Improved RabbitMQ connection handling, updated requirements and README file

Change-Id: Idf175772942a230f1affe8bcfb9a3dd2df33a55f
This commit is contained in:
Stan Lagun 2013-11-19 16:46:19 +04:00
parent 45da1ffa6f
commit 984a43af34
3 changed files with 54 additions and 23 deletions

10
python-agent/README.rst Normal file
View File

@ -0,0 +1,10 @@
Murano Python Agent README
==========================
Murano Python Agent is a VM-side guest agent that accepts commands from
Murano Conductor and executes them. Python Agent works on Linux and
uses new execution plan format described in `Unified Agent blueprint
<https://wiki.openstack.org/wiki/Murano/UnifiedAgent>`_
SEE ALSO
--------
* `Murano <https://wiki.openstack.org/wiki/Murano>`__

View File

@ -57,14 +57,15 @@ class MuranoAgent(service.Service):
def start(self):
self._load()
msg_iterator = self._wait_plan()
while True:
try:
self._loop_func()
self._loop_func(msg_iterator)
except Exception as ex:
log.exception(ex)
sleep(5)
def _loop_func(self):
def _loop_func(self, msg_iterator):
result, timestamp = self._queue.get_execution_plan_result()
if result is not None:
if self._send_result(result):
@ -76,7 +77,7 @@ class MuranoAgent(service.Service):
self._run(plan)
return
self._wait_plan()
msg_iterator.next()
def _run(self, plan):
with ExecutionPlanRunner(plan) as runner:
@ -112,26 +113,45 @@ class MuranoAgent(service.Service):
return MqClient(**connection_params)
def _wait_plan(self):
with self._create_rmq_client() as mq:
with mq.open(CONF.rabbitmq.input_queue,
prefetch_count=1) as subscription:
msg = subscription.get_message(timeout=5)
if msg is not None and isinstance(msg.body, dict):
if 'ID' not in msg.body and msg.id:
msg.body['ID'] = msg.id
err = self._verify_plan(msg.body)
if err is None:
self._queue.put_execution_plan(msg.body)
else:
try:
execution_result = ExecutionResult.from_error(
err, Bunch(msg.body))
self._send_result(execution_result)
except ValueError:
log.warn('Execution result is not produced')
delay = 5
while True:
try:
with self._create_rmq_client() as mq:
with mq.open(CONF.rabbitmq.input_queue,
prefetch_count=1) as subscription:
while True:
msg = subscription.get_message(timeout=5)
if msg is not None and isinstance(msg.body, dict):
self._handle_message(msg)
if msg is not None:
msg.ack()
yield
delay = 5
except KeyboardInterrupt:
break
except Exception:
log.warn('Communication error', exc_info=True)
sleep(delay)
delay = min(delay * 1.2, 60)
def _handle_message(self, msg):
print msg.body
if 'ID' not in msg.body and msg.id:
msg.body['ID'] = msg.id
err = self._verify_plan(msg.body)
if err is None:
self._queue.put_execution_plan(msg.body)
else:
try:
execution_result = \
ExecutionResult.from_error(
err, Bunch(msg.body))
self._send_result(execution_result)
except ValueError:
log.warn('Execution result is not produced')
if msg is not None:
msg.ack()
def _verify_plan(self, plan):
plan_format_version = plan.get('FormatVersion', '1.0.0')

View File

@ -2,4 +2,5 @@ pbr>=0.5.21,<1.0
semver
bunch
oslo.config>=1.2.0
murano-common>=0.2.11
http://tarballs.openstack.org/murano-common/murano-common-release-0.3.tar.gz#egg=murano-common-release-0.3
iso8601