From 332078e3b7bc9c4e0f532474c9721a9631fb62e9 Mon Sep 17 00:00:00 2001 From: Stan Lagun Date: Sat, 27 Jul 2013 00:10:55 +0400 Subject: [PATCH] Updated to muranocommon 0.2 with SSL support Change-Id: Ibf4ec63332ab1073bda7d1a1cd102cf514284188 --- MANIFEST.in | 1 + doc/source/index.rst | 3 + etc/murano-api.conf | 6 + muranoapi/common/config.py | 3 +- muranoapi/common/service.py | 152 +++++++++++++++----------- muranoapi/db/services/environments.py | 2 +- muranoapi/db/services/sessions.py | 2 +- requirements.txt | 2 +- 8 files changed, 104 insertions(+), 67 deletions(-) diff --git a/MANIFEST.in b/MANIFEST.in index 3b963b2d7..d54488f41 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,3 +1,4 @@ +include requirements.txt include tools/pip-requires include run_tests.sh include ChangeLog diff --git a/doc/source/index.rst b/doc/source/index.rst index 143df3e59..78717f7a0 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -75,6 +75,9 @@ Configure virtual_host = murano login = murano password = murano + ssl = False + ca_certs = + 3. Open second configuration file for editing:: diff --git a/etc/murano-api.conf b/etc/murano-api.conf index 76d13fac3..a8c5e7af5 100644 --- a/etc/murano-api.conf +++ b/etc/murano-api.conf @@ -38,6 +38,12 @@ host = localhost # RabbitMQ port (5672 is a default) port = 5672 +# Use SSL for RabbitMQ connections (True or False) +ssl = False + +# Path to SSL CA certificate or empty to allow self signed server certificate +ca_certs = + # RabbitMQ credentials. Fresh RabbitMQ installation has "guest" account with "guest" password # It is recommended to create dedicated user account for Murano using RabbitMQ web console or command line utility login = guest diff --git a/muranoapi/common/config.py b/muranoapi/common/config.py index 05fffced7..375124f85 100644 --- a/muranoapi/common/config.py +++ b/muranoapi/common/config.py @@ -52,10 +52,11 @@ reports_opts = [ rabbit_opts = [ cfg.StrOpt('host', default='localhost'), cfg.IntOpt('port', default=5672), - cfg.BoolOpt('use_ssl', default=False), cfg.StrOpt('login', default='guest'), cfg.StrOpt('password', default='guest'), cfg.StrOpt('virtual_host', default='/'), + cfg.BoolOpt('ssl', default=False), + cfg.StrOpt('ca_certs', default='') ] db_opts = [ diff --git a/muranoapi/common/service.py b/muranoapi/common/service.py index 0118ca166..d1645efec 100644 --- a/muranoapi/common/service.py +++ b/muranoapi/common/service.py @@ -17,106 +17,132 @@ from muranoapi.db.models import Status, Session, Environment, Deployment from muranoapi.db.session import get_session from muranoapi.openstack.common import log as logging, timeutils, service from muranoapi.common import config -from muranocommon.mq import MqClient +from muranocommon.messaging import MqClient from sqlalchemy import desc +import eventlet conf = config.CONF.reports -rabbitmq = config.CONF.rabbitmq log = logging.getLogger(__name__) class TaskResultHandlerService(service.Service): - connection_params = { - 'login': rabbitmq.login, - 'password': rabbitmq.password, - 'host': rabbitmq.host, - 'port': rabbitmq.port, - 'virtual_host': rabbitmq.virtual_host - } - def __init__(self): super(TaskResultHandlerService, self).__init__() def start(self): super(TaskResultHandlerService, self).start() - self.tg.add_thread(self._start_rabbitmq) + self.tg.add_thread(self._handle_results) + self.tg.add_thread(self._handle_reports) def stop(self): super(TaskResultHandlerService, self).stop() - def _start_rabbitmq(self): + def _create_mq_client(self): + rabbitmq = config.CONF.rabbitmq + connection_params = { + 'login': rabbitmq.login, + 'password': rabbitmq.password, + 'host': rabbitmq.host, + 'port': rabbitmq.port, + 'virtual_host': rabbitmq.virtual_host, + 'ssl': rabbitmq.ssl, + 'ca_certs': rabbitmq.ca_certs.strip() or None + } + return MqClient(**connection_params) + + def _handle_results(self): while True: try: - with MqClient(**self.connection_params) as mqClient: + with self._create_mq_client() as mqClient: mqClient.declare(conf.results_exchange, conf.results_queue) - mqClient.declare(conf.reports_exchange, conf.reports_queue) with mqClient.open(conf.results_queue) as results_sb: - with mqClient.open(conf.reports_queue) as reports_sb: - while True: - report = reports_sb.get_message(timeout=1000) - self.tg.add_thread(handle_report, report.body) - result = results_sb.get_message(timeout=1000) - self.tg.add_thread(handle_result, result.body) + while True: + result = results_sb.get_message() + eventlet.spawn(handle_result, result) + except Exception as ex: + log.exception(ex) + + def _handle_reports(self): + while True: + try: + with self._create_mq_client() as mqClient: + mqClient.declare(conf.reports_exchange, conf.reports_queue) + with mqClient.open(conf.reports_queue) as reports_sb: + while True: + report = reports_sb.get_message() + eventlet.spawn(handle_report, report) except Exception as ex: log.exception(ex) @handle -def handle_result(environment_result): - log.debug(_('Got result message from ' - 'orchestration engine:\n{0}'.format(environment_result))) +def handle_result(message): + try: + environment_result = message.body + log.debug(_('Got result message from ' + 'orchestration engine:\n{0}'.format(environment_result))) - if 'deleted' in environment_result: - log.debug(_('Result for environment {0} is dropped. Environment ' - 'is deleted'.format(environment_result['id']))) - return + if 'deleted' in environment_result: + log.debug(_('Result for environment {0} is dropped. Environment ' + 'is deleted'.format(environment_result['id']))) + return - session = get_session() - environment = session.query(Environment).get(environment_result['id']) + session = get_session() + environment = session.query(Environment).get(environment_result['id']) - if not environment: - log.warning(_('Environment result could not be handled, specified ' - 'environment does not found in database')) - return + if not environment: + log.warning(_('Environment result could not be handled, specified ' + 'environment does not found in database')) + return - environment.description = environment_result - environment.version += 1 - environment.save(session) + environment.description = environment_result + environment.version += 1 + environment.save(session) - #close session - conf_session = session.query(Session).filter_by( - **{'environment_id': environment.id, 'state': 'deploying'}).first() - conf_session.state = 'deployed' - conf_session.save(session) + #close session + conf_session = session.query(Session).filter_by( + **{'environment_id': environment.id, 'state': 'deploying'}).first() + conf_session.state = 'deployed' + conf_session.save(session) - #close deployment - deployment = get_last_deployment(session, environment.id) - deployment.finished = timeutils.utcnow() - status = Status() - status.deployment_id = deployment.id - status.text = "Deployment finished" - deployment.statuses.append(status) - deployment.save(session) + #close deployment + deployment = get_last_deployment(session, environment.id) + deployment.finished = timeutils.utcnow() + status = Status() + status.deployment_id = deployment.id + status.text = "Deployment finished" + deployment.statuses.append(status) + deployment.save(session) + except Exception as ex: + log.exception(ex) + finally: + message.ack() @handle -def handle_report(report): - log.debug(_('Got report message from orchestration ' - 'engine:\n{0}'.format(report))) +def handle_report(message): + try: + report = message.body + log.debug(_('Got report message from orchestration ' + 'engine:\n{0}'.format(report))) - report['entity_id'] = report['id'] - del report['id'] + report['entity_id'] = report['id'] + del report['id'] - status = Status() - status.update(report) + status = Status() + status.update(report) - session = get_session() - #connect with deployment - with session.begin(): - running_deployment = get_last_deployment(session, - status.environment_id) - status.deployment_id = running_deployment.id - session.add(status) + session = get_session() + #connect with deployment + with session.begin(): + running_deployment = get_last_deployment(session, + status.environment_id) + status.deployment_id = running_deployment.id + session.add(status) + except Exception as ex: + log.exception(ex) + finally: + message.ack() def get_last_deployment(session, env_id): diff --git a/muranoapi/db/services/environments.py b/muranoapi/db/services/environments.py index c20f2d688..942b09b65 100644 --- a/muranoapi/db/services/environments.py +++ b/muranoapi/db/services/environments.py @@ -19,7 +19,7 @@ from muranoapi.common import config from muranoapi.db.models import Session, Environment from muranoapi.db.services.sessions import SessionServices, SessionState from muranoapi.db.session import get_session -from muranocommon.mq import MqClient, Message +from muranocommon.messaging import MqClient, Message rabbitmq = config.CONF.rabbitmq diff --git a/muranoapi/db/services/sessions.py b/muranoapi/db/services/sessions.py index 2999532f8..742ffc855 100644 --- a/muranoapi/db/services/sessions.py +++ b/muranoapi/db/services/sessions.py @@ -16,7 +16,7 @@ from collections import namedtuple from muranoapi.common import config from muranoapi.db.models import Session, Environment, Deployment, Status from muranoapi.db.session import get_session -from muranocommon.mq import MqClient, Message +from muranocommon.messaging import MqClient, Message rabbitmq = config.CONF.rabbitmq diff --git a/requirements.txt b/requirements.txt index e887e969a..1b4acfc55 100644 --- a/requirements.txt +++ b/requirements.txt @@ -31,4 +31,4 @@ passlib jsonschema==2.0.0 python-keystoneclient>=0.2.0 oslo.config -http://github.com/sergmelikyan/murano-common/releases/download/0.1/muranocommon-0.1.tar.gz#egg=muranocommon-0.1 \ No newline at end of file +http://github.com/sergmelikyan/murano-common/releases/download/0.2.1/muranocommon-0.2.1.tar.gz#egg=muranocommon-0.2.1 \ No newline at end of file