cowidev.cmd.vax.process#

cowidev.cmd.vax.process.process#

cowidev.cmd.vax.process.process._build_server_message(df_status, domain)[source]#
cowidev.cmd.vax.process.process._check_no_overlapping_manual_auto(dfs_manual, dfs_auto)[source]#
cowidev.cmd.vax.process.process.main_process_data(path_input_files: str, path_output_files: str, path_output_meta: str, column_location: str, process_location: callable, path_output_status: str, path_output_status_ts: str, log_header: str, logger, skip_complete: Optional[list] = None, skip_monotonic: dict = {}, skip_anomaly: dict = {})[source]#
cowidev.cmd.vax.process.process.read_csv(filepath)[source]#

cowidev.cmd.vax.process.utils#

class cowidev.cmd.vax.process.utils.VaccinationGSheet[source]#

Bases: object

__sheet = None#
__sheets = None#
_api = <cowidev.utils.gdrive.gsheets.GSheetApi object>#
_check_metadata(df: DataFrame)[source]#

Check metadata LOCATIONS tab has valid format.

_check_with_metadata(df_list: list, filepaths: list)[source]#

Checks if country tabs are aligned with LOCATIONS metadata tab content.

property automated_countries#

Get list of countries with an automated process.

df_list(include_all: bool = False, refresh: bool = False) List[DataFrame][source]#

Read non-automated files.

Parameters:
  • include_all (bool) – Set to True to only load non-automated countries.

  • refresh (bool) – Set to True to get updated data from sheets.

Returns:

List with dataframe per country.

Return type:

List[pd.DataFrame]

classmethod from_json(path: str)[source]#

Load sheet from config file.

get_metadata(refresh: bool = False) DataFrame[source]#

Get metadata from LOCATIONS tab.

property manual_countries#

Get list of countries that require manual process.

property metadata#
property sheet#
sheet_id = None#
property sheets#
cowidev.cmd.vax.process.utils.process_location(df: DataFrame, monotonic_check_skip: list = [], anomaly_check_skip: list = []) DataFrame[source]#
cowidev.cmd.vax.process.utils.read_csv_and_check(filepath)[source]#