decisionengine.framework.tests package

Submodules

decisionengine.framework.tests.ABTransform module

class decisionengine.framework.tests.ABTransform.ABTransform(module_parameters, *args, **kwargs)[source]

Bases: Transform

_consumes = {'B': None}
_produces = {'A': None}

decisionengine.framework.tests.BATransform module

class decisionengine.framework.tests.BATransform.BATransform(module_parameters, *args, **kwargs)[source]

Bases: Transform

_consumes = {'A': None}
_produces = {'B': None}

decisionengine.framework.tests.DynamicPublisher module

class decisionengine.framework.tests.DynamicPublisher.DynamicPublisher(config)[source]

Bases: Publisher

_supported_config = {'consumes': (<class 'list'>, None, None), 'expects': (<class 'int'>, None, None)}
publisher(data_block)[source]

decisionengine.framework.tests.DynamicSource module

class decisionengine.framework.tests.DynamicSource.DynamicSource(config)[source]

Bases: Source

_supported_config = {'data_product_name': (<class 'str'>, None, None)}
acquire()[source]

decisionengine.framework.tests.DynamicTransform module

class decisionengine.framework.tests.DynamicTransform.DynamicTransform(config)[source]

Bases: Transform

_supported_config = {'consumes': (<class 'list'>, None, None), 'data_product_name': (<class 'str'>, None, None)}
transform(data_block)[source]

decisionengine.framework.tests.ErringPublisher module

class decisionengine.framework.tests.ErringPublisher.ErringPublisher(module_parameters, *args, **kwargs)[source]

Bases: Publisher

_consumes = {'bar': None}
publish(data_block)[source]

decisionengine.framework.tests.ErrorOnAcquire module

class decisionengine.framework.tests.ErrorOnAcquire.ErrorOnAcquire(config)[source]

Bases: Source

_produces = {'_placeholder': None}
acquire()[source]

decisionengine.framework.tests.FailingPublisher module

class decisionengine.framework.tests.FailingPublisher.FailingPublisher(module_parameters, *args, **kwargs)[source]

Bases: Publisher

_consumes = {'bar': None, 'publisher_status': <class 'decisionengine.framework.taskmanager.PublisherStatus.PublisherStatus'>}
publish(data_block)[source]

decisionengine.framework.tests.IntSource module

class decisionengine.framework.tests.IntSource.IntSource(config)[source]

Bases: Source

_produces = {'int_value': <class 'int'>}
_supported_config = {'int_value': (<class 'int'>, None, None)}
acquire()[source]

decisionengine.framework.tests.ModuleProgramOptions module

class decisionengine.framework.tests.ModuleProgramOptions.AcquireWithConfig(name)[source]

Bases: object

test(byte_str, expected_stderr='')[source]
class decisionengine.framework.tests.ModuleProgramOptions.AcquireWithSampleConfig(name)[source]

Bases: object

test()[source]
class decisionengine.framework.tests.ModuleProgramOptions.ConfigTemplate(name)[source]

Bases: object

test(has_comments=False)[source]
class decisionengine.framework.tests.ModuleProgramOptions.Describe(name)[source]

Bases: object

test(consumes=None, produces=None)[source]
class decisionengine.framework.tests.ModuleProgramOptions.DescribeAlias(alias, original)[source]

Bases: object

test()[source]
class decisionengine.framework.tests.ModuleProgramOptions.Help(name)[source]

Bases: object

test(has_sample_config=False)[source]
decisionengine.framework.tests.ModuleProgramOptions._expected_acquire_result(name, config_file=None, multiplier=1, channel_name='test1')[source]
decisionengine.framework.tests.ModuleProgramOptions._expected_config_template(name)[source]
decisionengine.framework.tests.ModuleProgramOptions._expected_config_template_with_comments(name)[source]
decisionengine.framework.tests.ModuleProgramOptions._expected_help(name)[source]
decisionengine.framework.tests.ModuleProgramOptions._expected_source_help(name, has_sample_config=False)[source]
decisionengine.framework.tests.ModuleProgramOptions._normalize(string)[source]
decisionengine.framework.tests.ModuleProgramOptions._run_as_main(name, *program_options)[source]

decisionengine.framework.tests.PublisherNOP module

class decisionengine.framework.tests.PublisherNOP.PublisherNOP(module_parameters, *args, **kwargs)[source]

Bases: Publisher

_consumes = {'bar': <class 'pandas.core.frame.DataFrame'>}
publish(data_block)[source]

decisionengine.framework.tests.PublisherWithMissingConsumes module

class decisionengine.framework.tests.PublisherWithMissingConsumes.PublisherWithMissingConsumes(set_of_parameters)[source]

Bases: Publisher

decisionengine.framework.tests.SourceAlias module

decisionengine.framework.tests.SourceNOP module

class decisionengine.framework.tests.SourceNOP.SourceNOP(config)[source]

Bases: Source

_produces = {'foo': <class 'pandas.core.frame.DataFrame'>}
acquire()[source]

decisionengine.framework.tests.SourceWithMissingProduces module

class decisionengine.framework.tests.SourceWithMissingProduces.SourceWithMissingProduces(set_of_parameters)[source]

Bases: Source

decisionengine.framework.tests.SourceWithSampleConfigNOP module

class decisionengine.framework.tests.SourceWithSampleConfigNOP.SourceWithSampleConfigNOP(config)[source]

Bases: Source

_produces = {'foo': <class 'pandas.core.frame.DataFrame'>}
_supported_config = {'channel_name': (<class 'str'>, None, None), 'multiplier': (<class 'int'>, None, None)}
acquire()[source]

decisionengine.framework.tests.SupportsConfigPublisher module

class decisionengine.framework.tests.SupportsConfigPublisher.SupportsConfig(set_of_parameters)[source]

Bases: Publisher

_supported_config = {'comment': (<class 'str'>, None, 'Single-line comment'), 'comment_with_nl': (<class 'str'>, None, 'Comment with newline\n'), 'convert_to': (<class 'int'>, 3, None), 'default_only': (<class 'float'>, 2.5, None), 'no_type': (None, None, None), 'only_type': (<class 'int'>, None, None)}

decisionengine.framework.tests.TransformNOP module

class decisionengine.framework.tests.TransformNOP.TransformNOP(module_parameters, *args, **kwargs)[source]

Bases: Transform

_consumes = {'foo': <class 'pandas.core.frame.DataFrame'>}
_produces = {'bar': <class 'pandas.core.frame.DataFrame'>}
transform(data_block)[source]

decisionengine.framework.tests.TransformWithMissingProducesConsumes module

class decisionengine.framework.tests.TransformWithMissingProducesConsumes.TransformWithMissingProducesConsumes(config)[source]

Bases: Transform

transform(data_block)[source]

decisionengine.framework.tests.WriteToDisk module

Special publisher used to register publish calls with an external file.

It is difficult to interact with individual publishers while testing a workflow. The WriteToDisk publisher therefore writes to an external file that can be read by a test. Ideally, we would implement a system so that the test and any instance of the WriteToDisk class are passed the same file-name string. Unfortunately, this is non-trivial to achieve without adjusting the behavior of the decision-engine server itself. We therefore choose the following abstruse logic:

  • WriteToDisk creates a temporary file and broadcasts its name to STDOUT. Note that this temporary file must be uniquely named as multiple tests can use WriteToDisk in parallel.

  • Capture STDOUT in the DETestWorker class.

  • Pass STDOUT to the ‘wait_for_n_writes’ which will wait until the number ‘n’ appears in the file.

class decisionengine.framework.tests.WriteToDisk.WriteToDisk(config)[source]

Bases: Publisher

_supported_config = {'consumes': (<class 'list'>, None, None), 'filename': (<class 'str'>, None, None)}
publish(data_block)[source]
decisionengine.framework.tests.WriteToDisk.wait_for_n_writes(stdout, n)[source]

decisionengine.framework.tests.fixtures module

defaults for pytest

decisionengine.framework.tests.fixtures.DEServer(conf_path=None, conf_override=None, channel_conf_path=None, channel_conf_override=None, host='127.0.0.1', port=None, make_conf_dirs_if_missing=False, block_until_startup_complete=True)[source]

A DE Server using a private database

decisionengine.framework.tests.fixtures.PG_DE_DB_WITHOUT_SCHEMA(request: FixtureRequest) Iterator[Connection]

Fixture factory for PostgreSQL.

Parameters:

request – fixture request object

Returns:

postgresql client

decisionengine.framework.tests.fixtures.PG_PROG(request: FixtureRequest, tmp_path_factory: TempPathFactory) Iterator[PostgreSQLExecutor]

Process fixture for PostgreSQL.

Parameters:
  • request – fixture request object

  • tmp_path_factory – temporary path object (fixture)

Returns:

tcp executor

decisionengine.framework.tests.fixtures.SQLALCHEMY_PG_WITH_SCHEMA(PG_DE_DB_WITHOUT_SCHEMA)[source]

Get a blank database from pytest_postgresql. Then setup the SQLAlchemy style URL with that DB. The SQLAlchemyDS will create the schema as needed.

decisionengine.framework.tests.fixtures.SQLALCHEMY_TEMPFILE_SQLITE(tmp_path)[source]

Setup an SQLite database with the pytest tmp_path fixture. Then setup the SQLAlchemy style URL with that DB. The SQLAlchemyDS will create the schema as needed.

decisionengine.framework.tests.test_client_errors module

Fixture based DE Server tests of the sample config

decisionengine.framework.tests.test_client_errors.test_client_cannot_wait_on_bad_state(deserver)[source]

Verify wait is for a valid state

decisionengine.framework.tests.test_client_server module

Fixture based DE Server for the de-client tests

decisionengine.framework.tests.test_client_server.test_client_get_loglevel(deserver)[source]
decisionengine.framework.tests.test_client_server.test_client_print_product(deserver)[source]
decisionengine.framework.tests.test_client_server.test_client_set_loglevel(deserver)[source]
decisionengine.framework.tests.test_client_server.test_client_status_msg_to_logger(deserver, caplog)[source]

Make sure the actual client console call goes to a logging destination

decisionengine.framework.tests.test_combined_channels module

decisionengine.framework.tests.test_combined_channels.test_combined_channels(deserver)[source]
decisionengine.framework.tests.test_combined_channels.test_combined_channels_3g(deserver_combined)[source]

decisionengine.framework.tests.test_defaults module

Fixture based DE Server tests of the sample config

decisionengine.framework.tests.test_defaults.test_defaults(deserver)[source]

decisionengine.framework.tests.test_dynamic_test_modules module

decisionengine.framework.tests.test_dynamic_test_modules.test_dynamic_publisher()[source]
decisionengine.framework.tests.test_dynamic_test_modules.test_dynamic_source()[source]
decisionengine.framework.tests.test_dynamic_test_modules.test_dynamic_transform()[source]

decisionengine.framework.tests.test_empty_config module

Fixture based DE Server tests of adding a channel later on

decisionengine.framework.tests.test_empty_config.test_client_can_start_one_channel_added_after_startup(deserver)[source]

Verify client can start a single channel

decisionengine.framework.tests.test_error_on_acquire module

decisionengine.framework.tests.test_error_on_acquire.test_error_on_acquire(deserver)[source]

decisionengine.framework.tests.test_module_program_options module

decisionengine.framework.tests.test_module_program_options.test_acquire_for_sources()[source]
decisionengine.framework.tests.test_module_program_options.test_config_templates()[source]
decisionengine.framework.tests.test_module_program_options.test_descriptions()[source]
decisionengine.framework.tests.test_module_program_options.test_help()[source]
decisionengine.framework.tests.test_module_program_options.test_module_alias()[source]

decisionengine.framework.tests.test_publisher_status module

decisionengine.framework.tests.test_publisher_status.test_publisher_status(deserver)[source]

decisionengine.framework.tests.test_publisher_status_board module

decisionengine.framework.tests.test_publisher_status_board.test_publisher_status_board()[source]

decisionengine.framework.tests.test_query_tool_server module

Fixture based DE Server for the de-query-tool tests

decisionengine.framework.tests.test_query_tool_server.test_query_tool(deserver)[source]

decisionengine.framework.tests.test_reaper module

Fixture based DE Server for the reaper tests

decisionengine.framework.tests.test_reaper.test_client_can_get_de_server_reaper_status(deserver)[source]

Verify reaper status

decisionengine.framework.tests.test_same_source_types module

decisionengine.framework.tests.test_same_source_types.test_same_source_types_separate_channels(deserver)[source]

decisionengine.framework.tests.test_sample_config module

Fixture based DE Server tests of the defaults

decisionengine.framework.tests.test_sample_config.stopped_channel_opts(timeout=1)[source]
decisionengine.framework.tests.test_sample_config.test_client_can_get_de_server_status(deserver)[source]
decisionengine.framework.tests.test_sample_config.test_client_can_kill_one_channel(deserver)[source]
decisionengine.framework.tests.test_sample_config.test_client_can_restart_all_channels(deserver)[source]

Verify client can get channel products even when none are run

decisionengine.framework.tests.test_sample_config.test_client_can_restart_one_channel(deserver)[source]

Verify client can restart a single channel

decisionengine.framework.tests.test_sample_config.test_client_logger_level(deserver)[source]
decisionengine.framework.tests.test_sample_config.test_client_non_real_channel(deserver)[source]

decisionengine.framework.tests.test_shared_sources module

decisionengine.framework.tests.test_shared_sources.record_that_matches(substring, records)[source]
decisionengine.framework.tests.test_shared_sources.test_conflicting_source_configurations(deserver_conflicting, caplog)[source]
decisionengine.framework.tests.test_shared_sources.test_shared_source(deserver_shared, caplog)[source]

decisionengine.framework.tests.test_start_with_bad_channels module

Fixture based DE Server tests of invalid channel configs

decisionengine.framework.tests.test_start_with_bad_channels._consumes_not_subset(test_str)[source]
decisionengine.framework.tests.test_start_with_bad_channels._expected_circularity(test_str)[source]
decisionengine.framework.tests.test_start_with_bad_channels._missing_consumes(name)[source]
decisionengine.framework.tests.test_start_with_bad_channels._missing_produces(name)[source]
decisionengine.framework.tests.test_start_with_bad_channels.test_client_can_get_products_no_channels(deserver, caplog)[source]

Verify client can get channel products even when none are run

decisionengine.framework.tests.test_status_during_startup module

decisionengine.framework.tests.test_status_during_startup.test_status_during_startup(deserver_no_wait)[source]

Module contents