decisionengine.framework.engine.tests package

Submodules

decisionengine.framework.engine.tests.conftest module

decisionengine.framework.engine.tests.conftest._redis_server_running() bool[source]
decisionengine.framework.engine.tests.conftest.pytest_runtest_call(item)[source]

decisionengine.framework.engine.tests.fixtures module

pytest defaults

decisionengine.framework.engine.tests.fixtures.DEServer(conf_path=None, conf_override=None, channel_conf_path=None, channel_conf_override=None, host='127.0.0.1', port=None, make_conf_dirs_if_missing=False, block_until_startup_complete=True)[source]

A DE Server using a private database

decisionengine.framework.engine.tests.fixtures.PG_DE_DB_WITHOUT_SCHEMA(request: FixtureRequest) Iterator[Connection]

Fixture factory for PostgreSQL.

Parameters:

request – fixture request object

Returns:

postgresql client

decisionengine.framework.engine.tests.fixtures.PG_PROG(request: FixtureRequest, tmp_path_factory: TempPathFactory) Iterator[PostgreSQLExecutor]

Process fixture for PostgreSQL.

Parameters:
  • request – fixture request object

  • tmp_path_factory – temporary path object (fixture)

Returns:

tcp executor

decisionengine.framework.engine.tests.fixtures.SQLALCHEMY_PG_WITH_SCHEMA(PG_DE_DB_WITHOUT_SCHEMA)[source]

Get a blank database from pytest_postgresql. Then setup the SQLAlchemy style URL with that DB. The SQLAlchemyDS will create the schema as needed.

decisionengine.framework.engine.tests.fixtures.SQLALCHEMY_TEMPFILE_SQLITE(tmp_path)[source]

Setup an SQLite database with the pytest tmp_path fixture. Then setup the SQLAlchemy style URL with that DB. The SQLAlchemyDS will create the schema as needed.

decisionengine.framework.engine.tests.test_ChannelWorkers module

class decisionengine.framework.engine.tests.test_ChannelWorkers.TaskManager[source]

Bases: object

name = 'test_channel'
set_loglevel_value(value)[source]
decisionengine.framework.engine.tests.test_ChannelWorkers.global_config(dataspace)[source]
decisionengine.framework.engine.tests.test_ChannelWorkers.test_worker_logger_sized_rotation(global_config)[source]
decisionengine.framework.engine.tests.test_ChannelWorkers.test_worker_logger_timed_rotation(global_config)[source]
decisionengine.framework.engine.tests.test_ChannelWorkers.test_worker_name(global_config)[source]

decisionengine.framework.engine.tests.test_SourceWorkers module

decisionengine.framework.engine.tests.test_SourceWorkers.global_config(dataspace)[source]
decisionengine.framework.engine.tests.test_SourceWorkers.source_config(channel_name, source_name)[source]
decisionengine.framework.engine.tests.test_SourceWorkers.source_worker_for(src_config, global_config)[source]
decisionengine.framework.engine.tests.test_SourceWorkers.test_worker_logger_sized_rotation(global_config)[source]
decisionengine.framework.engine.tests.test_SourceWorkers.test_worker_logger_timed_rotation(global_config)[source]
decisionengine.framework.engine.tests.test_SourceWorkers.test_worker_logger_wrong_rotation_method(global_config)[source]
decisionengine.framework.engine.tests.test_SourceWorkers.test_worker_name(global_config)[source]

decisionengine.framework.engine.tests.test_client_only module

decisionengine.framework.engine.tests.test_client_only.test_client_err_returned_as_rc()[source]

no de server is running, so –status should error

decisionengine.framework.engine.tests.test_client_only.test_client_err_returned_verbose_as_rc()[source]

no de server is running, so –status should error

decisionengine.framework.engine.tests.test_client_only.test_client_help(capfd)[source]
decisionengine.framework.engine.tests.test_client_only.test_client_with_no_command_says_use_help()[source]
decisionengine.framework.engine.tests.test_client_only.test_client_with_no_server()[source]
decisionengine.framework.engine.tests.test_client_only.test_client_with_no_server_verbose()[source]
decisionengine.framework.engine.tests.test_client_only.test_exclusive_options()[source]

decisionengine.framework.engine.tests.test_query_tool_only module

decisionengine.framework.engine.tests.test_query_tool_only.test_client_err_returned_as_rc()[source]

no de server is running, so –status should error

decisionengine.framework.engine.tests.test_query_tool_only.test_client_err_returned_verbose_as_rc()[source]

no de server is running, so –status should error

decisionengine.framework.engine.tests.test_query_tool_only.test_query_tool_help()[source]
decisionengine.framework.engine.tests.test_query_tool_only.test_query_tool_with_no_server()[source]
decisionengine.framework.engine.tests.test_query_tool_only.test_query_tool_with_no_server_verbose()[source]

decisionengine.framework.engine.tests.test_startup module

decisionengine.framework.engine.tests.test_startup._check_override(arguments)[source]
decisionengine.framework.engine.tests.test_startup.metrics_env_setup(tmp_path, monkeypatch)[source]

Make sure we have a directory set for PROMETHEUS_MULTIPROC_DIR so that metric instantiation gives us multiprocess metrics

decisionengine.framework.engine.tests.test_startup.test_change_port()[source]
decisionengine.framework.engine.tests.test_startup.test_check_metrics_env_no_webserver()[source]
decisionengine.framework.engine.tests.test_startup.test_check_metrics_env_var_set()[source]
decisionengine.framework.engine.tests.test_startup.test_check_metrics_env_var_unset()[source]
decisionengine.framework.engine.tests.test_startup.test_default_config()[source]

decisionengine.framework.engine.tests.test_verify_redis_server module

decisionengine.framework.engine.tests.test_verify_redis_server.fake_redis_server(monkeypatch)[source]
decisionengine.framework.engine.tests.test_verify_redis_server.test_verify_bad_broker()[source]
decisionengine.framework.engine.tests.test_verify_redis_server.test_verify_bad_redis_server()[source]
decisionengine.framework.engine.tests.test_verify_redis_server.test_verify_bad_url()[source]
decisionengine.framework.engine.tests.test_verify_redis_server.test_verify_redis_server(fake_redis_server)[source]
decisionengine.framework.engine.tests.test_verify_redis_server.test_verify_redis_server_int()[source]
decisionengine.framework.engine.tests.test_verify_redis_server.test_verify_redis_url()[source]

Module contents