Source code for cowidev.vax.utils.checks

from datetime import datetime
from itertools import chain

import pandas as pd


METRICS = ["total_vaccinations", "people_vaccinated", "people_fully_vaccinated", "total_boosters"]

VACCINES_ACCEPTED = [
    "Abdala",
    "CanSino",
    "Corbevax",
    "Covaxin",
    "COVIran Barekat",
    "KoviVac/Chumakov",
    "EpiVacCorona",
    "FAKHRAVAC",
    "IMBCAMS",
    "Johnson&Johnson",
    "KCONVAC",
    "Medicago",
    "Medigen",
    "Moderna",
    "Novavax",
    "Oxford/AstraZeneca",
    "Pfizer/BioNTech",
    "QazVac",
    "Razi Cov Pars",
    "Sanofi/GSK",
    "Sinopharm/Beijing",
    "Sinopharm/Wuhan",
    "Sinovac",
    "SKYCovione",
    "Soberana02",
    "Soberana Plus",
    "SpikoGen",
    "Sputnik Light",
    "Sputnik V",
    "Turkovac",
    "Valneva",
    "ZF2001",
    "ZyCoV-D",
]

VACCINES_ONE_DOSE = [
    "Johnson&Johnson",
    "CanSino",
    "Sputnik Light",
    "Soberana Plus",
]

VACCINES_THREE_DOSES = [
    "ZF2001",
    "Abdala",
    "Razi Cov Pars",  # 3rd dose (intranasal spray) not reported in Iran as a 3rd dose
    "ZyCoV-D",
]


[docs]def country_df_sanity_checks( df: pd.DataFrame, monotonic_check_skip: list = [], anomalies: bool = True, anomaly_check_skip: list = [], ) -> pd.DataFrame: checker = CountryChecker( df, monotonic_check_skip=monotonic_check_skip, anomalies=anomalies, anomaly_check_skip=anomaly_check_skip, ) checker.run()
[docs]class CountryChecker: def __init__( self, df: pd.DataFrame, allow_extra_cols: bool = True, monotonic_check_skip: list = [], anomalies: bool = True, anomaly_check_skip: list = [], ): self.location = self._get_location(df) self.df = df self.allow_extra_cols = allow_extra_cols self.skip_monocheck_ids = self._skip_check_ids(monotonic_check_skip) self.anomalies = anomalies self.skip_anomalcheck_ids = self._skip_check_ids(anomaly_check_skip)
[docs] def _get_location(self, df): x = df.loc[:, "location"].unique() if len(x) != 1: locations = df.loc[:, "location"].unique() raise ValueError(f"More than one location found: {locations}") return x[0]
[docs] def _skip_check_ids(self, check_skip): def _f(x): dt = x["date"].strftime("%Y%m%d") if isinstance(x["metrics"], list): return [dt + m for m in x["metrics"]] return [x["date"].strftime("%Y%m%d") + x["metrics"]] res = [_f(x) for x in check_skip] return list(chain.from_iterable(res))
@property def metrics_present(self): cols = ["total_vaccinations"] if "people_vaccinated" in self.df.columns: cols.append("people_vaccinated") if "people_fully_vaccinated" in self.df.columns: cols.append("people_fully_vaccinated") if "total_boosters" in self.df.columns: cols.append("total_boosters") return cols
[docs] def check_column_names(self): cols = ["total_vaccinations", "vaccine", "date", "location", "source_url"] cols_extra = cols + [ "people_vaccinated", "people_fully_vaccinated", "total_boosters", ] cols_missing = [col for col in cols if col not in self.df.columns] if cols_missing: raise ValueError(f"{self.location} -- df missing column(s): {cols_missing}.") # Ensure validity of column names in df if not self.allow_extra_cols: cols_wrong = [col for col in self.df.columns if col not in cols_extra] if cols_wrong: raise ValueError(f"{self.location} -- df contains invalid column(s): {cols_wrong}.")
[docs] def check_source_url(self): if self.df.source_url.isnull().any(): raise ValueError(f"{self.location} -- Invalid source_url! NaN values found.")
[docs] def check_vaccine(self): if self.df.vaccine.isnull().any(): raise ValueError(f"{self.location} -- Invalid vaccine! NaN values found.") vaccines_used = set([xx for x in self.df.vaccine.tolist() for xx in x.split(", ")]) if not all([vac in VACCINES_ACCEPTED for vac in vaccines_used]): vaccines_wrong = [vac for vac in vaccines_used if vac not in VACCINES_ACCEPTED] raise ValueError(f"{self.location} -- Invalid vaccine detected! Check {vaccines_wrong}.")
[docs] def check_date(self): if self.df.date.isnull().any(): raise ValueError(f"{self.location} -- Invalid dates! NaN values found.") if self.df.date.min() < datetime(2020, 12, 1): raise ValueError(f"{self.location} -- Invalid dates! Check {self.df.date.min()}") ds = self.df.date.value_counts() dates_wrong = ds[ds > 1].index msk = self.df.date.isin(dates_wrong) if not self.df[msk].empty: raise ValueError(f"{self.location} -- Check `date` field, there are duplicates: {self.df[msk]}")
[docs] def check_location(self): if self.df.location.isnull().any(): raise ValueError(f"{self.location} -- Invalid location! NaN values found. Check {self.df.location}.") if self.df.location.nunique() != 1: raise ValueError( f"{self.location} -- Invalid location! More than one location found. Check {self.df.location}." )
[docs] def check_metrics(self): df = self.df.sort_values(by="date") # [self.metrics_present] # Monotonically self._check_metrics_monotonic(df) # Inequalities self._check_metrics_inequalities(df) # Anomalies if self.anomalies: self._check_metrics_anomalies(df)
[docs] def _check_metrics_monotonic(self, df: pd.DataFrame): # Use info from monotonic_check_skip to raise exception or not for col in self.metrics_present: _x = df.dropna(subset=[col]) if not _x[col].is_monotonic: idx_wrong = _x[col].diff() < 0 wrong_rows = _x.loc[idx_wrong] wrong_ids = wrong_rows.date.dt.strftime("%Y%m%d") + col if not wrong_ids.isin(self.skip_monocheck_ids).all(): raise ValueError( f"{self.location} -- Column {col} must be monotonically increasing! Check:\n{wrong_rows}" )
[docs] def _check_metrics_inequalities(self, df: pd.DataFrame): if ("total_vaccinations" in df.columns) and ("people_vaccinated" in df.columns): df_ = df[["people_vaccinated", "total_vaccinations"]].dropna().copy() msk = df_["total_vaccinations"] < df_["people_vaccinated"] if (msk).any(): raise ValueError( f"{self.location} -- total_vaccinations can't be < people_vaccinated!\n{df.loc[msk[msk].index]}" ) if ("total_vaccinations" in df.columns) and ("people_fully_vaccinated" in df.columns): df_ = df[["people_fully_vaccinated", "total_vaccinations"]].dropna().copy() if (df_["total_vaccinations"] < df_["people_fully_vaccinated"]).any(): raise ValueError(f"{self.location} -- total_vaccinations can't be < people_fully_vaccinated!") if ("total_vaccinations" in df.columns) and ("total_boosters" in df.columns): df_ = df[["total_boosters", "total_vaccinations"]].dropna().copy() if (df_["total_vaccinations"] < df_["total_boosters"]).any(): raise ValueError(f"{self.location} -- total_vaccinations can't be < total_boosters!") if ("people_vaccinated" in df.columns) and ("people_fully_vaccinated" in df.columns): df_ = df[["people_vaccinated", "people_fully_vaccinated"]].dropna().copy() msk = df_["people_vaccinated"] < df_["people_fully_vaccinated"] if (msk).any(): raise ValueError( f"{self.location} -- people_vaccinated can't be <" f" people_fully_vaccinated!\n{df.loc[msk[msk].index]}" )
[docs] def _check_metrics_anomalies(self, df): for metric in self.metrics_present: self._check_anomalies(df, metric)
[docs] def _check_anomalies(self, df, metric, th=6): # Get metric values above 10,000 df_ = df.set_index("date") df_metric = df_.loc[(df_[metric] > 10000), metric] # Compute rolling average, 7 days. NaNs are filled with non-smoothed values window_size = "7d" m = df_metric.rolling(window_size, min_periods=2).mean().shift(1) m.loc[m.isnull()] = df_metric[m.isnull()] # Compute ratio between rolling average and value. Build Anomalies dataframe t = df_metric / (m + 1e-9) t = pd.DataFrame( { f"{metric}_{window_size}": m[t > th], f"{metric}_ratio": t[t > th], } ) anomalies = df_.loc[t.index, [metric]].merge(t, on="date").reset_index() if not anomalies.empty: wrong_ids = anomalies.date.dt.strftime("%Y%m%d") + metric if not wrong_ids.isin(self.skip_anomalcheck_ids).all(): raise ValueError(f"{self.location} -- Potential anomalies found ⚠️:\n{anomalies}")
[docs] def run(self): # Ensure required columns are present self.check_column_names() # Source url consistency self.check_source_url() # Vaccine consistency self.check_vaccine() # Date consistency self.check_date() # Location consistency self.check_location() # Metrics checks self.check_metrics()
[docs]def validate_vaccines(df, vaccines_accepted, vaccines_raw=None): if vaccines_raw != None: vaccines_wrong = set(vaccines_raw).difference(vaccines_accepted) else: vaccines_wrong = set(df["vaccine"].unique()).difference(vaccines_accepted) if vaccines_wrong: raise ValueError(f"Missing vaccines: {vaccines_wrong}")