import pandas as pd
from cowidev.utils.clean import clean_count
from cowidev.utils.clean.dates import localdate
from cowidev.utils.web import request_json
from cowidev.vax.utils.incremental import enrich_data, increment
[docs]class FaeroeIslands:
location: str = "Faeroe Islands"
source_url: str = "https://corona.fo/json/stats"
source_url_ref: str = "https://corona.fo/api"
[docs] def read(self) -> pd.Series:
data = request_json(self.source_url)["stats"]
return pd.DataFrame.from_records(data).iloc[0]
[docs] def pipe_metrics(self, ds: pd.Series) -> pd.Series:
ds = enrich_data(ds, "people_vaccinated", clean_count(ds["first_vaccine_total"]))
ds = enrich_data(ds, "people_fully_vaccinated", clean_count(ds["second_vaccine_total"]))
ds = enrich_data(ds, "total_boosters", clean_count(ds["third_vaccine_toal"]))
total_vaccinations = ds["people_vaccinated"] + ds["people_fully_vaccinated"] + ds["total_boosters"]
return enrich_data(ds, "total_vaccinations", total_vaccinations)
[docs] def pipe_location(self, ds: pd.Series) -> pd.Series:
return enrich_data(ds, "location", self.location)
[docs] def pipe_vaccine(self, ds: pd.Series) -> pd.Series:
return enrich_data(ds, "vaccine", "Moderna, Pfizer/BioNTech")
[docs] def pipe_source(self, ds: pd.Series) -> pd.Series:
return enrich_data(ds, "source_url", self.source_url_ref)
[docs] def pipeline(self, ds: pd.Series) -> pd.Series:
return (
ds.pipe(self.pipe_metrics)
.pipe(self.pipe_format_date)
.pipe(self.pipe_location)
.pipe(self.pipe_vaccine)
.pipe(self.pipe_source)
)
[docs] def export(self):
data = self.read().pipe(self.pipeline)
increment(
location=str(data["location"]),
total_vaccinations=int(data["total_vaccinations"]),
people_vaccinated=int(data["people_vaccinated"]),
people_fully_vaccinated=int(data["people_fully_vaccinated"]),
total_boosters=int(data["total_boosters"]),
date=str(data["date"]),
source_url=str(data["source_url"]),
vaccine=str(data["vaccine"]),
)
[docs]def main():
FaeroeIslands().export()