from collections import defaultdict
import copy
import pandas as pd
from cowidev.utils.web import request_json
from cowidev.vax.utils.orgs import SPC_COUNTRIES
from cowidev.vax.utils.files import load_data
from cowidev.vax.utils.utils import make_monotonic
from cowidev.vax.utils.base import CountryVaxBase
from cowidev.vax.incremental.fiji import check_booster as fiji_booster
metrics_mapping = {
"COVIDVACAD1": "people_vaccinated",
"COVIDVACAD2": "people_fully_vaccinated",
"COVIDVACBST": "total_boosters",
"COVIDVACADT": "total_vaccinations",
}
# Dictionary containing vaccines being used in each country and their start date. Element 'default' is used for all
# countries not explicitly defined. None defaults to first date of vaccination campaign.
vaccines_startdates = {
"New Caledonia": [
["Pfizer/BioNTech", None],
],
"French Polynesia": [
["Johnson&Johnson, Pfizer/BioNTech", None],
],
"Tokelau": [
["Pfizer/BioNTech", None],
],
"Cook Islands": [
["Pfizer/BioNTech", None],
],
"Wallis and Futuna": [
["Moderna", None],
],
"Fiji": [["Oxford/AstraZeneca", None], ["Pfizer/BioNTech", "2021-11-15"], ["Moderna", "2021-07-20"]],
"default": [
["Oxford/AstraZeneca", None],
],
}
country_codes_url = "+".join(SPC_COUNTRIES.keys())
[docs]class SPC(CountryVaxBase):
location = "SPC"
source_url = (
f"https://stats-nsi-stable.pacificdata.org/rest/data/SPC,DF_COVID_VACCINATION,1.0/D.{country_codes_url}.?"
"startPeriod=2021-02-02&format=jsondata"
)
[docs] def read(self):
# Get data
# print(self.source_url)
data = request_json(self.source_url)
return self.parse_data(data)
[docs] def parse_data(self, data: dict):
series = data["data"]["dataSets"][0]["series"]
country_info = self._parse_country_info(data)
metrics_info = self._parse_metrics_info(data)
date_info = self._parse_date_info(data)
vaccination_data = defaultdict(dict)
for k, v in series.items():
_, country_idx, metric_idx = k.split(":")
if metric_idx in metrics_info:
vaccination_data[country_info[country_idx]][metrics_info[metric_idx]] = self._build_data_array(
v["observations"], date_info
)
return self._build_df_list(vaccination_data)
[docs] def _parse_country_info(self, data: dict):
# Get country info
country_info = data["data"]["structures"][0]["dimensions"]["series"][1]
if country_info["id"] != "GEO_PICT":
raise AttributeError("JSON data has changed")
return {str(i): SPC_COUNTRIES[c["id"]] for i, c in enumerate(country_info["values"])}
[docs] def _parse_metrics_info(self, data: dict):
# Get metrics info
metrics_info = data["data"]["structures"][0]["dimensions"]["series"][2]
if metrics_info["id"] != "INDICATOR":
raise AttributeError("JSON data has changed")
return {
str(i): metrics_mapping[m["id"]]
for i, m in enumerate(metrics_info["values"])
if m["id"] in metrics_mapping
}
[docs] def _parse_date_info(self, data: dict):
# Get date info
date_info = data["data"]["structures"][0]["dimensions"]["observation"][0]["values"]
return {str(i): d["name"] for i, d in enumerate(date_info)}
[docs] def _build_data_array(self, observations: dict, date_info: dict):
return {date_info[k]: v[0] if len(v) == 1 else None for k, v in observations.items()}
[docs] def _build_df_list(self, data: dict):
for k, v in data.items():
data[k] = self._build_df(v, k)
return data
[docs] def _build_df(self, dix: dict, country: str):
df = (
pd.DataFrame(dix)
.dropna(how="all")
.replace("", None)
.astype("Int64")
.drop_duplicates(keep="first")
.reset_index()
.rename(columns={"index": "date"})
.sort_values(by="date")
.assign(
location=country,
source_url=(
"https://stats.pacificdata.org/vis?tm=covid&pg=0&df[ds]=SPC2&df[id]=DF_COVID_VACCINATION&df[ag]=SPC&df"
"[vs]=1.0"
),
)
)
# Merge with legacy (spreadsheet)
if country in ["Fiji", "Nauru", "Vanuatu"]:
df = df.pipe(self.pipe_merge_legacy, country)
# Drop duplicates
df = df.pipe(self.pipe_drop_duplicates)
# Enforce data consistency
df.loc[
df.people_vaccinated < df.people_fully_vaccinated,
["people_vaccinated", "people_fully_vaccinated"],
] = pd.NA
# Make monotonic
df = df.pipe(make_monotonic)
# Add vaccine info
df = df.pipe(self.pipe_vacine, country)
# Add Boosters
if country in ["Fiji"]:
try:
fiji_booster()
except:
pass
else:
df = df.pipe(self.pipe_merge_boosters, country)
return df
[docs] def pipe_merge_legacy(self, df: pd.DataFrame, country: str) -> pd.DataFrame:
country = country.lower().replace(" ", "-")
df_legacy = load_data(f"{country}-legacy")
df_legacy = df_legacy[~df_legacy.date.isin(df.date)]
return pd.concat([df, df_legacy]).sort_values("date")
[docs] def pipe_drop_duplicates(self, df: pd.DataFrame) -> pd.DataFrame:
column_metrics = [
"people_vaccinated",
"total_vaccinations",
"people_fully_vaccinated",
]
msk = df.people_vaccinated == 0
df.loc[msk, "people_fully_vaccinated"] = pd.NA
df = df.drop_duplicates(subset=column_metrics)
return df
[docs] def pipe_vacine(self, df: pd.DataFrame, country: str) -> pd.DataFrame:
date_min = df.date.min()
vax_date_mapping = self._pretty_vaxdates(country, date_min)
def _enrich_vaccine(date: str) -> str:
for dt, vaccines in reversed(vax_date_mapping):
if date >= dt:
return vaccines
raise ValueError(f"Invalid date {date} in DataFrame!")
return df.assign(vaccine=df.date.apply(_enrich_vaccine))
[docs] def _pretty_vaxdates(self, country, date_min):
if country not in vaccines_startdates:
country = "default"
records = copy.deepcopy(vaccines_startdates[country])
# Substitute None by minimum date
for i, (_, dt) in enumerate(records):
if dt is None:
records[i][1] = date_min
# print(records)
records = sorted(records, key=lambda x: x[1])
# Build mapping dictionary
vax_date_mapping = [
(dt, ", ".join(sorted(r[0] for r in records[: i + 1]))) for i, (vax, dt) in enumerate(records)
]
return vax_date_mapping
[docs] def pipe_merge_boosters(self, df: pd.DataFrame, country: str) -> pd.DataFrame:
"""Adds the boosters data available in the csv."""
# Read the csv
country = country.replace(" ", "-")
filepath = self.get_output_path(country)
df_current = pd.read_csv(filepath)
# Pick only the relevant dates
df_mod = df_current[df_current.date.isin(df.date)]
# Add the booster column
df = df.assign(
total_boosters=df.date.apply(
lambda x: df_mod.loc[df_mod.date == x, "total_boosters"].values[0] if x in df_mod.date.values else None
)
)
# Add boosters to total_vaccinations
df["total_vaccinations"] = df[["total_vaccinations", "total_boosters"]].sum(axis=1)
# Add the standalone booster rows
df_current = df_current[~df_current.date.isin(df.date)]
return pd.concat([df, df_current]).sort_values("date")
[docs] def export(self):
data = self.read()
for country, df in data.items():
self.export_datafile(df, filename=country)
[docs]def main():
SPC().export()