"""Update gapher database.
Update vaccination by age data.
"""
#!/usr/bin/env python
import sys
import os
import pandas as pd
import json
from dotenv import load_dotenv
from datetime import datetime, timezone
load_dotenv()
from cowidev.grapher.db.utils.db import connection
from cowidev.grapher.db.utils.db_utils import DBUtils
from cowidev.grapher.db.utils.slack_client import send_success
# ID of user who imports the data
USER_ID = 46
# Dataset namespace
NAMESPACE = "owid"
DEPLOY_QUEUE_PATH = os.getenv("DEPLOY_QUEUE_PATH")
[docs]def print_err(*args, **kwargs):
return print(*args, file=sys.stderr, **kwargs)
[docs]def chunk_df(df, n):
"""Yield successive n-sized chunks from data frame."""
for i in range(0, df.shape[0], n):
yield df[i : i + n]
tz_utc = tz_db = timezone.utc
tz_local = datetime.now(tz_utc).astimezone().tzinfo
[docs]def import_dataset(
dataset_name,
namespace,
csv_path,
default_variable_display,
source_name,
slack_notifications=True,
unit="",
unit_short=None,
):
print(dataset_name.upper())
with connection() as c:
db = DBUtils(c)
# Check whether the database is up to date, by checking the
# - last modified date of the Grapher file
# - last modified date of the database row
#
# This is not bulletproof, but it allows for flexibility – authors could manually update
# the repo, and that would trigger a database update too.
(db_dataset_id, db_dataset_modified_time) = db.fetch_one(
"""
SELECT id, dataEditedAt
FROM datasets
WHERE name = %s
AND namespace = %s
""",
[dataset_name, namespace],
)
db_dataset_modified_time = db_dataset_modified_time.replace(tzinfo=tz_db)
file_modified_time = datetime.fromtimestamp(os.stat(csv_path).st_mtime).replace(
tzinfo=tz_local
)
if file_modified_time <= db_dataset_modified_time:
print(f"Dataset is up to date: {dataset_name}")
return None
# sys.exit(0)
print("Updating database...")
# Load dataset data frame
df = pd.read_csv(csv_path)
# Check whether all entities exist in the database.
# If some are missing, report & quit.
entity_names = list(df["Country"].unique())
db_entities_query = db.fetch_many(
"""
SELECT id, name
FROM entities
WHERE name IN %s
""",
[entity_names],
)
db_entity_id_by_name = {name: id for id, name in db_entities_query}
# Terminate if some entities are missing from the database
missing_entity_names = set(entity_names) - set(db_entity_id_by_name.keys())
if len(missing_entity_names) > 0:
print_err(
f"Entity names missing from database: {str(missing_entity_names)}"
)
sys.exit(1)
# Fetch the source
(db_source_id,) = db.fetch_one(
"""
SELECT id
FROM sources
WHERE datasetId = %s
""",
db_dataset_id,
)
# Check whether all variables match database variables.
id_names = ["Country", "Year"]
variable_names = list(set(df.columns) - set(id_names))
db_variables_query = db.fetch_many(
"""
SELECT id, name
FROM variables
WHERE datasetId = %s
""",
[db_dataset_id],
)
db_variable_id_by_name = {name: id for id, name in db_variables_query}
# Remove any variables no longer in the dataset. This is safe because any variables used in
# charts won't be deleted because of database constrant checks.
variable_names_to_remove = list(
set(db_variable_id_by_name.keys()) - set(variable_names)
)
if len(variable_names_to_remove):
print(f"Removing variables: {str(variable_names_to_remove)}")
variable_ids_to_remove = [
db_variable_id_by_name[n] for n in variable_names_to_remove
]
db.execute(
"""
DELETE FROM data_values
WHERE variableId IN %(ids)s;
DELETE FROM variables
WHERE id IN %(ids)s;
""",
{"ids": variable_ids_to_remove},
)
# Add variables that didn't exist before. Make sure to set yearIsDay.
variable_names_to_insert = list(
set(variable_names) - set(db_variable_id_by_name.keys())
)
if len(variable_names_to_insert):
print(f"Inserting variables: {str(variable_names_to_insert)}")
for name in variable_names_to_insert:
db_variable_id_by_name[name] = db.upsert_variable(
name=name,
code=None,
unit=unit,
short_unit=unit_short,
source_id=db_source_id,
dataset_id=db_dataset_id,
display=default_variable_display,
)
# Delete all data_values in dataset
print("Deleting all data_values...")
db.execute(
"""
DELETE FROM data_values
WHERE variableId IN %s
""",
[tuple(db_variable_id_by_name.values())],
)
# Insert new data_values
print("Inserting new data_values...")
df_data_values = df.melt(
id_vars=id_names,
value_vars=variable_names,
var_name="variable",
value_name="value",
).dropna(how="any")
for df_chunk in chunk_df(df_data_values, 50000):
data_values = [
(
row["value"],
int(row["Year"]),
db_entity_id_by_name[row["Country"]],
db_variable_id_by_name[row["variable"]],
)
for _, row in df_chunk.iterrows()
]
db.upsert_many(
"""
INSERT INTO
data_values (value, year, entityId, variableId)
VALUES
(%s, %s, %s, %s)
""",
data_values,
)
# Update dataset dataUpdatedAt time & dataUpdatedBy
db.execute(
"""
UPDATE datasets
SET
dataEditedAt = NOW(),
dataEditedByUserId = %s
WHERE id = %s
""",
[USER_ID, db_dataset_id],
)
# Update source name ("last updated at")
db.execute(
"""
UPDATE sources
SET name = %s
WHERE id = %s
""",
[source_name, db_source_id],
)
# Update chart versions to trigger rebake
db.execute(
"""
UPDATE charts
SET config = JSON_SET(config, "$.version", config->"$.version" + 1)
WHERE id IN (
SELECT DISTINCT chart_dimensions.chartId
FROM chart_dimensions
JOIN variables ON variables.id = chart_dimensions.variableId
WHERE variables.datasetId = %s
)
""",
[db_dataset_id],
)
# Enqueue deploy
if DEPLOY_QUEUE_PATH:
with open(DEPLOY_QUEUE_PATH, "a") as f:
f.write(
json.dumps(
{
"message": f"Automated dataset update: {dataset_name}",
"timeISOString": datetime.now().isoformat(),
}
)
+ "\n"
)
print("Database update successful.")
if slack_notifications:
send_success(
channel="corona-data-updates" if not os.getenv("IS_DEV") else "bot-testing",
title=f"Updated Grapher dataset: {dataset_name}",
)