decisionengine.framework.engine package

Subpackages

Submodules

decisionengine.framework.engine.ChannelWorkers module

class decisionengine.framework.engine.ChannelWorkers.ChannelWorker(task_manager, logger_config)[source]

Bases: Process

Class that encapsulates a channel’s task manager as a separate process.

This class’ run function is called whenever the process is started. If the process is abruptly terminated–e.g. the run method is preempted by a signal or an os._exit(n) call–the ChannelWorker object will still exist even if the operating-system process no longer does.

To determine the exit code of this process, use the ChannelWorker.exitcode value, provided by the multiprocessing.Process base class.

get_consumes()[source]
get_produces()[source]
get_state_name()[source]
run()[source]

Method to be run in sub-process; can be overridden in sub-class

setup_logger()[source]
wait_while(state, timeout=None)[source]
class decisionengine.framework.engine.ChannelWorkers.ChannelWorkers[source]

Bases: object

This class manages and provides access to the task-manager workers.

The intention is that the decision engine never directly interacts with the workers but refers to them via a context manager:

with workers.access() as ws:

# Access to ws now protected ws[‘new_channel’] = ChannelWorker(…)

In cases where the decision engine’s block_while method must be called (e.g. during tests), one should use unguarded access:

ws = workers.get_unguarded() # Access to ws is unprotected ws[‘new_channel’].wait_while(…)

Calling a blocking method while using the protected context manager (i.e. workers.access()) will likely result in a deadlock.

class Access(workers, lock)[source]

Bases: object

access()[source]
accessed_by_another_thread()[source]
get_unguarded()[source]

decisionengine.framework.engine.ClientMessageReceiver module

class decisionengine.framework.engine.ClientMessageReceiver.ClientMessageReceiver(exchange_name, exchange_type, broker_url, routing_key_suffix, logger_name)[source]

Bases: object

_receive(body, message)[source]
execute(func, *args)[source]

decisionengine.framework.engine.DecisionEngine module

Main loop for Decision Engine. The following environment variable points to decision engine configuration file: DECISION_ENGINE_CONFIG_FILE if this environment variable is not defined the DE-Config.py file from the ``../tests/etc/` directory will be used.

class decisionengine.framework.engine.DecisionEngine.DecisionEngine(global_config, channel_config_loader, server_address)[source]

Bases: ThreadingMixIn, SimpleXMLRPCServer

_dataframe_to_column_names(df)[source]
_dataframe_to_csv(df)[source]
_dataframe_to_json(df)[source]
_dataframe_to_table(df)[source]
_dataframe_to_vertical_tables(df)[source]
_dispatch(method, params)[source]

Dispatches the XML-RPC method.

XML-RPC calls are forwarded to a registered function that matches the called XML-RPC method name. If no such function exists then the call is forwarded to the registered instance, if available.

If the registered instance has a _dispatch method then that method will be called with the name of the XML-RPC method and its parameters as a tuple e.g. instance._dispatch(‘add’,(2,3))

If the registered instance does not have a _dispatch method then the instance will be searched to find a matching method and, if found, will be called.

Methods beginning with an ‘_’ are considered private and will not be called.

block_while(state, timeout=None)[source]
create_channel(channel_name, channel_config)[source]
get_logger()[source]
metrics()[source]
reaper_start(delay)[source]
reaper_status()[source]
reaper_stop()[source]
rm_channel(channel, maybe_timeout)[source]
rpc_block_while(client_queue, state_str, timeout=None)[source]
rpc_get_channel_log_level(client_queue, channel)[source]
rpc_get_log_level(client_queue)[source]
rpc_get_source_log_level(client_queue, source)[source]
rpc_kill_channel(client_queue, channel, timeout=None)[source]
rpc_metrics(client_queue)[source]

Display collected metrics

rpc_ping(client_queue)[source]
rpc_print_product(client_queue, product, columns=None, query=None, types=False, format=None)[source]
rpc_print_products(client_queue)[source]
rpc_product_dependencies(client_queue)[source]
rpc_query_tool(client_queue, product, format=None, start_time=None)[source]
rpc_queue_status(client_queue)[source]
rpc_reaper_start(client_queue, delay=0)[source]

Start the reaper process after ‘delay’ seconds. Default 0 seconds delay. :type delay: int

rpc_reaper_status(client_queue)[source]
rpc_reaper_stop(client_queue)[source]
rpc_rm_channel(client_queue, channel, maybe_timeout)[source]
rpc_set_channel_log_level(client_queue, channel, log_level)[source]

Assumes log_level is a string corresponding to the supported logging-module levels.

rpc_set_source_log_level(client_queue, source, log_level)[source]

Assumes log_level is a string corresponding to the supported logging-module levels.

rpc_show_config(client_queue, channel)[source]

Show the configuration for a channel.

rpc_show_de_config(client_queue)[source]
rpc_start_channel(client_queue, channel_name)[source]
rpc_start_channels(client_queue)[source]
rpc_status(client_queue)[source]
rpc_stop(client_queue=None)[source]
rpc_stop_channel(client_queue, channel)[source]
rpc_stop_channels(client_queue)[source]
service_actions()[source]

Called by the serve_forever() loop.

May be overridden by a subclass / Mixin to implement any code that needs to be run during the loop.

start_channel(channel_name, src_workers)[source]
start_channels()[source]
start_webserver()[source]

Start CherryPy webserver using configured port. If port is not configured use default webserver port.

stop_channels()[source]
stop_worker(worker, timeout)[source]
class decisionengine.framework.engine.DecisionEngine.RequestHandler(request, client_address, server)[source]

Bases: SimpleXMLRPCRequestHandler

rpc_paths = ('/RPC2',)
class decisionengine.framework.engine.DecisionEngine.StopState(value)[source]

Bases: Enum

An enumeration.

Clean = 2
NotFound = 1
Terminated = 3
decisionengine.framework.engine.DecisionEngine._channel_preamble(name)[source]
decisionengine.framework.engine.DecisionEngine._check_metrics_env(options)[source]
decisionengine.framework.engine.DecisionEngine._create_de_server(global_config, channel_config_loader)[source]

Create the DE server with the passed global configuration and config manager

decisionengine.framework.engine.DecisionEngine._get_de_conf_manager(global_config_dir, channel_config_dir, options)[source]
decisionengine.framework.engine.DecisionEngine._get_global_config(config_file, options)[source]
decisionengine.framework.engine.DecisionEngine._initial_start_channels(server)[source]
decisionengine.framework.engine.DecisionEngine._queue_name_and_type(redis_obj, redis_member)[source]
decisionengine.framework.engine.DecisionEngine._requests_in_flight(redis_obj, exchange_name)[source]
decisionengine.framework.engine.DecisionEngine._start_de_server(server)[source]

Start the DE server and listen forever

decisionengine.framework.engine.DecisionEngine._verify_redis_server(broker_url)[source]
decisionengine.framework.engine.DecisionEngine._verify_redis_url(broker_url)[source]
decisionengine.framework.engine.DecisionEngine.main(args=None)[source]

If args is None, sys.argv will be used instead If args is a list, it will be used instead of sys.argv (for unit testing)

decisionengine.framework.engine.DecisionEngine.parse_program_options(args=None)[source]

If args is a list, it will be used instead of sys.argv

decisionengine.framework.engine.SourceWorkers module

class decisionengine.framework.engine.SourceWorkers.SourceWorker(key, config, logger_config, channel_name, exchange, broker_url)[source]

Bases: Process

Provides interface to loadable modules an events to synchronize execution

get_loglevel()[source]
run()[source]

Get the data from source

set_loglevel_value(log_level)[source]

Assumes log_level is a string corresponding to the supported logging-module levels.

setup_logger()[source]
take_offline()[source]
class decisionengine.framework.engine.SourceWorkers.SourceWorkers(exchange, broker_url, logger=<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=('decisionengine', ))>)[source]

Bases: object

This class manages and provides access to the Source workers.

The intention is that the decision engine never directly interacts with the workers but refers to them via a context manager:

with workers.access() as ws:

# Access to ws now protected ws[‘new_source’] = SourceWorker(…)

In cases where the decision engine’s block_while method must be called (e.g. during tests), one should use unguarded access:

ws = workers.get_unguarded() # Access to ws is unprotected ws[‘new_source’].wait_while(…)

Calling a blocking method while using the protected context manager (i.e. workers.access()) will likely result in a deadlock.

class Access(workers, lock)[source]

Bases: object

access()[source]
detach(channel_name, source_names)[source]
get_unguarded()[source]
prune(channel_name, source_names)[source]
remove_all(timeout)[source]
update(channel_name, source_configs, logger_config)[source]

decisionengine.framework.engine.de_client module

decisionengine.framework.engine.de_client.command_for_args(argsparsed, de_socket)[source]

argsparsed should be from create_parser in this file

decisionengine.framework.engine.de_client.console_scripts_main(args_to_parse=None)[source]

This is the entry point for the setuptools auto generated scripts. Setuptools thinks a return from this function is an error message.

decisionengine.framework.engine.de_client.create_parser()[source]
decisionengine.framework.engine.de_client.main(args_to_parse=None, logger_name='de_client')[source]

If you pass a list of args, they will be used instead of sys.argv

decisionengine.framework.engine.de_query_tool module

decisionengine.framework.engine.de_query_tool.command_for_args(argsparsed, de_socket)[source]

Calls the proper function for the arguments passed to de_query_tool.

Parameters:
  • argsparsed (Namespace) – Should be from create_parser in this file.

  • de_socket (ServerProxy) – RPC Server Proxy.

Returns:

Output of the command.

Return type:

str

decisionengine.framework.engine.de_query_tool.console_scripts_main(args_to_parse=None)[source]

This is the entry point for the setuptools auto generated scripts. Setuptools thinks a return from this function is an error message.

decisionengine.framework.engine.de_query_tool.create_parser()[source]
decisionengine.framework.engine.de_query_tool.main(args_to_parse=None, logger_name='de_query_tool')[source]

Main function for de_query_tool

Parameters:

args_to_parse (list, optional) – If you pass a list of args, they will be used instead of sys.argv. Defaults to None.

Returns:

Query result

Return type:

str

Module contents