Source code for cowidev.grapher.db.utils.db_utils

# Copied from https://github.com/owid/importers/blob/master/db_utils.py
# At some point in the future we should turn it into a package.

import json
import time
from unidecode import unidecode

UNMODIFIED = 0
INSERT = 1
UPDATE = 2


[docs]def normalize_entity_name(entity_name): return unidecode(entity_name.lower().strip())
[docs]class NotOne(ValueError): pass
[docs]class DBUtils: # TODO create bulk inserts for every create? what type should they return? def __init__(self, cursor): self.cursor = cursor self.counts = { "tags_inserted": 0, "tags_updated": 0, "entities_inserted": 0, "datasets_inserted": 0, "datasets_updated": 0, "variables_inserted": 0, "variables_updated": 0, "sources_inserted": 0, "sources_updated": 0, } self.entity_id_by_normalised_name = {}
[docs] def get_counts(self): return self.counts
[docs] def get_entity_cache(self): return self.entity_id_by_normalised_name
[docs] def execute(self, *args, **kwargs): return self.cursor.execute(*args, **kwargs)
[docs] def fetch_one_or_none(self, *args, **kwargs): self.cursor.execute(*args, **kwargs) rows = self.cursor.fetchall() if len(rows) > 1: raise NotOne("Expected 1 or 0 rows but received %d" % (len(rows))) elif len(rows) == 1: return rows[0] else: return None
[docs] def fetch_one(self, *args, **kwargs): result = self.fetch_one_or_none(*args, **kwargs) if result is None: raise NotOne("Expected 1 row but received 0") else: return result
[docs] def fetch_many(self, *args, **kwargs): self.cursor.execute(*args, **kwargs) return self.cursor.fetchall()
[docs] def insert_one(self, *args, **kwargs): self.cursor.execute(*args, **kwargs) return self.cursor.lastrowid
[docs] def upsert_one(self, *args, **kwargs): self.cursor.execute(*args, **kwargs) if self.cursor.rowcount == 0: return UNMODIFIED if self.cursor.rowcount == 1: return INSERT if self.cursor.rowcount == 2: return UPDATE return None
[docs] def upsert_many(self, query, tuples): self.cursor.executemany(query, tuples)
[docs] def execute_until_empty(self, *args, **kwargs): first = True while first or self.cursor.rowcount > 0: first = False self.cursor.execute(*args, **kwargs)
def __fetch_parent_tag(self, name): (tag_id,) = self.fetch_one( """ SELECT id FROM tags WHERE name = %s AND isBulkImport = TRUE AND parentId IS NULL LIMIT 1 """, [name], ) return tag_id
[docs] def upsert_parent_tag(self, name): try: return self.__fetch_parent_tag(name) except NotOne: self.upsert_one( """ INSERT INTO tags (name, createdAt, updatedAt, isBulkImport) VALUES (%s, NOW(), NOW(), TRUE) """, [name], ) self.counts["tags_inserted"] += 1 return self.__fetch_parent_tag(name)
[docs] def upsert_tag(self, name, parent_id): operation = self.upsert_one( """ INSERT INTO tags (name, parentId, createdAt, updatedAt, isBulkImport) VALUES (%s, %s, NOW(), NOW(), TRUE) ON DUPLICATE KEY UPDATE updatedAt = VALUES(updatedAt), isBulkImport = VALUES(isBulkImport) """, [name, parent_id], ) if operation == INSERT: self.counts["tags_inserted"] += 1 elif operation == UPDATE: self.counts["tags_updated"] += 1 (tag_id,) = self.fetch_one( """ SELECT id FROM tags WHERE name = %s AND parentId = %s """, [name, parent_id], ) return tag_id
[docs] def associate_dataset_tag(self, dataset_id, tag_id): self.upsert_one( """ INSERT INTO dataset_tags (datasetId, tagId) VALUES (%s, %s) ON DUPLICATE KEY UPDATE tagId = VALUES(tagId) """, [dataset_id, tag_id], )
# ON DUPLICATE here only avoids error, it intentionally updates nothing
[docs] def upsert_dataset( self, name, namespace, user_id, tag_id=None, description="This is a dataset imported by the automated fetcher", ): operation = self.upsert_one( """ INSERT INTO datasets (name, description, namespace, createdAt, createdByUserId, updatedAt, metadataEditedAt, metadataEditedByUserId, dataEditedAt, dataEditedByUserId) VALUES (%s, %s, %s, NOW(), %s, NOW(), NOW(), %s, NOW(), %s) ON DUPLICATE KEY UPDATE name = VALUES(name), description = VALUES(description), namespace = VALUES(namespace), updatedAt = VALUES(updatedAt), metadataEditedAt = VALUES(metadataEditedAt), metadataEditedByUserId = VALUES(metadataEditedByUserId), dataEditedAt = VALUES(dataEditedAt), dataEditedByUserId = VALUES(dataEditedByUserId) """, [name, description, namespace, user_id, user_id, user_id], ) (dataset_id,) = self.fetch_one( """ SELECT id FROM datasets WHERE name = %s AND namespace = %s """, [name, namespace], ) if operation == INSERT: self.counts["datasets_inserted"] += 1 if operation == UPDATE: self.counts["datasets_updated"] += 1 if tag_id is not None: self.associate_dataset_tag(dataset_id, tag_id) return dataset_id
[docs] def upsert_source(self, name, description, dataset_id): # There is no UNIQUE key constraint we can rely on to prevent duplicates # so we have to do a SELECT before INSERT... row = self.fetch_one_or_none( """ SELECT id FROM sources WHERE name = %s AND datasetId = %s LIMIT 1 """, [name, dataset_id], ) if row is None: self.upsert_one( """ INSERT INTO sources (name, description, datasetId, createdAt, updatedAt) VALUES (%s, %s, %s, NOW(), NOW()) """, [name, description, dataset_id], ) self.counts["sources_inserted"] += 1 row = self.fetch_one( """ SELECT id FROM sources WHERE name = %s AND datasetId = %s """, [name, dataset_id], ) else: self.cursor.execute( """ UPDATE sources SET updatedAt = NOW(), description = %(description)s WHERE id = %(id)s """, {"description": description, "id": row[0]}, ) self.counts["sources_updated"] += 1 return row[0]
[docs] def upsert_variable( self, name, code, unit, short_unit, source_id, dataset_id, description=None, timespan="", coverage="", display={}, ): operation = self.upsert_one( """ INSERT INTO variables (name, code, description, unit, shortUnit, timespan, coverage, display, sourceId, datasetId, createdAt, updatedAt) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW()) ON DUPLICATE KEY UPDATE name = VALUES(name), code = VALUES(code), timespan = VALUES(timespan), datasetId = VALUES(datasetId), sourceId = VALUES(sourceId), updatedAt = VALUES(updatedAt) """, [ name, code, description, unit, short_unit, timespan, coverage, json.dumps(display), source_id, dataset_id, ], ) if operation == INSERT: self.counts["variables_inserted"] += 1 elif operation == UPDATE: self.counts["variables_updated"] += 1 (var_id,) = self.fetch_one( """ SELECT id FROM variables WHERE (name = %s OR code = %s) AND datasetId = %s AND sourceId = %s """, [name, code, dataset_id, source_id], ) return var_id
[docs] def touch_variable(self, var_id): self.cursor.execute( """ UPDATE variables SET updatedAt = NOW() WHERE id = %s """, [var_id], ) self.counts["variables_updated"] += self.cursor.rowcount
[docs] def note_import(self, import_type, import_notes, import_state): self.upsert_one( """ INSERT INTO importer_importhistory (import_type, import_time, import_notes, import_state) VALUES (%s, NOW(), %s, %s) """, [import_type, import_notes, import_state], )
def __get_cached_entity_id(self, name): normalised_name = normalize_entity_name(name) if normalised_name in self.entity_id_by_normalised_name: return self.entity_id_by_normalised_name[normalised_name] else: return None
[docs] def get_or_create_entity(self, name): # Serve from cache if available entity_id = self.__get_cached_entity_id(name) if entity_id is not None: return entity_id # Populate cache from database self.prefill_entity_cache([name]) entity_id = self.__get_cached_entity_id(name) if entity_id is not None: return entity_id # If still not in cache, it's a new entity and we have to insert it else: self.upsert_one( """ INSERT INTO entities (name, displayName, validated, createdAt, updatedAt) VALUES (%s, '', FALSE, NOW(), NOW()) """, [name], ) self.counts["entities_inserted"] += 1 (entity_id,) = self.fetch_one( """ SELECT id FROM entities WHERE name = %s """, [name], ) # Cache the newly created entity self.entity_id_by_normalised_name[normalize_entity_name(name)] = entity_id return entity_id
[docs] def prefill_entity_cache(self, names): rows = self.fetch_many( """ SELECT LOWER(country_name), LOWER(entities.name), entities.id AS id FROM entities LEFT JOIN country_name_tool_countrydata ON country_name_tool_countrydata.owid_name = entities.name LEFT JOIN country_name_tool_countryname ON country_name_tool_countryname.owid_country = country_name_tool_countrydata.id WHERE LOWER(country_name) IN %(country_names)s OR LOWER(entities.name) IN %(country_names)s ORDER BY entities.id ASC """, {"country_names": [normalize_entity_name(x) for x in names]}, ) # Merge the two dicts self.entity_id_by_normalised_name.update( { # entityName → entityId **dict((row[1], row[2]) for row in rows if row[1]), # country_tool_name → entityId # the country tool name should take precedence **dict((row[0], row[2]) for row in rows if row[0]), } )