Source code for cowidev.vax.incremental.curacao
import pandas as pd
from cowidev.utils.clean.dates import localdate
from cowidev.utils.web import request_json
from cowidev.vax.utils.incremental import enrich_data, increment
[docs]class Curacao:
location = "Curacao"
source_url = "https://bakuna-counter.ibis-management.com/init/"
source_url_ref = "https://bakuna.cw/"
vaccine = "Pfizer/BioNTech, Moderna"
[docs] def read(self) -> pd.Series:
data = self._parse_data()
return pd.Series(
{
"people_vaccinated": data["total.dosis1"],
"people_fully_vaccinated": data["total.dosis2"],
"total_boosters": data["total.booster"],
}
)
[docs] def _parse_data(self) -> dict:
data = request_json(self.source_url)
return {d["code"]: d["count"] for d in data["stats"]}
[docs] def pipe_total_vaccinations(self, ds: pd.Series) -> pd.Series:
total_vaccinations = ds.people_vaccinated + ds.people_fully_vaccinated + ds.total_boosters
return enrich_data(ds, "total_vaccinations", total_vaccinations)
[docs] def pipe_date(self, ds: pd.Series) -> pd.Series:
date_str = localdate("America/Curacao")
return enrich_data(ds, "date", date_str)
[docs] def pipeline(self, ds: pd.Series) -> pd.Series:
return ds.pipe(self.pipe_date).pipe(self.pipe_total_vaccinations)
[docs] def export(self):
data = self.read().pipe(self.pipeline)
increment(
location=self.location,
total_vaccinations=data["total_vaccinations"],
people_vaccinated=data["people_vaccinated"],
people_fully_vaccinated=data["people_fully_vaccinated"],
total_boosters=data["total_boosters"],
date=data["date"],
source_url=self.source_url_ref,
vaccine=self.vaccine,
)