import os
import json
import datetime
import requests
from tqdm import tqdm
import numpy as np
import pandas as pd
from cowidev import PATHS
from cowidev.utils.clean.dates import DATE_FORMAT
DEBUG = False
DATASET_NAME = "YouGov-Imperial COVID-19 Behavior Tracker"
# MIN_RESPONSES: country-date-question observations with less than this
# many valid responses will be dropped. If "None", no observations will
# be dropped.
MIN_RESPONSES = 100
# FREQ: temporal level at which to aggregate the individual survey
# responses, passed as the `freq` argument to
# pandas.Series.dt.to_period. Must conform to a valid Pandas offset
# string (e.g. 'M' = "month", "W" = "week").
FREQ = "M"
# ZERO_DAY: reference date for internal yearIsDay Grapher usage.
ZERO_DAY = "2020-01-21"
# File paths
PROJECT_DIR = PATHS.PROJECT_DIR
INPUT_PATH = os.path.join(PROJECT_DIR, "scripts", "input", "yougov")
OUTPUT_PATH = os.path.join(PROJECT_DIR, "scripts", "grapher")
MAPPING_PATH = os.path.join(INPUT_PATH, "mapping.csv")
MAPPING_VALUES_PATH = os.path.join(INPUT_PATH, "mapped_values.json")
MAPPING = pd.read_csv(MAPPING_PATH, na_values=None)
MAPPING["label"] = MAPPING["label"].str.lower()
with open(MAPPING_VALUES_PATH, "r") as f:
MAPPED_VALUES = json.load(f)
[docs]class YouGov:
def __init__(self, output_path: str, debug: bool = False):
self.source_url = "https://github.com/YouGov-Data/covid-19-tracker/raw/master"
self.debug = debug
self.output_path = output_path
self.dataset_name = DATASET_NAME
@property
def output_csv_path(self):
return os.path.join(self.output_path, f"{self.dataset_name}.csv")
@property
def output_csv_path_composite(self):
return os.path.join(self.output_path, f"{self.dataset_name}, composite variables.csv")
[docs] def _get_source_url_country(self, country, extension):
return f"{self.source_url}/data/{country}.{extension}"
@property
def source_url_master(self):
return f"{self.source_url}/countries.csv"
@property
def list_countries(self):
"""Get list of countries to download."""
# Get list of countries
countries = list(pd.read_csv(self.source_url_master, header=None)[0])
if self.debug:
return countries[:3]
return countries
[docs] def read(self):
"""Read data. Reads multiple countries and concatenates them into one file."""
# Load countries
all_data = []
for country in tqdm(self.list_countries):
tqdm.write(country)
df = self.read_country(country)
all_data.append(df)
# Build DataFrame
df = pd.concat(all_data, axis=0)
if df.columns.nunique() != df.columns.shape[0]:
raise ValueError("There are one or more duplicate columns, which may cause unexpected errors.")
return df
[docs] def read_country(self, country):
"""Read individual country data."""
# Load df from web
extensions = ["csv", "zip"]
df = None
for ext in extensions:
url = self._get_source_url_country(country, ext)
if requests.get(url).ok:
df = self._read_country_from_web(url, ext)
if df is None:
raise ValueError(f"No file found for {country}")
# Parse date field
df = df.assign(country=country)
df.columns = df.columns.str.lower()
return df
[docs] def _read_country_from_web(self, source_url_country, extension):
"""Given URL, reads individual country data."""
if extension == "csv":
extension = None
elif extension != "zip":
raise ValueError("Invalid extension. Accepted are 'csv' and 'zip'.")
return pd.read_csv(
source_url_country,
low_memory=False,
na_values=[
"",
"Not sure",
" ",
"Prefer not to say",
"Don't know",
98,
"Don't Know",
"Not applicable - I have already contracted Coronavirus (COVID-19)",
"Not applicable - I have already contracted Coronavirus",
],
compression=extension,
)
[docs] def pipeline_csv(self, df: pd.DataFrame):
df = (
df.pipe(_format_date)
.pipe(_subset_and_rename_columns)
.pipe(_preprocess_cols)
.pipe(_derive_cols)
.pipe(_standardize_entities)
.pipe(_aggregate)
)
df_comp = _create_composite_cols(df)
if df_comp is not None:
df_comp = df_comp.pipe(_rename_columns).pipe(_reorder_columns)
df = df.pipe(_round).pipe(_rename_columns).pipe(_reorder_columns)
return df, df_comp
[docs] def export(self):
df = self.read()
df, df_comp = df.pipe(self.pipeline_csv)
# Export
if df_comp is not None:
df_comp.to_csv(self.output_csv_path_composite, index=False)
df.to_csv(self.output_csv_path, index=False)
[docs]def _subset_and_rename_columns(df):
"""keeps only the survey questions with keep=True in mapping.csv and
renames columns.
Note: we do not use `df.rename(columns={...})` because for some columns we
derive multiple variables.
"""
assert MAPPING.keep.isin([True, False]).all(), 'All values in "keep" column of `MAPPING` must be True or False.'
assert (
MAPPING["code_name"].duplicated().sum() == 0
), "All rows in the `code_name` field of mapping.csv must be unique."
index_cols = ["country", "date"]
df2 = df[index_cols]
for row in MAPPING[MAPPING.keep & ~MAPPING.derived].itertuples():
df2.loc[:, row.code_name] = df[row.label]
df2 = df2.dropna(subset=["date"])
return df2
[docs]def _preprocess_cols(df):
for row in MAPPING[MAPPING.preprocess.notnull()].itertuples():
if row.code_name in df.columns:
df.loc[:, row.code_name] = df[row.code_name].replace(MAPPED_VALUES[row.preprocess])
uniq_values = set(MAPPED_VALUES[row.preprocess].values())
assert (
df.loc[:, row.code_name].drop_duplicates().dropna().isin(uniq_values).all()
), f"One or more non-NaN values in {row.code_name} are not in {uniq_values}"
return df
[docs]def _derive_cols(df):
derived_variables_to_keep = MAPPING[MAPPING["derived"] & MAPPING["keep"]].code_name.unique().tolist()
if "covid_vaccinated_or_willing" in derived_variables_to_keep:
# constructs the covid_vaccinated_or_willing variable
# pd.crosstab(df['vac'].fillna(-1), df['vac_1'].fillna(-1))
vac_min_val = min(
MAPPED_VALUES[
MAPPING.loc[
MAPPING["code_name"] == "covid_vaccine_received_one_or_two_doses",
"preprocess",
].squeeze()
].values()
)
vac_max_val = max(
MAPPED_VALUES[
MAPPING.loc[
MAPPING["code_name"] == "covid_vaccine_received_one_or_two_doses",
"preprocess",
].squeeze()
].values()
)
vac_1_max_val = max(
MAPPED_VALUES[
MAPPING.loc[
MAPPING["code_name"] == "willingness_covid_vaccinate_this_week",
"preprocess",
].squeeze()
].values()
)
assert not (
(df["covid_vaccine_received_one_or_two_doses"] == vac_max_val)
& df["willingness_covid_vaccinate_this_week"].notnull()
).any(), (
"Expected all vaccinated respondents to NOT be asked whether they would "
"get vaccinated, but found at least one vaccinated respondent who was "
"asked the latter question."
)
assert not (
(df["covid_vaccine_received_one_or_two_doses"] == vac_min_val)
& df["willingness_covid_vaccinate_this_week"].isnull()
).any(), (
"Expected all unvaccinated respondents to be asked whether they would "
"get vaccinated, but found at least one unvaccinated respondent who was "
"not asked the latter question."
)
df.loc[:, "covid_vaccinated_or_willing"] = (
(df["covid_vaccine_received_one_or_two_doses"] == vac_max_val)
| (df["willingness_covid_vaccinate_this_week"] == vac_1_max_val)
).astype(int) * vac_max_val
df.loc[
df["covid_vaccine_received_one_or_two_doses"].isnull()
& df["willingness_covid_vaccinate_this_week"].isnull(),
"covid_vaccinated_or_willing",
] = np.nan
return df
[docs]def _standardize_entities(df):
df.loc[:, "entity"] = df.country.apply(lambda x: x.replace("-", " ").title())
df = df.drop(columns=["country"])
return df
[docs]def _aggregate(df):
s_period = df["date"].dt.to_period(FREQ)
if FREQ == "M":
df.loc[:, "date_mid"] = s_period.dt.start_time.dt.date + datetime.timedelta(days=14)
else:
df.loc[:, "date_mid"] = (s_period.dt.start_time + (s_period.dt.end_time - s_period.dt.start_time) / 2).dt.date
today = datetime.datetime.utcnow().date()
if df["date_mid"].max() > today:
df.loc[:, "date_mid"] = df["date_mid"].replace({df["date_mid"].max(): today})
questions = [q for q in MAPPING.code_name.tolist() if q in df.columns]
# computes the mean for each country-date-question observation
# (returned in long format)
df_means = (
df.groupby(["entity", "date_mid"])[questions]
.mean()
.rename_axis("question", axis=1)
.stack()
.rename("mean")
.to_frame()
)
# counts the number of non-NaN responses for each country-date-question
# observation (returned in long format)
df_counts = (
df.groupby(["entity", "date_mid"])[questions]
.apply(lambda gp: gp.notnull().sum())
.rename_axis("question", axis=1)
.stack()
.rename("num_responses")
.to_frame()
)
df_agg = pd.merge(
df_means,
df_counts,
left_index=True,
right_index=True,
how="outer",
validate="1:1",
)
if MIN_RESPONSES:
df_agg = df_agg[df_agg["num_responses"] >= MIN_RESPONSES]
# converts dataframe back to wide format.
df_agg = df_agg.unstack().reset_index()
new_columns = []
for lvl0, lvl1 in df_agg.columns:
if lvl1:
if lvl0 == "num_responses":
col = f"{lvl1}__{lvl0}"
else:
col = lvl1
else:
col = lvl0
new_columns.append(col)
df_agg.columns = new_columns
df_agg.rename(columns={"date_mid": "date"}, inplace=True)
# constructs date variable for internal Grapher usage.
df_agg.loc[:, "date_internal_use"] = (
df_agg["date"] - datetime.datetime.strptime(ZERO_DAY, DATE_FORMAT).date()
).dt.days
df_agg.drop("date", axis=1, inplace=True)
return df_agg
[docs]def _create_composite_cols(df):
ffill_limit = 7
vac_var_id = 145610
try:
res = requests.get(f"https://ourworldindata.org/grapher/data/variables/{vac_var_id}.json")
assert res.ok
vac_data = json.loads(res.content)
var_name = vac_data["variables"][f"{vac_var_id}"]["name"]
assert (
ZERO_DAY == vac_data["variables"][f"{vac_var_id}"]["display"]["zeroDay"]
), "Zero days do not match. Data merge will not be correct."
df_vac = pd.DataFrame(
{
"date": vac_data["variables"][f"{vac_var_id}"]["years"],
"entity": [
vac_data["entityKey"][f"{ent}"]["name"]
for ent in vac_data["variables"][f"{vac_var_id}"]["entities"]
],
var_name: vac_data["variables"][f"{vac_var_id}"]["values"],
}
).sort_values(["entity", "date"], ascending=True)
date_range = list(range(df_vac["date"].min(), df_vac["date"].max() + 1))
df_vac[df_vac["entity"] == "United States"].set_index("date").reindex(date_range)
df_vac = (
df_vac.groupby("entity")
.apply(lambda gp: gp.set_index("date").reindex(date_range))
.drop("entity", axis=1)
.reset_index()
.sort_values(["entity", "date"])
)
df_vac[var_name] = df_vac.groupby("entity")[var_name].apply(lambda gp: gp.ffill(limit=ffill_limit)).dropna()
df_vac.dropna(subset=[var_name], inplace=True)
vac_entities = df_vac["entity"].unique()
yougov_entities_not_found = [ent for ent in df["entity"].drop_duplicates() if ent not in vac_entities]
assert len(yougov_entities_not_found) < (df["entity"].drop_duplicates().shape[0] * 0.1), (
"Expected nearly all YouGov entities to be in vaccination data, but "
"failed to find >10% of YouGov entities in the vaccination data. "
f"Entities not found: {yougov_entities_not_found}"
)
will_cols = [
"willingness_covid_vaccinate_this_week",
"unwillingness_covid_vaccinate_this_week",
"uncertain_covid_vaccinate_this_week",
]
df_temp = pd.merge(
df[
[
"entity",
"date_internal_use",
]
+ will_cols
],
df_vac[["entity", "date", var_name]],
left_on=["entity", "date_internal_use"],
right_on=["entity", "date"],
how="inner",
validate="1:1",
)
df_temp.dropna(subset=will_cols, inplace=True)
df_temp[var_name] = df_temp[var_name].round(2)
# converts willingness to get vaccinated variables to a percentage of the
# overall population, instead of percentage of the unvaccinated population.
df_temp["willingness_covid_vaccinate_this_week_pct_pop"] = (
(100 - df_temp[var_name]) * (df_temp["willingness_covid_vaccinate_this_week"] / 100)
).round(2)
df_temp["unwillingness_covid_vaccinate_this_week_pct_pop"] = (
(100 - df_temp[var_name]) * (df_temp["unwillingness_covid_vaccinate_this_week"] / 100)
).round(2)
df_temp["uncertain_covid_vaccinate_this_week_pct_pop"] = (
(100 - df_temp[var_name]) * (df_temp["uncertain_covid_vaccinate_this_week"] / 100)
).round(2)
cols = [
var_name,
"willingness_covid_vaccinate_this_week_pct_pop",
"unwillingness_covid_vaccinate_this_week_pct_pop",
"uncertain_covid_vaccinate_this_week_pct_pop",
]
assert all(
df_temp[cols].sum(axis=1, min_count=len(cols)).dropna().round(1) == 100
), f"Expected {cols} to sum to *nearly* 100 for every entity-date observation, prior to rounding adjustment."
# adjusts one variable to ensure sum of all cols equals exactly
# 100. Otherwise, rounding errors may lead the sum to be
# slightly off (e.g. 99.99).
df_temp[f"{cols[-1]}_adjusted"] = (100 - df_temp[cols[:-1]].sum(axis=1)).round(2)
assert all((df_temp[cols[-1]] - df_temp[f"{cols[-1]}_adjusted"]).abs() < 0.1), (
f"Expected rounding adjustment of {cols[-1]} to be minor (< 0.1), "
"but adjustment was larger than this for one or more entity-date "
"observations."
)
assert all(
df_temp[cols[:-1] + [f"{cols[-1]}_adjusted"]].sum(axis=1, min_count=len(cols)).dropna().round(2) == 100
), f"Expected {cols} to sum to exactly 100 for every entity-date observation, after rounding adjustment."
df_temp[cols[-1]] = df_temp[f"{cols[-1]}_adjusted"]
df_temp = df_temp[
[
"entity",
"date_internal_use",
var_name,
"willingness_covid_vaccinate_this_week_pct_pop",
"unwillingness_covid_vaccinate_this_week_pct_pop",
"uncertain_covid_vaccinate_this_week_pct_pop",
]
]
except Exception as e:
df_temp = None
print(f"Failed to construct composite variables. Error: {e}")
return df_temp
[docs]def _round(df):
index_cols = ["entity", "date_internal_use"]
df = df.set_index(index_cols).round(1).reset_index()
return df
[docs]def _rename_columns(df):
# renames index columns for use in `update_db`.
df.rename(columns={"entity": "Country", "date_internal_use": "Year"}, inplace=True)
return df
[docs]def _reorder_columns(df):
index_cols = ["Country", "Year"]
data_cols = sorted([col for col in df.columns if col not in index_cols])
df = df[index_cols + data_cols]
return df
[docs]def main():
YouGov(output_path=OUTPUT_PATH, debug=DEBUG).export()
if __name__ == "__main__":
main()