import pandas as pd
from cowidev.utils.utils import check_known_columns
from cowidev.utils.web import request_json
from cowidev.vax.utils.checks import VACCINES_ONE_DOSE
from cowidev.vax.utils.base import CountryVaxBase
from cowidev.vax.utils.utils import make_monotonic, build_vaccine_timeline, add_latest_who_values
[docs]class Romania(CountryVaxBase):
source_url: str = "https://d35p9e4fm9h3wo.cloudfront.net/latestData.json"
source_url_ref: str = "https://datelazi.ro/"
location: str = "Romania"
columns_rename: dict = {
"total_administered": "total_vaccinations",
"immunized": "people_fully_vaccinated",
}
vaccine_mapping: dict = {
"pfizer": "Pfizer/BioNTech",
"pfizer_pediatric": "Pfizer/BioNTech",
"moderna": "Moderna",
"astra_zeneca": "Oxford/AstraZeneca",
"johnson_and_johnson": "Johnson&Johnson",
}
[docs] def read(self) -> pd.DataFrame:
data = request_json(self.source_url)
df = pd.DataFrame.from_dict(data["historicalData"], orient="index")
check_known_columns(
df,
[
"parsedOn",
"parsedOnString",
"fileName",
"complete",
"averageAge",
"numberInfected",
"numberCured",
"numberDeceased",
"percentageOfWomen",
"percentageOfMen",
"percentageOfChildren",
"numberTotalDosesAdministered",
"distributionByAge",
"countyInfectionsNumbers",
"incidence",
"large_cities_incidence",
"small_cities_incidence",
"vaccines",
],
)
return df[["vaccines", "numberTotalDosesAdministered"]].reset_index().dropna().sort_values(by="index")
[docs] def pipe_unnest_data(self, df: pd.DataFrame) -> pd.DataFrame:
def _data_by_day(record):
return (
pd.DataFrame.from_records(record[1])
.transpose()
.reset_index()
.rename(columns={"index": "vaccine"})
.assign(date=record[0])
)
df = pd.concat(map(_data_by_day, df.values.tolist()))
# Check vaccine names - Any new ones?
vaccines_unknown = set(df.vaccine).difference(self.vaccine_mapping)
if vaccines_unknown:
raise ValueError(f"Unrecognized vaccine {vaccines_unknown}")
df["vaccine"] = df.vaccine.replace(self.vaccine_mapping)
return df
[docs] def pipe_sum_vaccines(self, df: pd.DataFrame) -> pd.DataFrame:
# Some vaccines are renamed to the same vaccine in our data e.g. 'pfizer' and 'pfizer_pediatric'
return df.groupby(["date", "vaccine"], as_index=False).sum()
[docs] def pipe_rename_columns(self, df: pd.DataFrame) -> pd.DataFrame:
return df.rename(columns=self.columns_rename)
[docs] def pipe_store_timeline(self, df: pd.DataFrame) -> pd.DataFrame:
self.vaccine_timeline = df[df.total_vaccinations > 0].groupby("vaccine").date.min().to_dict()
return df
[docs] def pipe_location(self, df: pd.DataFrame) -> pd.DataFrame:
return df.assign(location=self.location)
[docs] def pipeline_base(self, df: pd.DataFrame) -> pd.DataFrame:
return (
df.pipe(self.pipe_unnest_data)
.pipe(self.pipe_sum_vaccines)
.pipe(self.pipe_rename_columns)
.pipe(self.pipe_store_timeline)
.pipe(self.pipe_location)
)
[docs] def pipe_metrics(self, df: pd.DataFrame) -> pd.DataFrame:
# Calculate people_vaccinated
df.loc[df.vaccine.isin(VACCINES_ONE_DOSE), "people_vaccinated"] = df.people_fully_vaccinated
df.loc[-df.vaccine.isin(VACCINES_ONE_DOSE), "people_vaccinated"] = (
df.total_vaccinations - df.people_fully_vaccinated
)
# Sum by day, then sum over time
df = df.drop(columns="vaccine").groupby(["date", "location"], as_index=False).sum().sort_values("date")
df[["total_vaccinations", "people_fully_vaccinated", "people_vaccinated"]] = (
df[["total_vaccinations", "people_fully_vaccinated", "people_vaccinated"]].cumsum().astype(int)
)
# Starting on 2021-09-28 (start of the booster rollout) we can no longer use
# people_vaccinated = total_vaccinations - people_fully_vaccinated
df.loc[df.date >= "2021-09-28", "people_vaccinated"] = pd.NA
return df
[docs] def pipe_source(self, df: pd.DataFrame) -> pd.DataFrame:
return df.assign(source_url=self.source_url_ref)
[docs] def pipe_vaccines(self, df: pd.DataFrame) -> pd.DataFrame:
return build_vaccine_timeline(df, self.vaccine_timeline)
[docs] def pipeline(self, df: pd.DataFrame) -> pd.DataFrame:
return (
df.pipe(self.pipe_metrics)
.pipe(self.pipe_source)
.pipe(self.pipe_vaccines)
.pipe(add_latest_who_values, "Romania", ["people_vaccinated"])
.pipe(make_monotonic)
)
[docs] def pipe_filter_rows_columns(self, df: pd.DataFrame) -> pd.DataFrame:
return df[df.total_vaccinations != 0].drop(columns="people_fully_vaccinated")
[docs] def pipe_manufacturer_cumsum(self, df: pd.DataFrame) -> pd.DataFrame:
df = df.sort_values("date")
df["total_vaccinations"] = df[["vaccine", "total_vaccinations"]].groupby("vaccine").cumsum()
return df
[docs] def pipeline_manufacturer(self, df: pd.DataFrame) -> pd.DataFrame:
return df.pipe(self.pipe_filter_rows_columns).pipe(self.pipe_manufacturer_cumsum)
[docs] def export(self):
df_base = self.read().pipe(self.pipeline_base)
# Main vaccination data
df = df_base.copy().pipe(self.pipeline)
# Manufacturer data
df_man = df_base.copy().pipe(self.pipeline_manufacturer)
# Export
self.export_datafile(
df,
df_manufacturer=df_man,
meta_manufacturer={
"source_name": "Government of Romania via datelazi.ro",
"source_url": self.source_url,
},
)
[docs]def main():
Romania().export()