Source code for cowidev.vax.incremental.faeroe_islands

import pandas as pd

from cowidev.utils.clean import clean_count
from cowidev.utils.clean.dates import localdate
from cowidev.utils.web import request_json
from cowidev.vax.utils.incremental import enrich_data, increment


[docs]class FaeroeIslands: location: str = "Faeroe Islands" source_url: str = "https://corona.fo/json/stats" source_url_ref: str = "https://corona.fo/api"
[docs] def read(self) -> pd.Series: data = request_json(self.source_url)["stats"] return pd.DataFrame.from_records(data).iloc[0]
[docs] def pipe_metrics(self, ds: pd.Series) -> pd.Series: ds = enrich_data(ds, "people_vaccinated", clean_count(ds["first_vaccine_total"])) ds = enrich_data(ds, "people_fully_vaccinated", clean_count(ds["second_vaccine_total"])) ds = enrich_data(ds, "total_boosters", clean_count(ds["third_vaccine_toal"])) total_vaccinations = ds["people_vaccinated"] + ds["people_fully_vaccinated"] + ds["total_boosters"] return enrich_data(ds, "total_vaccinations", total_vaccinations)
[docs] def pipe_format_date(self, ds: pd.Series) -> pd.Series: date = localdate("Atlantic/Faeroe") return enrich_data(ds, "date", date)
[docs] def pipe_location(self, ds: pd.Series) -> pd.Series: return enrich_data(ds, "location", self.location)
[docs] def pipe_vaccine(self, ds: pd.Series) -> pd.Series: return enrich_data(ds, "vaccine", "Moderna, Pfizer/BioNTech")
[docs] def pipe_source(self, ds: pd.Series) -> pd.Series: return enrich_data(ds, "source_url", self.source_url_ref)
[docs] def pipeline(self, ds: pd.Series) -> pd.Series: return ( ds.pipe(self.pipe_metrics) .pipe(self.pipe_format_date) .pipe(self.pipe_location) .pipe(self.pipe_vaccine) .pipe(self.pipe_source) )
[docs] def export(self): data = self.read().pipe(self.pipeline) increment( location=str(data["location"]), total_vaccinations=int(data["total_vaccinations"]), people_vaccinated=int(data["people_vaccinated"]), people_fully_vaccinated=int(data["people_fully_vaccinated"]), total_boosters=int(data["total_boosters"]), date=str(data["date"]), source_url=str(data["source_url"]), vaccine=str(data["vaccine"]), )
[docs]def main(): FaeroeIslands().export()