decisionengine.framework.dataspace.tests package

Submodules

decisionengine.framework.dataspace.tests.fixtures module

decisionengine.framework.dataspace.tests.fixtures.PG_DE_DB_WITHOUT_SCHEMA(request: FixtureRequest) Iterator[Connection]

Fixture factory for PostgreSQL.

Parameters:

request – fixture request object

Returns:

postgresql client

decisionengine.framework.dataspace.tests.fixtures.PG_PROG(request: FixtureRequest, tmp_path_factory: TempPathFactory) Iterator[PostgreSQLExecutor]

Process fixture for PostgreSQL.

Parameters:
  • request – fixture request object

  • tmp_path_factory – temporary path object (fixture)

Returns:

tcp executor

decisionengine.framework.dataspace.tests.fixtures.SQLALCHEMY_PG_WITH_SCHEMA(PG_DE_DB_WITHOUT_SCHEMA)[source]

Get a blank database from pytest_postgresql. Then setup the SQLAlchemy style URL with that DB. The SQLAlchemyDS will create the schema as needed.

decisionengine.framework.dataspace.tests.fixtures.SQLALCHEMY_TEMPFILE_SQLITE(tmp_path)[source]

Setup an SQLite database with the pytest tmp_path fixture. Then setup the SQLAlchemy style URL with that DB. The SQLAlchemyDS will create the schema as needed.

decisionengine.framework.dataspace.tests.fixtures.datasource(request)[source]

This parameterized fixture will setup up various datasources.

Add datasource objects to DATASOURCES_TO_TEST once they’ve got our basic schema loaded. And adjust our if statements here until we are SQLAlchemy only.

Pytest should take it from there and automatically run it through all the tests using this fixture.

decisionengine.framework.dataspace.tests.fixtures.dataspace(request)[source]

This parameterized fixture will setup up various datasources. Add datasource objects to DATASOURCES_TO_TEST once they’ve got our basic schema loaded. And adjust our if statements here until we are SQLAlchemy only.

Pytest should take it from there and automatically run it through all the tests using this fixture.

decisionengine.framework.dataspace.tests.fixtures.load_sample_data_into_datasource(schema_only_db)[source]

load our sample test data into a dataspace This is a function not a fixture so you can run it on any datasource providing the right API.

decisionengine.framework.dataspace.tests.test_Reaper module

decisionengine.framework.dataspace.tests.test_Reaper.config()[source]
decisionengine.framework.dataspace.tests.test_Reaper.reaper(request)[source]
decisionengine.framework.dataspace.tests.test_Reaper.test_fail_bad_config(reaper, config)[source]
decisionengine.framework.dataspace.tests.test_Reaper.test_fail_missing_config(reaper, config)[source]
decisionengine.framework.dataspace.tests.test_Reaper.test_fail_missing_config_key(reaper, config)[source]
decisionengine.framework.dataspace.tests.test_Reaper.test_fail_small_retain(reaper)[source]
decisionengine.framework.dataspace.tests.test_Reaper.test_fail_small_run_interval(reaper)[source]
decisionengine.framework.dataspace.tests.test_Reaper.test_fail_start_two_reapers(reaper)[source]
decisionengine.framework.dataspace.tests.test_Reaper.test_fail_wrong_config_key(reaper, config)[source]
decisionengine.framework.dataspace.tests.test_Reaper.test_just_stop_no_error(reaper)[source]
decisionengine.framework.dataspace.tests.test_Reaper.test_loop_of_start_stop_in_clumps(reaper)[source]
decisionengine.framework.dataspace.tests.test_Reaper.test_reap_default_state(reaper)[source]
decisionengine.framework.dataspace.tests.test_Reaper.test_reaper_can_reap(reaper)[source]
decisionengine.framework.dataspace.tests.test_Reaper.test_source_fail_can_be_fixed(reaper)[source]
decisionengine.framework.dataspace.tests.test_Reaper.test_start_delay(reaper)[source]
decisionengine.framework.dataspace.tests.test_Reaper.test_start_stop(reaper)[source]
decisionengine.framework.dataspace.tests.test_Reaper.test_start_stop_stop(reaper)[source]
decisionengine.framework.dataspace.tests.test_Reaper.test_state_can_be_active(reaper)[source]
decisionengine.framework.dataspace.tests.test_Reaper.test_state_sets_timer_and_uses_it(reaper)[source]

decisionengine.framework.dataspace.tests.test_datablock module

decisionengine.framework.dataspace.tests.test_datablock.test_DataBlock_constructor(dataspace)[source]
decisionengine.framework.dataspace.tests.test_datablock.test_DataBlock_duplicate(dataspace)[source]
decisionengine.framework.dataspace.tests.test_datablock.test_DataBlock_get_dataproducts(dataspace)[source]
decisionengine.framework.dataspace.tests.test_datablock.test_DataBlock_get_header(dataspace)[source]
decisionengine.framework.dataspace.tests.test_datablock.test_DataBlock_get_metadata(dataspace)[source]
decisionengine.framework.dataspace.tests.test_datablock.test_DataBlock_get_taskmanager(dataspace)[source]
decisionengine.framework.dataspace.tests.test_datablock.test_DataBlock_is_expired(dataspace)[source]

This test just validates the method/function exists. The stub within our default code should be replaced by a class inheriting from it. That class should have more rational return types.

decisionengine.framework.dataspace.tests.test_datablock.test_DataBlock_is_expired_with_key(dataspace)[source]

This test just validates the method/function exists. The stub within our default code should be replaced by a class inheriting from it. That class should have more rational return types.

decisionengine.framework.dataspace.tests.test_datablock.test_DataBlock_key_management(dataspace)[source]
decisionengine.framework.dataspace.tests.test_datablock.test_DataBlock_key_management_change_name(dataspace)[source]
decisionengine.framework.dataspace.tests.test_datablock.test_DataBlock_mark_expired(dataspace)[source]
decisionengine.framework.dataspace.tests.test_datablock.test_DataBlock_no_key_by_name(dataspace)[source]
decisionengine.framework.dataspace.tests.test_datablock.test_DataBlock_to_str(dataspace)[source]
decisionengine.framework.dataspace.tests.test_datablock.test_Header_constructor(dataspace)[source]
decisionengine.framework.dataspace.tests.test_datablock.test_Header_is_valid(dataspace)[source]
decisionengine.framework.dataspace.tests.test_datablock.test_Metadata_constructor(dataspace)[source]
decisionengine.framework.dataspace.tests.test_datablock.test_Metadata_set_state(dataspace)[source]

decisionengine.framework.dataspace.tests.test_datablock_zlib module

decisionengine.framework.dataspace.tests.test_datablock_zlib.test_compress()[source]
decisionengine.framework.dataspace.tests.test_datablock_zlib.test_zdumps()[source]
decisionengine.framework.dataspace.tests.test_datablock_zlib.test_zloads()[source]

decisionengine.framework.dataspace.tests.test_datasource module

decisionengine.framework.dataspace.tests.test_datasource.test_has_methods_we_expect()[source]

decisionengine.framework.dataspace.tests.test_dataspace module

decisionengine.framework.dataspace.tests.test_dataspace.test_dataspace_config_finds_bad()[source]
decisionengine.framework.dataspace.tests.test_dataspace.test_delete(dataspace)[source]
decisionengine.framework.dataspace.tests.test_dataspace.test_duplicate_datablock(dataspace)[source]

Can we duplicate taskmanager1 and all its entries

decisionengine.framework.dataspace.tests.test_dataspace.test_get_datablock(dataspace)[source]

Can we get the datablock content

decisionengine.framework.dataspace.tests.test_dataspace.test_get_dataproduct(dataspace)[source]

Can we get the dataproduct by uuid with key

decisionengine.framework.dataspace.tests.test_dataspace.test_get_dataproduct_not_exist(dataspace)[source]

Does it error out if we ask for bogus information?

decisionengine.framework.dataspace.tests.test_dataspace.test_get_dataproducts(dataspace)[source]

Can we get the dataproducts by uuid and uuid with key

decisionengine.framework.dataspace.tests.test_dataspace.test_get_dataproducts_not_exist(dataspace)[source]

Does it error out if we ask for bogus information?

decisionengine.framework.dataspace.tests.test_dataspace.test_get_header(dataspace)[source]

Can we fetch a header?

decisionengine.framework.dataspace.tests.test_dataspace.test_get_header_not_exist(dataspace)[source]

Does it error out if we ask for a bogus header?

decisionengine.framework.dataspace.tests.test_dataspace.test_get_last_generation_id(dataspace)[source]

Can we get the last generation id by name or name and uuid

decisionengine.framework.dataspace.tests.test_dataspace.test_get_last_generation_id_not_exist(dataspace)[source]

Does it error out if we ask for a bogus taskmanager?

decisionengine.framework.dataspace.tests.test_dataspace.test_get_metadata(dataspace)[source]

Can we fetch a metadata element?

decisionengine.framework.dataspace.tests.test_dataspace.test_get_metadata_not_exist(dataspace)[source]

Does it error out if we ask for a bogus metadata element?

decisionengine.framework.dataspace.tests.test_dataspace.test_get_taskmanager_exists(dataspace)[source]

Can I get a taskmanager by name or name and uuid

decisionengine.framework.dataspace.tests.test_dataspace.test_get_taskmanager_not_exists(dataspace)[source]

This should error out

decisionengine.framework.dataspace.tests.test_dataspace.test_get_taskmanagers(dataspace)[source]

Can I get multimple task managers

decisionengine.framework.dataspace.tests.test_dataspace.test_get_taskmanagers_not_exist(dataspace)[source]

Do I error out when asking for garbage

decisionengine.framework.dataspace.tests.test_dataspace.test_has_config(dataspace)[source]

verify our config entry exists

decisionengine.framework.dataspace.tests.test_dataspace.test_insert(dataspace)[source]

Can we insert new elements

decisionengine.framework.dataspace.tests.test_dataspace.test_mark_expired(dataspace)[source]
decisionengine.framework.dataspace.tests.test_dataspace.test_store_taskmanager(dataspace)[source]

Can we make new entries

decisionengine.framework.dataspace.tests.test_dataspace.test_update(dataspace)[source]

Do updates work as expected

decisionengine.framework.dataspace.tests.test_dataspace.test_update_bad(dataspace)[source]

Do updates fail to work on bogus taskmanager as expected

Module contents