Source code for cowidev.hosp.etl
import os
from sys import modules
import time
import importlib
import json
from joblib import Parallel, delayed
import pandas as pd
from pandas.api.types import is_string_dtype
from cowidev import PATHS
from cowidev.utils.log import get_logger
logger = get_logger()
[docs]class HospETL:
[docs] def extract(
self,
modules: list,
parallel: bool = False,
n_jobs: int = -2,
modules_skip: list = [],
):
"""Get the data for all locations.
- Build preliminary dataframe with all locations data.
- Build metadata dataframe with locations metadata (source url, source name, etc.)
"""
t0 = time.time()
# Sources
modules = [s for s in modules if s not in modules_skip]
if modules == []:
logger.info("HOSP - No data to be collected (check skipped countries)...")
return None
# Get data
modules_execution_results = self.extract_collect(parallel, n_jobs, modules=modules)
self._execution_summary(t0, modules_execution_results)
# Export data (checkpoint)
self.extract_export_checkpoint(modules_execution_results)
# Process output
df, df_meta = self.extract_process()
return {"df": df, "meta": df_meta}
[docs] def extract_collect(self, parallel, n_jobs, modules):
"""Collects data for all countries"""
logger.info("HOSP - Collecting data...")
if parallel:
modules_execution_results = Parallel(n_jobs=n_jobs, backend="threading")(
delayed(self._extract_entity)(
m,
)
for m in modules
)
else:
modules_execution_results = [self._extract_entity(m) for m in modules]
return modules_execution_results
[docs] def extract_export_checkpoint(self, modules_execution_results):
"""Exports downloaded data and metadata."""
logger.info("HOSP - Saving checkpoint data...")
for m in modules_execution_results:
if m is not None:
df = m[0]
metadata = m[1]
if isinstance(metadata, list):
for metadata_ in metadata:
df_ = df[df.entity == metadata_["entity"]]
df_.to_csv(
os.path.join(PATHS.INTERNAL_OUTPUT_HOSP_MAIN_DIR, f"{metadata_['entity']}.csv"),
index=False,
)
with open(
os.path.join(PATHS.INTERNAL_OUTPUT_HOSP_META_DIR, f"{metadata_['entity']}.json"), "w"
) as outfile:
json.dump(metadata_, outfile)
else:
df.to_csv(
os.path.join(PATHS.INTERNAL_OUTPUT_HOSP_MAIN_DIR, f"{metadata['entity']}.csv"), index=False
)
with open(
os.path.join(PATHS.INTERNAL_OUTPUT_HOSP_META_DIR, f"{metadata['entity']}.json"), "w"
) as outfile:
json.dump(metadata, outfile)
[docs] def extract_process(self):
"""Load checkpointed data."""
logger.info("HOSP - Loading checkpoint data...")
# Load & build data
data_paths = [
os.path.join(PATHS.INTERNAL_OUTPUT_HOSP_MAIN_DIR, p)
for p in os.listdir(PATHS.INTERNAL_OUTPUT_HOSP_MAIN_DIR)
if p[-3:] == "csv"
]
df = pd.concat([pd.read_csv(p) for p in data_paths])
# Load & buildmetadata
metadata_paths = [
os.path.join(PATHS.INTERNAL_OUTPUT_HOSP_META_DIR, p)
for p in os.listdir(PATHS.INTERNAL_OUTPUT_HOSP_META_DIR)
]
metadata = []
for p in metadata_paths:
with open(p, "r") as infile:
metadata.append(json.load(infile))
df_meta = self._build_metadata(metadata)
# Process output
df = df.dropna(subset=["value"])
# Remove duplicates
df = df.drop_duplicates() # subset=["date", "indicator", "entity"])
# Weird things
df = df[
~(
(df.entity == "Serbia")
& (df.date == "2020-03-09")
& (df.indicator == "Daily hospital occupancy")
& (df.value == 2)
)
]
# Check
duplicates = df[df.duplicated(subset=["date", "entity", "indicator"])]
if len(duplicates) > 0:
print(duplicates)
raise Exception("Some entity-date-indicator combinations are present more than once!")
return df, df_meta
[docs] def _build_metadata(self, metadata):
"""Build metadata dataframe (to be exported later to locations.csv)."""
# Flatten list
metadata = [[m] if not isinstance(m, list) else m for m in metadata]
metadata = [mm for m in metadata for mm in m]
# Build dataframe
metadata = [
{"location": m["entity"], "source_name": m["source_name"], "source_website": m["source_url_ref"]}
for m in metadata
]
df_meta = pd.DataFrame.from_records(metadata)
return df_meta
[docs] def _extract_entity(self, module_name: str):
"""Execute the process to get the data for a certain location (country)."""
t0 = time.time()
module = importlib.import_module(module_name)
logger.info(f"HOSP - {module_name}: started")
try:
df, metadata = module.main()
except Exception as err:
logger.error(f"HOSP - {module_name}: ❌ {err}", exc_info=True)
# raise Exception(f"Process for {module_name} did not work! Please check.")
# return None
else:
self._check_fields_df(df)
# Execution details
t = round(time.time() - t0, 2)
execution = {
"module_name": module_name,
"time": t,
}
logger.info(f"HOSP - {module_name}: SUCCESS ✅")
return df, metadata, execution
[docs] def _check_fields_df(self, df):
"""Check format of the data collected for a certain location."""
assert df.indicator.isin(
{
"Daily hospital occupancy",
"Daily ICU occupancy",
"Weekly new hospital admissions",
"Weekly new ICU admissions",
}
).all(), "One of the indicators for this country is not recognized!"
assert is_string_dtype(df.date), "The date column is not a string!"
[docs] def _build_time_df(self, execution):
"""Build execution time dataframe."""
df_time = (
pd.DataFrame([{"module": m["module_name"], "execution_time (sec)": m["time"]} for m in execution])
.set_index("module")
.sort_values(by="execution_time (sec)", ascending=False)
)
return df_time
[docs] def _execution_summary(self, t0, modules_execution_results):
"""Print a summary from the execution (timings)."""
execution = [m[2] for m in modules_execution_results if m is not None]
df_time = self._build_time_df(execution)
t_sec_1 = round(time.time() - t0, 2)
t_min_1 = round(t_sec_1 / 60, 2)
print("---")
print("TIMING DETAILS")
print(f"Took {t_sec_1} seconds (i.e. {t_min_1} minutes).")
print(f"Top most time consuming scripts:")
print(df_time.head(20))
print("---")
[docs] def pipe_metadata(self, df):
print("Adding ISO & population…")
shape_og = df.shape
population = pd.read_csv(PATHS.INTERNAL_INPUT_UN_POPULATION_FILE, usecols=["entity", "iso_code", "population"])
df = df.merge(population, on="entity")
if shape_og[0] != df.shape[0]:
raise ValueError(f"Dimension 0 after merge is different: {shape_og[0]} --> {df.shape[0]}")
return df
[docs] def pipe_per_million(self, df):
print("Adding per-capita metrics…")
per_million = df.copy()
per_million.loc[:, "value"] = per_million["value"].div(per_million["population"]).mul(1000000).round(3)
per_million.loc[:, "indicator"] = per_million["indicator"] + " per million"
df = pd.concat([df, per_million], ignore_index=True).drop(columns="population")
return df
[docs] def pipe_round_values(self, df):
df.loc[-df.indicator.str.contains("per million"), "value"] = df.value.round()
return df
[docs] def transform(self, df: pd.DataFrame):
return (
df.pipe(self.pipe_metadata)
.pipe(self.pipe_per_million)
.pipe(self.pipe_round_values)[["entity", "iso_code", "date", "indicator", "value"]]
.sort_values(["entity", "date", "indicator"])
)
[docs] def transform_meta(self, df_meta: pd.DataFrame, df: pd.DataFrame, locations_path: str):
# Get most recent date of data update
df_ = (
df.groupby(["entity", "iso_code"], as_index=False)
.date.max()
.rename(columns={"date": "last_observation_date"})
)
# Add iso and observation date to dataframe
df_meta = df_meta.merge(df_, left_on="location", right_on="entity", how="left")
# Fill with locations' metadata of countries not updated in this batch
# df_meta_current = pd.read_csv(locations_path)
# df_meta = (
# pd.concat([df_meta, df_meta_current])
# .sort_values("last_observation_date")
# .drop_duplicates(subset=["location"])
# )
# Order columns
df_meta = df_meta[
["location", "iso_code", "last_observation_date", "source_name", "source_website"]
].sort_values("location")
return df_meta
[docs] def load(self, df: pd.DataFrame, output_path: str) -> None:
# Export data
df.to_csv(output_path, index=False)
[docs] def run(self, parallel: bool, n_jobs: int, modules, modules_skip=[]):
data = self.extract(parallel=parallel, n_jobs=n_jobs, modules=modules, modules_skip=modules_skip)
if data is not None:
df = self.transform(data["df"])
df_meta = self.transform_meta(data["meta"], df, PATHS.DATA_HOSP_META_FILE)
self.load(df, PATHS.DATA_HOSP_MAIN_FILE)
self.load(df_meta, PATHS.DATA_HOSP_META_FILE)
[docs]def run_etl(parallel: bool, n_jobs: int, modules: list, modules_skip: list = []):
etl = HospETL()
etl.run(parallel, n_jobs, modules=modules, modules_skip=modules_skip)