[Event-engine] Make listener pool name configurable

* Now it is impossible to set the same
   pool name for queue listeners which use
   event engine. By default, it creates
   an unique pool named <hostname> so each
   event engine is in its own pool. Due to
   that and documentation of oslo.messaging,
   any message that comes to topic duplicates
   across all event engines.

 * But if they have the same pool name, the message
   will be delivered only to one of event-engines
   (by round-robin).

 * This patch adds a possibility to change listener pool
   name for each event-engine.

Change-Id: Iea83c461694a26d9cea810e6cc6169a0fe3f9f06
This commit is contained in:
Nikolay Mahotkin 2017-10-11 18:19:07 +03:00
parent f6b6f1d20b
commit e0eeca6706
2 changed files with 11 additions and 2 deletions

View File

@ -227,6 +227,13 @@ event_engine_opts = [
'identifier. It is not necessarily a hostname, ' 'identifier. It is not necessarily a hostname, '
'FQDN, or IP address.') 'FQDN, or IP address.')
), ),
cfg.HostAddressOpt(
'listener_pool_name',
default='events',
help=_('Name of the event engine\'s listener pool. This can be an'
' opaque identifier. It is used for identifying the group'
' of event engine listeners in oslo.messaging.')
),
cfg.StrOpt( cfg.StrOpt(
'topic', 'topic',
default='mistral_event_engine', default='mistral_event_engine',

View File

@ -18,8 +18,8 @@ AMQP messages based on olso.messaging framework.
""" """
import abc import abc
import socket
from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
import oslo_messaging import oslo_messaging
from oslo_messaging.notify import dispatcher from oslo_messaging.notify import dispatcher
@ -30,6 +30,7 @@ from oslo_utils import timeutils
import six import six
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
CONF = cfg.CONF
def handle_event(self, ctxt, publisher_id, event_type, payload, metadata): def handle_event(self, ctxt, publisher_id, event_type, payload, metadata):
@ -91,7 +92,8 @@ def get_pool_name(exchange):
:param exchange: exchange name :param exchange: exchange name
""" """
pool_name = 'mistral-%s-%s' % (exchange, socket.gethostname()) pool_host = CONF.event_engine.listener_pool_name
pool_name = 'mistral-%s-%s' % (exchange, pool_host)
LOG.debug("Listener pool name is %s", pool_name) LOG.debug("Listener pool name is %s", pool_name)