MessagingAPI should be implemented as common module

nova_manager.py implements the NovaManager which interacts with Nova
service but even provides the MessagingAPI class which interacts with
the AMQP system. This fix moves the MessagingAPI in common/messaging.py.

Bug: 1690133
Change-Id: Ifed11126a1af227950c03ccffc48d577fb631ded
Sem-Ver: bugfix
This commit is contained in:
Lisa Zangrando 2017-05-11 14:39:24 +02:00
parent 0e5e7b46de
commit fc16dbd695
2 changed files with 153 additions and 115 deletions

View File

@ -0,0 +1,110 @@
import logging
import oslo_messaging as oslo_msg
from oslo_config import cfg
__author__ = "Lisa Zangrando"
__email__ = "lisa.zangrando[AT]pd.infn.it"
__copyright__ = """Copyright (c) 2015 INFN - INDIGO-DataCloud
All Rights Reserved
Licensed under the Apache License, Version 2.0;
you may not use this file except in compliance with the
License. You may obtain a copy of the License at:
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
either express or implied.
See the License for the specific language governing
permissions and limitations under the License."""
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class AMQP(object):
def __init__(self, url=None, backend=None, username=None, password=None,
hosts=None, virt_host=None, exchange=None):
super(AMQP, self).__init__()
if exchange:
oslo_msg.set_transport_defaults(control_exchange=exchange)
if url:
self.TRANSPORT = oslo_msg.get_transport(CONF, url=url)
elif not backend and not hosts:
raise ValueError("missing AMQP parameters")
else:
t_hosts = self._createTransportHosts(username, password, hosts)
t_url = oslo_msg.TransportURL(CONF,
transport=backend,
virtual_host=virt_host,
hosts=t_hosts,
aliases=None)
self.TRANSPORT = oslo_msg.get_transport(CONF, url=t_url)
def _createTransportHosts(self, username, password, hosts):
"""Returns a list of oslo.messaging.TransportHost objects."""
transport_hosts = []
for host in hosts:
host = host.strip()
host_name, host_port = host.split(":")
if not host_port:
msg = "Invalid hosts value: %s. It should be"\
" in hostname:port format" % host
raise ValueError(msg)
try:
host_port = int(host_port)
except ValueError:
msg = "Invalid port value: %s. It should be an integer"
raise ValueError(msg % host_port)
transport_hosts.append(oslo_msg.TransportHost(
hostname=host_name,
port=host_port,
username=username,
password=password))
return transport_hosts
def getTarget(self, topic, exchange=None, namespace=None,
version=None, server=None):
return oslo_msg.Target(topic=topic,
exchange=exchange,
namespace=namespace,
version=version,
server=server)
def getRPCClient(self, target, version_cap=None, serializer=None):
assert self.TRANSPORT is not None
return oslo_msg.RPCClient(self.TRANSPORT,
target,
version_cap=version_cap,
serializer=serializer)
def getRPCServer(self, target, endpoints, serializer=None):
assert self.TRANSPORT is not None
return oslo_msg.get_rpc_server(self.TRANSPORT,
target,
endpoints,
executor="eventlet",
serializer=serializer)
def getNotificationListener(self, targets, endpoints):
assert self.TRANSPORT is not None
return oslo_msg.get_notification_listener(self.TRANSPORT,
targets,
endpoints,
allow_requeue=True,
executor="eventlet")

View File

@ -4,13 +4,13 @@ import hashlib
import hmac
import json
import logging
import oslo_messaging as oslo_msg
import requests
from common.block_device import BlockDeviceMapping
from common.compute import Compute
from common.flavor import Flavor
from common.hypervisor import Hypervisor
from common.messaging import AMQP
from common.quota import Quota
from common.request import Request
from common.server import Server
@ -42,91 +42,6 @@ LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class MessagingAPI(object):
def __init__(self, amqp_backend, amqp_user,
amqp_password, amqp_hosts, amqp_virt_host):
super(MessagingAPI, self).__init__()
oslo_msg.set_transport_defaults(control_exchange="nova")
transport_hosts = self._createTransportHosts(amqp_user,
amqp_password,
amqp_hosts)
transport_url = oslo_msg.TransportURL(CONF,
transport=amqp_backend,
virtual_host=amqp_virt_host,
hosts=transport_hosts,
aliases=None)
self.TRANSPORT = oslo_msg.get_transport(CONF, url=transport_url)
def _createTransportHosts(self, username, password, hosts):
"""Returns a list of oslo.messaging.TransportHost objects."""
transport_hosts = []
for host in hosts:
host = host.strip()
host_name, host_port = host.split(":")
if not host_port:
msg = "Invalid hosts value: %s. It should be"\
" in hostname:port format" % host
raise ValueError(msg)
try:
host_port = int(host_port)
except ValueError:
msg = "Invalid port value: %s. It should be an integer"
raise ValueError(msg % host_port)
transport_hosts.append(oslo_msg.TransportHost(
hostname=host_name,
port=host_port,
username=username,
password=password))
return transport_hosts
def getTarget(self, topic, exchange=None, namespace=None,
version=None, server=None):
return oslo_msg.Target(topic=topic,
exchange=exchange,
namespace=namespace,
version=version,
server=server)
def getRPCClient(self, target, version_cap=None, serializer=None):
assert self.TRANSPORT is not None
LOG.debug("creating RPC client with target %s" % target)
return oslo_msg.RPCClient(self.TRANSPORT,
target,
version_cap=version_cap,
serializer=serializer)
def getRPCServer(self, target, endpoints, serializer=None):
assert self.TRANSPORT is not None
LOG.debug("creating RPC server with target %s" % target)
return oslo_msg.get_rpc_server(self.TRANSPORT,
target,
endpoints,
executor="eventlet",
serializer=serializer)
def getNotificationListener(self, targets, endpoints):
assert self.TRANSPORT is not None
LOG.debug("creating notification listener with target %s endpoints %s"
% (targets, endpoints))
return oslo_msg.get_notification_listener(self.TRANSPORT,
targets,
endpoints,
allow_requeue=True,
executor="eventlet")
class NovaConductorComputeAPI(object):
def __init__(self, topic, scheduler_manager, keystone_manager, msg):
@ -215,10 +130,18 @@ class NovaManager(Manager):
super(NovaManager, self).__init__("NovaManager")
self.config_opts = [
cfg.StrOpt("amqp_url",
help="the amqp transport url",
default=None,
required=False),
cfg.StrOpt("amqp_exchange",
help="the amqp exchange",
default="nova",
required=False),
cfg.StrOpt("amqp_backend",
help="the amqp backend tpye (e.g. rabbit, qpid)",
default=None,
required=True),
required=False),
cfg.ListOpt("amqp_hosts",
help="AMQP HA cluster host:port pairs",
default=None,
@ -234,11 +157,11 @@ class NovaManager(Manager):
cfg.StrOpt("amqp_user",
help="the amqp user",
default=None,
required=True),
required=False),
cfg.StrOpt("amqp_password",
help="the amqp password",
default=None,
required=True),
required=False),
cfg.StrOpt("amqp_virt_host",
help="the amqp virtual host",
default="/",
@ -310,7 +233,9 @@ class NovaManager(Manager):
self.keystone_manager = self.getManager("KeystoneManager")
self.scheduler_manager = self.getManager("SchedulerManager")
amqp_backend = self.getParameter("amqp_backend", fallback=True)
amqp_url = self.getParameter("amqp_url")
amqp_backend = self.getParameter("amqp_backend")
amqp_hosts = self.getParameter("amqp_hosts")
@ -318,12 +243,14 @@ class NovaManager(Manager):
amqp_port = self.getParameter("amqp_port")
amqp_user = self.getParameter("amqp_user", fallback=True)
amqp_user = self.getParameter("amqp_user")
amqp_password = self.getParameter("amqp_password", fallback=True)
amqp_password = self.getParameter("amqp_password")
amqp_virt_host = self.getParameter("amqp_virt_host")
amqp_exchange = self.getParameter("amqp_exchange")
db_connection = self.getParameter("db_connection", fallback=True)
host = self.getParameter("host")
@ -342,19 +269,20 @@ class NovaManager(Manager):
self.db_engine = create_engine(db_connection, pool_recycle=30)
self.messagingAPI = MessagingAPI(amqp_backend, amqp_user,
amqp_password, amqp_hosts,
amqp_virt_host)
self.messaging = AMQP(url=amqp_url, backend=amqp_backend,
username=amqp_user, password=amqp_password,
hosts=amqp_hosts, virt_host=amqp_virt_host,
exchange=amqp_exchange)
self.novaConductorComputeAPI = NovaConductorComputeAPI(
conductor_topic,
self.scheduler_manager,
self.keystone_manager,
self.messagingAPI)
self.messaging)
self.conductor_rpc = self.messagingAPI.getRPCServer(
target=self.messagingAPI.getTarget(topic=synergy_topic,
server=host),
self.conductor_rpc = self.messaging.getRPCServer(
target=self.messaging.getTarget(topic=synergy_topic,
server=host),
endpoints=[self.novaConductorComputeAPI])
self.conductor_rpc.start()
@ -912,24 +840,24 @@ class NovaManager(Manager):
def getTarget(self, topic, exchange=None, namespace=None,
version=None, server=None):
return self.messagingAPI.getTarget(topic=topic,
namespace=namespace,
exchange=exchange,
version=version,
server=server)
return self.messaging.getTarget(topic=topic,
namespace=namespace,
exchange=exchange,
version=version,
server=server)
def getRPCClient(self, target, version_cap=None, serializer=None):
return self.messagingAPI.getRPCClient(target,
version_cap=version_cap,
serializer=serializer)
return self.messaging.getRPCClient(target,
version_cap=version_cap,
serializer=serializer)
def getRPCServer(self, target, endpoints, serializer=None):
return self.messagingAPI.getRPCServer(target,
endpoints,
serializer=serializer)
return self.messaging.getRPCServer(target,
endpoints,
serializer=serializer)
def getNotificationListener(self, targets, endpoints):
return self.messagingAPI.getNotificationListener(targets, endpoints)
return self.messaging.getNotificationListener(targets, endpoints)
def getProjectUsage(self, prj_id, from_date, to_date):
usage = {}
@ -1070,11 +998,11 @@ where instance_uuid='%(id)s' and deleted_at is NULL""" % {"id": server.getId()}
return servers
def selectComputes(self, request):
target = self.messagingAPI.getTarget(topic='scheduler',
exchange="nova",
version="4.0")
target = self.messaging.getTarget(topic='scheduler',
exchange="nova",
version="4.0")
client = self.messagingAPI.getRPCClient(target)
client = self.messaging.getRPCClient(target)
cctxt = client.prepare(version='4.0')
request_spec = {