Source code for cowidev.jhu.process

import pandas as pd
import numpy as np
import os
from datetime import datetime

from cowidev.megafile.steps.test import get_testing
from cowidev.jhu.load import (
    load_population,
    load_eu_country_names,
    LOCATIONS_BY_CONTINENT,
    LOCATIONS_BY_WB_INCOME_GROUP,
)


ZERO_DAY = "2020-01-21"
zero_day = datetime.strptime(ZERO_DAY, "%Y-%m-%d")

LARGE_DATA_CORRECTIONS = [
    ("Australia", "2022-04-01", "deaths"),
    ("Austria", "2022-04-21", "deaths"),
    ("Austria", "2022-04-22", "deaths"),
    ("Brazil", "2021-09-18", "cases"),
    ("Chile", "2020-07-17", "deaths"),
    ("Chile", "2022-03-21", "deaths"),
    ("China", "2020-04-17", "deaths"),
    ("Denmark", "2021-12-21", "deaths"),
    ("Ecuador", "2020-09-07", "deaths"),
    ("Ecuador", "2021-07-20", "deaths"),
    ("Finland", "2022-03-07", "deaths"),
    ("Iceland", "2022-05-17", "deaths"),
    ("India", "2021-06-10", "deaths"),
    ("Mexico", "2020-10-05", "deaths"),
    ("Mexico", "2021-06-01", "deaths"),
    ("Moldova", "2021-12-31", "deaths"),
    ("Norway", "2022-03-17", "deaths"),
    ("Oman", "2022-06-16", "deaths"),
    ("South Africa", "2021-11-23", "cases"),
    ("South Africa", "2022-01-06", "deaths"),
    ("Spain", "2020-06-19", "deaths"),
    ("Turkey", "2020-12-10", "cases"),
    ("United Kingdom", "2022-01-31", "cases"),
    ("United Kingdom", "2022-02-01", "deaths"),
    ("United Kingdom", "2022-04-06", "deaths"),
    ("United Kingdom", "2022-08-22", "deaths"),
    ("Vietnam", "2022-08-04", "cases"),
    ("Vietnam", "2022-08-06", "cases"),
]

# ==============
# Data injection
# ==============
[docs]def standardize_data(df): df = ( df[["date", "location", "new_cases", "new_deaths", "total_cases", "total_deaths"]] .pipe(discard_rows) .pipe(inject_owid_aggregates) .pipe(inject_weekly_growth) .pipe(inject_biweekly_growth) .pipe(inject_doubling_days) .pipe( inject_per_million, [ "new_cases", "new_deaths", "total_cases", "total_deaths", "weekly_cases", "weekly_deaths", "biweekly_cases", "biweekly_deaths", ], ) .pipe(inject_rolling_avg) .pipe(inject_cfr) .pipe(inject_days_since) .pipe(inject_exemplars) .sort_values(by=["location", "date"]) ) return df
# Useful for adding it to regions.csv and
[docs]def inject_population(df): return df.merge(load_population(), how="left", on="location")
[docs]def drop_population(df): return df.drop(columns=["population_year", "population"])
[docs]def inject_per_million(df, measures): df = inject_population(df) for measure in measures: pop_measure = measure + "_per_million" series = df[measure] / (df["population"] / 1e6) df[pop_measure] = series.round(decimals=3) return drop_population(df)
# =================================== # OWID continents + custom aggregates # =================================== aggregates_spec = { "World": {"include": None, "exclude": None}, "World excl. China": {"exclude": ["China"]}, "World excl. China and South Korea": {"exclude": ["China", "South Korea"]}, "World excl. China, South Korea, Japan and Singapore": {"exclude": ["China", "South Korea", "Japan", "Singapore"]}, # European Union "European Union": {"include": load_eu_country_names()}, # OWID continents **{continent: {"include": locations, "exclude": None} for continent, locations in LOCATIONS_BY_CONTINENT.items()}, # Asia without China "Asia excl. China": {"include": list(set(LOCATIONS_BY_CONTINENT["Asia"]) - set(["China"]))}, # World Bank income groups **{ income_group: {"include": locations, "exclude": None} for income_group, locations in LOCATIONS_BY_WB_INCOME_GROUP.items() }, }
[docs]def _sum_aggregate(df, name, include=None, exclude=None): df = df.copy() if include: df = df[df["location"].isin(include)] if exclude: df = df[~df["location"].isin(exclude)] df = df.groupby("date").sum().reset_index() df["location"] = name return df
[docs]def inject_owid_aggregates(df): return pd.concat( [ df, *[_sum_aggregate(df, name, **params) for name, params in aggregates_spec.items()], ], sort=True, ignore_index=True, )
# ======================= # Total/daily calculation # =======================
[docs]def inject_total_daily_cols(df, measures): # must sort in order to have the cumsum() and diff() in the right direction df = df.copy().sort_values(by=["location", "date"]) for measure in measures: total_col = "total_%s" % measure daily_col = "new_%s" % measure if total_col not in df.columns and daily_col in df.columns: df[total_col] = df.groupby("location")[daily_col].cumsum().astype("Int64") elif daily_col not in df.columns and total_col in df.columns: df[daily_col] = df.groupby("location")[total_col].diff().astype("Int64") return df
# ====================== # 'Days since' variables # ====================== days_since_spec = { "days_since_100_total_cases": { "value_col": "total_cases", "value_threshold": 100, "positive_only": False, }, "days_since_5_total_deaths": { "value_col": "total_deaths", "value_threshold": 5, "positive_only": False, }, "days_since_1_total_cases_per_million": { "value_col": "total_cases_per_million", "value_threshold": 1, "positive_only": False, }, "days_since_0_1_total_deaths_per_million": { "value_col": "total_deaths_per_million", "value_threshold": 0.1, "positive_only": False, }, }
[docs]def _get_date_of_threshold(df, col, threshold): try: return df["date"][df[col] >= threshold].iloc[0] except: return None
[docs]def _date_diff(a, b, positive_only=False): if pd.isnull(a) or pd.isnull(b): return None diff = (a - b).days if positive_only and diff < 0: return None return diff
[docs]def _days_since(df, spec): ref_date = pd.to_datetime(_get_date_of_threshold(df, spec["value_col"], spec["value_threshold"])) return ( pd.to_datetime(df["date"]).map(lambda date: _date_diff(date, ref_date, spec["positive_only"])).astype("Int64") )
[docs]def inject_days_since(df): df = df.copy() for col, spec in days_since_spec.items(): df[col] = ( df[["date", "location", spec["value_col"]]] .groupby("location") .apply(lambda df_group: _days_since(df_group, spec)) .reset_index(level=0, drop=True) ) return df
# =================== # Case Fatality Ratio # ===================
[docs]def _apply_row_cfr_100(row): if pd.notnull(row["total_cases"]) and row["total_cases"] >= 100: return row["cfr"] return pd.NA
[docs]def inject_cfr(df): cfr_series = (df["total_deaths"] / df["total_cases"]) * 100 df["cfr"] = cfr_series.round(decimals=3) df["cfr_100_cases"] = df.apply(_apply_row_cfr_100, axis=1) return df
# ================ # Rolling averages # ================ rolling_avg_spec = { "new_cases_7_day_avg_right": { "col": "new_cases", "window": 7, "min_periods": 6, "center": False, }, "new_deaths_7_day_avg_right": { "col": "new_deaths", "window": 7, "min_periods": 6, "center": False, }, "new_cases_per_million_7_day_avg_right": { "col": "new_cases_per_million", "window": 7, "min_periods": 6, "center": False, }, "new_deaths_per_million_7_day_avg_right": { "col": "new_deaths_per_million", "window": 7, "min_periods": 6, "center": False, }, }
[docs]def inject_rolling_avg(df): df = df.copy().sort_values(by="date") for col, spec in rolling_avg_spec.items(): df[col] = df[spec["col"]].astype("float") df[col] = ( df.groupby("location")[col] .rolling( window=spec["window"], min_periods=spec["min_periods"], center=spec["center"], ) .mean() .round(decimals=3) .reset_index(level=0, drop=True) ) return df
# =========================== # Variables to find exemplars # ===========================
[docs]def inject_exemplars(df): df = inject_population(df) # Inject days since 100th case IF population ≥ 5M def mapper_days_since(row): if pd.notnull(row["population"]) and row["population"] >= 5e6: return row["days_since_100_total_cases"] return pd.NA df["days_since_100_total_cases_and_5m_pop"] = df.apply(mapper_days_since, axis=1) # Inject boolean when all exenplar conditions hold # Use int because the Grapher doesn't handle non-ints very well countries_with_testing_data = set(get_testing()["location"]) def mapper_bool(row): if ( pd.notnull(row["days_since_100_total_cases"]) and pd.notnull(row["population"]) and row["days_since_100_total_cases"] >= 21 and row["population"] >= 5e6 and row["location"] in countries_with_testing_data ): return 1 return 0 df["5m_pop_and_21_days_since_100_cases_and_testing"] = df.apply(mapper_bool, axis=1) return drop_population(df)
# ========================= # Doubling days calculation # ========================= doubling_days_spec = { "doubling_days_total_cases_3_day_period": { "value_col": "total_cases", "periods": 3, }, "doubling_days_total_cases_7_day_period": { "value_col": "total_cases", "periods": 7, }, "doubling_days_total_deaths_3_day_period": { "value_col": "total_deaths", "periods": 3, }, "doubling_days_total_deaths_7_day_period": { "value_col": "total_deaths", "periods": 7, }, }
[docs]def pct_change_to_doubling_days(pct_change, periods): if pd.notnull(pct_change) and pct_change != 0: doubling_days = periods * np.log(2) / np.log(1 + pct_change) return np.round(doubling_days, decimals=2) return pd.NA
[docs]def inject_doubling_days(df): for col, spec in doubling_days_spec.items(): value_col = spec["value_col"] periods = spec["periods"] df.loc[df[value_col] == 0, value_col] = np.nan df[col] = ( df.groupby("location", as_index=False)[value_col] .pct_change(periods=periods, fill_method=None)[value_col] .map(lambda pct: pct_change_to_doubling_days(pct, periods)) ) return df
# ==================================== # Weekly & biweekly growth calculation # ====================================
[docs]def _inject_growth(df, prefix, periods): cases_colname = "%s_cases" % prefix deaths_colname = "%s_deaths" % prefix cases_growth_colname = "%s_pct_growth_cases" % prefix deaths_growth_colname = "%s_pct_growth_deaths" % prefix df[[cases_colname, deaths_colname]] = ( df[["location", "new_cases", "new_deaths"]] .groupby("location")[["new_cases", "new_deaths"]] .rolling(window=periods, min_periods=periods - 1, center=False) .sum() .reset_index(level=0, drop=True) ) df[[cases_growth_colname, deaths_growth_colname]] = ( df[["location", cases_colname, deaths_colname]] .groupby("location")[[cases_colname, deaths_colname]] .pct_change(periods=periods, fill_method=None) .round(3) .replace([np.inf, -np.inf], pd.NA) * 100 ) df.loc[df.new_cases.isnull(), cases_colname] = np.nan df.loc[df.new_deaths.isnull(), deaths_colname] = np.nan return df
[docs]def inject_weekly_growth(df): return _inject_growth(df, "weekly", 7)
[docs]def inject_biweekly_growth(df): return _inject_growth(df, "biweekly", 14)
# ============ # Export logic # ============ KEYS = ["date", "location"] BASE_MEASURES = [ "new_cases", "new_deaths", "total_cases", "total_deaths", "weekly_cases", "weekly_deaths", "biweekly_cases", "biweekly_deaths", ] PER_MILLION_MEASURES = ["%s_per_million" % m for m in BASE_MEASURES] DAYS_SINCE_MEASURES = list(days_since_spec.keys()) # Should keep these append-only in case someone external depends on the order FULL_DATA_COLS = [*KEYS, *BASE_MEASURES] GRAPHER_COL_NAMES = { "location": "Country", "date": "Year", # Absolute values "new_cases": "Daily new confirmed cases of COVID-19", "new_deaths": "Daily new confirmed deaths due to COVID-19", "total_cases": "Total confirmed cases of COVID-19", "total_deaths": "Total confirmed deaths due to COVID-19", # Per million "new_cases_per_million": "Daily new confirmed cases of COVID-19 per million people", "new_deaths_per_million": "Daily new confirmed deaths due to COVID-19 per million people", "total_cases_per_million": "Total confirmed cases of COVID-19 per million people", "total_deaths_per_million": "Total confirmed deaths due to COVID-19 per million people", # Days since "days_since_100_total_cases": "Days since the total confirmed cases of COVID-19 reached 100", "days_since_5_total_deaths": "Days since the total confirmed deaths of COVID-19 reached 5", "days_since_1_total_cases_per_million": ( "Days since the total confirmed cases of COVID-19 per million people reached 1" ), "days_since_0_1_total_deaths_per_million": ( "Days since the total confirmed deaths of COVID-19 per million people reached 0.1" ), # Rolling averages "new_cases_7_day_avg_right": "Daily new confirmed cases due to COVID-19 (rolling 7-day average, right-aligned)", "new_deaths_7_day_avg_right": "Daily new confirmed deaths due to COVID-19 (rolling 7-day average, right-aligned)", # Rolling averages - per million "new_cases_per_million_7_day_avg_right": ( "Daily new confirmed cases of COVID-19 per million people (rolling 7-day average, right-aligned)" ), "new_deaths_per_million_7_day_avg_right": ( "Daily new confirmed deaths due to COVID-19 per million people (rolling 7-day average, right-aligned)" ), # Case fatality ratio "cfr": "Case fatality rate of COVID-19 (%)", "cfr_100_cases": "Case fatality rate of COVID-19 (%) (Only observations with ≥100 cases)", # Exemplars variables "days_since_100_total_cases_and_5m_pop": ( "Days since the total confirmed cases of COVID-19 reached 100 (with population ≥ 5M)" ), "5m_pop_and_21_days_since_100_cases_and_testing": ( "Has population ≥ 5M AND had ≥100 cases ≥21 days ago AND has testing data" ), # Weekly aggregates "weekly_cases": "Weekly cases", "weekly_deaths": "Weekly deaths", "weekly_pct_growth_cases": "Weekly case growth (%)", "weekly_pct_growth_deaths": "Weekly death growth (%)", # Biweekly aggregates "biweekly_cases": "Biweekly cases", "biweekly_deaths": "Biweekly deaths", "biweekly_pct_growth_cases": "Biweekly case growth (%)", "biweekly_pct_growth_deaths": "Biweekly death growth (%)", # Weekly aggregates per capita "weekly_cases_per_million": "Weekly cases per million people", "weekly_deaths_per_million": "Weekly deaths per million people", # Biweekly aggregates per capita "biweekly_cases_per_million": "Biweekly cases per million people", "biweekly_deaths_per_million": "Biweekly deaths per million people", }
[docs]def existsin(l1, l2): return [x for x in l1 if x in l2]
[docs]def standard_export(df, output_path, grapher_name): # Grapher df_grapher = df.copy() df_grapher["date"] = pd.to_datetime(df_grapher["date"]).map(lambda date: (date - zero_day).days) df_grapher = ( df_grapher[GRAPHER_COL_NAMES.keys()] .rename(columns=GRAPHER_COL_NAMES) .to_csv(os.path.join(output_path, "%s.csv" % grapher_name), index=False) ) # Table & public extracts for external users # Excludes aggregates excluded_aggregates = list( set(aggregates_spec.keys()) - set( [ "World", "North America", "South America", "Europe", "Africa", "Asia", "Oceania", "European Union", "High income", "Upper middle income", "Lower middle income", "Low income", ] ) ) df_table = df[~df["location"].isin(excluded_aggregates)] # full_data.csv full_data_cols = existsin(FULL_DATA_COLS, df_table.columns) df_table[full_data_cols].dropna(subset=BASE_MEASURES, how="all").to_csv( os.path.join(output_path, "full_data.csv"), index=False ) # Pivot variables (wide format) for col_name in [*BASE_MEASURES, *PER_MILLION_MEASURES]: df_pivot = df_table.pivot(index="date", columns="location", values=col_name) # move World to first column cols = df_pivot.columns.tolist() cols.insert(0, cols.pop(cols.index("World"))) df_pivot[cols].to_csv(os.path.join(output_path, "%s.csv" % col_name)) return True
# Other
[docs]def hide_recent_zeros(df: pd.DataFrame) -> pd.DataFrame: last_reported_date = df.date.max() last_positive_cases_date = df.loc[df.new_cases > 0, "date"].max() if pd.isnull(last_positive_cases_date): return df if last_positive_cases_date != last_reported_date: last_known_cases = df.loc[df.date == last_positive_cases_date, "new_cases"].item() if last_known_cases >= 100 and (last_reported_date - last_positive_cases_date).days < 7: df.loc[df.date > last_positive_cases_date, "new_cases"] = np.nan last_positive_deaths_date = df.loc[df.new_deaths > 0, "date"].max() if pd.isnull(last_positive_deaths_date): return df if last_positive_deaths_date != last_reported_date: last_known_deaths = df.loc[df.date == last_positive_deaths_date, "new_deaths"].item() if last_known_deaths >= 10 and (last_reported_date - last_positive_deaths_date).days < 7: df.loc[df.date > last_positive_deaths_date, "new_deaths"] = np.nan return df
[docs]def discard_rows(df): # For all rows where new_cases or new_deaths is negative, we keep the cumulative value but set # the daily change to NA. This also sets the 7-day rolling average to NA for the next 7 days. df.loc[df.new_cases < 0, "new_cases"] = np.nan df.loc[df.new_deaths < 0, "new_deaths"] = np.nan # Custom data corrections for ldc in LARGE_DATA_CORRECTIONS: df.loc[(df.location == ldc[0]) & (df.date.astype(str) == ldc[1]), f"new_{ldc[2]}"] = np.nan # If the last known value is above 1000 cases or 100 deaths but the latest reported value is 0 # then set that value to NA in case it's a temporary reporting error. (Up to 7 days in the past) df = df.sort_values(["location", "date"]).groupby("location").apply(hide_recent_zeros) return df