from http.client import USE_PROXY
import pandas as pd
from cowidev.utils.clean.dates import clean_date, localdate
from cowidev.utils.utils import check_known_columns
from cowidev.utils.web.download import read_csv_from_url
from cowidev.vax.utils.orgs import ECDC_VACCINES
from cowidev.vax.utils.base import CountryVaxBase
from cowidev import PATHS
from cowidev.vax.utils.utils import build_vaccine_timeline
AGE_GROUPS_KNOWN = {
"ALL",
"Age0_4",
"Age15_17",
"Age5_9",
"Age10_14",
"Age<18",
"Age18_24",
"Age25_49",
"Age50_59",
"Age60_69",
"Age70_79",
"1_Age<60",
"1_Age60+",
"Age80+",
"AgeUNK",
"HCW",
"LTCF",
}
AGE_GROUPS_MUST_HAVE = {
"Age18_24",
"Age25_49",
"Age50_59",
"Age60_69",
"Age70_79",
"Age80+",
}
AGE_GROUP_UNDERAGE_LEVELS = {
"lvl0": "Age<18",
"lvl1": {
"Age0_4",
"Age5_9",
"Age10_14",
"Age15_17",
},
}
AGE_GROUPS_UNDERAGE = {AGE_GROUP_UNDERAGE_LEVELS["lvl0"]} | AGE_GROUP_UNDERAGE_LEVELS["lvl1"]
AGE_GROUPS_RELEVANT = AGE_GROUPS_UNDERAGE | AGE_GROUPS_MUST_HAVE
LOCATIONS_MAIN_INCLUDED = [
# "Austria",
"Portugal",
"Netherlands",
]
LOCATIONS_AGE_EXCLUDED = [
"Switzerland",
"Germany",
]
LOCATIONS_MANUFACTURER_EXCLUDED = [
"Czechia",
"France",
"Germany",
"Italy",
"Latvia",
"Romania",
"Iceland",
"Switzerland",
]
VACCINES_ONE_DOSE = ["JANSS"]
COLUMNS = {
"Denominator",
"FirstDose",
"FirstDoseRefused",
"NumberDosesReceived",
"Population",
"Region",
"ReportingCountry",
"SecondDose",
"TargetGroup",
"UnknownDose",
"Vaccine",
"YearWeekISO",
"DoseAdditional1",
"DoseAdditional2",
"NumberDosesExported",
}
[docs]class ECDC(CountryVaxBase):
location = "ECDC"
source_url = "https://opendata.ecdc.europa.eu/covid19/vaccine_tracker/csv/data.csv"
source_url_ref = "https://www.ecdc.europa.eu/en/publications-data/data-covid-19-vaccination-eu-eea"
vaccine_mapping = {**ECDC_VACCINES, "UNK": "Unknown"}
@property
def country_mapping(self):
return self._load_country_mapping(PATHS.INTERNAL_INPUT_ISO_FULL_FILE)
[docs] def read(self):
df = read_csv_from_url(self.source_url, timeout=30, use_proxy=True)
check_known_columns(
df,
[
"YearWeekISO",
"ReportingCountry",
"Denominator",
"NumberDosesReceived",
"NumberDosesExported",
"FirstDose",
"FirstDoseRefused",
"SecondDose",
"DoseAdditional1",
"DoseAdditional2",
"UnknownDose",
"Region",
"TargetGroup",
"Vaccine",
"Population",
],
)
return df
[docs] def _load_country_mapping(self, iso_path: str):
country_mapping = pd.read_csv(iso_path)
return dict(zip(country_mapping["alpha-2"], country_mapping["location"]))
[docs] def _weekday_to_date(self, d):
new_date = clean_date(d + "+5", "%Y-W%W+%w")
if new_date > localdate("Europe/London"):
new_date = clean_date(d + "+2", "%Y-W%W+%w")
return new_date
[docs] def pipe_initial_check(self, df: pd.DataFrame) -> pd.DataFrame:
# Vaccines
vaccines_wrong = set(df.Vaccine).difference(self.vaccine_mapping)
if vaccines_wrong:
raise ValueError(f"Unknown vaccines found. Check {vaccines_wrong}")
check_known_columns(df, COLUMNS)
return df
[docs] def pipe_base(self, df: pd.DataFrame) -> pd.DataFrame:
df = df.pipe(self.pipe_initial_check)
df = df.assign(
total_vaccinations=df[
["FirstDose", "SecondDose", "UnknownDose", "DoseAdditional1", "DoseAdditional2"]
].sum(axis=1),
people_vaccinated=df.FirstDose,
people_fully_vaccinated=df.SecondDose,
people_with_booster=df.DoseAdditional1,
total_boosters=df.DoseAdditional1 + df.DoseAdditional2,
date=df.YearWeekISO.apply(self._weekday_to_date),
location=df.ReportingCountry.replace(self.country_mapping),
)
# Update people_fully_vaccinated
mask = df.Vaccine.isin(VACCINES_ONE_DOSE)
df.loc[mask, "people_fully_vaccinated"] = df.loc[mask, "people_fully_vaccinated"] + df.loc[mask, "FirstDose"]
return df.loc[df.Region.isin(self.country_mapping.keys())]
[docs] def _vaccine_timeseries(self, df: pd.DataFrame):
"""Get Series with the vaccine timeseries for all countries.
Format:
location -> {vaccine_1: start_date_1, vaccine_2: start_date_2, ...}
"""
x = df[df.Vaccine.isin(ECDC_VACCINES)]
x = x.assign(Vaccine=x.transform({"Vaccine": lambda x: ECDC_VACCINES[x]}))
x = x[x["total_vaccinations"].fillna(0) > 1]
vaccine_timeseries = (
x.groupby(["location", "Vaccine"], as_index=False)
.date.min()
.groupby("location")
.apply(lambda x: x.set_index("Vaccine")["date"].to_dict())
)
return vaccine_timeseries
[docs] def pipe_group(self, df: pd.DataFrame, group_field: str = None, group_field_renamed: str = None) -> pd.DataFrame:
if group_field is None:
cols_group = ["date", "location"]
cols_rename = {}
else:
cols_group = ["date", "location", group_field]
cols_rename = {group_field: group_field_renamed}
return (
df.groupby(cols_group, as_index=False)[
[
"total_vaccinations",
"people_vaccinated",
"people_fully_vaccinated",
"people_with_booster",
"total_boosters",
"UnknownDose",
]
]
.sum()
.rename(columns=cols_rename)
)
[docs] def pipe_cumsum(self, df: pd.DataFrame, group_field_renamed: str = None) -> pd.DataFrame:
if group_field_renamed is None:
cols_group = ["location"]
else:
cols_group = ["location", group_field_renamed]
return df.assign(
total_vaccinations=df.groupby(cols_group)["total_vaccinations"].cumsum(),
people_vaccinated=df.groupby(cols_group)["people_vaccinated"].cumsum(),
people_fully_vaccinated=df.groupby(cols_group)["people_fully_vaccinated"].cumsum(),
people_with_booster=df.groupby(cols_group)["people_with_booster"].cumsum(),
total_boosters=df.groupby(cols_group)["total_boosters"].cumsum(),
UnknownDose=df.groupby(cols_group)["UnknownDose"].cumsum(),
)
[docs] def pipeline_common(
self, df: pd.DataFrame, group_field: str = None, group_field_renamed: str = None
) -> pd.DataFrame:
cols = [
"date",
"location",
"total_vaccinations",
"people_vaccinated",
"people_fully_vaccinated",
"people_with_booster",
"total_boosters",
"UnknownDose",
]
if group_field_renamed is not None:
cols = cols + [group_field_renamed]
return (
df.pipe(self.pipe_group, group_field, group_field_renamed)[cols]
.sort_values("date")
.pipe(self.pipe_cumsum, group_field_renamed)
)
[docs] def pipe_filter_locations(self, df: pd.DataFrame):
"""Filters countries to be excluded and those with a high number of"""
return df[df.location.isin(LOCATIONS_MAIN_INCLUDED)]
[docs] def pipe_vaccine(self, df: pd.DataFrame, vax_timeline):
dfs = []
locations = df.location.unique()
for location in locations:
df_c = df[df.location == location]
df_c = build_vaccine_timeline(df_c, vax_timeline[location])
dfs.append(df_c)
return pd.concat(dfs, ignore_index=True)
[docs] def pipe_filter_targetgroup(self, df: pd.DataFrame):
dfs = []
dfg = df.groupby("location")
for _, _df in dfg:
if "Age<18" in _df.TargetGroup.unique():
tagetgroups = ["ALL", "Age<18"]
else:
tagetgroups = ["ALL"] + list(AGE_GROUP_UNDERAGE_LEVELS["lvl1"])
_df = _df.loc[df.TargetGroup.isin(tagetgroups)]
dfs.append(_df)
df = pd.concat(dfs)
return df
[docs] def pipeline(self, df: pd.DataFrame):
vax_timeline = self._vaccine_timeseries(df)
df = (
df.pipe(self.pipe_filter_targetgroup)
.pipe(self.pipeline_common)
.pipe(self.pipe_filter_locations)
.pipe(self.pipe_vaccine, vax_timeline)
.assign(source_url=self.source_url_ref)
)
# Boosters (people -> doses)
df = df.assign(total_boosters=df.people_with_booster)
return df[
[
"location",
"date",
"vaccine",
"source_url",
"total_vaccinations",
"people_vaccinated",
"people_fully_vaccinated",
"total_boosters",
]
]
[docs] def pipe_rename_vaccines(self, df: pd.DataFrame) -> pd.DataFrame:
return df.assign(vaccine=df.vaccine.replace(self.vaccine_mapping))
[docs] def pipe_manufacturer_filter_locations(self, df: pd.DataFrame):
"""Filters countries to be excluded and those with a high number of unknown doses."""
def _get_perc_unk(x):
res = x.groupby("vaccine").total_vaccinations.sum()
res /= res.sum()
if not "Unknown" in res:
return 0
return res.loc["Unknown"]
threshold_unk_ratio = 0.05
mask = df.groupby("location").apply(_get_perc_unk) < threshold_unk_ratio
locations_valid = mask[mask].index.tolist()
locations_valid = [loc for loc in locations_valid if loc not in LOCATIONS_MANUFACTURER_EXCLUDED]
df = df[df.location.isin(locations_valid)]
return df
[docs] def pipe_manufacturer_filter_entries(self, df: pd.DataFrame):
return df[~df.vaccine.isin(["Unknown"])]
[docs] def pipeline_manufacturer(self, df: pd.DataFrame):
group_field_renamed = "vaccine"
return (
df.loc[df.TargetGroup.isin(["ALL", "Age<18"])]
.pipe(self.pipeline_common, "Vaccine", group_field_renamed)
.pipe(self.pipe_rename_vaccines)
.pipe(self.pipe_manufacturer_filter_locations)
.pipe(self.pipe_manufacturer_filter_entries)[["location", "date", "vaccine", "total_vaccinations"]]
.sort_values(["location", "date", "vaccine"])
)
[docs] def pipe_age_checks(self, df: pd.DataFrame) -> pd.DataFrame:
# Check all age groups are valid names
ages_groups_wrong = set(df.age_group).difference(AGE_GROUPS_KNOWN)
if ages_groups_wrong:
raise ValueError(f"Unknown age groups found. Check {ages_groups_wrong}")
return df
[docs] def pipe_age_filter_locations(self, df: pd.DataFrame) -> pd.DataFrame:
"""Filter locations and keep only valid ones.
Validity is defined as a country having all age groups defined by `AGE_GROUPS_MUST_HAVE`.
"""
locations = df.location.unique()
locations_valid = []
for location in locations:
df_c = df.loc[df.location == location]
if not AGE_GROUPS_MUST_HAVE.difference(df_c.age_group.unique()):
locations_valid.append(location)
locations_valid = [loc for loc in locations_valid if loc not in LOCATIONS_AGE_EXCLUDED]
df = df[df.location.isin(locations_valid)]
return df
[docs] def pipe_age_filter_entries(self, df: pd.DataFrame) -> pd.DataFrame:
"""More granular filter. Keep entries where data is deemed reliable.
1. Checks field ALL is equal to sum of all other ages (within 5% error). If not filters rows out.
2. If percentage of unknown doses is above 5% of total doses, filters row out.
"""
# Find valid dates + location
x = df.pivot(index=["date", "location"], columns="age_group", values="total_vaccinations").reset_index()
x = x.dropna(subset=AGE_GROUPS_MUST_HAVE, how="any")
# Create debug variable (= sum of all ages)
x = x.assign(
debug_u18=x[AGE_GROUP_UNDERAGE_LEVELS["lvl0"]].fillna(x[AGE_GROUP_UNDERAGE_LEVELS["lvl1"]].sum(axis=1))
)
x = x.assign(debug=x[AGE_GROUPS_MUST_HAVE].sum(axis=1) + x.debug_u18)
x = x.assign(
debug_diff=x.ALL - x.debug,
debug_diff_perc=(x.ALL - x.debug) / x.ALL,
)
threshold_missmatch_ratio = 0.05 # Keep only those days where missmatch between sum(ages) and total is <5%
x = x[x.debug_diff_perc <= threshold_missmatch_ratio]
valid_entries_ids = x[["date", "location"]]
if not valid_entries_ids.value_counts().max() == 1:
raise ValueError("Some entries appear to be duplicated")
df = df.merge(valid_entries_ids, on=["date", "location"])
# Filter entries with too many unknown doses (where more 5% of doses are unknown)
threshold_unknown_doses_ratio = 0.05
df = df[(df.UnknownDose / df.total_vaccinations) < threshold_unknown_doses_ratio]
return df
[docs] def pipe_age_groups(self, df: pd.DataFrame) -> pd.DataFrame:
"""Build age groups."""
# df = df[~df.age_group.isin(['LTCF', 'HCW', 'AgeUNK', 'ALL'])]
df_ = df[df.age_group.isin(AGE_GROUPS_RELEVANT)].copy()
df_ = df_.assign(age_group_modified=df_.age_group.replace({"Age<18": "Age0_17"}))
regex = r"(?:1_)?Age(\d{1,2})?(?:\+|<)?_?(\d{1,2})?"
df_[["age_group_min", "age_group_max"]] = df_.age_group_modified.str.extract(regex)
# df_ = df_.assign(age_group_min=df_.age_group_min.fillna(0))
# df.loc[df.age_group == "1_Age60+", ["age_group_min", "age_group_max"]] = [60, pd.NA]
# df.loc[df.age_group == "1_Age<60", ["age_group_min", "age_group_max"]] = [0, 60]
return df_
[docs] def pipe_age_relative_metrics(self, df: pd.DataFrame, df_og: pd.DataFrame) -> pd.DataFrame:
df_den = df_og.loc[df_og.TargetGroup.isin(AGE_GROUPS_RELEVANT)].dropna(subset=["Denominator"])
if df_den.Denominator.isnull().any():
raise ValueError(f"Denomintor found to be null: {df_den[df_den.Denominator.isnull()]}")
res = df_den.groupby(["date", "location", "TargetGroup"]).Denominator.nunique()
if (res != 1).any():
raise ValueError(
"Several Denomintor values found for same (date, location, TargetGroup):"
f" {res[res== 1].index.tolist()}"
)
df_den = df_den[["date", "location", "TargetGroup", "Denominator"]].drop_duplicates()
df = df.merge(
df_den,
left_on=["date", "age_group", "location"],
right_on=["date", "TargetGroup", "location"],
)
return df.assign(
people_vaccinated_per_hundred=(100 * df.people_vaccinated / df.Denominator).round(2),
people_fully_vaccinated_per_hundred=(100 * df.people_fully_vaccinated / df.Denominator).round(2),
people_with_booster_per_hundred=(100 * df.people_with_booster / df.Denominator).round(2),
)
[docs] def pipeline_age(self, df: pd.DataFrame):
group_field_renamed = "age_group"
return (
df
# .dropna(subset=["Denominator"])
.pipe(self.pipeline_common, "TargetGroup", group_field_renamed)
.pipe(self.pipe_age_checks)
.pipe(self.pipe_age_filter_locations)
.pipe(self.pipe_age_filter_entries)
.pipe(self.pipe_age_groups)
.pipe(self.pipe_age_relative_metrics, df)
.drop(columns=[group_field_renamed])
.sort_values(["location", "date", "age_group_min"])
)
[docs] def _filter_age_targetgroup(self, df_c: pd.DataFrame):
# Filter age groups
date_0 = df_c.loc[df_c.TargetGroup.isin({AGE_GROUP_UNDERAGE_LEVELS["lvl0"]}), "date"].unique()
date_1 = df_c.loc[df_c.TargetGroup.isin(AGE_GROUP_UNDERAGE_LEVELS["lvl1"]), "date"].unique()
if (len(date_0) == len(date_1)) | (len(date_0) == 0):
age_group_selection = AGE_GROUPS_MUST_HAVE | AGE_GROUP_UNDERAGE_LEVELS["lvl1"]
elif len(date_1) == 0:
age_group_selection = AGE_GROUPS_MUST_HAVE | {AGE_GROUP_UNDERAGE_LEVELS["lvl0"]}
else:
if (df_c.location == "Ireland").any():
age_group_selection = AGE_GROUPS_MUST_HAVE | AGE_GROUP_UNDERAGE_LEVELS["lvl1"]
else:
raise ValueError(
f"Can't choose between under age groups. Restriction might be too strict, consider relaxing it!"
)
df_c = df_c[df_c.TargetGroup.isin(age_group_selection)]
return df_c
[docs] def export_age(self, df: pd.DataFrame):
df_age = df.pipe(self.pipeline_age)
# Export
locations = df_age.location.unique()
for location in locations:
df_c = df_age[df_age.location == location].pipe(self._filter_age_targetgroup).copy()
self.export_datafile(
df_age=df_c,
filename=location,
meta_age={
"source_name": "European Centre for Disease Prevention and Control (ECDC)",
"source_url": self.source_url_ref,
},
)
[docs] def export_manufacturer(self, df: pd.DataFrame):
df_manufacturer = df.pipe(self.pipeline_manufacturer)
# Export
locations = df_manufacturer.location.unique()
for location in locations:
df_c = df_manufacturer[df_manufacturer.location == location].copy()
self.export_datafile(
df_manufacturer=df_c,
filename=location,
meta_manufacturer={
"source_name": "European Centre for Disease Prevention and Control (ECDC)",
"source_url": self.source_url_ref,
},
attach_manufacturer=True,
)
[docs] def export_main(self, df: pd.DataFrame):
df = df.pipe(self.pipeline)
# Export
locations = df.location.unique()
for location in locations:
df_c = df[df.location == location].copy()
msk = (
df_c[["total_vaccinations", "people_vaccinated", "people_fully_vaccinated", "total_vaccinations"]].sum(
axis=1
)
!= 0
)
df_c = df_c.loc[msk]
self.export_datafile(df_c, filename=location)
[docs] def export(self):
# Read data
df = self.read().pipe(self.pipe_base)
# Main
self.export_main(df)
# Age
self.export_age(df)
# Manufacturer
self.export_manufacturer(df)
[docs]def main():
ECDC().export()