Source code for cowidev.vax.batch.united_states

from glob import glob
import os

import pandas as pd

from cowidev import PATHS
from cowidev.utils import clean_date_series
from cowidev.utils.utils import check_known_columns
from cowidev.vax.utils.utils import build_vaccine_timeline
from cowidev.vax.utils.base import CountryVaxBase


[docs]class UnitedStates(CountryVaxBase): def __init__(self): self.source_url = "https://data.cdc.gov/api/views/rh2h-3yt2/rows.csv?accessType=DOWNLOAD" self.source_url_ref = ( "https://data.cdc.gov/Vaccinations/COVID-19-Vaccination-Trends-in-the-United-States-N/rh2h-3yt2" ) self.source_url_age = "https://data.cdc.gov/resource/km4m-vcsb.json" self.location = "United States" ### Main processing ###
[docs] def read(self) -> pd.DataFrame: df = pd.read_csv(self.source_url) check_known_columns( df, [ "Date", "MMWR_week", "Location", "Administered_Daily", "Administered_Cumulative", "Administered_7_Day_Rolling_Average", "Admin_Dose_1_Daily", "Admin_Dose_1_Cumulative", "Admin_Dose_1_Day_Rolling_Average", "date_type", "Administered_daily_change_report", "Administered_daily_change_report_7dayroll", "Series_Complete_Daily", "Series_Complete_Cumulative", "Series_Complete_Day_Rolling_Average", "Booster_Daily", "Booster_Cumulative", "Booster_7_Day_Rolling_Average", "Series_Complete_Pop_Pct", "Administered_Dose1_Pop_Pct", "Additional_Doses_Vax_Pct", "Second_Booster_50Plus_Daily", "Second_Booster_50Plus_Vax_Pct", "Second_Booster_50Plus_7_Day_Rolling_Average", "Second_Booster_50Plus_Cumulative", ], ) return df[ [ "Date", "Location", "Administered_Cumulative", "Admin_Dose_1_Cumulative", "date_type", "Series_Complete_Cumulative", "Booster_Cumulative", "Second_Booster_50Plus_Cumulative", ] ]
[docs] def pipe_filter_rows(self, df: pd.DataFrame) -> pd.DataFrame: return df[(df.Location == "US") & (df.date_type == "Admin")]
[docs] def pipe_clean_data(self, df: pd.DataFrame) -> pd.DataFrame: df = ( df.assign(Date=clean_date_series(df.Date, format_input="%m/%d/%Y"), Location="United States") .drop(columns=["date_type"]) .rename( columns={ "Date": "date", "Location": "location", "Administered_Cumulative": "total_vaccinations", "Admin_Dose_1_Cumulative": "people_vaccinated", "Series_Complete_Cumulative": "people_fully_vaccinated", "Booster_Cumulative": "total_boosters", "Second_Booster_50Plus_Cumulative": "total_boosters_2", } ) .sort_values("date") ) df = df[df.total_vaccinations > 0].drop_duplicates(subset=["date"], keep=False) df = df.assign(total_boosters=df.total_boosters + df.total_boosters_2) return df
[docs] def pipe_add_source(self, df: pd.DataFrame) -> pd.DataFrame: return df.assign(source_url=self.source_url_ref)
[docs] def pipe_add_vaccines(self, df: pd.DataFrame) -> pd.DataFrame: schedule = { "Pfizer/BioNTech": "2020-12-01", "Moderna": "2020-12-23", "Johnson&Johnson": "2021-03-05", } return build_vaccine_timeline(df, schedule)
[docs] def pipeline(self, df: pd.DataFrame) -> pd.DataFrame: return ( df.pipe(self.pipe_filter_rows) .pipe(self.pipe_clean_data) .pipe(self.pipe_add_source) .pipe(self.pipe_add_vaccines) )
### Manufacturer processing ###
[docs] def read_manufacturer(self) -> pd.DataFrame: vaccine_cols = [ "Administered_Pfizer", "Administered_Moderna", "Administered_Janssen", ] dfs = [] for file in glob(os.path.join(PATHS.INTERNAL_INPUT_CDC_VAX_DIR, "cdc_data_*.csv")): try: df = pd.read_csv(file) for vc in vaccine_cols: if vc not in df.columns: df[vc] = pd.NA df = df[["Date", "LongName"] + vaccine_cols] dfs.append(df) except Exception: continue df = pd.concat(dfs) return df
[docs] def pipeline_manufacturer(self, df: pd.DataFrame) -> pd.DataFrame: # Renaming df = ( df[df.LongName == "United States"] .sort_values("Date") .rename( columns={ "Date": "date", "LongName": "location", "Administered_Pfizer": "Pfizer/BioNTech", "Administered_Moderna": "Moderna", "Administered_Janssen": "Johnson&Johnson", } ) ) # Melting df = df.melt(["date", "location"], var_name="vaccine", value_name="total_vaccinations") # Filter datapoint msk = (df.date == "2022-03-16") & (df.vaccine == "Johnson&Johnson") if (df.loc[msk, "total_vaccinations"] == 516219).all(): df = df[-msk] else: raise Exception("Please check value for J&J and date 2022-03-16 in manufacturer data") # Dropna df = df.dropna(subset=["total_vaccinations"]) # Make monotonic df = df.pipe(self.make_monotonic, "vaccine") return df
[docs] def export(self): # Main df = self.read().pipe(self.pipeline) # Manufacturer df_manufacturer = self.read_manufacturer().pipe(self.pipeline_manufacturer) # Export self.export_datafile( df, df_manufacturer=df_manufacturer, meta_manufacturer={ "source_name": "Centers for Disease Control and Prevention", "source_url": "https://covid.cdc.gov/covid-data-tracker/COVIDData/getAjaxData?id=vaccination_data", }, )
[docs]def main(): UnitedStates().export()