import time
import importlib
from datetime import datetime
from joblib import Parallel, delayed
import pandas as pd
from cowidev.utils.utils import export_timestamp, get_traceback
from cowidev.utils.s3 import obj_from_s3
from cowidev.cmd.commons.utils import StepReport
# S3 paths
LOG_MACHINES = "s3://covid-19/log/machines.json"
LOG_GET_COUNTRIES = "s3://covid-19/log/{}-get-data-countries.csv"
LOG_GET_GLOBAL = "s3://covid-19/log/{}-get-data-global.csv"
[docs]class CountryDataGetter:
def __init__(self, logger, modules_skip: list = [], log_header: str = ""):
self.logger = logger
self.modules_skip = modules_skip
self.log_header = log_header
[docs] def _skip_module(self, module_name):
return module_name in self.modules_skip
[docs] def run(self, module_name: str, num_retries: int = 2):
t0 = time.time()
# Check country skipping
if self._skip_module(module_name):
self.logger.info(f"{self.log_header} - {module_name}: skipped! ⚠️")
return {
"module_name": module_name,
"success": None,
"skipped": True,
"time": None,
"timestamp": datetime.utcnow().replace(microsecond=0).isoformat(),
"error": "",
"error_short": "",
}
# Start country scraping
self.logger.info(f"{self.log_header} - {module_name}: started")
module = importlib.import_module(module_name)
for i in range(num_retries):
try:
module.main()
except Exception as err:
self.logger.info(f"{self.log_header} - {module_name}: Attempt #{i+1} failed")
success = False
error_msg = get_traceback(err)
error_msg_short = str(err)
else:
success = True
error_msg = error_msg_short = ""
break
if success:
self.logger.info(f"{self.log_header} - {module_name}: SUCCESS ✅")
else:
self.logger.warning(
f"{self.log_header} - {module_name}: ❌ FAILED after {i+1} tries: {error_msg}", exc_info=True
)
t = round(time.time() - t0, 2)
return {
"module_name": module_name,
"success": success,
"skipped": False,
"time": t,
"timestamp": datetime.utcnow().replace(microsecond=0).isoformat(),
"error": error_msg,
"error_short": error_msg_short,
}
[docs]def main_get_data(
modules: list,
modules_valid: list,
logger,
parallel: bool = False,
n_jobs: int = -2,
modules_skip: list = [],
log_header: str = "",
log_s3_path=None,
output_status: str = None,
output_status_ts: str = None,
logging_mode: str = "info",
):
"""Get data from sources and export to output folder.
Is equivalent to script `run_python_scripts.py`
"""
t0 = time.time()
logger.info("-- Getting data... --")
country_data_getter = CountryDataGetter(logger, modules_skip, log_header)
if log_s3_path:
modules = _load_modules_order(modules, log_s3_path)
if parallel:
modules_execution_results = Parallel(n_jobs=n_jobs, backend="threading")(
delayed(country_data_getter.run)(module_name) for module_name in modules
)
else:
modules_execution_results = []
for module_name in modules:
modules_execution_results.append(country_data_getter.run(module_name))
t_sec_1 = round(time.time() - t0, 2)
# Get timing dataframe
df_exec = _build_df_execution(modules_execution_results)
# Retry failed modules
error_log, modules_execution_results_retry = _retry_modules_failed(modules_execution_results, country_data_getter)
if error_log is not None:
logger.error(error_log)
# Status
modules_execution_results += modules_execution_results_retry
df_status = export_status(modules_execution_results, modules_valid, output_status, output_status_ts)
# Print timing details
t_sec_1, t_min_1, t_sec_2, t_min_2, timing_log = _print_timing(t0, t_sec_1, df_exec)
# summary_log = summary_log_1 + summary_log_2
logger.info(f"{timing_log}")
# Build report
if log_header == "VAX":
domain = "Vaccinations"
elif log_header == "TEST":
domain = "Testing"
report_msg = _build_server_message(df_status, domain)
return report_msg
[docs]def _build_server_message(df_status, domain):
if (df_status.success == False).any():
dix_failed = df_status.loc[df_status.success == False, "error_short"].to_dict()
module_error_log = ""
for module, error in dix_failed.items():
module_error_log += f"* {module}\n {error}\n"
module_error_log += "--------------------------------------------------------\n\n"
module_list = ", ".join(dix_failed.keys())
title = f"{domain} - [get] step failed"
text = f"Modules failed: {len(dix_failed)}\n{module_list}"
trace = module_error_log
type = "warning"
else:
title = f"{domain} - [get] step ran successfully"
text = "All modules executed successfully"
type = "success"
trace = None
return StepReport(
title=title,
text=text,
type=type,
trace=trace,
)
[docs]def export_status(modules_execution_results, modules_valid, output_status, output_status_ts):
# Get status of executed scripts
df_status = _build_df_status(modules_execution_results)
# Load current status
df_status_now = pd.read_csv(output_status)
msk = ~df_status_now.module.isin(df_status.module)
df_status_now = df_status_now[msk]
# Merge
df_status = pd.concat([df_status, df_status_now], ignore_index=True).sort_values("module")
# Filter only running modules & set index
df_status = df_status[df_status.module.isin(modules_valid)].set_index("module")
# Export
if output_status is not None:
df_status.to_csv(output_status)
export_timestamp(output_status_ts)
return df_status
[docs]def _build_df_execution(modules_execution_results):
df_exec = (
pd.DataFrame(
[
{"module": m["module_name"], "execution_time (sec)": m["time"], "success": m["success"]}
for m in modules_execution_results
]
)
.set_index("module")
.sort_values(by="execution_time (sec)", ascending=False)
)
return df_exec
[docs]def _build_df_status(modules_execution_results):
df_exec = (
pd.DataFrame(
[
{
"module": m["module_name"],
"execution_time (sec)": m["time"],
"success": m["success"],
"timestamp": m["timestamp"],
"error": m["error"],
"error_short": m["error_short"],
}
for m in modules_execution_results
]
)
.sort_values(by="timestamp", ascending=True)
.drop_duplicates(subset=["module"], keep="last")
.sort_values(by="execution_time (sec)", ascending=False)
)
return df_exec
[docs]def _retry_modules_failed(modules_execution_results, country_data_getter):
modules_failed = [m["module_name"] for m in modules_execution_results if m["success"] is False]
retried_str = "\n".join([f"* {m}" for m in modules_failed])
country_data_getter.logger.warning(
f"""\n\n--------------------------------------\nRETRIES ({len(modules_failed)})
The following modules will be re-executed:
{retried_str}
"""
)
modules_execution_results = []
for module_name in modules_failed:
modules_execution_results.append(country_data_getter.run(module_name))
modules_failed_retrial = [m["module_name"] for m in modules_execution_results if m["success"] is False]
if len(modules_failed_retrial) > 0:
failed_str = "\n".join([f"* {m}" for m in modules_failed_retrial])
error_log = f"""\n\n--------------------------------------
FAILED ({len(modules_failed_retrial)})
The following modules failed:
{failed_str}
--------------------------------------
"""
else:
error_log = None
return error_log, modules_execution_results
[docs]def _print_timing(t0, t_sec_1, df_time):
t_min_1 = round(t_sec_1 / 60, 2)
t_sec_2 = round(time.time() - t0, 2)
t_min_2 = round(t_sec_2 / 60, 2)
summary_log = f"""TIMING DETAILS
* Took {t_sec_1} seconds (i.e. {t_min_1} minutes).
* Top 20 most time consuming scripts:
{df_time[["execution_time (sec)"]].head(20)}
* Took {t_sec_2} seconds (i.e. {t_min_2} minutes) [AFTER RETRIALS].
--------------------------------------
"""
return t_sec_1, t_min_1, t_sec_2, t_min_2, summary_log
[docs]def _load_modules_order(modules_name, path_log):
if len(modules_name) < 10:
return modules_name
df = obj_from_s3(path_log)
# Filter by machine
# details = system_details()
# machine = details["id"]
# if machine in df.machine:
# df = df[df.machine == machine]
# df = pd.read_csv(os.path.join(PATHS.INTERNAL_OUTPUT_VAX_LOG_DIR, "get-data.csv"))
module_order_all = (
df.sort_values("date")
.drop_duplicates(subset=["module"], keep="last")
.sort_values("execution_time (sec)", ascending=False)
.module.tolist()
)
modules_name_order = [m for m in module_order_all if m in modules_name]
missing = [m for m in modules_name if m not in modules_name_order]
return modules_name_order + missing
# def _export_log_info(df_exec, t_sec_1, t_sec_2):
# # print(len(df_new), len(MODULES_NAME), len(df_new) == len(MODULES_NAME))
# if len(df_exec) == len(MODULES_NAME):
# print("EXPORTING LOG DETAILS")
# details = system_details()
# date_now = localdate(force_today=True)
# machine = details["id"]
# # Export timings per country
# df_exec = df_exec.reset_index().assign(date=date_now, machine=machine)
# df = obj_from_s3(LOG_GET_COUNTRIES)
# df = df[df.date + df.machine != date_now + machine]
# df = pd.concat([df, df_exec])
# obj_to_s3(df, LOG_GET_COUNTRIES)
# # Export machine info
# data = obj_from_s3(LOG_MACHINES)
# if machine not in data:
# data = {**details, machine: details["info"]}
# obj_to_s3(data, LOG_MACHINES)
# # Export overall timing
# report = {"machine": machine, "date": date_now, "t_sec": t_sec_1, "t_sec_retry": t_sec_2}
# df_new = pd.DataFrame([report])
# df = obj_from_s3(LOG_GET_GLOBAL)
# df = df[df.date + df.machine != date_now + machine]
# df = pd.concat([df, df_new])
# obj_to_s3(df, LOG_GET_GLOBAL)