decisionengine.framework.tests package
decisionengine.framework.tests.DynamicPublisher module
-
class decisionengine.framework.tests.DynamicPublisher.DynamicPublisher(config)[source]
Bases: Publisher
-
_supported_config = {'consumes': (<class 'list'>, None, None), 'expects': (<class 'int'>, None, None)}
-
publisher(data_block)[source]
decisionengine.framework.tests.DynamicSource module
-
class decisionengine.framework.tests.DynamicSource.DynamicSource(config)[source]
Bases: Source
-
_supported_config = {'data_product_name': (<class 'str'>, None, None)}
-
acquire()[source]
decisionengine.framework.tests.ErringPublisher module
-
class decisionengine.framework.tests.ErringPublisher.ErringPublisher(module_parameters, *args, **kwargs)[source]
Bases: Publisher
-
_consumes = {'bar': None}
-
publish(data_block)[source]
decisionengine.framework.tests.ErrorOnAcquire module
-
class decisionengine.framework.tests.ErrorOnAcquire.ErrorOnAcquire(config)[source]
Bases: Source
-
_produces = {'_placeholder': None}
-
acquire()[source]
decisionengine.framework.tests.FailingPublisher module
-
class decisionengine.framework.tests.FailingPublisher.FailingPublisher(module_parameters, *args, **kwargs)[source]
Bases: Publisher
-
_consumes = {'bar': None, 'publisher_status': <class 'decisionengine.framework.taskmanager.PublisherStatus.PublisherStatus'>}
-
publish(data_block)[source]
decisionengine.framework.tests.IntSource module
-
class decisionengine.framework.tests.IntSource.IntSource(config)[source]
Bases: Source
-
_produces = {'int_value': <class 'int'>}
-
_supported_config = {'int_value': (<class 'int'>, None, None)}
-
acquire()[source]
decisionengine.framework.tests.ModuleProgramOptions module
-
class decisionengine.framework.tests.ModuleProgramOptions.AcquireWithConfig(name)[source]
Bases: object
-
test(byte_str, expected_stderr='')[source]
-
class decisionengine.framework.tests.ModuleProgramOptions.AcquireWithSampleConfig(name)[source]
Bases: object
-
test()[source]
-
class decisionengine.framework.tests.ModuleProgramOptions.ConfigTemplate(name)[source]
Bases: object
-
test(has_comments=False)[source]
-
class decisionengine.framework.tests.ModuleProgramOptions.Describe(name)[source]
Bases: object
-
test(consumes=None, produces=None)[source]
-
class decisionengine.framework.tests.ModuleProgramOptions.DescribeAlias(alias, original)[source]
Bases: object
-
test()[source]
-
class decisionengine.framework.tests.ModuleProgramOptions.Help(name)[source]
Bases: object
-
test(has_sample_config=False)[source]
-
decisionengine.framework.tests.ModuleProgramOptions._expected_acquire_result(name, config_file=None, multiplier=1, channel_name='test1')[source]
-
decisionengine.framework.tests.ModuleProgramOptions._expected_config_template(name)[source]
-
decisionengine.framework.tests.ModuleProgramOptions._expected_help(name)[source]
-
decisionengine.framework.tests.ModuleProgramOptions._expected_source_help(name, has_sample_config=False)[source]
-
decisionengine.framework.tests.ModuleProgramOptions._normalize(string)[source]
-
decisionengine.framework.tests.ModuleProgramOptions._run_as_main(name, *program_options)[source]
decisionengine.framework.tests.PublisherNOP module
-
class decisionengine.framework.tests.PublisherNOP.PublisherNOP(module_parameters, *args, **kwargs)[source]
Bases: Publisher
-
_consumes = {'bar': <class 'pandas.core.frame.DataFrame'>}
-
publish(data_block)[source]
decisionengine.framework.tests.PublisherWithMissingConsumes module
-
class decisionengine.framework.tests.PublisherWithMissingConsumes.PublisherWithMissingConsumes(set_of_parameters)[source]
Bases: Publisher
decisionengine.framework.tests.SourceAlias module
decisionengine.framework.tests.SourceNOP module
-
class decisionengine.framework.tests.SourceNOP.SourceNOP(config)[source]
Bases: Source
-
_produces = {'foo': <class 'pandas.core.frame.DataFrame'>}
-
acquire()[source]
decisionengine.framework.tests.SourceWithMissingProduces module
-
class decisionengine.framework.tests.SourceWithMissingProduces.SourceWithMissingProduces(set_of_parameters)[source]
Bases: Source
decisionengine.framework.tests.SourceWithSampleConfigNOP module
-
class decisionengine.framework.tests.SourceWithSampleConfigNOP.SourceWithSampleConfigNOP(config)[source]
Bases: Source
-
_produces = {'foo': <class 'pandas.core.frame.DataFrame'>}
-
_supported_config = {'channel_name': (<class 'str'>, None, None), 'multiplier': (<class 'int'>, None, None)}
-
acquire()[source]
decisionengine.framework.tests.SupportsConfigPublisher module
-
class decisionengine.framework.tests.SupportsConfigPublisher.SupportsConfig(set_of_parameters)[source]
Bases: Publisher
-
_supported_config = {'comment': (<class 'str'>, None, 'Single-line comment'), 'comment_with_nl': (<class 'str'>, None, 'Comment with newline\n'), 'convert_to': (<class 'int'>, 3, None), 'default_only': (<class 'float'>, 2.5, None), 'no_type': (None, None, None), 'only_type': (<class 'int'>, None, None)}
decisionengine.framework.tests.WriteToDisk module
Special publisher used to register publish calls with an external file.
It is difficult to interact with individual publishers while testing a
workflow. The WriteToDisk publisher therefore writes to an external
file that can be read by a test. Ideally, we would implement a system
so that the test and any instance of the WriteToDisk class are passed
the same file-name string. Unfortunately, this is non-trivial to
achieve without adjusting the behavior of the decision-engine server
itself. We therefore choose the following abstruse logic:
WriteToDisk creates a temporary file and broadcasts its name to
STDOUT. Note that this temporary file must be uniquely named as
multiple tests can use WriteToDisk in parallel.
Capture STDOUT in the DETestWorker class.
Pass STDOUT to the ‘wait_for_n_writes’ which will wait until the
number ‘n’ appears in the file.
-
class decisionengine.framework.tests.WriteToDisk.WriteToDisk(config)[source]
Bases: Publisher
-
_supported_config = {'consumes': (<class 'list'>, None, None), 'filename': (<class 'str'>, None, None)}
-
publish(data_block)[source]
-
decisionengine.framework.tests.WriteToDisk.wait_for_n_writes(stdout, n)[source]
decisionengine.framework.tests.fixtures module
defaults for pytest
-
decisionengine.framework.tests.fixtures.DEServer(conf_path=None, conf_override=None, channel_conf_path=None, channel_conf_override=None, host='127.0.0.1', port=None, make_conf_dirs_if_missing=False, block_until_startup_complete=True)[source]
A DE Server using a private database
-
decisionengine.framework.tests.fixtures.PG_DE_DB_WITHOUT_SCHEMA(request: FixtureRequest) → Iterator[Connection]
Fixture factory for PostgreSQL.
- Parameters:
request – fixture request object
- Returns:
postgresql client
-
decisionengine.framework.tests.fixtures.PG_PROG(request: FixtureRequest, tmp_path_factory: TempPathFactory) → Iterator[PostgreSQLExecutor]
Process fixture for PostgreSQL.
- Parameters:
-
- Returns:
tcp executor
-
decisionengine.framework.tests.fixtures.SQLALCHEMY_PG_WITH_SCHEMA(PG_DE_DB_WITHOUT_SCHEMA)[source]
Get a blank database from pytest_postgresql.
Then setup the SQLAlchemy style URL with that DB.
The SQLAlchemyDS will create the schema as needed.
-
decisionengine.framework.tests.fixtures.SQLALCHEMY_TEMPFILE_SQLITE(tmp_path)[source]
Setup an SQLite database with the pytest tmp_path fixture.
Then setup the SQLAlchemy style URL with that DB.
The SQLAlchemyDS will create the schema as needed.
decisionengine.framework.tests.test_client_errors module
Fixture based DE Server tests of the sample config
-
decisionengine.framework.tests.test_client_errors.test_client_cannot_wait_on_bad_state(deserver)[source]
Verify wait is for a valid state
decisionengine.framework.tests.test_client_server module
Fixture based DE Server for the de-client tests
-
decisionengine.framework.tests.test_client_server.test_client_get_loglevel(deserver)[source]
-
decisionengine.framework.tests.test_client_server.test_client_print_product(deserver)[source]
-
decisionengine.framework.tests.test_client_server.test_client_set_loglevel(deserver)[source]
-
decisionengine.framework.tests.test_client_server.test_client_status_msg_to_logger(deserver, caplog)[source]
Make sure the actual client console call goes to a logging destination
decisionengine.framework.tests.test_combined_channels module
-
decisionengine.framework.tests.test_combined_channels.test_combined_channels(deserver)[source]
-
decisionengine.framework.tests.test_combined_channels.test_combined_channels_3g(deserver_combined)[source]
decisionengine.framework.tests.test_defaults module
Fixture based DE Server tests of the sample config
-
decisionengine.framework.tests.test_defaults.test_defaults(deserver)[source]
decisionengine.framework.tests.test_dynamic_test_modules module
-
decisionengine.framework.tests.test_dynamic_test_modules.test_dynamic_publisher()[source]
-
decisionengine.framework.tests.test_dynamic_test_modules.test_dynamic_source()[source]
-
decisionengine.framework.tests.test_dynamic_test_modules.test_dynamic_transform()[source]
decisionengine.framework.tests.test_empty_config module
Fixture based DE Server tests of adding a channel later on
-
decisionengine.framework.tests.test_empty_config.test_client_can_start_one_channel_added_after_startup(deserver)[source]
Verify client can start a single channel
decisionengine.framework.tests.test_error_on_acquire module
-
decisionengine.framework.tests.test_error_on_acquire.test_error_on_acquire(deserver)[source]
decisionengine.framework.tests.test_module_program_options module
-
decisionengine.framework.tests.test_module_program_options.test_acquire_for_sources()[source]
-
decisionengine.framework.tests.test_module_program_options.test_config_templates()[source]
-
decisionengine.framework.tests.test_module_program_options.test_descriptions()[source]
-
decisionengine.framework.tests.test_module_program_options.test_help()[source]
-
decisionengine.framework.tests.test_module_program_options.test_module_alias()[source]
decisionengine.framework.tests.test_publisher_status module
-
decisionengine.framework.tests.test_publisher_status.test_publisher_status(deserver)[source]
decisionengine.framework.tests.test_publisher_status_board module
-
decisionengine.framework.tests.test_publisher_status_board.test_publisher_status_board()[source]
decisionengine.framework.tests.test_reaper module
Fixture based DE Server for the reaper tests
-
decisionengine.framework.tests.test_reaper.test_client_can_get_de_server_reaper_status(deserver)[source]
Verify reaper status
decisionengine.framework.tests.test_same_source_types module
-
decisionengine.framework.tests.test_same_source_types.test_same_source_types_separate_channels(deserver)[source]
decisionengine.framework.tests.test_sample_config module
Fixture based DE Server tests of the defaults
-
decisionengine.framework.tests.test_sample_config.stopped_channel_opts(timeout=1)[source]
-
decisionengine.framework.tests.test_sample_config.test_client_can_get_de_server_status(deserver)[source]
-
decisionengine.framework.tests.test_sample_config.test_client_can_kill_one_channel(deserver)[source]
-
decisionengine.framework.tests.test_sample_config.test_client_can_restart_all_channels(deserver)[source]
Verify client can get channel products even when none are run
-
decisionengine.framework.tests.test_sample_config.test_client_can_restart_one_channel(deserver)[source]
Verify client can restart a single channel
-
decisionengine.framework.tests.test_sample_config.test_client_logger_level(deserver)[source]
-
decisionengine.framework.tests.test_sample_config.test_client_non_real_channel(deserver)[source]
decisionengine.framework.tests.test_shared_sources module
-
decisionengine.framework.tests.test_shared_sources.record_that_matches(substring, records)[source]
-
decisionengine.framework.tests.test_shared_sources.test_conflicting_source_configurations(deserver_conflicting, caplog)[source]
-
decisionengine.framework.tests.test_shared_sources.test_shared_source(deserver_shared, caplog)[source]
decisionengine.framework.tests.test_start_with_bad_channels module
Fixture based DE Server tests of invalid channel configs
-
decisionengine.framework.tests.test_start_with_bad_channels._consumes_not_subset(test_str)[source]
-
decisionengine.framework.tests.test_start_with_bad_channels._expected_circularity(test_str)[source]
-
decisionengine.framework.tests.test_start_with_bad_channels._missing_consumes(name)[source]
-
decisionengine.framework.tests.test_start_with_bad_channels._missing_produces(name)[source]
-
decisionengine.framework.tests.test_start_with_bad_channels.test_client_can_get_products_no_channels(deserver, caplog)[source]
Verify client can get channel products even when none are run
decisionengine.framework.tests.test_status_during_startup module
-
decisionengine.framework.tests.test_status_during_startup.test_status_during_startup(deserver_no_wait)[source]