import logging
import re
from datetime import datetime, timedelta
from urllib.error import HTTPError
import pandas as pd
from cowidev.utils import clean_count, clean_date_series, clean_date
from cowidev.vax.utils.base import CountryVaxBase
[docs]class Spain(CountryVaxBase):
location = "Spain"
vaccine_mapping = {
"Pfizer": "Pfizer/BioNTech",
"Moderna": "Moderna",
"AstraZeneca": "Oxford/AstraZeneca",
"Janssen": "Johnson&Johnson",
}
_date_field_raw = "Fecha de la última vacuna registrada (2)"
_max_days_back = 20
[docs] def read(self, last_update: str) -> pd.Series:
return self._parse_data(last_update)
[docs] def _parse_data(self, last_update: str):
"""Goes back _max_days_back days to retrieve data.
Does not exceed `last_update` date.
"""
records = []
for days in range(self._max_days_back):
date_it = clean_date(datetime.now() - timedelta(days=days))
# print(date_it)
# print(f"{date_it} > {last_update}?")
if date_it > last_update:
source = self._get_source_url(date_it.replace("-", ""))
try:
df_ = pd.read_excel(source, index_col=0, parse_dates=[self._date_field_raw])
except HTTPError:
# print(f"Date {date_it} not available!")
pass
# logging.info(f"Date {date_it} not available!")
else:
# print("Adding!")
self._check_vaccine_names(df_)
ds = self._parse_data_day(df_, source)
records.append(ds)
else:
# print("End!")
break
if len(records) > 0:
return pd.DataFrame(records)
print("No data being added to Spain")
return None
[docs] def _parse_data_day(self, df: pd.DataFrame, source: str) -> pd.Series:
"""Parse data for a single day"""
df.loc[~df.index.isin(["Sanidad Exterior"]), self._date_field_raw].dropna().max()
data = {
"total_vaccinations": clean_count(round(df.loc["Totales", "Dosis administradas (2)"])),
"people_vaccinated": clean_count(df.loc["Totales", "Nº Personas con al menos 1 dosis"]),
"people_fully_vaccinated": clean_count(df.loc["Totales", "Nº Personas vacunadas(pauta completada)"]),
"date": clean_date(
df.loc[
~df.index.isin(["Sanidad Exterior"]),
"Fecha de la última vacuna registrada (2)",
]
.dropna()
.max()
),
"source_url": source,
"vaccine": ", ".join(self._get_vaccine_names(df, translate=True)),
}
if (col_boosters := "Nº Personas con dosis adicional") in df.columns:
# print("EEE")
data["total_boosters"] = clean_count(df.loc["Totales", col_boosters])
return pd.Series(data=data)
[docs] def _get_source_url(self, dt_str):
return (
"https://www.mscbs.gob.es/profesionales/saludPublica/ccayes/alertasActual/nCov/documentos/"
f"Informe_Comunicacion_{dt_str}.ods"
)
[docs] def _get_vaccine_names(self, df: pd.DataFrame, translate: bool = False):
regex_vaccines = r"Dosis entregadas ([a-zA-Z]*) \(1\)"
if translate:
return sorted(
[
self.vaccine_mapping[re.search(regex_vaccines, col).group(1)]
for col in df.columns
if re.match(regex_vaccines, col)
]
)
else:
return sorted(
[re.search(regex_vaccines, col).group(1) for col in df.columns if re.match(regex_vaccines, col)]
)
[docs] def _check_vaccine_names(self, df: pd.DataFrame):
vaccines = self._get_vaccine_names(df)
unknown_vaccines = set(vaccines).difference(self.vaccine_mapping.keys())
if unknown_vaccines:
raise ValueError("Found unknown vaccines: {}".format(unknown_vaccines))
[docs] def pipe_date(self, df: pd.DataFrame) -> pd.DataFrame:
return df.assign(date=clean_date_series(df[self._date_field_raw]))
[docs] def pipe_location(self, df: pd.DataFrame) -> pd.DataFrame:
return df.assign(location=self.location)
[docs] def pipeline(self, df: pd.DataFrame) -> pd.DataFrame:
return df.pipe(self.pipe_location)
[docs] def export(self):
last_update = self.load_datafile().date.astype(str).max()
df = self.read(last_update)
if df is not None:
df = df.pipe(self.pipeline)
self.export_datafile(df, attach=True)
[docs]def main():
Spain().export()