Source code for cowidev.vax.incremental.zambia

from datetime import datetime

import pandas as pd

from cowidev.utils.clean import clean_date
from cowidev.utils.web import request_json
from cowidev.vax.utils.incremental import enrich_data, increment
from cowidev.vax.utils.base import CountryVaxBase
from cowidev.vax.utils.utils import add_latest_who_values


[docs]class Zambia(CountryVaxBase): def __init__(self) -> None: self.location = "Zambia" self.source_url = ( "https://services7.arcgis.com/OwPYxdqWv7612O7N/arcgis/rest/services/" "service_ef4ce56ba48a44ef82991dcf85f62f71/FeatureServer/0/query?f=json&&cacheHint=true&resultOffset=0&" "resultRecordCount=100&where=1=1&outFields=*&resultType=standard&returnGeometry=false&" "spatialRel=esriSpatialRelIntersects" ) self.source_url_ref = "https://rtc-planning.maps.arcgis.com/apps/dashboards/3b3a01c1d8444932ba075fb44b119b63"
[docs] def read(self): data = request_json(self.source_url)["features"][0]["attributes"] date = clean_date(datetime.fromtimestamp(data["EditDate"] / 1000)) return pd.Series( { "total_vaccinations": data["Vaccine_total"], "people_fully_vaccinated": data["Vaccine_total_last24"], "date": date, } )
[docs] def pipe_location(self, ds: pd.Series) -> pd.Series: return enrich_data(ds, "location", self.location)
[docs] def pipe_source(self, ds: pd.Series) -> pd.Series: return enrich_data(ds, "source_url", self.source_url_ref)
[docs] def pipe_vaccine(self, ds: pd.Series) -> pd.Series: return enrich_data(ds, "vaccine", "Johnson&Johnson, Oxford/AstraZeneca, Sinopharm/Beijing")
[docs] def pipeline(self, ds: pd.Series) -> pd.Series: return ds.pipe(self.pipe_location).pipe(self.pipe_source).pipe(self.pipe_vaccine)
[docs] def export(self): data = self.read().pipe(self.pipeline) df = pd.DataFrame(data).T df = add_latest_who_values(df, 'Zambia', ['people_vaccinated']) self.export_datafile(df=df, attach=True)
[docs]def main(): Zambia().export()