decisionengine.framework.engine package
Subpackages
- decisionengine.framework.engine.tests package
- Submodules
- decisionengine.framework.engine.tests.conftest module
- decisionengine.framework.engine.tests.fixtures module
- decisionengine.framework.engine.tests.test_ChannelWorkers module
- decisionengine.framework.engine.tests.test_SourceWorkers module
- decisionengine.framework.engine.tests.test_client_only module
- decisionengine.framework.engine.tests.test_query_tool_only module
- decisionengine.framework.engine.tests.test_startup module
- decisionengine.framework.engine.tests.test_verify_redis_server module
- Module contents
Submodules
decisionengine.framework.engine.ChannelWorkers module
- class decisionengine.framework.engine.ChannelWorkers.ChannelWorker(task_manager, logger_config)[source]
Bases:
Process
Class that encapsulates a channel’s task manager as a separate process.
This class’ run function is called whenever the process is started. If the process is abruptly terminated–e.g. the run method is preempted by a signal or an os._exit(n) call–the ChannelWorker object will still exist even if the operating-system process no longer does.
To determine the exit code of this process, use the ChannelWorker.exitcode value, provided by the multiprocessing.Process base class.
- class decisionengine.framework.engine.ChannelWorkers.ChannelWorkers[source]
Bases:
object
This class manages and provides access to the task-manager workers.
The intention is that the decision engine never directly interacts with the workers but refers to them via a context manager:
- with workers.access() as ws:
# Access to ws now protected ws[‘new_channel’] = ChannelWorker(…)
In cases where the decision engine’s block_while method must be called (e.g. during tests), one should use unguarded access:
ws = workers.get_unguarded() # Access to ws is unprotected ws[‘new_channel’].wait_while(…)
Calling a blocking method while using the protected context manager (i.e. workers.access()) will likely result in a deadlock.
decisionengine.framework.engine.ClientMessageReceiver module
decisionengine.framework.engine.DecisionEngine module
Main loop for Decision Engine.
The following environment variable points to decision engine configuration file:
DECISION_ENGINE_CONFIG_FILE
if this environment variable is not defined the DE-Config.py
file from the ``../tests/etc/` directory will be used.
- class decisionengine.framework.engine.DecisionEngine.DecisionEngine(global_config, channel_config_loader, server_address)[source]
Bases:
ThreadingMixIn
,SimpleXMLRPCServer
- _dispatch(method, params)[source]
Dispatches the XML-RPC method.
XML-RPC calls are forwarded to a registered function that matches the called XML-RPC method name. If no such function exists then the call is forwarded to the registered instance, if available.
If the registered instance has a _dispatch method then that method will be called with the name of the XML-RPC method and its parameters as a tuple e.g. instance._dispatch(‘add’,(2,3))
If the registered instance does not have a _dispatch method then the instance will be searched to find a matching method and, if found, will be called.
Methods beginning with an ‘_’ are considered private and will not be called.
- rpc_print_product(client_queue, product, columns=None, query=None, types=False, format=None)[source]
- rpc_reaper_start(client_queue, delay=0)[source]
Start the reaper process after ‘delay’ seconds. Default 0 seconds delay. :type delay: int
- rpc_set_channel_log_level(client_queue, channel, log_level)[source]
Assumes log_level is a string corresponding to the supported logging-module levels.
- rpc_set_source_log_level(client_queue, source, log_level)[source]
Assumes log_level is a string corresponding to the supported logging-module levels.
- service_actions()[source]
Called by the serve_forever() loop.
May be overridden by a subclass / Mixin to implement any code that needs to be run during the loop.
- class decisionengine.framework.engine.DecisionEngine.RequestHandler(request, client_address, server)[source]
Bases:
SimpleXMLRPCRequestHandler
- rpc_paths = ('/RPC2',)
- class decisionengine.framework.engine.DecisionEngine.StopState(value)[source]
Bases:
Enum
An enumeration.
- Clean = 2
- NotFound = 1
- Terminated = 3
- decisionengine.framework.engine.DecisionEngine._create_de_server(global_config, channel_config_loader)[source]
Create the DE server with the passed global configuration and config manager
- decisionengine.framework.engine.DecisionEngine._get_de_conf_manager(global_config_dir, channel_config_dir, options)[source]
- decisionengine.framework.engine.DecisionEngine._queue_name_and_type(redis_obj, redis_member)[source]
- decisionengine.framework.engine.DecisionEngine._requests_in_flight(redis_obj, exchange_name)[source]
- decisionengine.framework.engine.DecisionEngine._start_de_server(server)[source]
Start the DE server and listen forever
decisionengine.framework.engine.SourceWorkers module
- class decisionengine.framework.engine.SourceWorkers.SourceWorker(key, config, logger_config, channel_name, exchange, broker_url)[source]
Bases:
Process
Provides interface to loadable modules an events to synchronize execution
- class decisionengine.framework.engine.SourceWorkers.SourceWorkers(exchange, broker_url, logger=<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=('decisionengine', ))>)[source]
Bases:
object
This class manages and provides access to the Source workers.
The intention is that the decision engine never directly interacts with the workers but refers to them via a context manager:
- with workers.access() as ws:
# Access to ws now protected ws[‘new_source’] = SourceWorker(…)
In cases where the decision engine’s block_while method must be called (e.g. during tests), one should use unguarded access:
ws = workers.get_unguarded() # Access to ws is unprotected ws[‘new_source’].wait_while(…)
Calling a blocking method while using the protected context manager (i.e. workers.access()) will likely result in a deadlock.
decisionengine.framework.engine.de_client module
- decisionengine.framework.engine.de_client.command_for_args(argsparsed, de_socket)[source]
argsparsed should be from create_parser in this file
decisionengine.framework.engine.de_query_tool module
- decisionengine.framework.engine.de_query_tool.command_for_args(argsparsed, de_socket)[source]
Calls the proper function for the arguments passed to de_query_tool.
- Parameters:
argsparsed (Namespace) – Should be from create_parser in this file.
de_socket (ServerProxy) – RPC Server Proxy.
- Returns:
Output of the command.
- Return type:
str
- decisionengine.framework.engine.de_query_tool.console_scripts_main(args_to_parse=None)[source]
This is the entry point for the setuptools auto generated scripts. Setuptools thinks a return from this function is an error message.
- decisionengine.framework.engine.de_query_tool.main(args_to_parse=None, logger_name='de_query_tool')[source]
Main function for de_query_tool
- Parameters:
args_to_parse (list, optional) – If you pass a list of args, they will be used instead of sys.argv. Defaults to None.
- Returns:
Query result
- Return type:
str