import pandas as pd
from typing import List, Tuple
from cowidev.utils.utils import check_known_columns
from cowidev.vax.utils.base import CountryVaxBase
[docs]class Italy(CountryVaxBase):
source_url: str = "https://raw.githubusercontent.com/italia/covid19-opendata-vaccini/master/dati/somministrazioni-vaccini-latest.csv"
location: str = "Italy"
columns: list = [
"data",
"forn",
"eta",
"d1",
"d2",
"dpi",
"db1",
# "dbi",
"db2",
]
columns_rename: dict = {
"data": "date",
"forn": "vaccine",
"eta": "age_group",
}
vaccine_mapping: dict = {
"Pfizer/BioNTech": "Pfizer/BioNTech",
"Pfizer Pediatrico": "Pfizer/BioNTech",
"Moderna": "Moderna",
"Vaxzevria (AstraZeneca)": "Oxford/AstraZeneca",
"Janssen": "Johnson&Johnson",
"Novavax": "Novavax",
"ND": "unknown",
}
one_dose_vaccines: list = ["Johnson&Johnson"]
vax_date_mapping = None
[docs] def read(self) -> pd.DataFrame:
df = pd.read_csv(self.source_url)
check_known_columns(
df,
self.columns + ["m", "f", "N1", "N2", "ISTAT", "reg", "area", "reg"],
)
return df[self.columns]
[docs] def _check_vaccines(self, df: pd.DataFrame) -> pd.DataFrame:
vax_wrong = set(df["forn"]).difference(self.vaccine_mapping.keys())
if vax_wrong:
raise ValueError(f"Unknown vaccine(s) {vax_wrong}")
return df
[docs] def rename_columns(self, df: pd.DataFrame) -> pd.DataFrame:
return df.rename(columns=self.columns_rename)
[docs] def translate_vaccine_columns(self, df: pd.DataFrame) -> pd.DataFrame:
df = df.replace({"vaccine": self.vaccine_mapping})
return df[df.vaccine != "unknown"]
[docs] def get_total_vaccinations(self, df: pd.DataFrame) -> pd.DataFrame:
return df.assign(
total_vaccinations=df.d1 + df.d2 + df.dpi + df.db1
# + df.dbi
+ df.db2,
total_boosters=df.db1 + df.db2, # + df.dbi ,
)
[docs] def pipeline_base(self, df: pd.DataFrame) -> pd.DataFrame:
return (
df.pipe(self._check_vaccines)
.pipe(self.rename_columns)
.pipe(self.translate_vaccine_columns)
.pipe(self.get_total_vaccinations)
)
[docs] def get_people_vaccinated(self, df: pd.DataFrame) -> pd.DataFrame:
return df.assign(people_vaccinated=df["d1"] + df["dpi"])
[docs] def get_people_fully_vaccinated(self, df: pd.DataFrame) -> pd.DataFrame:
return df.assign(
people_fully_vaccinated=lambda x: x.apply(
lambda row: row["d1"] + row["dpi"] if row["vaccine"] in self.one_dose_vaccines else row["d2"],
axis=1,
)
)
[docs] def get_final_numbers(self, df: pd.DataFrame) -> pd.DataFrame:
return (
df.groupby("date")[
["total_vaccinations", "people_vaccinated", "people_fully_vaccinated", "total_boosters"]
]
.sum()
.sort_index()
.cumsum()
.reset_index()
)
[docs] def enrich_location(self, df: pd.DataFrame) -> pd.DataFrame:
return df.assign(location=self.location)
[docs] def enrich_source(self, df: pd.DataFrame) -> pd.DataFrame:
return df.assign(source_url=self.source_url)
[docs] def vaccine_start_dates(self, df: pd.DataFrame) -> List[Tuple[str, str]]:
date2vax = sorted(
((df.loc[df["vaccine"] == vaccine, "date"].min(), vaccine) for vaccine in df.vaccine.unique()),
key=lambda x: x[0],
reverse=True,
)
return [(date2vax[i][0], ", ".join(sorted(set([v[1] for v in date2vax[i:]])))) for i in range(len(date2vax))]
[docs] def enrich_vaccine(self, df: pd.DataFrame) -> pd.DataFrame:
def _enrich_vaccine(date: str) -> str:
for dt, vaccines in self.vax_date_mapping:
if date >= dt:
return vaccines
raise ValueError(f"Invalid date {date} in DataFrame!")
return df.assign(vaccine=df["date"].apply(_enrich_vaccine))
[docs] def pipeline(self, df: pd.DataFrame) -> pd.DataFrame:
return (
df.pipe(self.get_people_vaccinated)
.pipe(self.get_people_fully_vaccinated)
.pipe(self.get_final_numbers)
.pipe(self.enrich_location)
.pipe(self.enrich_source)
.pipe(self.enrich_vaccine)
)
[docs] def get_total_vaccinations_by_manufacturer(self, df: pd.DataFrame) -> pd.DataFrame:
return (
df.groupby(["date", "vaccine"])["total_vaccinations"]
.sum()
.sort_index()
.reset_index()
.assign(total_vaccinations=lambda x: x.groupby("vaccine")["total_vaccinations"].cumsum())
)
[docs] def pipeline_manufacturer(self, df: pd.DataFrame) -> pd.DataFrame:
return df.pipe(self.get_total_vaccinations_by_manufacturer).pipe(self.enrich_location)
[docs] def export(self) -> None:
df_base = self.read().pipe(self.pipeline_base)
self.vax_date_mapping = self.vaccine_start_dates(df_base)
# Main
df = df_base.pipe(self.pipeline)
# Manufacturer
df_man = df_base.pipe(self.pipeline_manufacturer)
# Export
self.export_datafile(
df,
df_manufacturer=df_man,
meta_manufacturer={
"source_name": "Extraordinary commissioner for the Covid-19 emergency",
"source_url": self.source_url,
},
)
[docs]def main():
Italy().export()