Source code for cowidev.cmd.vax.generate.utils

import os
import itertools
from datetime import datetime
from math import isnan
import glob
import json
from shutil import copyfile

import pandas as pd
from pandas.api.types import is_numeric_dtype

from cowidev import PATHS
from cowidev.utils.clean.dates import localdate
from cowidev.utils.utils import pd_series_diff_values
from cowidev.utils.clean import clean_date
from cowidev.utils.log import get_logger
from cowidev.vax.utils.checks import VACCINES_ACCEPTED


logger = get_logger()


[docs]class DatasetGenerator: def __init__(self, logger): self.logger = logger self.aggregates = build_aggregates() self._countries_covered = None @property def column_names_int(self): return [ "total_vaccinations", "people_vaccinated", "people_fully_vaccinated", "total_boosters", "daily_vaccinations_raw", "daily_vaccinations", "daily_vaccinations_per_million", "new_vaccinations_smoothed", "new_vaccinations_smoothed_per_million", "new_vaccinations", "new_people_vaccinated_smoothed", ]
[docs] def pipeline_automated(self, df: pd.DataFrame) -> pd.DataFrame: """Generate DataFrame for automated states.""" return df.sort_values(by=["automated", "location"], ascending=[False, True])[ ["location", "automated"] ].reset_index(drop=True)
[docs] def pipeline_locations( self, df_vax: pd.DataFrame, df_metadata: pd.DataFrame, df_iso: pd.DataFrame ) -> pd.DataFrame: """Generate DataFrame for locations.""" def _pretty_vaccine(vaccines): return ", ".join(sorted(v.strip() for v in vaccines.split(","))) df_vax = ( df_vax.sort_values(by=["location", "date"]) .drop_duplicates(subset=["location"], keep="last") .rename( columns={ "date": "last_observation_date", "source_url": "source_website", } ) ) if len(df_metadata) != len(df_vax): loc_miss = pd_series_diff_values(df_metadata.location, df_vax.location) a = df_metadata[df_metadata.location.isin(loc_miss)] b = df_vax[df_vax.location.isin(loc_miss)] # print("metadata\n", a) # print("data\n", b) raise ValueError(f"Missmatch between vaccination data and metadata! Unknown location {loc_miss}.") return ( df_vax.assign(vaccines=df_vax.vaccine.apply(_pretty_vaccine)) # Keep only last vaccine set .merge(df_metadata, on="location") .merge(df_iso, on="location") )[ [ "location", "iso_code", "vaccines", "last_observation_date", "source_name", "source_website", ] ]
[docs] def pipe_daily_vaccinations(self, df: pd.DataFrame) -> pd.DataFrame: """Get daily vaccinations.""" logger.info("Adding daily metrics") df = df.sort_values(by=["location", "date"]) df = df.assign(new_vaccinations=df.groupby("location").total_vaccinations.diff()) df.loc[df.date.diff().dt.days > 1, "new_vaccinations"] = None # df = df.sort_values(["location", "date"]) return df
[docs] def _add_interpolate_base(self, df: pd.DataFrame, colname_cum: str, colname_diff: str) -> pd.DataFrame: dt_min = df.dropna(subset=[colname_cum]).date.min() dt_max = df.dropna(subset=[colname_cum]).date.max() df_nan = df[(df.date < dt_min) | (df.date > dt_max)] # Add missing dates df = df.merge( pd.Series(pd.date_range(dt_min, dt_max), name="date"), how="right", ).sort_values(by="date") # Calculate and add smoothed vars df[colname_diff] = ( df[colname_cum] .interpolate(method="linear") .diff() # .apply(lambda x: round(x) if not isnan(x) else x) ) # Add missing dates df = pd.concat([df, df_nan], ignore_index=True).sort_values("date") df = df.assign(location=df.location.dropna().iloc[0]) return df
[docs] def _add_interpolate(self, df: pd.DataFrame): return df.pipe(self._add_interpolate_base, "total_vaccinations", "new_vaccinations_interpolated").pipe( self._add_interpolate_base, "people_vaccinated", "new_people_vaccinated_interpolated" )
[docs] def pipe_interpolate(self, df: pd.DataFrame) -> pd.DataFrame: """Interpolate missing dates.""" logger.info("Interpolating daily metrics") df = df.groupby("location").apply(self._add_interpolate).reset_index(drop=True) df.to_csv("/tmp/test-interp.csv", index=False) return df
[docs] def _add_smoothed(self, df: pd.DataFrame) -> pd.DataFrame: # NEW VACCINATIONS df["new_vaccinations_smoothed"] = ( df["new_vaccinations_interpolated"] .rolling(7, min_periods=1) .mean() .apply(lambda x: round(x) if not isnan(x) else x) ) df.loc[df.new_vaccinations_interpolated.isna(), "new_vaccinations_smoothed"] = None df["new_people_vaccinated_smoothed"] = ( df["new_people_vaccinated_interpolated"] .rolling(7, min_periods=1) .mean() .apply(lambda x: round(x) if not isnan(x) else x) ) df.loc[df.new_people_vaccinated_interpolated.isna(), "new_people_vaccinated_smoothed"] = None return df
[docs] def pipe_smoothed(self, df: pd.DataFrame) -> pd.DataFrame: logger.info("Adding smoothed variables") df = df.groupby("location").apply(self._add_smoothed).reset_index(drop=True) df.to_csv("/tmp/test-smooth.csv", index=False) return df
[docs] def _get_aggregate(self, df, agg_name, included_locs, excluded_locs): # Take rows that matter agg = df[~df.location.isin(self.aggregates.keys())] # remove aggregated rows if excluded_locs is not None: agg = agg[~agg.location.isin(excluded_locs)] elif included_locs is not None: agg = agg[agg.location.isin(included_locs)] # Get full location-date grid agg = ( pd.DataFrame( itertools.product(agg.location.unique(), agg.date.unique()), columns=[agg.location.name, agg.date.name], ) .merge(agg, on=["date", "location"], how="outer") .sort_values(by=["location", "date"]) ) # NaN: Forward filling + Zero-filling if all metric is NaN cols = [ "total_vaccinations", "people_vaccinated", "people_fully_vaccinated", "total_boosters", "new_vaccinations", "new_vaccinations_smoothed", "new_people_vaccinated_smoothed", ] grouper = agg.groupby("location") for col in cols: agg[col] = grouper[col].apply(lambda x: x.fillna(0) if x.isnull().all() else x.fillna(method="ffill")) # Aggregate agg = agg.groupby("date").sum().reset_index().assign(location=agg_name) agg = agg[agg.date.dt.date < datetime.now().date()] return agg
[docs] def _get_aggregate_new(self, df, agg_name, included_locs, excluded_locs): # Take rows that matter agg = df[~df.location.isin(self.aggregates.keys())] # remove aggregated rows if excluded_locs is not None: agg = agg[~agg.location.isin(excluded_locs)] elif included_locs is not None: agg = agg[agg.location.isin(included_locs)] # Get full location-date grid agg = ( pd.DataFrame( itertools.product(agg.location.unique(), agg.date.unique()), columns=[agg.location.name, agg.date.name], ) .merge(agg, on=["date", "location"], how="outer") .sort_values(by=["location", "date"]) ) # cumulative metrics: Forward filling + Zero-filling if all metric is NaN cols_ffill = [ "total_vaccinations", "people_vaccinated", "people_fully_vaccinated", "total_boosters", ] # daily metrics: zero fill cols_0fill = [ "new_vaccinations", "new_vaccinations_interpolated", "new_vaccinations_smoothed", "new_people_vaccinated_interpolated", "new_people_vaccinated_smoothed", ] grouper = agg.groupby("location") for col in cols_ffill: agg[col] = grouper[col].apply(lambda x: x.fillna(0) if x.isnull().all() else x.fillna(method="ffill")) for col in cols_0fill: agg[col] = grouper[col].apply(lambda x: x.fillna(0)) # Use interpolated, otherwise it will be too much of an underestimate agg["new_vaccinations"] = agg["new_vaccinations_interpolated"].apply(lambda x: x if isnan(x) else round(x)) # Aggregate agg = agg.groupby("date").sum().reset_index().assign(location=agg_name) # Filter dates for daily metrics mask = agg.date.dt.date > localdate(minus_days=7, as_datetime=True).date() columns = [ "new_vaccinations", "new_vaccinations_interpolated", "new_vaccinations_smoothed", "new_people_vaccinated_interpolated", "new_people_vaccinated_smoothed", ] agg.loc[mask, columns] = None return agg
[docs] def pipe_aggregates(self, df: pd.DataFrame) -> pd.DataFrame: logger.info(f"Building aggregate regions {list(self.aggregates.keys())}") aggs = [] for agg_name, _ in self.aggregates.items(): aggs.append( self._get_aggregate_new( df=df, agg_name=agg_name, included_locs=self.aggregates[agg_name]["included_locs"], excluded_locs=self.aggregates[agg_name]["excluded_locs"], ) ) df = pd.concat([df] + aggs, ignore_index=True) df.to_csv("/tmp/test-agg.csv", index=False) return df
[docs] def get_population(self, df_subnational: pd.DataFrame) -> pd.DataFrame: # Build population dataframe column_rename = {"entity": "location", "population": "population"} pop = pd.read_csv(PATHS.INTERNAL_INPUT_UN_POPULATION_FILE, usecols=column_rename.keys()).rename( columns=column_rename ) pop = pd.concat([pop, df_subnational], ignore_index=True) # The US population denominator is more complex to calculate, as the US CDC is pulling # together data from multiple territories and federal agencies to build national figures. # To ensure that we use the correct territorial boundaries in the denominator, we use the # population figure provided by the CDC in its data. pop.loc[pop.location == "United States", "population"] = 332008832 return pop
[docs] def pipe_capita(self, df: pd.DataFrame) -> pd.DataFrame: logger.info("Adding per-capita variables") # Get data df_subnational = pd.read_csv(PATHS.INTERNAL_INPUT_OWID_POPULATION_SUB_FILE, usecols=["location", "population"]) pop = self.get_population(df_subnational) df = df.merge(pop, on="location", validate="many_to_one", how="left") if df.population.isna().any(): missing_locs = df[df.population.isna()].location.unique() raise ValueError(f"Missing population data for {missing_locs}") # Get covered countries locations = df.location.unique() ncountries = df_subnational.location.tolist() + list(self.aggregates.keys()) self._countries_covered = list(filter(lambda x: x not in ncountries, locations)) # Obtain per-capita metrics df = df.assign( total_vaccinations_per_hundred=(df.total_vaccinations * 100 / df.population).round(2), people_vaccinated_per_hundred=(df.people_vaccinated * 100 / df.population).round(2), people_fully_vaccinated_per_hundred=(df.people_fully_vaccinated * 100 / df.population).round(2), total_boosters_per_hundred=(df.total_boosters * 100 / df.population).round(2), new_vaccinations_smoothed_per_million=(df.new_vaccinations_smoothed * 1000000 / df.population).round(), new_people_vaccinated_smoothed_per_hundred=(df.new_people_vaccinated_smoothed * 100 / df.population).round( 3 ), ) df.loc[:, "people_fully_vaccinated"] = df.people_fully_vaccinated.replace({0: pd.NA}) df.loc[df.people_fully_vaccinated.isnull(), "people_fully_vaccinated_per_hundred"] = pd.NA df.loc[:, "total_boosters"] = df.total_boosters.replace({0: pd.NA}) df.loc[df.total_boosters.isnull(), "total_boosters_per_hundred"] = pd.NA df["people_unvaccinated"] = df.population - df.people_vaccinated df.loc[df.people_unvaccinated < 0, "people_unvaccinated"] = 0 return df.drop(columns=["population"])
[docs] def pipe_vax_checks(self, df: pd.DataFrame) -> pd.DataFrame: logger.info("Sanity checks") # Config skip_countries = ["Pitcairn"] # Sanity checks df_to_check = df[-df.location.isin(skip_countries)] if not (df_to_check.total_vaccinations.dropna() >= 0).all(): raise ValueError("Negative values found! Check values in `total_vaccinations`.") if not (df_to_check.new_vaccinations_smoothed.dropna() >= 0).all(): raise ValueError("Negative values found! Check values in `new_vaccinations_smoothed`.") if not (msk := (x := df_to_check.new_vaccinations_smoothed_per_million.dropna()) <= 120000).all(): example = df_to_check.loc[x[~msk].index, ["date", "location", "new_vaccinations_smoothed_per_million"]] raise ValueError( f"Huge values found! Check values in `new_vaccinations_smoothed_per_million`: \n{example}" ) return df
[docs] def pipe_to_int(self, df: pd.DataFrame) -> pd.DataFrame: logger.info("Converting INT columns to int") # Ensure Int types cols = df.columns count_cols = [col for col in self.column_names_int if col in cols] df[count_cols] = df[count_cols].astype("Int64").fillna(pd.NA) return df
[docs] def pipe_drop_columns(self, df: pd.DataFrame) -> pd.DataFrame: logger.info("Removing columns") return df.drop(columns=["new_vaccinations_interpolated", "new_people_vaccinated_interpolated"])
[docs] def pipeline_vaccinations(self, df: pd.DataFrame) -> pd.DataFrame: df = ( df[ [ "date", "location", "total_vaccinations", "people_vaccinated", "people_fully_vaccinated", "total_boosters", ] ] .pipe(self.pipe_daily_vaccinations) .pipe(self.pipe_interpolate) .pipe(self.pipe_smoothed) .pipe(self.pipe_aggregates) .pipe(self.pipe_capita) .pipe(self.pipe_vax_checks) .pipe(self.pipe_to_int) .pipe(self.pipe_drop_columns) .sort_values(by=["location", "date"]) ) return df
[docs] def pipe_vaccinations_csv(self, df: pd.DataFrame, df_iso: pd.DataFrame) -> pd.DataFrame: return df.merge(df_iso, on="location").rename( columns={ "new_vaccinations_smoothed": "daily_vaccinations", "new_vaccinations_smoothed_per_million": "daily_vaccinations_per_million", "new_vaccinations": "daily_vaccinations_raw", "new_people_vaccinated_smoothed": "daily_people_vaccinated", "new_people_vaccinated_smoothed_per_hundred": "daily_people_vaccinated_per_hundred", } )[ [ "location", "iso_code", "date", "total_vaccinations", "people_vaccinated", "people_fully_vaccinated", "total_boosters", "daily_vaccinations_raw", "daily_vaccinations", "total_vaccinations_per_hundred", "people_vaccinated_per_hundred", "people_fully_vaccinated_per_hundred", "total_boosters_per_hundred", "daily_vaccinations_per_million", "daily_people_vaccinated", "daily_people_vaccinated_per_hundred", ] ]
[docs] def pipe_vaccinations_json(self, df: pd.DataFrame) -> list: location_iso_codes = df[["location", "iso_code"]].drop_duplicates().values.tolist() metrics = [column for column in df.columns if column not in {"location", "iso_code"}] df = df.assign(date=df.date.apply(clean_date)) return [ { "country": location, "iso_code": iso_code, "data": [ {**x[i]} for i, x in df.loc[(df.location == location) & (df.iso_code == iso_code), metrics] .stack() .groupby(level=0) ], } for location, iso_code in location_iso_codes ]
[docs] def pipe_manufacturer_select_cols(self, df: pd.DataFrame) -> pd.DataFrame: return df[ [ "location", "date", "vaccine", "total_vaccinations", ] ].sort_values(["location", "date", "vaccine"])
[docs] def pipe_manufacturer_add_eu(self, df: pd.DataFrame) -> pd.DataFrame: eu_countries = pd.read_csv(PATHS.INTERNAL_INPUT_OWID_EU_FILE, usecols=["Country"], squeeze=True).tolist() eu_manufacturer = ( df[df.location.isin(eu_countries)] .pivot(index=["location", "vaccine"], columns="date", values="total_vaccinations") .reset_index() .melt(id_vars=["location", "vaccine"], var_name="date", value_name="total_vaccinations") .sort_values("date") ) eu_manufacturer["total_vaccinations"] = ( eu_manufacturer.groupby(["location", "vaccine"]).ffill().total_vaccinations ) eu_manufacturer = ( eu_manufacturer[eu_manufacturer.date.astype(str) >= "2020-12-27"] .groupby(["date", "vaccine"], as_index=False) .sum() .assign(location="European Union") ) return pd.concat([df, eu_manufacturer])
[docs] def pipe_manufacturer_filter_dates(self, df: pd.DataFrame) -> pd.DataFrame: return df[df.date.astype(str) >= "2020-12-01"]
[docs] def pipe_manufacturer_checks(self, df: pd.DataFrame) -> pd.DataFrame: vaccines_wrong = set(df.vaccine).difference(VACCINES_ACCEPTED) if vaccines_wrong: raise ValueError(f"Invalid vaccines found in manufacturer file! {vaccines_wrong}") return df
[docs] def pipeline_manufacturer(self, df: pd.DataFrame) -> pd.DataFrame: return ( df.pipe(self.pipe_manufacturer_select_cols) .pipe(self.pipe_manufacturer_add_eu) .pipe(self.pipe_manufacturer_filter_dates) .pipe(self.pipe_manufacturer_checks) .pipe(self.pipe_to_int) )
[docs] def pipe_age_checks(self, df: pd.DataFrame) -> pd.DataFrame: if df[["location", "date", "age_group_min"]].isnull().sum().sum() != 0: raise ValueError( "Unexpected NaN values found in one (or several) fields from `location`, `date`, `age_group_min`" ) if not ( is_numeric_dtype(df.people_vaccinated_per_hundred) and is_numeric_dtype(df.people_fully_vaccinated_per_hundred) and is_numeric_dtype(df.people_with_booster_per_hundred) ): raise TypeError("Metrics should be numeric! E.g., 50.23") return df
[docs] def pipe_metrics_format(self, df: pd.DataFrame) -> pd.DataFrame: cols_metrics = [ "people_vaccinated_per_hundred", "people_fully_vaccinated_per_hundred", "people_with_booster_per_hundred", ] df[cols_metrics] = df[cols_metrics].round(2) return df
[docs] def pipe_age_group(self, df: pd.DataFrame) -> pd.DataFrame: # Get age group age_min = df.age_group_min.astype(str) age_max = df.age_group_max.astype("Int64").apply(lambda x: str(x) if not pd.isna(x) else "+") age_group = (age_min + "-" + age_max).replace(to_replace=r"-\+", value="+", regex=True) return df.assign(age_group=age_group)
[docs] def pipe_age_output(self, df: pd.DataFrame) -> pd.DataFrame: return df.dropna( subset=[ "people_vaccinated_per_hundred", "people_fully_vaccinated_per_hundred", "people_with_booster_per_hundred", ], how="all", )[ [ "location", "date", "age_group", "people_vaccinated_per_hundred", "people_fully_vaccinated_per_hundred", "people_with_booster_per_hundred", ] ].sort_values( ["location", "date", "age_group"] )
[docs] def pipeline_age(self, df: pd.DataFrame) -> pd.DataFrame: return ( df.pipe(self.pipe_age_checks) .pipe(self.pipe_metrics_format) .pipe(self.pipe_age_group) .pipe(self.pipe_age_output) )
[docs] def add_booster_share(self, df: pd.DataFrame) -> pd.DataFrame: shape_before = df.shape global_boosters = df[df.location == "World"][ ["location", "date", "total_vaccinations", "total_boosters"] ].sort_values("date") global_boosters[["total_vaccinations", "total_boosters"]] = global_boosters[ ["total_vaccinations", "total_boosters"] ].astype(float) global_boosters["share_of_boosters"] = ( ( (global_boosters.total_boosters - global_boosters.total_boosters.shift(1)) / (global_boosters.total_vaccinations - global_boosters.total_vaccinations.shift(1)) ) .rolling(14) .mean() .round(4) ) global_boosters = global_boosters.drop(columns=["total_vaccinations", "total_boosters"]) df = pd.merge(df, global_boosters, how="left", on=["location", "date"], validate="one_to_one") assert ( df.shape[0] == shape_before[0] and df.shape[1] == shape_before[1] + 1 ), "Adding share_of_boosters has changed the shape of the dataframe in an unintended way!" return df
[docs] def pipe_grapher( self, df: pd.DataFrame, date_ref: datetime = datetime(2020, 1, 21), fillna: bool = False, fillna_0: bool = True, ) -> pd.DataFrame: df = ( df.rename( columns={ "date": "Year", "location": "Country", } ).assign(Year=(df.date - date_ref).dt.days) ).copy() columns_first = ["Country", "Year"] columns_rest = [col for col in df.columns if col not in columns_first] col_order = columns_first + columns_rest df = df[col_order].sort_values(["Country", "Year"]) if fillna: filled = df.groupby(["Country"])[columns_rest].fillna(method="ffill") if fillna_0: df[columns_rest] = filled.fillna(0) else: df[columns_rest] = filled return df
[docs] def pipe_manufacturer_pivot(self, df: pd.DataFrame) -> pd.DataFrame: x = df.groupby(["location", "date", "vaccine"]).count().sort_values("total_vaccinations") mask = x.total_vaccinations != 1 if mask.sum() != 0: raise ValueError(f"Check entries {x[mask]}") return df.pivot(index=["location", "date"], columns="vaccine", values="total_vaccinations").reset_index()
[docs] def pipeline_manufacturer_grapher(self, df: pd.DataFrame) -> pd.DataFrame: return ( df.pipe(self.pipe_manufacturer_pivot) .pipe(self.pipe_grapher, date_ref=datetime(2021, 1, 1), fillna=True) .pipe(self.pipe_to_int) )
[docs] def pipe_age_pivot(self, df: pd.DataFrame) -> pd.DataFrame: duplicates = df[df.duplicated(subset=["date", "location", "age_group"])] if len(duplicates) > 0: # print(duplicates) raise Exception("There are duplicate combinations of location-date-age_group in the age dataset!") df = df.pivot( index=["location", "date"], columns="age_group", ).reset_index() # Ensure column order columns = pd.MultiIndex.from_tuples(sorted(df.columns, key=lambda x: x[0] + x[1])) df = df[columns] columns_wrong_1 = df.people_vaccinated_per_hundred.columns.difference( df.people_fully_vaccinated_per_hundred.columns ) columns_wrong_2 = df.people_fully_vaccinated_per_hundred.columns.difference( df.people_with_booster_per_hundred.columns ) if columns_wrong_1.any() or columns_wrong_2.any(): raise ValueError( f"There is a mismatch between age groups in people vaccinated and people fully vaccinated" ) return df
[docs] def pipe_age_partly(self, df: pd.DataFrame) -> pd.DataFrame: # Add partly vaccinated y = (df["people_vaccinated_per_hundred"] - df["people_fully_vaccinated_per_hundred"]).round(2) cols = pd.MultiIndex.from_tuples([("people_partly_vaccinated_per_hundred", yy) for yy in y.columns]) y.columns = cols df[cols] = y return df
[docs] def pipe_age_flatten(self, df: pd.DataFrame) -> pd.DataFrame: # Flatten columns new_cols = [] for col in df.columns: if col[0] == "people_vaccinated_per_hundred": new_cols.append(f"{col[1]}_start") elif col[0] == "people_fully_vaccinated_per_hundred": new_cols.append(f"{col[1]}_fully") elif col[0] == "people_partly_vaccinated_per_hundred": new_cols.append(f"{col[1]}_partly") elif col[0] == "people_with_booster_per_hundred": new_cols.append(f"{col[1]}_booster") else: new_cols.append(col[0]) df.columns = new_cols return df
[docs] def pipeline_age_grapher(self, df: pd.DataFrame) -> pd.DataFrame: return ( df.pipe(self.pipe_age_pivot) .pipe(self.pipe_age_partly) .pipe(self.pipe_age_flatten) .pipe( self.pipe_grapher, date_ref=datetime(2021, 1, 1), fillna=True, fillna_0=False, ) )
[docs] def export( self, df_automated: pd.DataFrame, df_locations: pd.DataFrame, df_vaccinations: pd.DataFrame, df_manufacturer: pd.DataFrame, df_age: pd.DataFrame, json_vaccinations: dict, df_grapher: pd.DataFrame, df_manufacturer_grapher: pd.DataFrame, df_age_grapher: pd.DataFrame, # html_table: str, ): # Export files = [ (df_automated, PATHS.INTERNAL_OUTPUT_VAX_AUTOM_FILE), (df_locations, PATHS.DATA_VAX_META_FILE), (df_vaccinations, PATHS.DATA_VAX_MAIN_FILE), (df_manufacturer, PATHS.DATA_VAX_MANUFACT_FILE), (df_age, PATHS.DATA_VAX_AGE_FILE), (json_vaccinations, PATHS.DATA_VAX_MAIN_JSON_FILE), (df_grapher, os.path.join(PATHS.INTERNAL_GRAPHER_DIR, "COVID-19 - Vaccinations.csv")), ( df_manufacturer_grapher, os.path.join( PATHS.INTERNAL_GRAPHER_DIR, "COVID-19 - Vaccinations by manufacturer.csv", ), ), ( df_age_grapher, os.path.join( PATHS.INTERNAL_GRAPHER_DIR, "COVID-19 - Vaccinations by age group.csv", ), ), # (html_table, PATHS.DATA_INTERNAL_VAX_TABLE), ] for obj, path in files: if path.endswith(".csv"): obj.to_csv(path, index=False) elif path.endswith(".json"): with open(path, "w") as f: json.dump(obj, f, indent=2) # default=lambda o: o.__dict__, sort_keys=True elif path.endswith(".html"): with open(path, "w") as f: f.write(obj) else: raise ValueError("Format not supported. Currently only csv, json and html are accepted!")
[docs] def _cp_locations_files(self): copyfile(PATHS.INTERNAL_OUTPUT_VAX_META_MANUFACT_FILE, PATHS.DATA_VAX_META_MANUFACT_FILE) copyfile(PATHS.INTERNAL_OUTPUT_VAX_META_AGE_FILE, PATHS.DATA_VAX_META_AGE_FILE)
[docs] def run(self): logger.info("-- Generating dataset... --") logger.info("1/10 Loading input data...") try: df_metadata = pd.read_csv(PATHS.INTERNAL_OUTPUT_VAX_META_FILE) df_vaccinations = pd.concat( [pd.read_csv(path, parse_dates=["date"]) for path in glob.glob(PATHS.DATA_VAX_COUNTRY_DIR + "/*.csv")] ).sort_values(by=["location", "date"]) except FileNotFoundError: raise FileNotFoundError( "Internal files not found! Make sure to run `proccess-data` step prior to running `generate-dataset`." ) df_iso = pd.read_csv(PATHS.INTERNAL_INPUT_ISO_FILE) files_manufacturer = glob.glob(os.path.join(PATHS.INTERNAL_OUTPUT_VAX_MANUFACT_DIR, "*.csv")) df_manufacturer = pd.concat( (pd.read_csv(filepath, parse_dates=["date"]) for filepath in files_manufacturer), ignore_index=True, ) files_age = glob.glob(os.path.join(PATHS.INTERNAL_OUTPUT_VAX_AGE_DIR, "*.csv")) df_age = pd.concat( (pd.read_csv(filepath, parse_dates=["date"]) for filepath in files_age), ignore_index=True, ) # Metadata logger.info("2/10 Generating `automated_state` table...") df_automated = df_metadata.pipe(self.pipeline_automated) # Export to AUTOMATED_STATE_FILE logger.info("3/10 Generating `locations` table...") df_locations = df_vaccinations.pipe(self.pipeline_locations, df_metadata, df_iso) # Export to LOCATIONS_FILE # Vaccinations logger.info("4/10 Generating `vaccinations` table...") df_vaccinations_base = df_vaccinations.pipe(self.pipeline_vaccinations) df_vaccinations = df_vaccinations_base.pipe(self.pipe_vaccinations_csv, df_iso) logger.info("5/10 Generating `vaccinations` json...") json_vaccinations = df_vaccinations.pipe(self.pipe_vaccinations_json) # Manufacturer logger.info("6/10 Generating `manufacturer` table...") df_manufacturer = df_manufacturer.pipe(self.pipeline_manufacturer) # Age logger.info("7/10 Generating `age` table...") df_age = df_age.pipe(self.pipeline_age) # Grapher logger.info("8/10 Generating `grapher` tables...") df_grapher = df_vaccinations_base.pipe(self.add_booster_share).pipe(self.pipe_grapher) df_manufacturer_grapher = df_manufacturer.pipe(self.pipeline_manufacturer_grapher) df_age_grapher = df_age.pipe(self.pipeline_age_grapher) # df_age_grapher_fully = df_age.pipe(self.pipeline_age_grapher, "people_fully_vaccinated_per_hundred") # HTML logger.info("9/10 Generating HTML...") # html_table = df_locations.pipe(self.pipe_locations_to_html) # Export logger.info("10/10 Exporting files...") self.export( df_automated=df_automated, df_locations=df_locations, df_vaccinations=df_vaccinations, df_manufacturer=df_manufacturer, df_age=df_age, json_vaccinations=json_vaccinations, df_grapher=df_grapher, df_manufacturer_grapher=df_manufacturer_grapher, df_age_grapher=df_age_grapher, # html_table=html_table, ) self._cp_locations_files() # Timestamp timestamp_filename = PATHS.DATA_TIMESTAMP_VAX_FILE with open(timestamp_filename, "w") as timestamp_file: timestamp_file.write(datetime.utcnow().replace(microsecond=0).isoformat())
[docs]def build_aggregates(): continent_countries = pd.read_csv(PATHS.INTERNAL_INPUT_OWID_CONT_FILE, usecols=["Entity", "Unnamed: 3"]) eu_countries = pd.read_csv(PATHS.INTERNAL_INPUT_OWID_EU_FILE, usecols=["Country"], squeeze=True).tolist() income_groups = pd.concat( [ pd.read_csv(PATHS.INTERNAL_INPUT_WB_INCOME_FILE, usecols=["Country", "Income group"]), pd.read_csv(PATHS.INTERNAL_INPUT_OWID_INCOME_FILE, usecols=["Country", "Income group"]), ], ignore_index=True, ) aggregates = { "World": { "excluded_locs": ["England", "Northern Ireland", "Scotland", "Wales"], "included_locs": None, }, "European Union": { "excluded_locs": None, "included_locs": eu_countries, }, "World excl. China": { "excluded_locs": ["China"], "included_locs": None, }, } for continent in [ "Asia", "Africa", "Europe", "North America", "Oceania", "South America", ]: aggregates[continent] = { "excluded_locs": None, "included_locs": ( continent_countries.loc[continent_countries["Unnamed: 3"] == continent, "Entity"].tolist() ), } for group in income_groups["Income group"].unique(): aggregates[group] = { "excluded_locs": None, "included_locs": (income_groups.loc[income_groups["Income group"] == group, "Country"].tolist()), } return aggregates