Source code for cowidev.vax.batch.canada

import numpy as np
import pandas as pd

from cowidev.utils.utils import check_known_columns
from cowidev.utils.web import request_json
from cowidev.utils.web.download import read_csv_from_url
from cowidev.vax.utils.base import CountryVaxBase
from cowidev.vax.utils.checks import validate_vaccines
from cowidev.vax.utils.utils import build_vaccine_timeline


[docs]class Canada(CountryVaxBase): location: str = "Canada" source_name: str = "Public Health Agency of Canada" source_url: str = "https://api.covid19tracker.ca/reports" source_url_a: str = ( "https://health-infobase.canada.ca/src/data/covidLive/vaccination-coverage-byAgeAndSex-overTimeDownload.csv" ) source_url_m: str = ( "https://health-infobase.canada.ca/src/data/covidLive/vaccination-administration-bydosenumber2.csv" ) source_url_ref: str = "https://covid19tracker.ca/vaccinationtracker.html" source_url_age: str = "https://health-infobase.canada.ca/covid-19/vaccination-coverage/" source_url_man: str = "https://health-infobase.canada.ca/covid-19/vaccine-administration/" cols_age: dict = { "week_end": "date", "age": "age", "numtotal_atleast1dose": "people_vaccinated", "numtotal_fully": "people_fully_vaccinated", "numtotal_additional": "people_with_booster", } cols_man: dict = { "week_end": "date", "product_name": "vaccine", "numtotal_dose1_admin": "total_vaccinations", "numtotal_dose2_admin": "total_vaccinations", "numtotal_dose3_admin": "total_vaccinations", "numtotal_dose4_admin": "total_vaccinations", "numtotal_dose5+_admin": "total_vaccinations", "numtotal_dosenotreported_admin": "total_vaccinations", } age_pattern: str = r"0?(\d{1,2})(?:–0?(\d{1,2})|\+)" vaccine_mapping: dict = { "AstraZeneca Vaxzevria/COVISHIELD": "Oxford/AstraZeneca", "Janssen": "Johnson&Johnson", "Janssen Jcovden": "Johnson&Johnson", "Medicago Covifenz": "Medicago", "Moderna Spikevax": "Moderna", "Moderna Spikevax (ages 6 months-5 years)": "Moderna", "Not reported": None, "Novavax": "Novavax", "Novavax Nuvaxovid": "Novavax", "Pfizer-BioNTech Comirnaty": "Pfizer/BioNTech", "Pfizer-BioNTech Comirnaty pediatric 5-11 years": "Pfizer/BioNTech", "Pfizer-BioNTech Comirnaty (ages 5-11 years)": "Pfizer/BioNTech", "Pfizer-BioNTech Comirnaty (ages 12 years and older)": "Pfizer/BioNTech", "Total": None, "Unknown": None, } max_filtered_dates: int = 3 max_removed_rows: int = 22
[docs] def read(self) -> pd.DataFrame: data = request_json(self.source_url) df = pd.DataFrame.from_records(data["data"]) check_known_columns( df, [ "date", "change_cases", "change_fatalities", "change_tests", "change_hospitalizations", "change_criticals", "change_recoveries", "change_vaccinations", "change_vaccinated", "change_boosters_1", "change_boosters_2", "change_vaccines_distributed", "total_cases", "total_fatalities", "total_tests", "total_hospitalizations", "total_criticals", "total_recoveries", "total_vaccinations", "total_vaccinated", "total_boosters_1", "total_boosters_2", "total_vaccines_distributed", ], ) return df[["date", "change_vaccinations", "change_vaccinated", "change_boosters_1", "change_boosters_2"]]
[docs] def read_age(self) -> pd.DataFrame: df = read_csv_from_url(self.source_url_a) check_known_columns( df, [ "pruid", "prename", "prfname", "week_end", "sex", "age", "numtotal_atleast1dose", "numtotal_partially", "numtotal_fully", "numtotal_additional", "numtotal_2nd_additional", "proptotal_atleast1dose", "proptotal_partially", "proptotal_fully", "proptotal_additional", "proptotal_2nd_additional", ], ) return df
[docs] def read_manufacturer(self) -> pd.DataFrame: df = read_csv_from_url(self.source_url_m) check_known_columns( df, [ "week_end", "pruid", "prename", "prfname", "product_name", "numtotal_totaldoses_admin", "numtotal_dose1_admin", "numtotal_dose2_admin", "numtotal_dose3_admin", "numtotal_dose4_admin", "numtotal_dose5+_admin", "numtotal_dosenotreported_admin", "numdelta_dose1", "numdelta_dose2", "numdelta_dose3", "numdelta_dose4", "numdelta_dose5+", "numdelta_notreported", "num2weekdelta_dose1", "num2weekdelta_dose2", "num2weekdelta_dose3", "num2weekdelta_dose4", "num2weekdelta_dose5+", "num2weekdelta_notreported", "num4weekdelta_dose1", "num4weekdelta_dose2", "num4weekdelta_dose3", "num4weekdelta_dose4", "num4weekdelta_dose5+", "num4weekdelta_notreported", ], ) return df
[docs] def pipeline_age(self, df: pd.DataFrame) -> pd.DataFrame: # Filter rows & columns df = df[(df.pruid == 1) & (df.sex == "All sexes") & df.age.str.match(self.age_pattern)] df = df[self.cols_age.keys()].rename(columns=self.cols_age) # Parse age groups df[["age_group_min", "age_group_max"]] = df.age.str.extract(self.age_pattern).fillna("") # Convert data types and calculate per capita metrics metrics = df.filter(like="people_").columns df[metrics] = df[metrics].astype("float").fillna(0) df = df.pipe(self.pipe_age_per_capita) return df.assign(location=self.location)
[docs] def pipeline_manufacturer(self, df: pd.DataFrame) -> pd.DataFrame: # Filter rows & columns df = df[df.pruid == 1].fillna(0) df = df[self.cols_man.keys()].rename(columns=self.cols_man) # Dtype df = df.astype({"total_vaccinations": float}) # Calculate total vaccinations df = df.groupby(df.columns, axis=1).sum() # Check and map vaccine names validate_vaccines(df, self.vaccine_mapping) df = df[df.total_vaccinations > 0].replace(self.vaccine_mapping).dropna() df = df.groupby(["date", "vaccine"], as_index=False).sum() return df.assign(location=self.location)
[docs] def pipe_get_totals(self, df: pd.DataFrame) -> pd.DataFrame: df = df.fillna(0).sort_values("date") metrics = df.filter(like="change_").columns df[metrics] = df[metrics].cumsum() df.columns = df.columns.str.replace("change_", "total_") return df[df.total_vaccinations > 0]
[docs] def pipe_rename_columns(self, df: pd.DataFrame) -> pd.DataFrame: return df.rename(columns={"total_vaccinated": "people_fully_vaccinated"})
[docs] def pipe_metrics(self, df: pd.DataFrame) -> pd.DataFrame: total_boosters = df.total_boosters_1 + df.total_boosters_2 return df.assign( people_vaccinated=df.total_vaccinations - df.people_fully_vaccinated - total_boosters, total_boosters=total_boosters, )
[docs] def pipe_vaccine_timeline(self, df: pd.DataFrame, df_man: pd.DataFrame) -> pd.DataFrame: vaccine_timeline = df_man[["date", "vaccine"]].groupby("vaccine").min().date.to_dict() vaccine_timeline["Pfizer/BioNTech"] = "2020-12-14" # Vaccination start date return df.pipe(build_vaccine_timeline, vaccine_timeline)
[docs] def pipe_filter_dp(self, df: pd.DataFrame) -> pd.DataFrame: df.loc[df.date.isin(["2022-07-29", "2022-07-30", "2022-07-31"]), "people_vaccinated"] = np.nan return df
[docs] def pipe_make_monotonic(self, df: pd.DataFrame) -> pd.DataFrame: num_filtered_dates = 0 while True: try: df = df.pipe(self.make_monotonic, max_removed_rows=self.max_removed_rows) except Exception: if num_filtered_dates < self.max_filtered_dates: # Filter the last dates if `make_monotonic()` fails df = df.iloc[:-1] num_filtered_dates += 1 else: raise else: break return df
[docs] def pipeline(self, df: pd.DataFrame, df_man: pd.DataFrame) -> pd.DataFrame: return ( df.pipe(self.pipe_get_totals) .pipe(self.pipe_rename_columns) .pipe(self.pipe_metrics) .pipe(self.pipe_vaccine_timeline, df_man) .pipe(self.pipe_filter_dp) .pipe(self.pipe_metadata) .pipe(self.pipe_make_monotonic)[ [ "location", "date", "vaccine", "source_url", "total_vaccinations", "people_vaccinated", "people_fully_vaccinated", "total_boosters", ] ] )
[docs] def export(self): # Read df, df_age, df_man = self.read(), self.read_age(), self.read_manufacturer() # Transform df_age = df_age.pipe(self.pipeline_age) df_man = df_man.pipe(self.pipeline_manufacturer) df = df.pipe(self.pipeline, df_man) # Export self.export_datafile( df=df, df_age=df_age, df_manufacturer=df_man, meta_age={"source_name": self.source_name, "source_url": self.source_url_age}, meta_manufacturer={"source_name": self.source_name, "source_url": self.source_url_man}, )
[docs]def main(): Canada().export()