Improved RabbitMQ connection handling, updated requirements and README file
Change-Id: Idf175772942a230f1affe8bcfb9a3dd2df33a55f
This commit is contained in:
parent
45da1ffa6f
commit
984a43af34
|
@ -0,0 +1,10 @@
|
|||
Murano Python Agent README
|
||||
==========================
|
||||
Murano Python Agent is a VM-side guest agent that accepts commands from
|
||||
Murano Conductor and executes them. Python Agent works on Linux and
|
||||
uses new execution plan format described in `Unified Agent blueprint
|
||||
<https://wiki.openstack.org/wiki/Murano/UnifiedAgent>`_
|
||||
|
||||
SEE ALSO
|
||||
--------
|
||||
* `Murano <https://wiki.openstack.org/wiki/Murano>`__
|
|
@ -57,14 +57,15 @@ class MuranoAgent(service.Service):
|
|||
|
||||
def start(self):
|
||||
self._load()
|
||||
msg_iterator = self._wait_plan()
|
||||
while True:
|
||||
try:
|
||||
self._loop_func()
|
||||
self._loop_func(msg_iterator)
|
||||
except Exception as ex:
|
||||
log.exception(ex)
|
||||
sleep(5)
|
||||
|
||||
def _loop_func(self):
|
||||
def _loop_func(self, msg_iterator):
|
||||
result, timestamp = self._queue.get_execution_plan_result()
|
||||
if result is not None:
|
||||
if self._send_result(result):
|
||||
|
@ -76,7 +77,7 @@ class MuranoAgent(service.Service):
|
|||
self._run(plan)
|
||||
return
|
||||
|
||||
self._wait_plan()
|
||||
msg_iterator.next()
|
||||
|
||||
def _run(self, plan):
|
||||
with ExecutionPlanRunner(plan) as runner:
|
||||
|
@ -112,26 +113,45 @@ class MuranoAgent(service.Service):
|
|||
return MqClient(**connection_params)
|
||||
|
||||
def _wait_plan(self):
|
||||
with self._create_rmq_client() as mq:
|
||||
with mq.open(CONF.rabbitmq.input_queue,
|
||||
prefetch_count=1) as subscription:
|
||||
msg = subscription.get_message(timeout=5)
|
||||
if msg is not None and isinstance(msg.body, dict):
|
||||
if 'ID' not in msg.body and msg.id:
|
||||
msg.body['ID'] = msg.id
|
||||
err = self._verify_plan(msg.body)
|
||||
if err is None:
|
||||
self._queue.put_execution_plan(msg.body)
|
||||
else:
|
||||
try:
|
||||
execution_result = ExecutionResult.from_error(
|
||||
err, Bunch(msg.body))
|
||||
self._send_result(execution_result)
|
||||
except ValueError:
|
||||
log.warn('Execution result is not produced')
|
||||
delay = 5
|
||||
while True:
|
||||
try:
|
||||
with self._create_rmq_client() as mq:
|
||||
with mq.open(CONF.rabbitmq.input_queue,
|
||||
prefetch_count=1) as subscription:
|
||||
while True:
|
||||
msg = subscription.get_message(timeout=5)
|
||||
if msg is not None and isinstance(msg.body, dict):
|
||||
self._handle_message(msg)
|
||||
|
||||
if msg is not None:
|
||||
msg.ack()
|
||||
yield
|
||||
delay = 5
|
||||
except KeyboardInterrupt:
|
||||
break
|
||||
except Exception:
|
||||
log.warn('Communication error', exc_info=True)
|
||||
sleep(delay)
|
||||
delay = min(delay * 1.2, 60)
|
||||
|
||||
def _handle_message(self, msg):
|
||||
print msg.body
|
||||
if 'ID' not in msg.body and msg.id:
|
||||
msg.body['ID'] = msg.id
|
||||
err = self._verify_plan(msg.body)
|
||||
if err is None:
|
||||
self._queue.put_execution_plan(msg.body)
|
||||
else:
|
||||
try:
|
||||
execution_result = \
|
||||
ExecutionResult.from_error(
|
||||
err, Bunch(msg.body))
|
||||
|
||||
self._send_result(execution_result)
|
||||
except ValueError:
|
||||
log.warn('Execution result is not produced')
|
||||
|
||||
if msg is not None:
|
||||
msg.ack()
|
||||
|
||||
def _verify_plan(self, plan):
|
||||
plan_format_version = plan.get('FormatVersion', '1.0.0')
|
||||
|
|
|
@ -2,4 +2,5 @@ pbr>=0.5.21,<1.0
|
|||
semver
|
||||
bunch
|
||||
oslo.config>=1.2.0
|
||||
murano-common>=0.2.11
|
||||
http://tarballs.openstack.org/murano-common/murano-common-release-0.3.tar.gz#egg=murano-common-release-0.3
|
||||
iso8601
|
||||
|
|
Loading…
Reference in New Issue