decisionengine.framework.taskmanager package

Submodules

decisionengine.framework.taskmanager.LatestMessages module

The LatestMessages class listens for messages from a set of queues and retains only the last unconsumed message from each queue.

The latest messages are consumed by calling the consume() instance method, which returns a dictionary whose key is the message routing key and whose value is the full message. If no messages are available, consume() returns an empty dictionary.

The LatestMessages class is intended to be used as a context manager (e.g.):

with LatestMessages(queues, broker_url) as messages:
    while some_predicate():
        msgs = messages.consume()
        if not msgs:
            continue

        for routing_key, msg in msgs.items():
            ...

Upon exiting the the context, a LatestMessage object will no longer listen for any messages.

class decisionengine.framework.taskmanager.LatestMessages.LatestMessages(queues, broker_url)[source]

Bases: object

_listen()[source]
consume()[source]

Return dictionary of latest messages, keyed by message routing key.

decisionengine.framework.taskmanager.ProcessingState module

The ProcessingState class can represent any of the following task-manager states:

BOOT IDLE ACTIVE STEADY OFFLINE SHUTTINGDOWN SHUTDOWN ERROR

In addition, the class supports ‘wait_until(state)’ and ‘wait_while(state)’ methods, which, when called from a different process, block until the state has been entered or exited, respectively.

The ‘RUNNING_CONDITIONS’ list is a list of states that a thread may have if it is started/starting. The ‘STOPPING_CONDITIONS’ list is a list of states that a thread may have if it is stopped/stopping. The ‘INACTIVE_CONDITIONS’ list is a list of states that a thread may have when it is not active

class decisionengine.framework.taskmanager.ProcessingState.ProcessingState(state=State.BOOT)[source]

Bases: object

This object tracks the state of a process.

A number of convenience wrappers are provided.

Additionally you may use the .lock attribute for with block to lock the state during specific operations.

get()[source]

This function is a minimally locking check to fetch the state.

get_state_value()[source]
has_value(state)[source]
inactive()[source]
property lock
probably_running()[source]
set(state)[source]

This function will lock (and possibly block) to ensure a consistent change to the state value.

This function can be blocked using the .lock to force state sync between threads if need be.

should_stop()[source]
wait_until(state, timeout=None)[source]
wait_while(state, timeout=None)[source]
class decisionengine.framework.taskmanager.ProcessingState.State(value)[source]

Bases: Enum

An enumeration.

ACTIVE = 2
BOOT = 0
ERROR = 7
IDLE = 1
OFFLINE = 6
SHUTDOWN = 5
SHUTTINGDOWN = 4
STEADY = 3

decisionengine.framework.taskmanager.PublisherStatus module

PublisherStatus

The status of each decision-engine publisher is captured by a PublisherStatus object. The status can be queried to determine if a given publisher is enabled, and when it was last enabled or disabled. To access this information from a datablock, one must specify the a consumes statement:

The API for each relevant class is given below.

class decisionengine.framework.taskmanager.PublisherStatus.PublisherState(enabled, duration, since)[source]

Bases: NamedTuple

_asdict()

Return a new dict which maps field names to their values.

_field_defaults = {}
_fields = ('enabled', 'duration', 'since')
classmethod _make(iterable)

Make a new PublisherState object from a sequence or iterable

_replace(**kwds)

Return a new PublisherState object replacing specified fields with new values

duration: timedelta

datetime.timedelta object representing duration between now and when publisher was last enabled/disabled

enabled: bool

Boolean value indicating if publisher is enabled.

since: datetime

datetime.datetime object representing when publisher was last enabled/disabled

class decisionengine.framework.taskmanager.PublisherStatus.PublisherStatus(status_snapshot)[source]

Bases: object

Proxy object that provides publisher-status information.

is_enabled(publisher_name)[source]
Parameters:

publisher_name (str) – The name of the configured publisher

Returns:

If publisher is enabled or disabled

Return type:

bool

state(publisher_name)[source]
Parameters:

publisher_name (str) – The name of the configured publisher

Returns:

Full state of publisher

Return type:

PublisherState

class decisionengine.framework.taskmanager.PublisherStatus.PublisherStatusBoard(publisher_names)[source]

Bases: object

Publisher status board owned by each decision channel

The status board is not a user-facing entity; it is owned by each decision channel, which updates the status of each publisher after they have been run.

snapshot()[source]
Returns:

An publisher-status object corresponding to now

Return type:

PublisherStatus

update(publisher_name, result_of_publish)[source]
Parameters:
  • publisher_name (str) – The name of the configured publisher

  • result_of_publish (bool) – Whether the last execution of the publisher was successsul

decisionengine.framework.taskmanager.SourceProductCache module

class decisionengine.framework.taskmanager.SourceProductCache.SourceProductCache(expected_products, logger)[source]

Bases: object

update(new_data)[source]

decisionengine.framework.taskmanager.TaskManager module

Task manager

class decisionengine.framework.taskmanager.TaskManager.TaskManager(name, workers, dataspace, expected_products, exchange, broker_url, routing_keys)[source]

Bases: object

Task manager

data_block_put(data, header, data_block)[source]

Put data into data block

Parameters:
  • data (dict) – key, value pairs

  • header (Header) – data header

  • data_block (DataBlock) – data block

decision_cycle()[source]

Decision cycle to be run periodically (by trigger)

get_consumes()[source]
get_loglevel()[source]
get_produces()[source]
get_state()[source]
get_state_name()[source]
get_state_value()[source]
run_cycle(messages)[source]
run_cycles()[source]

Task manager main loop

run_logic_engine(data_block)[source]

Run Logic Engine.

Parameters:

data_block (DataBlock) – data block

run_publishers(actions, data_block)[source]

Run Publishers in main process.

Parameters:

data_block (DataBlock) – data block

run_transform(worker, data_block)[source]

Run a transform

Parameters:
  • worker (Worker) – Transform worker

  • data_block (DataBlock) – data block

run_transforms(data_block=None)[source]

Run transforms. So far in main process.

Parameters:

data_block (DataBlock) – data block

set_loglevel_value(log_level)[source]

Assumes log_level is a string corresponding to the supported logging-module levels.

take_offline()[source]

Adjust status to stop the decision cycles and bring the task manager offline

decisionengine.framework.taskmanager.module_graph module

Ensure no circularities in produces and consumes.

class decisionengine.framework.taskmanager.module_graph.Worker(key, conf_dict, base_class, channel_name)[source]

Bases: object

Provides interface to loadable modules an events to synchronise execution

decisionengine.framework.taskmanager.module_graph._consumed_products(*worker_lists)[source]
decisionengine.framework.taskmanager.module_graph._create_module_instance(config_dict, base_class, channel_name)[source]

Create instance of dynamically loaded module

decisionengine.framework.taskmanager.module_graph._find_only_one_subclass(module, base_class)[source]

Search through module looking for only one subclass of the supplied base_class

decisionengine.framework.taskmanager.module_graph._make_workers_for(configs, base_class, channel_name)[source]
decisionengine.framework.taskmanager.module_graph._produced_products(*worker_lists)[source]
decisionengine.framework.taskmanager.module_graph.channel_workers(channel_name, channel_config, logger)[source]
decisionengine.framework.taskmanager.module_graph.ensure_no_circularities(sources, transforms, publishers)[source]

Ensures no circularities among data products.

decisionengine.framework.taskmanager.module_graph.source_products(source_workers)[source]
decisionengine.framework.taskmanager.module_graph.validated_workflow(channel_name, sources, channel_config, logger=<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>)[source]

Module contents