Source code for cowidev.vax.batch.switzerland

from datetime import datetime

import pandas as pd

from cowidev.utils.web import request_json, get_soup
from cowidev.utils.clean import clean_date_series
from cowidev.vax.utils.checks import validate_vaccines
from cowidev.vax.utils.base import CountryVaxBase
from cowidev.vax.utils.utils import build_vaccine_timeline


[docs]class Switzerland(CountryVaxBase): location = "Switzerland" def __init__(self): self.source_url = "https://opendata.swiss/en/dataset/covid-19-schweiz" self.vaccine_mapping = { "pfizer_biontech": "Pfizer/BioNTech", "moderna": "Moderna", "johnson_johnson": "Johnson&Johnson", "novavax": "Novavax", "moderna_bivalent": "Moderna", }
[docs] def read(self): doses_url, people_url, manufacturer_url = self._get_file_url() df, df_manufacturer = self._parse_data(doses_url, people_url, manufacturer_url) df_age = self.read_age() return df, df_manufacturer, df_age
[docs] def read_age(self): soup = get_soup(self.source_url) url = self._parse_age_link(soup) return pd.read_csv(url)
[docs] def _get_file_url(self) -> str: response = request_json("https://www.covid19.admin.ch/api/data/context") context = response["sources"]["individual"]["csv"] doses_url = context["vaccDosesAdministered"] people_url = context["vaccPersonsV2"] manufacturer_url = context["weeklyVacc"]["byVaccine"]["vaccDosesAdministered"] return doses_url, people_url, manufacturer_url
[docs] def _parse_data(self, doses_url, people_url, manufacturer_url): # print(doses_url) # print(people_url) # print(manufacturer_url) doses = pd.read_csv( doses_url, usecols=["geoRegion", "date", "sumTotal", "type"], ) people = pd.read_csv( people_url, usecols=["geoRegion", "date", "sumTotal", "type", "age_group"], ) accepted_types = { "COVID19AtLeastOneDosePersons", "COVID19FullyVaccPersons", "COVID19PartiallyVaccPersons", "COVID19FirstBoosterPersons", "COVID19NotVaccPersons", "COVID19SecondBoosterPersons", } assert set(people.type) == accepted_types, "New type found! Check people.type" people = people[people.age_group == "total_population"].drop(columns=["age_group"]) manufacturer = pd.read_csv( manufacturer_url, usecols=["date", "geoRegion", "vaccine", "sumTotal"], ) return pd.concat([doses, people], ignore_index=True), manufacturer
[docs] def save_vaccine_timeline(self, df_manuf: pd.DataFrame) -> pd.DataFrame: self.vaccine_timeline = ( df_manuf[df_manuf.sumTotal > 0][["vaccine", "date"]] .replace(self.vaccine_mapping) .groupby("vaccine") .min() .to_dict()["date"] )
[docs] def pipe_filter_country(self, df: pd.DataFrame, country_code: str) -> pd.DataFrame: return df[df.geoRegion == country_code].drop(columns=["geoRegion"])
[docs] def pipe_unique_rows(self, df: pd.DataFrame): # Checks a = df.groupby(["date", "type"]).count().reset_index() if not a[a.sumTotal > 1].empty: raise ValueError("Duplicated rows in either `people` or `doses` dataframes!") return df
[docs] def pipe_pivot(self, df: pd.DataFrame) -> pd.DataFrame: return df.pivot(index=["date"], columns="type", values="sumTotal").reset_index().sort_values("date")
[docs] def pipe_rename_columns(self, df: pd.DataFrame) -> pd.DataFrame: return df.rename( columns={ "COVID19FullyVaccPersons": "people_fully_vaccinated", "COVID19VaccDosesAdministered": "total_vaccinations", "COVID19AtLeastOneDosePersons": "people_vaccinated", "COVID19FirstBoosterPersons": "total_boosters", "COVID19SecondBoosterPersons": "total_boosters_2", } )
[docs] def pipe_fix_metrics(self, df: pd.DataFrame) -> pd.DataFrame: df.loc[df.total_vaccinations < df.people_vaccinated, "total_vaccinations"] = df.people_vaccinated df = df.assign(total_boosters=df.total_boosters + df.total_boosters_2) return df
[docs] def pipe_location(self, df: pd.DataFrame, location: str) -> pd.DataFrame: return df.assign(location=location)
[docs] def pipe_source(self, df: pd.DataFrame, country_code: str) -> pd.DataFrame: return df.assign( source_url=f"{self.source_url}?detGeo={country_code}", )
[docs] def pipeline(self, df: pd.DataFrame, location: str) -> pd.DataFrame: geo_region = _get_geo_region(location) return ( df.pipe(self.pipe_filter_country, geo_region) .pipe(self.pipe_unique_rows) .pipe(self.pipe_pivot) .pipe(self.pipe_rename_columns) .pipe(self.pipe_fix_metrics) .pipe(self.pipe_location, location) .pipe(self.pipe_source, geo_region) .pipe(build_vaccine_timeline, self.vaccine_timeline)[ [ "location", "date", "vaccine", "source_url", "total_vaccinations", "people_vaccinated", "people_fully_vaccinated", "total_boosters", ] ] )
[docs] def pipeline_manufacturer(self, df: pd.DataFrame) -> pd.DataFrame: validate_vaccines(df, self.vaccine_mapping) df = df[df.sumTotal > 0] return ( df.rename(columns={"sumTotal": "total_vaccinations"})[df.geoRegion == "CH"] .drop(columns="geoRegion") .assign(location="Switzerland") .replace(self.vaccine_mapping) )
[docs] def pipe_age_filter_region(self, df, geo_region): # Only Switzerland return df[(df.geoRegion == geo_region) & (df.age_group_type == "age_group_AKL10")]
[docs] def pipe_age_checks(self, df): # Check population per age group is unique if not (df.groupby("altersklasse_covid19").pop.nunique() == 1).all(): raise ValueError("Different `pop` values for same `alterklasse_covid19` value!") # Check type type_wrong = set(df.type_variant).difference(["altersklasse_covid19"]) if type_wrong: raise ValueError(f"Invalid `type_variant` value: {type_wrong}") # Date+Age group uniqueness if not (df.groupby(["date", "altersklasse_covid19"]).type.value_counts() == 1).all(): raise ValueError("Some dates and age groups have multiple entries for same metric!") return df
[docs] def pipe_age_pivot(self, df): return df.pivot( index=["date", "altersklasse_covid19"], columns=["type"], values="per100PersonsTotal" ).reset_index()
[docs] def pipe_age_date(self, df): return df.assign(date=clean_date_series(df.date.apply(lambda x: datetime.strptime(str(x) + "+0", "%G%V+%w"))))
[docs] def pipe_age_location(self, df, location): return df.assign(location=location)
[docs] def pipe_age_rename_columns(self, df): return df.rename( columns={ "altersklasse_covid19": "age_group", "COVID19AtLeastOneDosePersons": "people_vaccinated_per_hundred", "COVID19FullyVaccPersons": "people_fully_vaccinated_per_hundred", } )
[docs] def pipe_age_groups(self, df): regex = r"(\d{1,2})+?(?: - (\d{1,2}))?" df[["age_group_min", "age_group_max"]] = df.age_group.str.extract(regex) return df
[docs] def pipe_age_select_cols(self, df): return df[ [ "location", "date", "age_group_min", "age_group_max", "people_vaccinated_per_hundred", "people_fully_vaccinated_per_hundred", ] ]
[docs] def pipeline_age(self, df, location): geo_region = _get_geo_region(location) df_ = df.copy() return ( df_.pipe(self.pipe_age_filter_region, geo_region) .pipe(self.pipe_age_checks) .pipe(self.pipe_age_pivot) .pipe(self.pipe_age_date) .pipe(self.pipe_age_location, location) .pipe(self.pipe_age_rename_columns) .pipe(self.pipe_age_groups) .pipe(self.pipe_age_select_cols) )
[docs] def export(self): locations = ["Switzerland", "Liechtenstein"] df, df_manuf, df_age = self.read() # Save vaccine timeline self.save_vaccine_timeline(df_manuf) # Main data for location in locations: df_c = df.pipe(self.pipeline, location) self.export_datafile(df_c, filename=location) # Manufacturer df_manuf = df_manuf.pipe(self.pipeline_manufacturer) self.export_datafile( df_manufacturer=df_manuf, meta_manufacturer={"source_name": "Federal Office of Public Health", "source_url": self.source_url}, ) # Age for location in locations: df_age_ = df_age.pipe(self.pipeline_age, location) self.export_datafile( df_age=df_age_, meta_age={"source_name": "Federal Office of Public Health", "source_url": self.source_url}, filename=location, )
[docs]def main(): Switzerland().export()
[docs]def _get_geo_region(location): if location == "Switzerland": return "CH" elif location == "Liechtenstein": return "FL" else: raise ValueError("Only Switzerland or Liechtenstein are accepted values for `location`.")