diff --git a/muranoagent/app.py b/muranoagent/app.py index 392e8d0a..c0960bf2 100644 --- a/muranoagent/app.py +++ b/muranoagent/app.py @@ -101,8 +101,11 @@ class MuranoAgent(service.Service): msg = messaging.Message() msg.body = result msg.id = result.get('SourceID') + routing_key = CONF.rabbitmq.result_routing_key + if ('ReplyTo' in result) and CONF.enable_dynamic_result_queue: + routing_key = result.pop('ReplyTo') mq.send(message=msg, - key=CONF.rabbitmq.result_routing_key, + key=routing_key, exchange=CONF.rabbitmq.result_exchange) return True @@ -145,6 +148,8 @@ class MuranoAgent(service.Service): def _handle_message(self, msg): if 'ID' not in msg.body and msg.id: msg.body['ID'] = msg.id + if 'ReplyTo' not in msg.body and msg.reply_to: + msg.body['ReplyTo'] = msg.reply_to try: self._verify_plan(msg.body) self._queue.put_execution_plan(msg.body) @@ -152,6 +157,9 @@ class MuranoAgent(service.Service): try: execution_result = ex_result.ExecutionResult.from_error( err, bunch.Bunch(msg.body)) + if ('ReplyTo' in msg.body) and \ + CONF.enable_dynamic_result_queue: + execution_result['ReplyTo'] = msg.body.get('ReplyTo') self._send_result(execution_result) except ValueError: diff --git a/muranoagent/common/config.py b/muranoagent/common/config.py index 8b324e71..5c3b4e3f 100644 --- a/muranoagent/common/config.py +++ b/muranoagent/common/config.py @@ -30,6 +30,12 @@ storage_opt = [ help='Directory to store execution plans') ] +message_routing_opt = [ + cfg.BoolOpt('enable_dynamic_result_queue', help='Enable taking dynamic ' + 'result queue from task field reply_to', + default=False) +] + rabbit_opts = [ cfg.StrOpt('host', help='The RabbitMQ broker address which used for communication ' @@ -64,6 +70,7 @@ rabbit_opts = [ ] CONF.register_cli_opts(storage_opt) +CONF.register_cli_opts(message_routing_opt) CONF.register_opts(rabbit_opts, group='rabbitmq') logging.register_options(CONF) diff --git a/muranoagent/common/messaging/message.py b/muranoagent/common/messaging/message.py index 47564a86..131479b5 100644 --- a/muranoagent/common/messaging/message.py +++ b/muranoagent/common/messaging/message.py @@ -26,8 +26,10 @@ class Message(object): self._message_handle = message_handle if message_handle: self.id = message_handle.properties.get('message_id') + self._reply_to = message_handle.properties.get('reply_to') else: self.id = None + self._reply_to = None try: if message_handle: @@ -54,5 +56,9 @@ class Message(object): def id(self, value): self._id = value or '' + @property + def reply_to(self): + return self._reply_to + def ack(self): self._message_handle.ack() diff --git a/muranoagent/execution_plan_queue.py b/muranoagent/execution_plan_queue.py index 26577ba8..2be1d98a 100644 --- a/muranoagent/execution_plan_queue.py +++ b/muranoagent/execution_plan_queue.py @@ -73,6 +73,8 @@ class ExecutionPlanQueue(object): def put_execution_result(self, result, execution_plan): timestamp = execution_plan['_timestamp'] + if 'ReplyTo' in execution_plan: + result['ReplyTo'] = execution_plan.get('ReplyTo') path = os.path.join( self._plans_folder, timestamp, ExecutionPlanQueue.result_filename)