cowidev.cmd.commons#

cowidev.cmd.commons.get#

class cowidev.cmd.commons.get.CountryDataGetter(logger, modules_skip: list = [], log_header: str = '')[source]#

Bases: object

_skip_module(module_name)[source]#
run(module_name: str, num_retries: int = 2)[source]#
cowidev.cmd.commons.get._build_df_execution(modules_execution_results)[source]#
cowidev.cmd.commons.get._build_df_status(modules_execution_results)[source]#
cowidev.cmd.commons.get._build_server_message(df_status, domain)[source]#
cowidev.cmd.commons.get._load_modules_order(modules_name, path_log)[source]#
cowidev.cmd.commons.get._print_timing(t0, t_sec_1, df_time)[source]#
cowidev.cmd.commons.get._retry_modules_failed(modules_execution_results, country_data_getter)[source]#
cowidev.cmd.commons.get.export_status(modules_execution_results, modules_valid, output_status, output_status_ts)[source]#
cowidev.cmd.commons.get.main_get_data(modules: list, modules_valid: list, logger, parallel: bool = False, n_jobs: int = -2, modules_skip: list = [], log_header: str = '', log_s3_path=None, output_status: Optional[str] = None, output_status_ts: Optional[str] = None, logging_mode: str = 'info')[source]#

Get data from sources and export to output folder.

Is equivalent to script run_python_scripts.py

cowidev.cmd.commons.utils#

class cowidev.cmd.commons.utils.Country2Module(modules_name: list, country_to_module: list, modules_name_incremental: list = None, modules_name_batch: list = None)[source]#

Bases: object

_check_countries(countries)[source]#
country_to_module: list#
modules_name: list#
modules_name_batch: list = None#
modules_name_incremental: list = None#
parse(countries)[source]#
class cowidev.cmd.commons.utils.OptionEatAll(*args, **kwargs)[source]#

Bases: Option

From https://stackoverflow.com/a/48394004/5056599

add_to_parser(parser, ctx)[source]#
class cowidev.cmd.commons.utils.OrderedGroup(name=None, commands=None, **attrs)[source]#

Bases: Group

From https://stackoverflow.com/a/58323807/5056599

commands: t.Dict[str, Command]#

the registered subcommands by their exported names.

list_commands(ctx)[source]#

Returns a list of subcommand names in the order they should appear.

class cowidev.cmd.commons.utils.PythonLiteralOption(param_decls: Optional[Sequence[str]] = None, show_default: Union[bool, str] = False, prompt: Union[bool, str] = False, confirmation_prompt: Union[bool, str] = False, prompt_required: bool = True, hide_input: bool = False, is_flag: Optional[bool] = None, flag_value: Optional[Any] = None, multiple: bool = False, count: bool = False, allow_from_autoenv: bool = True, type: Optional[Union[ParamType, Any]] = None, help: Optional[str] = None, hidden: bool = False, show_choices: bool = True, show_envvar: bool = False, **attrs: Any)[source]#

Bases: Option

From https://stackoverflow.com/a/47730333/5056599

type_cast_value(ctx, value)[source]#

Convert and validate a value against the option’s type, multiple, and nargs.

class cowidev.cmd.commons.utils.StepReport(title: str, type: str, text: str = '', trace: str = '')[source]#

Bases: object

to_slack(channel='#corona-data-updates')[source]#
cowidev.cmd.commons.utils._comma_separated_to_list(x)[source]#
cowidev.cmd.commons.utils.feedback_log(func, server, domain, step=None, text_success='', hide_success=False, channel='#corona-data-updates', **function_kwargs)[source]#
cowidev.cmd.commons.utils.normalize_country_name(country_name: str)[source]#