decisionengine.framework.dataspace package

Subpackages

Submodules

decisionengine.framework.dataspace.datablock module

class decisionengine.framework.dataspace.datablock.DataBlock(dataspace, name, taskmanager_id=None, generation_id=None, sequence_id=None)[source]

Bases: object

__insert(key, value, header, metadata)

Insert a new product into database with header and metadata

__update(key, value, header, metadata)

Update an existing product in the database with header and metadata

_setitem(key, value, header, metadata=None)[source]

put a product in the database with header and metadata

duplicate()[source]

Duplicate the datablock and return this new DataBlock. The intent is that at the point the duplication occurs there is only information from the sources in the DataBlock. This also increments the generation_id of this DataBlock.

TODO: Also update the header and the metadata information TODO: Make this threadsafe

Return type:

DataBlock

get(key, default=None)[source]

Return the value associated with the key in the database

Return type:

dict

get_dataproducts(key=None)[source]
get_header(key)[source]

Return the Header associated with the key in the database

Return type:

Header

get_metadata(key)[source]

Return the metadata associated with the key in the database

Return type:

Metadata

get_taskmanager(taskmanager_name, taskmanager_id=None)[source]

Retrieve TaskManager

Parameters:
  • taskmanager_name (str) – Name of the TaskManager

  • taskmanager_id (str, optional) – ID of the TaskManager to retrieve. Defaults to None.

Returns:

TaskManager information

Return type:

dict

The dictionary returned looks like :

{
    'datestamp': datetime.datetime(2017, 12, 20, 17, 37, 17, 503210,
                tzinfo=psycopg2.tz.FixedOffsetTimezone(offset=-360, name=None)),
    'sequence_id': 135L,
    'name': 'AWS_Calculations',
    'taskmanager_id': '77B16EB5-C79E-45B0-B1B1-37E846692E1D'
}
is_expired(key=None)[source]

Check if the dataproduct for a given key or any key is expired

keys()[source]
mark_expired(expiration_time)[source]

Set the expiration_time for the current generation of the dataproduct and mark it as expired if expiration_time <= current time

put(key, value, header, metadata=None)[source]

Put data into the DataBlock

store_taskmanager(taskmanager_name, taskmanager_id)[source]

Persist TaskManager, returns sequence number :type taskmanager_name: string :type taskmanager_id: :obj: string :rtype: int

class decisionengine.framework.dataspace.datablock.Header(taskmanager_id, create_time=None, expiration_time=None, scheduled_create_time=None, creator='module', schema_id=None)[source]

Bases: UserDict

_abc_impl = <_abc._abc_data object>
default_data_lifetime = 1800
is_valid()[source]

Check if the Header has minimum required information

required_keys = {'create_time', 'creator', 'expiration_time', 'scheduled_create_time', 'schema_id', 'taskmanager_id'}
exception decisionengine.framework.dataspace.datablock.InvalidMetadataError[source]

Bases: Exception

Errors due to invalid Metadata

class decisionengine.framework.dataspace.datablock.Metadata(taskmanager_id, state='NEW', generation_id=None, generation_time=None, missed_update_count=0)[source]

Bases: UserDict

_abc_impl = <_abc._abc_data object>
required_keys = {'generation_id', 'generation_time', 'missed_update_count', 'state', 'taskmanager_id'}
set_state(state)[source]

Set the state for the Metadata

valid_states = {'END_CYCLE', 'METADATA_UPDATE', 'NEW', 'START_BACKUP'}
class decisionengine.framework.dataspace.datablock.ProductRetriever(product_name, product_type, product_source)[source]

Bases: object

decisionengine.framework.dataspace.datablock.compress(obj)[source]

Compress python object :param obj: python object :return: compressed object

decisionengine.framework.dataspace.datablock.decompress(zbytes)[source]

Decompress zipped byte stream, convert to string. :param zbytes: byte stream :return: uncompressed string

decisionengine.framework.dataspace.datablock.zdumps(obj)[source]

Pickle and compress :param obj: a python object :return: compressed string

decisionengine.framework.dataspace.datablock.zloads(zbytes)[source]

Decompress and unpickle If input is not compressed attempts to just unpickle it

Parameters:

zbytes – compressed bytes

Returns:

returns python object

decisionengine.framework.dataspace.datasource module

class decisionengine.framework.dataspace.datasource.DataSource(config)[source]

Bases: object

_abc_impl = <_abc._abc_data object>
abstract close()[source]

Close all connections to the database

abstract connect()[source]

Create a pool of database connections

abstract create_tables()[source]

Create database tables

dataproduct_table = 'dataproduct'

Name of the dataproduct table

abstract delete_data_older_than(days)[source]

Delete data older that interval :type days: long :arg days: remove data older than interval

abstract duplicate_datablock(taskmanager_id, generation_id, new_generation_id)[source]

For the given taskmanager_id, make a copy of the datablock with given generation_id, set the generation_id for the datablock copy

Parameters:
  • taskmanager_id (string) – taskmanager_id for generation to be retrieved

  • generation_id (int) – generation_id of the data

  • new_generation_id (int) – generation_id of the new datablock created

abstract get_datablock(taskmanager_id, generation_id)[source]

Return the entire datablock from the dataproduct table for the given taskmanager_id, generation_id

Parameters:
  • taskmanager_id (string) – taskmanager_id for generation to be retrieved

  • generation_id (int) – generation_id of the data

abstract get_dataproduct(taskmanager_id, generation_id, key)[source]

Return the data from the dataproduct table for the given taskmanager_id, generation_id, key

Parameters:
  • taskmanager_id (string) – taskmanager_id for generation to be retrieved

  • generation_id (int) – generation_id of the data

  • key (string) – key for the value

abstract get_dataproducts(taskmanager_id, key)[source]

Return list of all data products associated with with taskmanager_id

Parameters:

key (string) – data product key

abstract get_header(taskmanager_id, generation_id, key)[source]

Return the header from the header table for the given taskmanager_id, generation_id, key

Parameters:
  • taskmanager_id (string) – taskmanager_id for generation to be retrieved

  • generation_id (int) – generation_id of the data

  • key (string) – key for the value

abstract get_last_generation_id(taskmanager_name, taskmanager_id=None)[source]

Return last generation id for current task manager or taskmanager w/ task_manager_id.

Parameters:
  • taskmanager_name (string) – task manager name

  • taskmanager_id (string) – task manager id

abstract get_metadata(taskmanager_id, generation_id, key)[source]

Return the metadata from the metadata table for the given taskmanager_id, generation_id, key

Parameters:
  • taskmanager_id (string) – taskmanager_id for generation to be retrieved

  • generation_id (int) – generation_id of the data

  • key (string) – key for the value

abstract get_schema(table=None)[source]

Given the table name return it’s schema

Parameters:

table (string) – Name of the table

abstract get_taskmanager(taskmanager_name, taskmanager_id)[source]

Retrieve TaskManager :type taskmanager_name: string :arg taskmanager_name: name of taskmanager to retrieve :type taskmanager_id: string :arg taskmanager_id: id of taskmanager to retrieve

abstract get_taskmanagers(taskmanager_name=None, start_time=None, end_time=None)[source]

Retrieve TaskManagers :type taskmanager_name: string :arg taskmanager_name: name of taskmanager to retrieve :type taskmanager_id: string :arg taskmanager_id: id of taskmanager to retrieve

header_table = 'header'

Name of the header table

abstract insert(taskmanager_id, generation_id, key, value, header, metadata)[source]

Insert data into respective tables for the given taskmanager_id, generation_id, key

Parameters:
  • taskmanager_id (string) – taskmanager_id for generation to be retrieved

  • generation_id (int) – generation_id of the data

  • key (string) – key for the value

  • value (object) – Value can be an object or dict

  • header (Header) – Header for the value

  • header – Metadata for the value

metadata_table = 'metadata'

Name of the metadata table

abstract reset_connections()[source]

Drop any cached connections and reconnect to the database

abstract store_taskmanager(taskmanager_name, taskmanager_id, datestamp=None)[source]

Store TaskManager :type taskmanager_name: string :arg taskmanager_name: name of taskmanager to retrieve :type taskmanager_id: string :arg taskmanager_id: id of taskmanager to retrieve :type datestamp: datetime :arg datestamp: datetime of created object, defaults to ‘now’

taskmanager_table = 'taskmanager'

Name of the taskmanager table

abstract update(taskmanager_id, generation_id, key, value, header, metadata)[source]

Update the data in respective tables for the given taskmanager_id, generation_id, key

Parameters:
  • taskmanager_id (string) – taskmanager_id for generation to be retrieved

  • generation_id (int) – generation_id of the data

  • key (string) – key for the value

  • value (object) – Value can be an object or dict

  • header (Header) – Header for the value

  • header – Metadata for the value

decisionengine.framework.dataspace.dataspace module

class decisionengine.framework.dataspace.dataspace.DataSpace(config)[source]

Bases: object

DataSpace class is collection of datablocks and provides interface to the database used to store the actual data

close()[source]
delete(taskmanager_id, all_generations=False)[source]
duplicate_datablock(taskmanager_id, generation_id, new_generation_id)[source]
get_datablock(taskmanager_id, generation_id)[source]
get_dataproduct(taskmanager_id, generation_id, key)[source]
get_dataproducts(taskmanager_id, key=None)[source]
get_header(taskmanager_id, generation_id, key)[source]
get_last_generation_id(taskmanager_name, taskmanager_id=None)[source]
get_metadata(taskmanager_id, generation_id, key)[source]
get_taskmanager(taskmanager_name, taskmanager_id=None)[source]
get_taskmanagers(taskmanager_name=None, start_time=None, end_time=None)[source]
insert(taskmanager_id, generation_id, key, value, header, metadata)[source]
mark_demented(taskmanager_id, keys, generation_id=None)[source]
mark_expired(taskmanager_id, generation_id, key, expiry_time)[source]
store_taskmanager(name, taskmanager_id, datestamp=None)[source]
update(taskmanager_id, generation_id, key, value, header, metadata)[source]
exception decisionengine.framework.dataspace.dataspace.DataSpaceConfigurationError[source]

Bases: Exception

Errors related to database access

exception decisionengine.framework.dataspace.dataspace.DataSpaceConnectionError[source]

Bases: Exception

Errors related to database access

exception decisionengine.framework.dataspace.dataspace.DataSpaceError[source]

Bases: Exception

Errors related to database access

exception decisionengine.framework.dataspace.dataspace.DataSpaceExistsError[source]

Bases: Exception

Errors related to database access

decisionengine.framework.dataspace.maintain module

class decisionengine.framework.dataspace.maintain.Reaper(config)[source]

Bases: object

Reaper provides functionality of periodic deletion of data older than retention_interval in days

The class attributes indicate a rational set of defaults that shouldn’t be altered by user configuration.

MIN_RETENTION_INTERVAL_DAYS = 7
MIN_SECONDS_BETWEEN_RUNS = 7080
_reaper_loop(delay)[source]

The thread actually runs this.

reap()[source]

Actually spawn the query to delete the old records. Lock the state as this task doesn’t have a cancel option.

property retention_interval

We have data constraints, so use a property to track

property seconds_between_runs

We have data constraints, so use a property to track

start(delay=0)[source]

Start thread with an optional delay to start the thread in X seconds

stop()[source]

Try to stop the reaper, will block if the reaper cannot be interrupted.

Module contents