Source code for cowidev.cmd.vax.process.process

import os
from datetime import datetime

import pandas as pd
from pandas.core.base import DataError
from pandas.errors import ParserError
import click

from cowidev import PATHS
from cowidev.utils.log import get_logger
from cowidev.utils.params import CONFIG
from cowidev.utils.utils import export_timestamp, get_traceback
from cowidev.cmd.vax.process.utils import process_location, VaccinationGSheet
from cowidev.cmd.commons.utils import StepReport


@click.command(name="process", short_help="Step 2: Process scraped vaccination data from primary sources.")
@click.pass_context
def click_vax_process(ctx):
    """Process data in folder scripts/output/vaccinations/.

    By default, the default values for OPTIONS are those specified in the configuration file. The configuration file is
    a YAML file with the pipeline settings. Note that the environment variable `OWID_COVID_CONFIG` must be pointing to
    this file. We provide a default config file in the project folder scripts/config.yaml.

    OPTIONS passed via command line will overwrite those from configuration file.

    Example:
    Process all country data.

        cowid vax process
    """

    report_msg = main_process_data(
        path_input_files=PATHS.INTERNAL_OUTPUT_VAX_MAIN_DIR,
        path_output_files=PATHS.DATA_VAX_COUNTRY_DIR,
        path_output_meta=PATHS.INTERNAL_OUTPUT_VAX_META_FILE,
        column_location="location",
        process_location=process_location,
        path_output_status=PATHS.INTERNAL_OUTPUT_VAX_STATUS_PROCESS,
        path_output_status_ts=PATHS.INTERNAL_OUTPUT_VAX_STATUS_PROCESS_TS,
        log_header="VAX",
        skip_complete=CONFIG.pipeline.vaccinations.process.skip_complete,
        skip_monotonic=CONFIG.pipeline.vaccinations.process.skip_monotonic_check,
        skip_anomaly=CONFIG.pipeline.vaccinations.process.skip_anomaly_check,
        logger=ctx.obj["logger"],
    )
    if ctx.obj["server"]:
        report_msg.to_slack()
        print(report_msg)


[docs]def main_process_data( path_input_files: str, path_output_files: str, path_output_meta: str, column_location: str, process_location: callable, path_output_status: str, path_output_status_ts: str, log_header: str, logger, skip_complete: list = None, skip_monotonic: dict = {}, skip_anomaly: dict = {}, ): # TODO: Generalize logger.info("-- Processing data... --") # Get data from sheets (i.e. manual data) logger.info("Getting data from Google Spreadsheet...") gsheet = VaccinationGSheet() dfs_manual = gsheet.df_list() # Get automated-country data logger.info("Getting data from internal output...") automated = gsheet.automated_countries filepaths_auto = [os.path.join(path_input_files, f"{country}.csv") for country in automated] dfs_auto = [read_csv(filepath) for filepath in filepaths_auto] # Concatenate list of dataframes dfs = dfs_manual + dfs_auto # Check that no location is present in both manual and automated data _check_no_overlapping_manual_auto(dfs_manual, dfs_auto) # vax = [v for v in vax if v.location.iloc[0] == "Pakistan"] # DEBUG # Process locations def _process_location_and_move_file(df): if column_location not in df: raise ValueError(f"Column `{column_location}` missing. df: {df.tail(5)}") country = df.loc[0, column_location] if country.lower() not in skip_complete: # Process dataframe monotonic_check_skip = skip_monotonic.get(df.loc[0, column_location], []) anomaly_check_skip = skip_anomaly.get(df.loc[0, column_location], []) try: df = process_location(df, monotonic_check_skip, anomaly_check_skip) except Exception as err: success = False error_msg = get_traceback(err) error_short_msg = str(err) logger.error(f"{log_header} - {country}: FAILED ❌ {err}") except: success = False error_msg = "Error" else: # Export df.to_csv(os.path.join(path_output_files, f"{country}.csv"), index=False) logger.info(f"{log_header} - {country}: SUCCESS ✅") success = True error_msg = error_short_msg = "" finally: return { "location": country, "success": success, "skipped": False, "error": error_msg, "error_short": error_short_msg, "timestamp": datetime.utcnow().replace(microsecond=0).isoformat(), } else: logger.info(f"{country}: SKIPPED 🚧") return { "location": country, "success": None, "skipped": True, "error": "", "timestamp": datetime.utcnow().replace(microsecond=0).isoformat(), } # return {"location": country, "error": ""} logger.info("Processing and exporting data...") # Process all countries df_status = pd.DataFrame([_process_location_and_move_file(df) for df in dfs]).set_index("location") # Export metadata gsheet.metadata.to_csv(path_output_meta, index=False) logger.info("Exported ✅") # Export status df_status.to_csv(path_output_status) export_timestamp(path_output_status_ts) # print_eoe() report_msg = _build_server_message(df_status, domain="Vaccinations") return report_msg
[docs]def _build_server_message(df_status, domain): if (df_status.success == False).any(): dix_failed = df_status.loc[df_status.success == False, "error_short"].to_dict() module_error_log = "" for module, error in dix_failed.items(): module_error_log += f"* {module}\n {error}\n" module_error_log += "--------------------------------------------------------\n\n" module_list = ", ".join(dix_failed.keys()) title = f"{domain} - [process] step failed" text = f"Modules failed: {len(dix_failed)}\n{module_list}\n\n```{module_error_log}```" type = "warning" else: title = f"{domain} - [process] step run successfully" text = "All modules executed successfully" type = "success" return StepReport( title=title, text=text, type=type, )
[docs]def read_csv(filepath): try: return pd.read_csv(filepath) except: raise ParserError(f"Error tokenizing data from file {filepath}")
[docs]def _check_no_overlapping_manual_auto(dfs_manual, dfs_auto): locations_manual = set([df.location[0] for df in dfs_manual]) locations_auto = set([df.location[0] for df in dfs_auto]) locations_common = locations_auto.intersection(locations_manual) if len(locations_common) > 0: raise DataError(f"The following locations have data in both output/main_data and GSheet: {locations_common}")