cowidev.cmd.commons

cowidev.cmd.commons.get

class cowidev.cmd.commons.get.CountryDataGetter(logger, modules_skip: list = [], log_header: str = '')[source]

Bases: object

_skip_module(module_name)[source]
run(module_name: str, num_retries: int = 2)[source]
cowidev.cmd.commons.get._build_df_execution(modules_execution_results)[source]
cowidev.cmd.commons.get._build_df_status(modules_execution_results)[source]
cowidev.cmd.commons.get._build_server_message(df_status, domain)[source]
cowidev.cmd.commons.get._load_modules_order(modules_name, path_log)[source]
cowidev.cmd.commons.get._print_timing(t0, t_sec_1, df_time)[source]
cowidev.cmd.commons.get._retry_modules_failed(modules_execution_results, country_data_getter)[source]
cowidev.cmd.commons.get.export_status(modules_execution_results, modules_valid, output_status, output_status_ts)[source]
cowidev.cmd.commons.get.main_get_data(modules: list, modules_valid: list, logger, parallel: bool = False, n_jobs: int = -2, modules_skip: list = [], log_header: str = '', log_s3_path=None, output_status: str | None = None, output_status_ts: str | None = None, logging_mode: str = 'info')[source]

Get data from sources and export to output folder.

Is equivalent to script run_python_scripts.py

cowidev.cmd.commons.utils

class cowidev.cmd.commons.utils.Country2Module(modules_name: list, country_to_module: list, modules_name_incremental: list = None, modules_name_batch: list = None)[source]

Bases: object

_check_countries(countries)[source]
country_to_module: list
modules_name: list
modules_name_batch: list = None
modules_name_incremental: list = None
parse(countries)[source]
class cowidev.cmd.commons.utils.OptionEatAll(*args, **kwargs)[source]

Bases: Option

From https://stackoverflow.com/a/48394004/5056599

add_to_parser(parser, ctx)[source]
class cowidev.cmd.commons.utils.OrderedGroup(name=None, commands=None, **attrs)[source]

Bases: Group

From https://stackoverflow.com/a/58323807/5056599

commands: t.Dict[str, Command]

the registered subcommands by their exported names.

list_commands(ctx)[source]

Returns a list of subcommand names in the order they should appear.

class cowidev.cmd.commons.utils.PythonLiteralOption(param_decls: Sequence[str] | None = None, show_default: bool | str = False, prompt: bool | str = False, confirmation_prompt: bool | str = False, prompt_required: bool = True, hide_input: bool = False, is_flag: bool | None = None, flag_value: Any | None = None, multiple: bool = False, count: bool = False, allow_from_autoenv: bool = True, type: ParamType | Any | None = None, help: str | None = None, hidden: bool = False, show_choices: bool = True, show_envvar: bool = False, **attrs: Any)[source]

Bases: Option

From https://stackoverflow.com/a/47730333/5056599

type_cast_value(ctx, value)[source]

Convert and validate a value against the option’s type, multiple, and nargs.

class cowidev.cmd.commons.utils.StepReport(title: str, type: str, text: str = '', trace: str = '')[source]

Bases: object

to_slack(channel='#corona-data-updates')[source]
cowidev.cmd.commons.utils._comma_separated_to_list(x)[source]
cowidev.cmd.commons.utils.feedback_log(func, server, domain, step=None, text_success='', hide_success=False, channel='#corona-data-updates', **function_kwargs)[source]
cowidev.cmd.commons.utils.normalize_country_name(country_name: str)[source]