decisionengine.framework.dataspace.datasources.sqlalchemy_ds package

Submodules

decisionengine.framework.dataspace.datasources.sqlalchemy_ds.datasource_api module

The datasource layer for our abstraction

class decisionengine.framework.dataspace.datasources.sqlalchemy_ds.datasource_api.SQLAlchemyDS(config_dict)[source]

Bases: DataSource

A DecisionEngine data source via the SQL Alchemy ORM

{
    "dataspace": {
        "datasource": {
            "module": "decisionengine.framework.dataspace.datasources.sqlalchemy_ds",
            "name": "SQLAlchemyDS",
            "params": {
                "pool_size": 5,
                "max_overflow": 10,
                "timeout": 30,

                # url is mandatory, but any `engine` keyword is accepted here.
                "url": "dialect[+driver]://user:password@host/dbname"
            }
        }
    }
}

Exceptions should be caught and logged by the caller.

_abc_impl = <_abc._abc_data object>
close()[source]

Close all connections to the database

Returns:

None

connect()[source]

Create a pool of database connections

Returns:

None

create_tables()[source]

Create database tables

Returns:

None

delete_data_older_than(days)[source]

Delete data older that interval

Parameters:

days (int) – remove data older than this many days

Returns:

None

duplicate_datablock(taskmanager_id, generation_id, new_generation_id)[source]

For the given taskmanager_id, make a copy of the datablock with given generation_id, set the generation_id for the datablock copy

Parameters:
  • taskmanager_id (str/uuid) – id of taskmanager to retrieve

  • generation_id (int) – generation id to clone

  • new_generation_id (int) – generation id to create

Returns:

None

get_datablock(taskmanager_id, generation_id)[source]

Return the entire datablock from the dataproduct table for the given taskmanager_id, generation_id

Parameters:
  • taskmanager_id (str/uuid) – id of taskmanager to retrieve

  • generation_id (int) – generation id to locate

Returns:

with all set keys and their associated values

Return type:

dict

get_dataproduct(taskmanager_id, generation_id, key)[source]

Return the data from the dataproduct table for the given taskmanager_id, generation_id, key

Parameters:
  • taskmanager_id (str/uuid) – id of taskmanager to retrieve

  • generation_id (int) – generation id to locate

  • key (str) – key for the value

Returns:

The possibly binary value stored earlier

Return type:

obj

get_dataproducts(taskmanager_id, key=None)[source]

Return list of all data products associated with with taskmanager_id

Parameters:
  • taskmanager_id (str/uuid) – id of taskmanager to retrieve

  • key (str) – key for the value

Returns:

each element is the matching row as a dict()

Return type:

tuple

get_header(taskmanager_id, generation_id, key)[source]

Return the header from the header table for the given taskmanager_id, generation_id, key

Parameters:
  • taskmanager_id (str/uuid) – id of taskmanager to retrieve

  • generation_id (int) – generation id to locate

  • key (str) – key for the value

Returns:

fields in order are:

taskmanager.taskmanager_id, header.taskmanager_id, header.generation_id, header.key, header.create_time, header.expiration_time, header.scheduled_create_time, header.creator, header.schema_id

Return type:

tuple

get_last_generation_id(taskmanager_name, taskmanager_id=None)[source]

Return last generation id for current task manager or taskmanager w/ task_manager_id.

Parameters:
  • taskmanager_name (str) – name of taskmanager to retrieve

  • taskmanager_id (str/uuid) – id of taskmanager to retrieve

Returns:

the largest generation stored within the database

Return type:

int

get_metadata(taskmanager_id, generation_id, key)[source]

Return the metadata from the metadata table for the given taskmanager_id, generation_id, key

Parameters:
  • taskmanager_id (str/uuid) – id of taskmanager to retrieve

  • generation_id (int) – generation id to locate

  • key (str) – key for the value

Returns:

fields in order are:

taskmanager.taskmanager_id, metadata.taskmanager_id, metadata.generation_id, metadata.key, metadata.state, metadata.generation_time, metadata.missed_update_count

Return type:

tuple

get_schema(table=None)[source]

Given the table name return it’s schema

get_taskmanager(taskmanager_name, taskmanager_id=None)[source]

Find the task manager by name/uuid in the database get back the primary key.

If multiples match, find highest primary key.

Parameters:
  • taskmanager_name (str) – name of taskmanager to retrieve

  • taskmanager_id (str/uuid) – id of taskmanager to retrieve

Returns:

the matching row, column names as keys

Return type:

dict

get_taskmanagers(taskmanager_name=None, start_time=None, end_time=None)[source]

Find taskmanagers that meet our search

Parameters:
  • taskmanager_name (str) – name of taskmanager to retrieve

  • start_time (datetime) – Datetime to confine against

  • end_time (datetime) – Datetime to confine against

Returns:

each element is a dict() matching row, column names as keys

Return type:

list

insert(taskmanager_id, generation_id, key, value, header, metadata)[source]

Insert data into respective tables for the given taskmanager_id, generation_id, key

Parameters:
  • taskmanager_id (str/uuid) – id of taskmanager to retrieve

  • generation_id (int) – generation id to create

  • key (str) – key for the value

  • value (obj) – Value can be an object or dict or a binary

  • header (datablock.Header) – Header for the value

  • metadata (datablock.Metadata) – Metadata for the value

Returns:

None

reset_connections()[source]

Reset the connection to the database. So long as self.engine isn’t undef, the engine can still make new connections if new db actions happen. It just won’t have any open at this time.

Returns:

None

store_taskmanager(name, taskmanager_id, datestamp=None)[source]

Store TaskManager in database

Parameters:
  • name (str) – name of taskmanager to retrieve

  • taskmanager_id (str/uuid) – id of taskmanager to retrieve

  • datestamp (datetime) – datetime of created object, defaults to ‘now’

Returns:

the primary key of the row in the database

Return type:

int

update(taskmanager_id, generation_id, key, value, header, metadata)[source]

Update the data in respective tables for the given taskmanager_id, generation_id, key

Parameters:
  • taskmanager_id (str/uuid) – id of taskmanager to retrieve

  • generation_id (int) – generation id to update

  • key (str) – key for the value

  • value (obj) – Value can be an object or dict or a binary

  • header (datablock.Header) – Header for the value

  • metadata (datablock.Metadata) – Metadata for the value

Returns:

None

decisionengine.framework.dataspace.datasources.sqlalchemy_ds.db_schema module

The table layout and utilities for our SQLAlchemy ORM

class decisionengine.framework.dataspace.datasources.sqlalchemy_ds.db_schema.Base(**kwargs)

Bases: object

The base class of the class hierarchy.

When called, it accepts no arguments and returns a new featureless instance that has no instance attributes and cannot be given any.

_sa_registry = <sqlalchemy.orm.decl_api.registry object>
metadata = MetaData()
registry = <sqlalchemy.orm.decl_api.registry object>
class decisionengine.framework.dataspace.datasources.sqlalchemy_ds.db_schema.Dataproduct(**kwargs)[source]

Bases: Base

The PRIMARY KEY on this table isn’t used….

Existing code appears to depend on column order.

_sa_class_manager = {'generation_id': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'id': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'key': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'taskmanager': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'taskmanager_id': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'value': <sqlalchemy.orm.attributes.InstrumentedAttribute object>}
generation_id
id
key
taskmanager
taskmanager_id
value
class decisionengine.framework.dataspace.datasources.sqlalchemy_ds.db_schema.Header(**kwargs)[source]

Bases: Base

The PRIMARY KEY on this table isn’t used….

The existing code has a hard expectation on the time columns being BIGINT rather than datetime objects buried within the classes.

Looks like there was an initial goal of a relationship

with the Schema table, but it may not be in use

Existing code appears to depend on column order.

_sa_class_manager = {'create_time': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'creator': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'expiration_time': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'generation_id': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'id': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'key': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'scheduled_create_time': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'schema_id': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'taskmanager': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'taskmanager_id': <sqlalchemy.orm.attributes.InstrumentedAttribute object>}
create_time
creator
expiration_time
generation_id
id
key
scheduled_create_time
schema_id
taskmanager
taskmanager_id
class decisionengine.framework.dataspace.datasources.sqlalchemy_ds.db_schema.Metadata(**kwargs)[source]

Bases: Base

The PRIMARY KEY on this table isn’t used….

The existing code has a hard expectation on the state field as a ‘text’ element.

The existing code has a hard expectation on the time columns being BIGINT rather than datetime objects buried within the classes.

Existing code appears to depend on column order.

_sa_class_manager = {'generation_id': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'generation_time': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'id': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'key': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'missed_update_count': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'state': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'taskmanager': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'taskmanager_id': <sqlalchemy.orm.attributes.InstrumentedAttribute object>}
generation_id
generation_time
id
key
missed_update_count
state
taskmanager
taskmanager_id
class decisionengine.framework.dataspace.datasources.sqlalchemy_ds.db_schema.Schema(**kwargs)[source]

Bases: Base

This table may not be in use

Has a one-to-many relationship with:

Header - may not be in use

_sa_class_manager = {'schema': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'schema_id': <sqlalchemy.orm.attributes.InstrumentedAttribute object>}
schema
schema_id
class decisionengine.framework.dataspace.datasources.sqlalchemy_ds.db_schema.Taskmanager(**kwargs)[source]

Bases: Base

Has a one-to-many relationship with:

Header Metadata Dataproduct

changes cascade on:

Header Metadata Dataproduct

Existing code appears to depend on column order.

_sa_class_manager = {'datestamp': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'name': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'sequence_id': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'task_dataproduct': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'task_header': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'task_metadata': <sqlalchemy.orm.attributes.InstrumentedAttribute object>, 'taskmanager_id': <sqlalchemy.orm.attributes.InstrumentedAttribute object>}
datestamp
name
sequence_id
task_dataproduct
task_header
task_metadata
taskmanager_id

decisionengine.framework.dataspace.datasources.sqlalchemy_ds.utils module

Code not written by us

decisionengine.framework.dataspace.datasources.sqlalchemy_ds.utils.add_engine_pidguard(engine)[source]

Based on https://stackoverflow.com/questions/62920507/using-sqlalchemy-connection-pooling-queues-with-python-multiprocessing

decisionengine.framework.dataspace.datasources.sqlalchemy_ds.utils.clone_model(model, **kwargs)[source]

Based on https://stackoverflow.com/a/55991358

decisionengine.framework.dataspace.datasources.sqlalchemy_ds.utils.orm_as_dict(obj)[source]

Based on : https://stackoverflow.com/a/37350445

Module contents

Top level import so we can rationally segment items of the ORM

class decisionengine.framework.dataspace.datasources.sqlalchemy_ds.SQLAlchemyDS(config_dict)[source]

Bases: DataSource

A DecisionEngine data source via the SQL Alchemy ORM

{
    "dataspace": {
        "datasource": {
            "module": "decisionengine.framework.dataspace.datasources.sqlalchemy_ds",
            "name": "SQLAlchemyDS",
            "params": {
                "pool_size": 5,
                "max_overflow": 10,
                "timeout": 30,

                # url is mandatory, but any `engine` keyword is accepted here.
                "url": "dialect[+driver]://user:password@host/dbname"
            }
        }
    }
}

Exceptions should be caught and logged by the caller.

_abc_impl = <_abc._abc_data object>
close()[source]

Close all connections to the database

Returns:

None

connect()[source]

Create a pool of database connections

Returns:

None

create_tables()[source]

Create database tables

Returns:

None

delete_data_older_than(days)[source]

Delete data older that interval

Parameters:

days (int) – remove data older than this many days

Returns:

None

duplicate_datablock(taskmanager_id, generation_id, new_generation_id)[source]

For the given taskmanager_id, make a copy of the datablock with given generation_id, set the generation_id for the datablock copy

Parameters:
  • taskmanager_id (str/uuid) – id of taskmanager to retrieve

  • generation_id (int) – generation id to clone

  • new_generation_id (int) – generation id to create

Returns:

None

get_datablock(taskmanager_id, generation_id)[source]

Return the entire datablock from the dataproduct table for the given taskmanager_id, generation_id

Parameters:
  • taskmanager_id (str/uuid) – id of taskmanager to retrieve

  • generation_id (int) – generation id to locate

Returns:

with all set keys and their associated values

Return type:

dict

get_dataproduct(taskmanager_id, generation_id, key)[source]

Return the data from the dataproduct table for the given taskmanager_id, generation_id, key

Parameters:
  • taskmanager_id (str/uuid) – id of taskmanager to retrieve

  • generation_id (int) – generation id to locate

  • key (str) – key for the value

Returns:

The possibly binary value stored earlier

Return type:

obj

get_dataproducts(taskmanager_id, key=None)[source]

Return list of all data products associated with with taskmanager_id

Parameters:
  • taskmanager_id (str/uuid) – id of taskmanager to retrieve

  • key (str) – key for the value

Returns:

each element is the matching row as a dict()

Return type:

tuple

get_header(taskmanager_id, generation_id, key)[source]

Return the header from the header table for the given taskmanager_id, generation_id, key

Parameters:
  • taskmanager_id (str/uuid) – id of taskmanager to retrieve

  • generation_id (int) – generation id to locate

  • key (str) – key for the value

Returns:

fields in order are:

taskmanager.taskmanager_id, header.taskmanager_id, header.generation_id, header.key, header.create_time, header.expiration_time, header.scheduled_create_time, header.creator, header.schema_id

Return type:

tuple

get_last_generation_id(taskmanager_name, taskmanager_id=None)[source]

Return last generation id for current task manager or taskmanager w/ task_manager_id.

Parameters:
  • taskmanager_name (str) – name of taskmanager to retrieve

  • taskmanager_id (str/uuid) – id of taskmanager to retrieve

Returns:

the largest generation stored within the database

Return type:

int

get_metadata(taskmanager_id, generation_id, key)[source]

Return the metadata from the metadata table for the given taskmanager_id, generation_id, key

Parameters:
  • taskmanager_id (str/uuid) – id of taskmanager to retrieve

  • generation_id (int) – generation id to locate

  • key (str) – key for the value

Returns:

fields in order are:

taskmanager.taskmanager_id, metadata.taskmanager_id, metadata.generation_id, metadata.key, metadata.state, metadata.generation_time, metadata.missed_update_count

Return type:

tuple

get_schema(table=None)[source]

Given the table name return it’s schema

get_taskmanager(taskmanager_name, taskmanager_id=None)[source]

Find the task manager by name/uuid in the database get back the primary key.

If multiples match, find highest primary key.

Parameters:
  • taskmanager_name (str) – name of taskmanager to retrieve

  • taskmanager_id (str/uuid) – id of taskmanager to retrieve

Returns:

the matching row, column names as keys

Return type:

dict

get_taskmanagers(taskmanager_name=None, start_time=None, end_time=None)[source]

Find taskmanagers that meet our search

Parameters:
  • taskmanager_name (str) – name of taskmanager to retrieve

  • start_time (datetime) – Datetime to confine against

  • end_time (datetime) – Datetime to confine against

Returns:

each element is a dict() matching row, column names as keys

Return type:

list

insert(taskmanager_id, generation_id, key, value, header, metadata)[source]

Insert data into respective tables for the given taskmanager_id, generation_id, key

Parameters:
  • taskmanager_id (str/uuid) – id of taskmanager to retrieve

  • generation_id (int) – generation id to create

  • key (str) – key for the value

  • value (obj) – Value can be an object or dict or a binary

  • header (datablock.Header) – Header for the value

  • metadata (datablock.Metadata) – Metadata for the value

Returns:

None

reset_connections()[source]

Reset the connection to the database. So long as self.engine isn’t undef, the engine can still make new connections if new db actions happen. It just won’t have any open at this time.

Returns:

None

store_taskmanager(name, taskmanager_id, datestamp=None)[source]

Store TaskManager in database

Parameters:
  • name (str) – name of taskmanager to retrieve

  • taskmanager_id (str/uuid) – id of taskmanager to retrieve

  • datestamp (datetime) – datetime of created object, defaults to ‘now’

Returns:

the primary key of the row in the database

Return type:

int

update(taskmanager_id, generation_id, key, value, header, metadata)[source]

Update the data in respective tables for the given taskmanager_id, generation_id, key

Parameters:
  • taskmanager_id (str/uuid) – id of taskmanager to retrieve

  • generation_id (int) – generation id to update

  • key (str) – key for the value

  • value (obj) – Value can be an object or dict or a binary

  • header (datablock.Header) – Header for the value

  • metadata (datablock.Metadata) – Metadata for the value

Returns:

None