add pool name while register notification listener

oslo.messaging is exposing a new API allow creating listener by
supplying a pool name, then the listeners with that pool name will use
the queue name as the pool name, multiple listeners can be listening
with the same topic.

Change-Id: I220d50fad8cda6f6f50f31215a7a6b98523c35dc
Closes-Bug: #1388663
This commit is contained in:
Terry Yao 2014-11-06 16:39:36 +08:00 committed by terryyao
parent 368e8f71a1
commit c780569e8c
9 changed files with 83 additions and 101 deletions

View File

@ -1,4 +1,4 @@
# Copyright 2013 IBM Corp.
# Copyright 2013, 2014 IBM Corp.
"""
All constants.
@ -24,6 +24,10 @@ LOCAL_PVC_PREFIX = 'pvc:'
# The composite PowerVC storage backend
POWERVC_VOLUME_BACKEND = 'powervc'
# AMQP constants
AMQP_EXCHANGE = 'cinder'
AMQP_TOPIC = 'notifications'
# PowerVC volume & volume type notification events that we listen for
EVENT_VOLUME_TYPE_CREATE = 'volume_type.create'
EVENT_VOLUME_TYPE_DELETE = 'volume_type.delete'

View File

@ -1,4 +1,4 @@
# Copyright 2013 IBM Corp.
# Copyright 2013, 2014 IBM Corp.
# from cinderclient.v1 import client
import cinder.db.sqlalchemy.models
@ -20,10 +20,6 @@ from powervc.common.client import delegate as ctx_delegate
from powervc.common import messaging
from oslo.messaging.notify import listener
from oslo.messaging import target
from oslo.messaging import transport
CONF = config.CONF
LOG = log.getLogger(__name__)
@ -157,10 +153,6 @@ class PowerVCCinderManager(service.Service):
"""
LOG.debug("Enter _create_powervc_listeners method")
trans = transport.get_transport(config.AMQP_POWERVC_CONF)
targets = [
target.Target(exchange='cinder', topic='notifications')
]
endpoint = messaging.NotificationEndpoint(log=LOG, sec_context=ctx)
# Volume type creation
@ -217,12 +209,10 @@ class PowerVCCinderManager(service.Service):
]
LOG.debug("Starting to listen...... ")
pvc_cinder_listener = listener.\
get_notification_listener(trans, targets, endpoints,
allow_requeue=False)
messaging.start_notification_listener(pvc_cinder_listener)
messaging.start_listener(config.AMQP_POWERVC_CONF,
constants.AMQP_EXCHANGE,
constants.AMQP_TOPIC,
endpoints)
LOG.debug("Exit _create_powervc_listeners method")
def _periodic_volume_type_sync(self, context, vol_type_ids=None):

View File

@ -5,10 +5,18 @@ AMQP messages based on olso.messaging framework.
"""
import fnmatch
import logging
import inspect
import socket
import threading
import time
from oslo.messaging import target
from oslo.messaging import transport
from oslo.messaging.notify import dispatcher
from oslo.messaging.notify import listener
LOG = logging.getLogger(__name__)
class NotificationEndpoint(object):
@ -165,7 +173,7 @@ class NotificationEndpoint(object):
self._handler_map[et] = handler
def start_notification_listener(notification_listener):
def _start_notification_listener(notification_listener):
def _run():
notification_listener.start()
notification_listener.wait()
@ -176,3 +184,35 @@ def start_notification_listener(notification_listener):
"""
t = threading.Thread(target=_run)
t.start()
def _get_pool_name(exchange):
"""Get the pool name for the listener, it will be formated as
'powervdriver-exchange-hostname'
:param: exchange exchange name
"""
pool_name = 'powervcdriver-%s-%s' % (exchange, socket.gethostname())
LOG.info("Listener pool name is %s" % pool_name)
return pool_name
def start_listener(conf, exchange, topic, endpoints):
"""Start up notification listener
:param: conf configuration object for listener
:param: exchange exchange name
:param: topic topic name
:param: endpoints the listener endpoints
"""
trans = transport.get_transport(conf)
targets = [target.Target(exchange=exchange, topic=topic)]
create_listener = listener.get_notification_listener
if 'pool' in inspect.getargspec(create_listener).args:
pool_name = _get_pool_name(exchange)
mylistener = create_listener(trans, targets, endpoints,
allow_requeue=False, pool=pool_name)
else:
mylistener = create_listener(trans, targets, endpoints,
allow_requeue=False)
_start_notification_listener(mylistener)

View File

@ -32,10 +32,6 @@ from powervc.common import utils
from powervc.common import messaging
from oslo.messaging.notify import listener
from oslo.messaging import target
from oslo.messaging import transport
CONF = glance_config.CONF
LOG = logging.getLogger(__name__)
@ -2388,11 +2384,6 @@ class PowerVCImageManager(service.Service):
LOG.debug("Enter _start_local_event_handler method")
trans = transport.get_transport(config.AMQP_OPENSTACK_CONF)
targets = [
target.Target(exchange=constants.IMAGE_EVENT_EXCHANGE,
topic=constants.IMAGE_EVENT_TOPIC)
]
endpoint = messaging.NotificationEndpoint(log=LOG)
endpoint.register_handler(constants.IMAGE_EVENT_TYPE_ALL,
@ -2403,12 +2394,10 @@ class PowerVCImageManager(service.Service):
]
LOG.debug("Starting to listen...... ")
local_glance_listener = listener.\
get_notification_listener(trans, targets, endpoints,
allow_requeue=False)
messaging.start_notification_listener(local_glance_listener)
messaging.start_listener(config.AMQP_OPENSTACK_CONF,
constants.IMAGE_EVENT_EXCHANGE,
constants.IMAGE_EVENT_TOPIC,
endpoints)
LOG.debug("Exit _start_local_event_handler method")
self.local_event_handler_running = True
@ -2427,11 +2416,6 @@ class PowerVCImageManager(service.Service):
LOG.debug("Enter _start_pvc_event_handler method")
trans = transport.get_transport(config.AMQP_POWERVC_CONF)
targets = [
target.Target(exchange=constants.IMAGE_EVENT_EXCHANGE,
topic=constants.IMAGE_EVENT_TOPIC)
]
endpoint = messaging.NotificationEndpoint(log=LOG)
endpoint.register_handler(constants.IMAGE_EVENT_TYPE_ALL,
@ -2442,12 +2426,10 @@ class PowerVCImageManager(service.Service):
]
LOG.debug("Starting to listen...... ")
pvc_glance_listener = listener.\
get_notification_listener(trans, targets, endpoints,
allow_requeue=False)
messaging.start_notification_listener(pvc_glance_listener)
messaging.start_listener(config.AMQP_POWERVC_CONF,
constants.IMAGE_EVENT_EXCHANGE,
constants.IMAGE_EVENT_TOPIC,
endpoints)
LOG.debug("Exit _start_pvc_event_handler method")
self.pvc_event_handler_running = True

View File

@ -24,10 +24,6 @@ from powervc.neutron.db import powervc_db_v2
from powervc.common import config as cfg
from powervc.common import messaging
from oslo.messaging.notify import listener
from oslo.messaging import target
from oslo.messaging import transport
LOG = logging.getLogger(__name__)
@ -53,11 +49,6 @@ class Client(neutron_client_bindings.Client):
LOG.debug("Enter _create_amqp_listeners(local) method")
trans = transport.get_transport(cfg.AMQP_OPENSTACK_CONF)
targets = [
target.Target(exchange=constants.QPID_EXCHANGE,
topic=constants.QPID_TOPIC)
]
endpoint = messaging.NotificationEndpoint(log=LOG)
endpoint.register_handler(constants.EVENT_NETWORK_CREATE,
@ -84,12 +75,10 @@ class Client(neutron_client_bindings.Client):
]
LOG.debug("Starting to listen...... ")
local_neutron_listener = listener.\
get_notification_listener(trans, targets, endpoints,
allow_requeue=False)
messaging.start_notification_listener(local_neutron_listener)
messaging.start_listener(cfg.AMQP_OPENSTACK_CONF,
constants.AMQP_EXCHANGE,
constants.AMQP_TOPIC,
endpoints)
LOG.debug("Exit _create_amqp_listeners(local) method")
def _handle_network_create(self,

View File

@ -22,10 +22,6 @@ from powervc.neutron.db import powervc_db_v2
from powervc.common import config as cfg
from powervc.common import messaging
from oslo.messaging.notify import listener
from oslo.messaging import target
from oslo.messaging import transport
LOG = logging.getLogger(__name__)
@ -46,11 +42,6 @@ class Client(neutron_client_bindings.Client):
LOG.debug("Entry _create_amqp_listeners(pvc) method")
trans = transport.get_transport(cfg.AMQP_POWERVC_CONF)
targets = [
target.Target(exchange=constants.QPID_EXCHANGE,
topic=constants.QPID_TOPIC)
]
endpoint = messaging.NotificationEndpoint(log=LOG)
endpoint.register_handler(constants.EVENT_NETWORK_CREATE,
@ -77,12 +68,10 @@ class Client(neutron_client_bindings.Client):
]
LOG.debug("Starting to listen...... ")
pvc_neutron_listener = listener.\
get_notification_listener(trans, targets, endpoints,
allow_requeue=False)
messaging.start_notification_listener(pvc_neutron_listener)
messaging.start_listener(cfg.AMQP_POWERVC_CONF,
constants.AMQP_EXCHANGE,
constants.AMQP_TOPIC,
endpoints)
LOG.debug("Exit _create_amqp_listeners(pvc) method")
def _handle_network_create(self,

View File

@ -50,9 +50,9 @@ PORT_CREATE_FIELDS = ['name',
'device_owner']
PORT_UPDATE_FIELDS = ['name']
# Qpid message handling
QPID_EXCHANGE = 'neutron'
QPID_TOPIC = 'notifications'
# amqp message handling
AMQP_EXCHANGE = 'neutron'
AMQP_TOPIC = 'notifications'
EVENT_END_THREAD = 'thread.end'
EVENT_FULL_SYNC = 'full.sync'

View File

@ -1,4 +1,4 @@
# Copyright 2013 IBM Corp.
# Copyright 2013, 2014 IBM Corp.
"""
All constants.
@ -24,6 +24,10 @@ POWERVC_SUPPORTED_INSTANCES = [('ppc64', 'powervm', 'hvm'),
# Suffix to append to sync event notifications
SYNC_EVENT_SUFFIX = 'sync'
# AMQP constants
AMQP_EXCHANGE = 'nova'
AMQP_TOPIC = 'notifications'
# PowerVC instance notification events that we listen for
EVENT_INSTANCE_UPDATE = 'compute.instance.update'
EVENT_INSTANCE_CREATE = 'compute.instance.create.end'

View File

@ -45,10 +45,6 @@ from powervc.common.client import delegate as ctx_delegate
from powervc.common import messaging
from oslo.messaging.notify import listener
from oslo.messaging import target
from oslo.messaging import transport
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
@ -934,10 +930,6 @@ class PowerVCCloudManager(manager.Manager):
LOG.debug("Enter _create_local_listeners method")
trans = transport.get_transport(cfg.AMQP_OPENSTACK_CONF)
targets = [
target.Target(exchange='nova', topic='notifications')
]
endpoint = messaging.NotificationEndpoint(log=LOG, sec_context=ctx)
# Instance state changes
@ -955,12 +947,10 @@ class PowerVCCloudManager(manager.Manager):
]
LOG.debug("Starting to listen...... ")
local_nova_listener = listener.\
get_notification_listener(trans, targets, endpoints,
allow_requeue=False)
messaging.start_notification_listener(local_nova_listener)
messaging.start_listener(cfg.AMQP_OPENSTACK_CONF,
constants.AMQP_EXCHANGE,
constants.AMQP_TOPIC,
endpoints)
LOG.debug("Exit _create_local_listeners method")
def _create_powervc_listeners(self, ctx):
@ -973,10 +963,6 @@ class PowerVCCloudManager(manager.Manager):
LOG.debug("Enter _create_powervc_listeners method")
trans = transport.get_transport(cfg.AMQP_POWERVC_CONF)
targets = [
target.Target(exchange='nova', topic='notifications')
]
endpoint = messaging.NotificationEndpoint(log=LOG, sec_context=ctx)
# Instance creation
@ -1014,12 +1000,10 @@ class PowerVCCloudManager(manager.Manager):
]
LOG.debug("Starting to listen...... ")
pvc_nova_listener = listener.\
get_notification_listener(trans, targets, endpoints,
allow_requeue=False)
messaging.start_notification_listener(pvc_nova_listener)
messaging.start_listener(cfg.AMQP_POWERVC_CONF,
constants.AMQP_EXCHANGE,
constants.AMQP_TOPIC,
endpoints)
LOG.debug("Exit _create_powervc_listeners method")
def _handle_local_instance_create(self,