from datetime import datetime
import os
import pytz
import requests
import pandas as pd
from uk_covid19 import Cov19API
from cowidev import PATHS
from cowidev.grapher.db.utils.db_imports import import_dataset
DATASET_NAME = "uk_covid_data"
OUTPUT_CSV = os.path.join(PATHS.INTERNAL_GRAPHER_DIR, f"{DATASET_NAME}.csv")
ZERO_DAY = "2020-01-01"
[docs]def get_uk() -> pd.DataFrame:
# Absolute
filters = ["areaType=overview"]
structure = {
"Year": "date",
"Country": "areaName",
"areaCode": "areaCode",
"weekly_cases_rolling": "newCasesByPublishDateRollingSum",
"cumulative_cases": "cumCasesByPublishDate",
"weekly_deaths_rolling": "newDeaths28DaysByPublishDateRollingSum",
"cumulative_deaths": "cumDeaths28DaysByPublishDate",
"daily_deaths": "newDeaths28DaysByPublishDate",
"daily_cases": "newCasesByPublishDate",
"test_positivity_rate": "uniqueCasePositivityBySpecimenDateRollingSum",
"weekly_hospital_admissions": "newAdmissionsRollingSum",
"people_in_hospital": "hospitalCases",
"people_ventilated": "covidOccupiedMVBeds",
}
api = Cov19API(filters=filters, structure=structure)
uk = api.get_dataframe()
# Rate
filters = ["areaType=overview"]
structure = {
"Year": "date",
"Country": "areaName",
"areaCode": "areaCode",
"cumulative_cases_rate": "cumCasesByPublishDateRate",
"cumulative_deaths_rate": "cumDeaths28DaysByPublishDateRate",
"weekly_cases_rate": "newCasesBySpecimenDateRollingRate",
"weekly_deaths_rate": "newDeaths28DaysByDeathDateRollingRate",
}
api = Cov19API(filters=filters, structure=structure)
uk_rate = api.get_dataframe()
# Merge
return pd.merge(uk, uk_rate)
[docs]def find_metric_peak(df: pd.DataFrame, metric: str, period_start="2020-12-09", period_end="2021-02-23") -> tuple:
period_df = df[(df.Year >= period_start) & (df.Year <= period_end)][["Year", metric]]
period_df = period_df.sort_values(metric, ascending=False, na_position="last")
peak_date = period_df.Year.values[0]
peak_value = period_df[metric].values[0]
return peak_date, peak_value
[docs]def add_decoupling_metrics(df: pd.DataFrame) -> pd.DataFrame:
if df.people_ventilated.isnull().all() or df.weekly_cases_rolling.isnull().all():
df[
[
"weekly_cases_rolling_normalized",
"people_in_hospital_normalized",
"people_ventilated_normalized",
"weekly_deaths_rolling_normalized",
]
] = pd.NA
return df
case_peak_date, case_peak_value = find_metric_peak(df, "weekly_cases_rolling")
hosp_peak_date, hosp_peak_value = find_metric_peak(df, "people_in_hospital")
icu_peak_date, icu_peak_value = find_metric_peak(df, "people_ventilated")
death_peak_date, death_peak_value = find_metric_peak(df, "weekly_deaths_rolling")
hosp_shift = (pd.to_datetime(hosp_peak_date) - pd.to_datetime(case_peak_date)).days
icu_shift = (pd.to_datetime(icu_peak_date) - pd.to_datetime(case_peak_date)).days
death_shift = (pd.to_datetime(death_peak_date) - pd.to_datetime(case_peak_date)).days
df["weekly_cases_rolling_normalized"] = (df.weekly_cases_rolling / case_peak_value).mul(100).round(2)
df["people_in_hospital_normalized"] = (df.people_in_hospital.shift(hosp_shift) / hosp_peak_value).mul(100).round(2)
df["people_ventilated_normalized"] = (df.people_ventilated.shift(icu_shift) / icu_peak_value).mul(100).round(2)
df["weekly_deaths_rolling_normalized"] = (
(df.weekly_deaths_rolling.shift(death_shift) / death_peak_value).mul(100).round(2)
)
return df
[docs]def get_nation() -> pd.DataFrame:
# Absolute
filters = ["areaType=nation"]
structure = {
"Year": "date",
"Country": "areaName",
"areaCode": "areaCode",
"cumulative_cases": "cumCasesByPublishDate",
"cumulative_deaths": "cumDeaths28DaysByPublishDate",
"weekly_cases_rolling": "newCasesByPublishDateRollingSum",
"weekly_deaths_rolling": "newDeaths28DaysByPublishDateRollingSum",
"daily_deaths": "newDeaths28DaysByPublishDate",
"daily_cases": "newCasesByPublishDate",
"test_positivity_rate": "uniqueCasePositivityBySpecimenDateRollingSum",
"weekly_hospital_admissions": "newAdmissionsRollingSum",
"people_in_hospital": "hospitalCases",
"people_ventilated": "covidOccupiedMVBeds",
}
api = Cov19API(filters=filters, structure=structure)
nation = api.get_dataframe()
# Rate
filters = ["areaType=nation"]
structure = {
"Year": "date",
"Country": "areaName",
"areaCode": "areaCode",
"cumulative_cases_rate": "cumCasesByPublishDateRate",
"cumulative_deaths_rate": "cumDeaths28DaysByPublishDateRate",
"weekly_cases_rate": "newCasesBySpecimenDateRollingRate",
"weekly_deaths_rate": "newDeaths28DaysByDeathDateRollingRate",
}
api = Cov19API(filters=filters, structure=structure)
nation_rate = api.get_dataframe()
# Merge
return pd.merge(nation, nation_rate)
[docs]def get_local() -> pd.DataFrame:
# Absolute
filters = ["areaType=utla"]
metrics = {
"Year": "date",
"Country": "areaName",
"areaCode": "areaCode",
"cumulative_cases": "cumCasesByPublishDate",
"cumulative_deaths": "cumDeaths28DaysByPublishDate",
"weekly_cases_rolling": "newCasesByPublishDateRollingSum",
"weekly_deaths_rolling": "newDeaths28DaysByPublishDateRollingSum",
"daily_deaths": "newDeaths28DaysByPublishDate",
"daily_cases": "newCasesByPublishDate",
"test_positivity_rate": "uniqueCasePositivityBySpecimenDateRollingSum",
}
api = Cov19API(filters=filters, structure=metrics)
local = api.get_dataframe().sort_values("Year")
# Rate
url_local_rate = (
"https://api.coronavirus.data.gov.uk/v2/data?areaType=utla&metric=cumCasesByPublishDateRate&"
"metric=cumDeaths28DaysByPublishDateRate&metric=newCasesBySpecimenDateRollingRate&"
"metric=newDeaths28DaysByDeathDateRollingRate"
)
local_rate = requests.get(url_local_rate).json()
local_rate = pd.DataFrame.from_records(local_rate["body"], exclude=["areaType"])
local_rate = local_rate.rename(
columns={
"areaName": "Country",
"date": "Year",
"cumCasesByPublishDateRate": "cumulative_cases_rate",
"cumDeaths28DaysByPublishDateRate": "cumulative_deaths_rate",
"newCasesBySpecimenDateRollingRate": "weekly_cases_rate",
"newDeaths28DaysByDeathDateRollingRate": "weekly_deaths_rate",
}
)
# Merge
return pd.merge(local, local_rate)
[docs]def get_nhs_region() -> pd.DataFrame:
filters = ["areaType=nhsRegion"]
metrics = {
"Year": "date",
"Country": "areaName",
"areaCode": "areaCode",
"weekly_hospital_admissions": "newAdmissionsRollingSum",
"people_in_hospital": "hospitalCases",
}
api = Cov19API(filters=filters, structure=metrics)
return api.get_dataframe()
[docs]def get_day_diff(dt):
return (datetime.strptime(dt, "%Y-%m-%d") - datetime.strptime(ZERO_DAY, "%Y-%m-%d")).days
[docs]def generate_dataset():
combined = pd.concat([get_uk(), get_nation(), get_local(), get_nhs_region()])
combined = combined.drop_duplicates(subset=["Country", "Year"], keep="first")
combined = combined.groupby("Country").apply(add_decoupling_metrics)
combined["daily_cases_rolling_average"] = combined["weekly_cases_rolling"] / 7
combined["daily_deaths_rolling_average"] = combined["weekly_deaths_rolling"] / 7
combined["daily_cases_rate_rolling_average"] = combined["weekly_cases_rate"] / 7
combined["daily_deaths_rate_rolling_average"] = combined["weekly_deaths_rate"] / 7
combined["new_hospital_admissions"] = combined["weekly_hospital_admissions"] / 7
combined["Year"] = combined["Year"].apply(get_day_diff)
combined = combined[["Country"] + [col for col in combined.columns if col != "Country"]]
combined = (
combined.dropna(how="any", subset=["weekly_cases_rolling"])
.drop(columns="areaCode")
.sort_values(["Country", "Year"])
)
# Export
combined.to_csv(OUTPUT_CSV, index=False)
[docs]def update_db():
time_str = datetime.now().astimezone(pytz.timezone("Europe/London")).strftime("%-d %B %Y")
source_name = f"UK Government COVID-19 Dashboard – Last updated {time_str}"
import_dataset(
dataset_name=DATASET_NAME,
namespace="owid",
csv_path=OUTPUT_CSV,
default_variable_display={"yearIsDay": True, "zeroDay": ZERO_DAY},
source_name=source_name,
slack_notifications=False,
)
if __name__ == "__main__":
generate_dataset()