decisionengine.framework.dataspace package
Subpackages
- decisionengine.framework.dataspace.datasources package
- Subpackages
- Submodules
- decisionengine.framework.dataspace.datasources.null module
NullDataSource
NullDataSource._abc_impl
NullDataSource.close()
NullDataSource.connect()
NullDataSource.create_tables()
NullDataSource.delete_data_older_than()
NullDataSource.duplicate_datablock()
NullDataSource.get_datablock()
NullDataSource.get_dataproduct()
NullDataSource.get_dataproducts()
NullDataSource.get_header()
NullDataSource.get_last_generation_id()
NullDataSource.get_metadata()
NullDataSource.get_schema()
NullDataSource.get_taskmanager()
NullDataSource.get_taskmanagers()
NullDataSource.insert()
NullDataSource.reset_connections()
NullDataSource.store_taskmanager()
NullDataSource.update()
- Module contents
- decisionengine.framework.dataspace.tests package
- Submodules
- decisionengine.framework.dataspace.tests.fixtures module
- decisionengine.framework.dataspace.tests.test_Reaper module
config()
reaper()
test_fail_bad_config()
test_fail_missing_config()
test_fail_missing_config_key()
test_fail_small_retain()
test_fail_small_run_interval()
test_fail_start_two_reapers()
test_fail_wrong_config_key()
test_just_stop_no_error()
test_loop_of_start_stop_in_clumps()
test_reap_default_state()
test_reaper_can_reap()
test_source_fail_can_be_fixed()
test_start_delay()
test_start_stop()
test_start_stop_stop()
test_state_can_be_active()
test_state_sets_timer_and_uses_it()
- decisionengine.framework.dataspace.tests.test_datablock module
test_DataBlock_constructor()
test_DataBlock_duplicate()
test_DataBlock_get_dataproducts()
test_DataBlock_get_header()
test_DataBlock_get_metadata()
test_DataBlock_get_taskmanager()
test_DataBlock_is_expired()
test_DataBlock_is_expired_with_key()
test_DataBlock_key_management()
test_DataBlock_key_management_change_name()
test_DataBlock_mark_expired()
test_DataBlock_no_key_by_name()
test_DataBlock_to_str()
test_Header_constructor()
test_Header_is_valid()
test_Metadata_constructor()
test_Metadata_set_state()
- decisionengine.framework.dataspace.tests.test_datablock_zlib module
- decisionengine.framework.dataspace.tests.test_datasource module
- decisionengine.framework.dataspace.tests.test_dataspace module
test_dataspace_config_finds_bad()
test_delete()
test_duplicate_datablock()
test_get_datablock()
test_get_dataproduct()
test_get_dataproduct_not_exist()
test_get_dataproducts()
test_get_dataproducts_not_exist()
test_get_header()
test_get_header_not_exist()
test_get_last_generation_id()
test_get_last_generation_id_not_exist()
test_get_metadata()
test_get_metadata_not_exist()
test_get_taskmanager_exists()
test_get_taskmanager_not_exists()
test_get_taskmanagers()
test_get_taskmanagers_not_exist()
test_has_config()
test_insert()
test_mark_expired()
test_store_taskmanager()
test_update()
test_update_bad()
- Module contents
Submodules
decisionengine.framework.dataspace.datablock module
- class decisionengine.framework.dataspace.datablock.DataBlock(dataspace, name, taskmanager_id=None, generation_id=None, sequence_id=None)[source]
Bases:
object
- __insert(key, value, header, metadata)
Insert a new product into database with header and metadata
- __update(key, value, header, metadata)
Update an existing product in the database with header and metadata
- _setitem(key, value, header, metadata=None)[source]
put a product in the database with header and metadata
- duplicate()[source]
Duplicate the datablock and return this new DataBlock. The intent is that at the point the duplication occurs there is only information from the sources in the DataBlock. This also increments the generation_id of this DataBlock.
TODO: Also update the header and the metadata information TODO: Make this threadsafe
- Return type:
- get(key, default=None)[source]
Return the value associated with the key in the database
- Return type:
dict
- get_taskmanager(taskmanager_name, taskmanager_id=None)[source]
Retrieve TaskManager
- Parameters:
taskmanager_name (str) – Name of the TaskManager
taskmanager_id (str, optional) – ID of the TaskManager to retrieve. Defaults to None.
- Returns:
TaskManager information
- Return type:
dict
The dictionary returned looks like :
{ 'datestamp': datetime.datetime(2017, 12, 20, 17, 37, 17, 503210, tzinfo=psycopg2.tz.FixedOffsetTimezone(offset=-360, name=None)), 'sequence_id': 135L, 'name': 'AWS_Calculations', 'taskmanager_id': '77B16EB5-C79E-45B0-B1B1-37E846692E1D' }
- class decisionengine.framework.dataspace.datablock.Header(taskmanager_id, create_time=None, expiration_time=None, scheduled_create_time=None, creator='module', schema_id=None)[source]
Bases:
UserDict
- _abc_impl = <_abc._abc_data object>
- default_data_lifetime = 1800
- required_keys = {'create_time', 'creator', 'expiration_time', 'scheduled_create_time', 'schema_id', 'taskmanager_id'}
- exception decisionengine.framework.dataspace.datablock.InvalidMetadataError[source]
Bases:
Exception
Errors due to invalid Metadata
- class decisionengine.framework.dataspace.datablock.Metadata(taskmanager_id, state='NEW', generation_id=None, generation_time=None, missed_update_count=0)[source]
Bases:
UserDict
- _abc_impl = <_abc._abc_data object>
- required_keys = {'generation_id', 'generation_time', 'missed_update_count', 'state', 'taskmanager_id'}
- valid_states = {'END_CYCLE', 'METADATA_UPDATE', 'NEW', 'START_BACKUP'}
- class decisionengine.framework.dataspace.datablock.ProductRetriever(product_name, product_type, product_source)[source]
Bases:
object
- decisionengine.framework.dataspace.datablock.compress(obj)[source]
Compress python object :param obj: python object :return: compressed object
- decisionengine.framework.dataspace.datablock.decompress(zbytes)[source]
Decompress zipped byte stream, convert to string. :param zbytes: byte stream :return: uncompressed string
decisionengine.framework.dataspace.datasource module
- class decisionengine.framework.dataspace.datasource.DataSource(config)[source]
Bases:
object
- _abc_impl = <_abc._abc_data object>
- dataproduct_table = 'dataproduct'
Name of the dataproduct table
- abstract delete_data_older_than(days)[source]
Delete data older that interval :type days:
long
:arg days: remove data older than interval
- abstract duplicate_datablock(taskmanager_id, generation_id, new_generation_id)[source]
For the given taskmanager_id, make a copy of the datablock with given generation_id, set the generation_id for the datablock copy
- Parameters:
taskmanager_id (
string
) – taskmanager_id for generation to be retrievedgeneration_id (
int
) – generation_id of the datanew_generation_id (
int
) – generation_id of the new datablock created
- abstract get_datablock(taskmanager_id, generation_id)[source]
Return the entire datablock from the dataproduct table for the given taskmanager_id, generation_id
- Parameters:
taskmanager_id (
string
) – taskmanager_id for generation to be retrievedgeneration_id (
int
) – generation_id of the data
- abstract get_dataproduct(taskmanager_id, generation_id, key)[source]
Return the data from the dataproduct table for the given taskmanager_id, generation_id, key
- Parameters:
taskmanager_id (
string
) – taskmanager_id for generation to be retrievedgeneration_id (
int
) – generation_id of the datakey (
string
) – key for the value
- abstract get_dataproducts(taskmanager_id, key)[source]
Return list of all data products associated with with taskmanager_id
- Parameters:
key (
string
) – data product key
- abstract get_header(taskmanager_id, generation_id, key)[source]
Return the header from the header table for the given taskmanager_id, generation_id, key
- Parameters:
taskmanager_id (
string
) – taskmanager_id for generation to be retrievedgeneration_id (
int
) – generation_id of the datakey (
string
) – key for the value
- abstract get_last_generation_id(taskmanager_name, taskmanager_id=None)[source]
Return last generation id for current task manager or taskmanager w/ task_manager_id.
- Parameters:
taskmanager_name (
string
) – task manager nametaskmanager_id (
string
) – task manager id
- abstract get_metadata(taskmanager_id, generation_id, key)[source]
Return the metadata from the metadata table for the given taskmanager_id, generation_id, key
- Parameters:
taskmanager_id (
string
) – taskmanager_id for generation to be retrievedgeneration_id (
int
) – generation_id of the datakey (
string
) – key for the value
- abstract get_schema(table=None)[source]
Given the table name return it’s schema
- Parameters:
table (
string
) – Name of the table
- abstract get_taskmanager(taskmanager_name, taskmanager_id)[source]
Retrieve TaskManager :type taskmanager_name:
string
:arg taskmanager_name: name of taskmanager to retrieve :type taskmanager_id:string
:arg taskmanager_id: id of taskmanager to retrieve
- abstract get_taskmanagers(taskmanager_name=None, start_time=None, end_time=None)[source]
Retrieve TaskManagers :type taskmanager_name:
string
:arg taskmanager_name: name of taskmanager to retrieve :type taskmanager_id:string
:arg taskmanager_id: id of taskmanager to retrieve
- header_table = 'header'
Name of the header table
- abstract insert(taskmanager_id, generation_id, key, value, header, metadata)[source]
Insert data into respective tables for the given taskmanager_id, generation_id, key
- Parameters:
taskmanager_id (
string
) – taskmanager_id for generation to be retrievedgeneration_id (
int
) – generation_id of the datakey (
string
) – key for the valuevalue (
object
) – Value can be an object or dictheader (
Header
) – Header for the valueheader – Metadata for the value
- metadata_table = 'metadata'
Name of the metadata table
- abstract store_taskmanager(taskmanager_name, taskmanager_id, datestamp=None)[source]
Store TaskManager :type taskmanager_name:
string
:arg taskmanager_name: name of taskmanager to retrieve :type taskmanager_id:string
:arg taskmanager_id: id of taskmanager to retrieve :type datestamp:datetime
:arg datestamp: datetime of created object, defaults to ‘now’
- taskmanager_table = 'taskmanager'
Name of the taskmanager table
- abstract update(taskmanager_id, generation_id, key, value, header, metadata)[source]
Update the data in respective tables for the given taskmanager_id, generation_id, key
- Parameters:
taskmanager_id (
string
) – taskmanager_id for generation to be retrievedgeneration_id (
int
) – generation_id of the datakey (
string
) – key for the valuevalue (
object
) – Value can be an object or dictheader (
Header
) – Header for the valueheader – Metadata for the value
decisionengine.framework.dataspace.dataspace module
- class decisionengine.framework.dataspace.dataspace.DataSpace(config)[source]
Bases:
object
DataSpace class is collection of datablocks and provides interface to the database used to store the actual data
- exception decisionengine.framework.dataspace.dataspace.DataSpaceConfigurationError[source]
Bases:
Exception
Errors related to database access
- exception decisionengine.framework.dataspace.dataspace.DataSpaceConnectionError[source]
Bases:
Exception
Errors related to database access
decisionengine.framework.dataspace.maintain module
- class decisionengine.framework.dataspace.maintain.Reaper(config)[source]
Bases:
object
Reaper provides functionality of periodic deletion of data older than retention_interval in days
The class attributes indicate a rational set of defaults that shouldn’t be altered by user configuration.
- MIN_RETENTION_INTERVAL_DAYS = 7
- MIN_SECONDS_BETWEEN_RUNS = 7080
- reap()[source]
Actually spawn the query to delete the old records. Lock the state as this task doesn’t have a cancel option.
- property retention_interval
We have data constraints, so use a property to track
- property seconds_between_runs
We have data constraints, so use a property to track