Updated to muranocommon 0.2 with SSL support

Change-Id: Ibf4ec63332ab1073bda7d1a1cd102cf514284188
This commit is contained in:
Stan Lagun 2013-07-27 00:10:55 +04:00
parent 311c44e4f6
commit 332078e3b7
8 changed files with 104 additions and 67 deletions

View File

@ -1,3 +1,4 @@
include requirements.txt
include tools/pip-requires
include run_tests.sh
include ChangeLog

View File

@ -75,6 +75,9 @@ Configure
virtual_host = murano
login = murano
password = murano
ssl = False
ca_certs =
3. Open second configuration file for editing::

View File

@ -38,6 +38,12 @@ host = localhost
# RabbitMQ port (5672 is a default)
port = 5672
# Use SSL for RabbitMQ connections (True or False)
ssl = False
# Path to SSL CA certificate or empty to allow self signed server certificate
ca_certs =
# RabbitMQ credentials. Fresh RabbitMQ installation has "guest" account with "guest" password
# It is recommended to create dedicated user account for Murano using RabbitMQ web console or command line utility
login = guest

View File

@ -52,10 +52,11 @@ reports_opts = [
rabbit_opts = [
cfg.StrOpt('host', default='localhost'),
cfg.IntOpt('port', default=5672),
cfg.BoolOpt('use_ssl', default=False),
cfg.StrOpt('login', default='guest'),
cfg.StrOpt('password', default='guest'),
cfg.StrOpt('virtual_host', default='/'),
cfg.BoolOpt('ssl', default=False),
cfg.StrOpt('ca_certs', default='')
]
db_opts = [

View File

@ -17,106 +17,132 @@ from muranoapi.db.models import Status, Session, Environment, Deployment
from muranoapi.db.session import get_session
from muranoapi.openstack.common import log as logging, timeutils, service
from muranoapi.common import config
from muranocommon.mq import MqClient
from muranocommon.messaging import MqClient
from sqlalchemy import desc
import eventlet
conf = config.CONF.reports
rabbitmq = config.CONF.rabbitmq
log = logging.getLogger(__name__)
class TaskResultHandlerService(service.Service):
connection_params = {
'login': rabbitmq.login,
'password': rabbitmq.password,
'host': rabbitmq.host,
'port': rabbitmq.port,
'virtual_host': rabbitmq.virtual_host
}
def __init__(self):
super(TaskResultHandlerService, self).__init__()
def start(self):
super(TaskResultHandlerService, self).start()
self.tg.add_thread(self._start_rabbitmq)
self.tg.add_thread(self._handle_results)
self.tg.add_thread(self._handle_reports)
def stop(self):
super(TaskResultHandlerService, self).stop()
def _start_rabbitmq(self):
def _create_mq_client(self):
rabbitmq = config.CONF.rabbitmq
connection_params = {
'login': rabbitmq.login,
'password': rabbitmq.password,
'host': rabbitmq.host,
'port': rabbitmq.port,
'virtual_host': rabbitmq.virtual_host,
'ssl': rabbitmq.ssl,
'ca_certs': rabbitmq.ca_certs.strip() or None
}
return MqClient(**connection_params)
def _handle_results(self):
while True:
try:
with MqClient(**self.connection_params) as mqClient:
with self._create_mq_client() as mqClient:
mqClient.declare(conf.results_exchange, conf.results_queue)
mqClient.declare(conf.reports_exchange, conf.reports_queue)
with mqClient.open(conf.results_queue) as results_sb:
with mqClient.open(conf.reports_queue) as reports_sb:
while True:
report = reports_sb.get_message(timeout=1000)
self.tg.add_thread(handle_report, report.body)
result = results_sb.get_message(timeout=1000)
self.tg.add_thread(handle_result, result.body)
while True:
result = results_sb.get_message()
eventlet.spawn(handle_result, result)
except Exception as ex:
log.exception(ex)
def _handle_reports(self):
while True:
try:
with self._create_mq_client() as mqClient:
mqClient.declare(conf.reports_exchange, conf.reports_queue)
with mqClient.open(conf.reports_queue) as reports_sb:
while True:
report = reports_sb.get_message()
eventlet.spawn(handle_report, report)
except Exception as ex:
log.exception(ex)
@handle
def handle_result(environment_result):
log.debug(_('Got result message from '
'orchestration engine:\n{0}'.format(environment_result)))
def handle_result(message):
try:
environment_result = message.body
log.debug(_('Got result message from '
'orchestration engine:\n{0}'.format(environment_result)))
if 'deleted' in environment_result:
log.debug(_('Result for environment {0} is dropped. Environment '
'is deleted'.format(environment_result['id'])))
return
if 'deleted' in environment_result:
log.debug(_('Result for environment {0} is dropped. Environment '
'is deleted'.format(environment_result['id'])))
return
session = get_session()
environment = session.query(Environment).get(environment_result['id'])
session = get_session()
environment = session.query(Environment).get(environment_result['id'])
if not environment:
log.warning(_('Environment result could not be handled, specified '
'environment does not found in database'))
return
if not environment:
log.warning(_('Environment result could not be handled, specified '
'environment does not found in database'))
return
environment.description = environment_result
environment.version += 1
environment.save(session)
environment.description = environment_result
environment.version += 1
environment.save(session)
#close session
conf_session = session.query(Session).filter_by(
**{'environment_id': environment.id, 'state': 'deploying'}).first()
conf_session.state = 'deployed'
conf_session.save(session)
#close session
conf_session = session.query(Session).filter_by(
**{'environment_id': environment.id, 'state': 'deploying'}).first()
conf_session.state = 'deployed'
conf_session.save(session)
#close deployment
deployment = get_last_deployment(session, environment.id)
deployment.finished = timeutils.utcnow()
status = Status()
status.deployment_id = deployment.id
status.text = "Deployment finished"
deployment.statuses.append(status)
deployment.save(session)
#close deployment
deployment = get_last_deployment(session, environment.id)
deployment.finished = timeutils.utcnow()
status = Status()
status.deployment_id = deployment.id
status.text = "Deployment finished"
deployment.statuses.append(status)
deployment.save(session)
except Exception as ex:
log.exception(ex)
finally:
message.ack()
@handle
def handle_report(report):
log.debug(_('Got report message from orchestration '
'engine:\n{0}'.format(report)))
def handle_report(message):
try:
report = message.body
log.debug(_('Got report message from orchestration '
'engine:\n{0}'.format(report)))
report['entity_id'] = report['id']
del report['id']
report['entity_id'] = report['id']
del report['id']
status = Status()
status.update(report)
status = Status()
status.update(report)
session = get_session()
#connect with deployment
with session.begin():
running_deployment = get_last_deployment(session,
status.environment_id)
status.deployment_id = running_deployment.id
session.add(status)
session = get_session()
#connect with deployment
with session.begin():
running_deployment = get_last_deployment(session,
status.environment_id)
status.deployment_id = running_deployment.id
session.add(status)
except Exception as ex:
log.exception(ex)
finally:
message.ack()
def get_last_deployment(session, env_id):

View File

@ -19,7 +19,7 @@ from muranoapi.common import config
from muranoapi.db.models import Session, Environment
from muranoapi.db.services.sessions import SessionServices, SessionState
from muranoapi.db.session import get_session
from muranocommon.mq import MqClient, Message
from muranocommon.messaging import MqClient, Message
rabbitmq = config.CONF.rabbitmq

View File

@ -16,7 +16,7 @@ from collections import namedtuple
from muranoapi.common import config
from muranoapi.db.models import Session, Environment, Deployment, Status
from muranoapi.db.session import get_session
from muranocommon.mq import MqClient, Message
from muranocommon.messaging import MqClient, Message
rabbitmq = config.CONF.rabbitmq

View File

@ -31,4 +31,4 @@ passlib
jsonschema==2.0.0
python-keystoneclient>=0.2.0
oslo.config
http://github.com/sergmelikyan/murano-common/releases/download/0.1/muranocommon-0.1.tar.gz#egg=muranocommon-0.1
http://github.com/sergmelikyan/murano-common/releases/download/0.2.1/muranocommon-0.2.1.tar.gz#egg=muranocommon-0.2.1