Merged in murano-common

In attempt to reduce number of Murano repositories to a sane minumum Murano
community decided to merge in murano-common repo to projects dependent on it:
* murano-api
* murano-agent

We hope that we will completely get rid of this code in Juno.

Change-Id: Id203e62b040ddc2fe67399b2b487eaddca5f564f
Partially-Implements: blueprint repository-reorganization
This commit is contained in:
Ruslan Kamaldinov 2014-03-26 18:38:45 +04:00
parent b1cb74ddee
commit e02166f355
7 changed files with 256 additions and 3 deletions

View File

@ -22,7 +22,7 @@ from execution_result import ExecutionResult
from openstack.common import log as logging
from openstack.common import service
from config import CONF
from muranocommon.messaging import MqClient, Message
from muranoagent.common.messaging import MqClient, Message
from exceptions import AgentException
from time import sleep
from bunch import Bunch

View File

@ -0,0 +1,20 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from message import Message
from subscription import Subscription
from mqclient import MqClient
__all__ = ['Message', 'Subscription', 'MqClient']

View File

@ -0,0 +1,58 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import anyjson
import logging
LOG = logging.getLogger("murano-common.messaging")
class Message(object):
def __init__(self, connection=None, message_handle=None):
self._body = None
self._connection = connection
self._message_handle = message_handle
if message_handle:
self.id = message_handle.header.get('message_id')
else:
self.id = None
try:
if message_handle:
self.body = anyjson.loads(message_handle.body)
else
self.body = None
except ValueError as e:
self.body = None
LOG.exception(e)
@property
def body(self):
return self._body
@body.setter
def body(self, value):
self._body = value
@property
def id(self):
return self._id
@id.setter
def id(self, value):
self._id = value or ''
def ack(self):
self._message_handle.ack()

View File

@ -0,0 +1,108 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import anyjson
import logging
import ssl as ssl_module
from eventlet import patcher
kombu = patcher.import_patched('kombu')
from subscription import Subscription
log = logging.getLogger("murano-common.messaging")
class MqClient(object):
def __init__(self, login, password, host, port, virtual_host,
ssl=False, ca_certs=None):
ssl_params = None
if ssl is True:
ssl_params = {
'ca_certs': ca_certs,
'cert_reqs': ssl_module.CERT_REQUIRED
}
self._connection = kombu.Connection(
'amqp://{0}:{1}@{2}:{3}/{4}'.format(
login,
password,
host,
port,
virtual_host
), ssl=ssl_params
)
self._channel = None
self._connected = False
def __enter__(self):
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
return False
def connect(self):
self._connection.connect()
self._channel = self._connection.channel()
self._connected = True
def close(self):
self._connection.close()
self._connected = False
def declare(self, queue, exchange='', enable_ha=False, ttl=0):
if not self._connected:
raise RuntimeError('Not connected to RabbitMQ')
queue_arguments = {}
if enable_ha is True:
# To use mirrored queues feature in RabbitMQ 2.x
# we need to declare this policy on the queue itself.
#
# Warning: this option has no effect on RabbitMQ 3.X,
# to enable mirrored queues feature in RabbitMQ 3.X, please
# configure RabbitMQ.
queue_arguments['x-ha-policy'] = 'all'
if ttl > 0:
queue_arguments['x-expires'] = ttl
exchange = kombu.Exchange(exchange, type='direct', durable=True)
queue = kombu.Queue(queue, exchange, queue, durable=True,
queue_arguments=queue_arguments)
bound_queue = queue(self._connection)
bound_queue.declare()
def send(self, message, key, exchange=''):
if not self._connected:
raise RuntimeError('Not connected to RabbitMQ')
headers = {'message_id': str(message.id)}
producer = kombu.Producer(self._connection)
producer.publish(
exchange=str(exchange),
routing_key=str(key),
body=anyjson.dumps(message.body),
headers=headers
)
def open(self, queue, prefetch_count=1):
if not self._connected:
raise RuntimeError('Not connected to RabbitMQ')
return Subscription(self._connection, queue, prefetch_count)

View File

@ -0,0 +1,65 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import socket
from eventlet import patcher
kombu = patcher.import_patched('kombu')
five = patcher.import_patched('kombu.five')
from . import message
class Subscription(object):
def __init__(self, connection, queue, prefetch_count=1):
self._buffer = collections.deque()
self._connection = connection
self._queue = kombu.Queue(name=queue, exchange=None)
self._consumer = kombu.Consumer(self._connection, auto_declare=False)
self._consumer.register_callback(self._receive)
self._consumer.qos(prefetch_count=prefetch_count)
def __enter__(self):
self._consumer.add_queue(self._queue)
self._consumer.consume()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self._consumer is not None:
self._consumer.cancel()
return False
def get_message(self, timeout=None):
msg_handle = self._get(timeout=timeout)
if msg_handle is None:
return None
return message.Message(self._connection, msg_handle)
def _get(self, timeout=None):
elapsed = 0.0
remaining = timeout
while True:
time_start = five.monotonic()
if self._buffer:
return self._buffer.pop()
try:
self._connection.drain_events(timeout=timeout and remaining)
except socket.timeout:
return None
elapsed += five.monotonic() - time_start
remaining = timeout and timeout - elapsed or None
def _receive(self, message_data, message):
self._buffer.append(message)

View File

@ -1,7 +1,9 @@
pbr>=0.5.21,<1.0
pbr>=0.6,<1.0
semver
bunch
oslo.config>=1.2.0
http://tarballs.openstack.org/murano-common/murano-common-release-0.3.tar.gz#egg=murano-common-release-0.3
iso8601
babel
anyjson>=0.3.3
eventlet>=0.13.0
kombu>=2.4.8