decisionengine.framework.dataspace.datasources.sqlalchemy_ds package
Submodules
decisionengine.framework.dataspace.datasources.sqlalchemy_ds.datasource_api module
The datasource layer for our abstraction
- class decisionengine.framework.dataspace.datasources.sqlalchemy_ds.datasource_api.SQLAlchemyDS(config_dict)[source]
Bases:
DataSource
A DecisionEngine data source via the SQL Alchemy ORM
{ "dataspace": { "datasource": { "module": "decisionengine.framework.dataspace.datasources.sqlalchemy_ds", "name": "SQLAlchemyDS", "params": { "pool_size": 5, "max_overflow": 10, "timeout": 30, # url is mandatory, but any `engine` keyword is accepted here. "url": "dialect[+driver]://user:password@host/dbname" } } } }
Exceptions should be caught and logged by the caller.
- _abc_impl = <_abc._abc_data object>
- delete_data_older_than(days)[source]
Delete data older that interval
- Parameters:
days (int) – remove data older than this many days
- Returns:
None
- duplicate_datablock(taskmanager_id, generation_id, new_generation_id)[source]
For the given taskmanager_id, make a copy of the datablock with given generation_id, set the generation_id for the datablock copy
- Parameters:
taskmanager_id (str/uuid) – id of taskmanager to retrieve
generation_id (int) – generation id to clone
new_generation_id (int) – generation id to create
- Returns:
None
- get_datablock(taskmanager_id, generation_id)[source]
Return the entire datablock from the dataproduct table for the given taskmanager_id, generation_id
- Parameters:
taskmanager_id (str/uuid) – id of taskmanager to retrieve
generation_id (int) – generation id to locate
- Returns:
with all set keys and their associated values
- Return type:
dict
- get_dataproduct(taskmanager_id, generation_id, key)[source]
Return the data from the dataproduct table for the given taskmanager_id, generation_id, key
- Parameters:
taskmanager_id (str/uuid) – id of taskmanager to retrieve
generation_id (int) – generation id to locate
key (str) – key for the value
- Returns:
The possibly binary value stored earlier
- Return type:
obj
- get_dataproducts(taskmanager_id, key=None)[source]
Return list of all data products associated with with taskmanager_id
- Parameters:
taskmanager_id (str/uuid) – id of taskmanager to retrieve
key (str) – key for the value
- Returns:
each element is the matching row as a dict()
- Return type:
tuple
- get_header(taskmanager_id, generation_id, key)[source]
Return the header from the header table for the given taskmanager_id, generation_id, key
- Parameters:
taskmanager_id (str/uuid) – id of taskmanager to retrieve
generation_id (int) – generation id to locate
key (str) – key for the value
- Returns:
- fields in order are:
taskmanager.taskmanager_id, header.taskmanager_id, header.generation_id, header.key, header.create_time, header.expiration_time, header.scheduled_create_time, header.creator, header.schema_id
- Return type:
tuple
- get_last_generation_id(taskmanager_name, taskmanager_id=None)[source]
Return last generation id for current task manager or taskmanager w/ task_manager_id.
- Parameters:
taskmanager_name (str) – name of taskmanager to retrieve
taskmanager_id (str/uuid) – id of taskmanager to retrieve
- Returns:
the largest generation stored within the database
- Return type:
int
- get_metadata(taskmanager_id, generation_id, key)[source]
Return the metadata from the metadata table for the given taskmanager_id, generation_id, key
- Parameters:
taskmanager_id (str/uuid) – id of taskmanager to retrieve
generation_id (int) – generation id to locate
key (str) – key for the value
- Returns:
- fields in order are:
taskmanager.taskmanager_id, metadata.taskmanager_id, metadata.generation_id, metadata.key, metadata.state, metadata.generation_time, metadata.missed_update_count
- Return type:
tuple
- get_taskmanager(taskmanager_name, taskmanager_id=None)[source]
Find the task manager by name/uuid in the database get back the primary key.
If multiples match, find highest primary key.
- Parameters:
taskmanager_name (str) – name of taskmanager to retrieve
taskmanager_id (str/uuid) – id of taskmanager to retrieve
- Returns:
the matching row, column names as keys
- Return type:
dict
- get_taskmanagers(taskmanager_name=None, start_time=None, end_time=None)[source]
Find taskmanagers that meet our search
- Parameters:
taskmanager_name (str) – name of taskmanager to retrieve
start_time (datetime) – Datetime to confine against
end_time (datetime) – Datetime to confine against
- Returns:
each element is a dict() matching row, column names as keys
- Return type:
list
- insert(taskmanager_id, generation_id, key, value, header, metadata)[source]
Insert data into respective tables for the given taskmanager_id, generation_id, key
- Parameters:
taskmanager_id (str/uuid) – id of taskmanager to retrieve
generation_id (int) – generation id to create
key (str) – key for the value
value (obj) – Value can be an object or dict or a binary
header (datablock.Header) – Header for the value
metadata (datablock.Metadata) – Metadata for the value
- Returns:
None
- reset_connections()[source]
Reset the connection to the database. So long as self.engine isn’t undef, the engine can still make new connections if new db actions happen. It just won’t have any open at this time.
- Returns:
None
- store_taskmanager(name, taskmanager_id, datestamp=None)[source]
Store TaskManager in database
- Parameters:
name (str) – name of taskmanager to retrieve
taskmanager_id (str/uuid) – id of taskmanager to retrieve
datestamp (datetime) – datetime of created object, defaults to ‘now’
- Returns:
the primary key of the row in the database
- Return type:
int
- update(taskmanager_id, generation_id, key, value, header, metadata)[source]
Update the data in respective tables for the given taskmanager_id, generation_id, key
- Parameters:
taskmanager_id (str/uuid) – id of taskmanager to retrieve
generation_id (int) – generation id to update
key (str) – key for the value
value (obj) – Value can be an object or dict or a binary
header (datablock.Header) – Header for the value
metadata (datablock.Metadata) – Metadata for the value
- Returns:
None
decisionengine.framework.dataspace.datasources.sqlalchemy_ds.db_schema module
The table layout and utilities for our SQLAlchemy ORM
- class decisionengine.framework.dataspace.datasources.sqlalchemy_ds.db_schema.Base(**kwargs)
Bases:
object
The base class of the class hierarchy.
When called, it accepts no arguments and returns a new featureless instance that has no instance attributes and cannot be given any.
- _sa_registry = <sqlalchemy.orm.decl_api.registry object>
- metadata = MetaData()
- registry = <sqlalchemy.orm.decl_api.registry object>
- class decisionengine.framework.dataspace.datasources.sqlalchemy_ds.db_schema.Dataproduct(**kwargs)[source]
Bases:
Base
The PRIMARY KEY on this table isn’t used….
Existing code appears to depend on column order.
- _sa_class_manager = {'generation_id': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'id': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'key': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'taskmanager': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'taskmanager_id': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'value': <sqlalchemy.orm.attributes.InstrumentedAttribute object>}
- generation_id
- id
- key
- taskmanager
- taskmanager_id
- value
- class decisionengine.framework.dataspace.datasources.sqlalchemy_ds.db_schema.Header(**kwargs)[source]
Bases:
Base
The PRIMARY KEY on this table isn’t used….
The existing code has a hard expectation on the time columns being BIGINT rather than datetime objects buried within the classes.
- Looks like there was an initial goal of a relationship
with the Schema table, but it may not be in use
Existing code appears to depend on column order.
- _sa_class_manager = {'create_time': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'creator': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'expiration_time': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'generation_id': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'id': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'key': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'scheduled_create_time': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'schema_id': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'taskmanager': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'taskmanager_id': <sqlalchemy.orm.attributes.InstrumentedAttribute object>}
- create_time
- creator
- expiration_time
- generation_id
- id
- key
- scheduled_create_time
- schema_id
- taskmanager
- taskmanager_id
- class decisionengine.framework.dataspace.datasources.sqlalchemy_ds.db_schema.Metadata(**kwargs)[source]
Bases:
Base
The PRIMARY KEY on this table isn’t used….
The existing code has a hard expectation on the state field as a ‘text’ element.
The existing code has a hard expectation on the time columns being BIGINT rather than datetime objects buried within the classes.
Existing code appears to depend on column order.
- _sa_class_manager = {'generation_id': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'generation_time': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'id': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'key': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'missed_update_count': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'state': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'taskmanager': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'taskmanager_id': <sqlalchemy.orm.attributes.InstrumentedAttribute object>}
- generation_id
- generation_time
- id
- key
- missed_update_count
- state
- taskmanager
- taskmanager_id
- class decisionengine.framework.dataspace.datasources.sqlalchemy_ds.db_schema.Schema(**kwargs)[source]
Bases:
Base
This table may not be in use
- Has a one-to-many relationship with:
Header - may not be in use
- _sa_class_manager = {'schema': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'schema_id': <sqlalchemy.orm.attributes.InstrumentedAttribute object>}
- schema
- schema_id
- class decisionengine.framework.dataspace.datasources.sqlalchemy_ds.db_schema.Taskmanager(**kwargs)[source]
Bases:
Base
- Has a one-to-many relationship with:
Header Metadata Dataproduct
- changes cascade on:
Header Metadata Dataproduct
Existing code appears to depend on column order.
- _sa_class_manager = {'datestamp': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'name': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'sequence_id': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'task_dataproduct': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'task_header': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'task_metadata': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'taskmanager_id': <sqlalchemy.orm.attributes.InstrumentedAttribute object>}
- datestamp
- name
- sequence_id
- task_dataproduct
- task_header
- task_metadata
- taskmanager_id
decisionengine.framework.dataspace.datasources.sqlalchemy_ds.utils module
Code not written by us
- decisionengine.framework.dataspace.datasources.sqlalchemy_ds.utils.add_engine_pidguard(engine)[source]
- decisionengine.framework.dataspace.datasources.sqlalchemy_ds.utils.clone_model(model, **kwargs)[source]
Based on https://stackoverflow.com/a/55991358
- decisionengine.framework.dataspace.datasources.sqlalchemy_ds.utils.orm_as_dict(obj)[source]
Based on : https://stackoverflow.com/a/37350445
Module contents
Top level import so we can rationally segment items of the ORM
- class decisionengine.framework.dataspace.datasources.sqlalchemy_ds.SQLAlchemyDS(config_dict)[source]
Bases:
DataSource
A DecisionEngine data source via the SQL Alchemy ORM
{ "dataspace": { "datasource": { "module": "decisionengine.framework.dataspace.datasources.sqlalchemy_ds", "name": "SQLAlchemyDS", "params": { "pool_size": 5, "max_overflow": 10, "timeout": 30, # url is mandatory, but any `engine` keyword is accepted here. "url": "dialect[+driver]://user:password@host/dbname" } } } }
Exceptions should be caught and logged by the caller.
- _abc_impl = <_abc._abc_data object>
- delete_data_older_than(days)[source]
Delete data older that interval
- Parameters:
days (int) – remove data older than this many days
- Returns:
None
- duplicate_datablock(taskmanager_id, generation_id, new_generation_id)[source]
For the given taskmanager_id, make a copy of the datablock with given generation_id, set the generation_id for the datablock copy
- Parameters:
taskmanager_id (str/uuid) – id of taskmanager to retrieve
generation_id (int) – generation id to clone
new_generation_id (int) – generation id to create
- Returns:
None
- get_datablock(taskmanager_id, generation_id)[source]
Return the entire datablock from the dataproduct table for the given taskmanager_id, generation_id
- Parameters:
taskmanager_id (str/uuid) – id of taskmanager to retrieve
generation_id (int) – generation id to locate
- Returns:
with all set keys and their associated values
- Return type:
dict
- get_dataproduct(taskmanager_id, generation_id, key)[source]
Return the data from the dataproduct table for the given taskmanager_id, generation_id, key
- Parameters:
taskmanager_id (str/uuid) – id of taskmanager to retrieve
generation_id (int) – generation id to locate
key (str) – key for the value
- Returns:
The possibly binary value stored earlier
- Return type:
obj
- get_dataproducts(taskmanager_id, key=None)[source]
Return list of all data products associated with with taskmanager_id
- Parameters:
taskmanager_id (str/uuid) – id of taskmanager to retrieve
key (str) – key for the value
- Returns:
each element is the matching row as a dict()
- Return type:
tuple
- get_header(taskmanager_id, generation_id, key)[source]
Return the header from the header table for the given taskmanager_id, generation_id, key
- Parameters:
taskmanager_id (str/uuid) – id of taskmanager to retrieve
generation_id (int) – generation id to locate
key (str) – key for the value
- Returns:
- fields in order are:
taskmanager.taskmanager_id, header.taskmanager_id, header.generation_id, header.key, header.create_time, header.expiration_time, header.scheduled_create_time, header.creator, header.schema_id
- Return type:
tuple
- get_last_generation_id(taskmanager_name, taskmanager_id=None)[source]
Return last generation id for current task manager or taskmanager w/ task_manager_id.
- Parameters:
taskmanager_name (str) – name of taskmanager to retrieve
taskmanager_id (str/uuid) – id of taskmanager to retrieve
- Returns:
the largest generation stored within the database
- Return type:
int
- get_metadata(taskmanager_id, generation_id, key)[source]
Return the metadata from the metadata table for the given taskmanager_id, generation_id, key
- Parameters:
taskmanager_id (str/uuid) – id of taskmanager to retrieve
generation_id (int) – generation id to locate
key (str) – key for the value
- Returns:
- fields in order are:
taskmanager.taskmanager_id, metadata.taskmanager_id, metadata.generation_id, metadata.key, metadata.state, metadata.generation_time, metadata.missed_update_count
- Return type:
tuple
- get_taskmanager(taskmanager_name, taskmanager_id=None)[source]
Find the task manager by name/uuid in the database get back the primary key.
If multiples match, find highest primary key.
- Parameters:
taskmanager_name (str) – name of taskmanager to retrieve
taskmanager_id (str/uuid) – id of taskmanager to retrieve
- Returns:
the matching row, column names as keys
- Return type:
dict
- get_taskmanagers(taskmanager_name=None, start_time=None, end_time=None)[source]
Find taskmanagers that meet our search
- Parameters:
taskmanager_name (str) – name of taskmanager to retrieve
start_time (datetime) – Datetime to confine against
end_time (datetime) – Datetime to confine against
- Returns:
each element is a dict() matching row, column names as keys
- Return type:
list
- insert(taskmanager_id, generation_id, key, value, header, metadata)[source]
Insert data into respective tables for the given taskmanager_id, generation_id, key
- Parameters:
taskmanager_id (str/uuid) – id of taskmanager to retrieve
generation_id (int) – generation id to create
key (str) – key for the value
value (obj) – Value can be an object or dict or a binary
header (datablock.Header) – Header for the value
metadata (datablock.Metadata) – Metadata for the value
- Returns:
None
- reset_connections()[source]
Reset the connection to the database. So long as self.engine isn’t undef, the engine can still make new connections if new db actions happen. It just won’t have any open at this time.
- Returns:
None
- store_taskmanager(name, taskmanager_id, datestamp=None)[source]
Store TaskManager in database
- Parameters:
name (str) – name of taskmanager to retrieve
taskmanager_id (str/uuid) – id of taskmanager to retrieve
datestamp (datetime) – datetime of created object, defaults to ‘now’
- Returns:
the primary key of the row in the database
- Return type:
int
- update(taskmanager_id, generation_id, key, value, header, metadata)[source]
Update the data in respective tables for the given taskmanager_id, generation_id, key
- Parameters:
taskmanager_id (str/uuid) – id of taskmanager to retrieve
generation_id (int) – generation id to update
key (str) – key for the value
value (obj) – Value can be an object or dict or a binary
header (datablock.Header) – Header for the value
metadata (datablock.Metadata) – Metadata for the value
- Returns:
None