cowidev.cmd.vax.process

cowidev.cmd.vax.process.process

cowidev.cmd.vax.process.process._build_server_message(df_status, domain)[source]
cowidev.cmd.vax.process.process._check_no_overlapping_manual_auto(dfs_manual, dfs_auto)[source]
cowidev.cmd.vax.process.process.main_process_data(path_input_files: str, path_output_files: str, path_output_meta: str, column_location: str, process_location: callable, path_output_status: str, path_output_status_ts: str, log_header: str, logger, skip_complete: list | None = None, skip_monotonic: dict = {}, skip_anomaly: dict = {})[source]
cowidev.cmd.vax.process.process.read_csv(filepath)[source]

cowidev.cmd.vax.process.utils

class cowidev.cmd.vax.process.utils.VaccinationGSheet[source]

Bases: object

__sheet = None
__sheets = None
_api = <cowidev.utils.gdrive.gsheets.GSheetApi object>
_check_metadata(df: DataFrame)[source]

Check metadata LOCATIONS tab has valid format.

_check_with_metadata(df_list: list, filepaths: list)[source]

Checks if country tabs are aligned with LOCATIONS metadata tab content.

property automated_countries

Get list of countries with an automated process.

df_list(include_all: bool = False, refresh: bool = False) List[DataFrame][source]

Read non-automated files.

Parameters:
  • include_all (bool) – Set to True to only load non-automated countries.

  • refresh (bool) – Set to True to get updated data from sheets.

Returns:

List with dataframe per country.

Return type:

List[pd.DataFrame]

classmethod from_json(path: str)[source]

Load sheet from config file.

get_metadata(refresh: bool = False) DataFrame[source]

Get metadata from LOCATIONS tab.

property manual_countries

Get list of countries that require manual process.

property metadata
property sheet
sheet_id = None
property sheets
cowidev.cmd.vax.process.utils.process_location(df: DataFrame, monotonic_check_skip: list = [], anomaly_check_skip: list = []) DataFrame[source]
cowidev.cmd.vax.process.utils.read_csv_and_check(filepath)[source]