decisionengine.framework.taskmanager package
Submodules
decisionengine.framework.taskmanager.LatestMessages module
The LatestMessages class listens for messages from a set of queues and retains only the last unconsumed message from each queue.
The latest messages are consumed by calling the consume() instance method, which returns a dictionary whose key is the message routing key and whose value is the full message. If no messages are available, consume() returns an empty dictionary.
The LatestMessages class is intended to be used as a context manager (e.g.):
with LatestMessages(queues, broker_url) as messages:
while some_predicate():
msgs = messages.consume()
if not msgs:
continue
for routing_key, msg in msgs.items():
...
Upon exiting the the context, a LatestMessage object will no longer listen for any messages.
decisionengine.framework.taskmanager.ProcessingState module
The ProcessingState class can represent any of the following task-manager states:
BOOT IDLE ACTIVE STEADY OFFLINE SHUTTINGDOWN SHUTDOWN ERROR
In addition, the class supports ‘wait_until(state)’ and ‘wait_while(state)’ methods, which, when called from a different process, block until the state has been entered or exited, respectively.
The ‘RUNNING_CONDITIONS’ list is a list of states that a thread may have if it is started/starting. The ‘STOPPING_CONDITIONS’ list is a list of states that a thread may have if it is stopped/stopping. The ‘INACTIVE_CONDITIONS’ list is a list of states that a thread may have when it is not active
- class decisionengine.framework.taskmanager.ProcessingState.ProcessingState(state=State.BOOT)[source]
Bases:
object
This object tracks the state of a process.
A number of convenience wrappers are provided.
Additionally you may use the .lock attribute for with block to lock the state during specific operations.
- property lock
decisionengine.framework.taskmanager.PublisherStatus module
PublisherStatus
The status of each decision-engine publisher is captured by a PublisherStatus object. The status can be queried to determine if a given publisher is enabled, and when it was last enabled or disabled. To access this information from a datablock, one must specify the a consumes statement:
The API for each relevant class is given below.
- class decisionengine.framework.taskmanager.PublisherStatus.PublisherState(enabled, duration, since)[source]
Bases:
NamedTuple
- _asdict()
Return a new dict which maps field names to their values.
- _field_defaults = {}
- _fields = ('enabled', 'duration', 'since')
- classmethod _make(iterable)
Make a new PublisherState object from a sequence or iterable
- _replace(**kwds)
Return a new PublisherState object replacing specified fields with new values
- duration: timedelta
datetime.timedelta object representing duration between now and when publisher was last enabled/disabled
- enabled: bool
Boolean value indicating if publisher is enabled.
- since: datetime
datetime.datetime object representing when publisher was last enabled/disabled
- class decisionengine.framework.taskmanager.PublisherStatus.PublisherStatus(status_snapshot)[source]
Bases:
object
Proxy object that provides publisher-status information.
- is_enabled(publisher_name)[source]
- Parameters:
publisher_name (str) – The name of the configured publisher
- Returns:
If publisher is enabled or disabled
- Return type:
bool
- class decisionengine.framework.taskmanager.PublisherStatus.PublisherStatusBoard(publisher_names)[source]
Bases:
object
Publisher status board owned by each decision channel
The status board is not a user-facing entity; it is owned by each decision channel, which updates the status of each publisher after they have been run.
decisionengine.framework.taskmanager.SourceProductCache module
decisionengine.framework.taskmanager.TaskManager module
Task manager
- class decisionengine.framework.taskmanager.TaskManager.TaskManager(name, workers, dataspace, expected_products, exchange, broker_url, routing_keys)[source]
Bases:
object
Task manager
- data_block_put(data, header, data_block)[source]
Put data into data block
- Parameters:
data (
dict
) – key, value pairsheader (
Header
) – data headerdata_block (
DataBlock
) – data block
- run_logic_engine(data_block)[source]
Run Logic Engine.
- Parameters:
data_block (
DataBlock
) – data block
- run_publishers(actions, data_block)[source]
Run Publishers in main process.
- Parameters:
data_block (
DataBlock
) – data block
- run_transform(worker, data_block)[source]
Run a transform
- Parameters:
worker (
Worker
) – Transform workerdata_block (
DataBlock
) – data block
- run_transforms(data_block=None)[source]
Run transforms. So far in main process.
- Parameters:
data_block (
DataBlock
) – data block
decisionengine.framework.taskmanager.module_graph module
Ensure no circularities in produces and consumes.
- class decisionengine.framework.taskmanager.module_graph.Worker(key, conf_dict, base_class, channel_name)[source]
Bases:
object
Provides interface to loadable modules an events to synchronise execution
- decisionengine.framework.taskmanager.module_graph._create_module_instance(config_dict, base_class, channel_name)[source]
Create instance of dynamically loaded module
- decisionengine.framework.taskmanager.module_graph._find_only_one_subclass(module, base_class)[source]
Search through module looking for only one subclass of the supplied base_class
- decisionengine.framework.taskmanager.module_graph._make_workers_for(configs, base_class, channel_name)[source]
- decisionengine.framework.taskmanager.module_graph.channel_workers(channel_name, channel_config, logger)[source]