better support notification coordination

when launching multiple agents at same time, there is a chance that
agents will miss the registry of another agent. this is possible
because there is a lot of overhead involved when starting up agents,
specifically with initialising managers.

this change makes it so the agent only joins the group AFTER it has
done all setup that does not require coordination. after it joins,
we start listening right away for other changes to group membership

additionally, this adds a lock to pipeline queue setup so only one
event at any time can trigger a reconfiguration.

Change-Id: I8100160a3aa83a190c4110e6e8be9b26aef8fd1c
Closes-Bug: #1533787
This commit is contained in:
gordon chung 2016-01-14 09:47:33 -05:00
parent 5a51a3c218
commit 67e47cda8e
3 changed files with 55 additions and 42 deletions

View File

@ -167,7 +167,7 @@ class PartitionCoordinator(object):
self.join_group(group_id)
try:
members = self._get_members(group_id)
LOG.debug('Members of group: %s', members)
LOG.debug('Members of group: %s, Me: %s', members, self._my_id)
hr = utils.HashRing(members)
filtered = [v for v in iterable
if hr.get_node(str(v)) == self._my_id]

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import itertools
import threading
from oslo_config import cfg
from oslo_context import context
@ -91,6 +92,7 @@ class NotificationService(service_base.BaseService):
super(NotificationService, self).__init__(*args, **kwargs)
self.partition_coordinator = None
self.listeners, self.pipeline_listeners = [], []
self.coord_lock = threading.Lock()
self.group_id = None
@classmethod
@ -154,7 +156,6 @@ class NotificationService(service_base.BaseService):
self.group_id = self.NOTIFICATION_NAMESPACE
self.partition_coordinator = coordination.PartitionCoordinator()
self.partition_coordinator.start()
self.partition_coordinator.join_group(self.group_id)
else:
# FIXME(sileht): endpoint uses the notification_topics option
# and it should not because this is an oslo_messaging option
@ -174,14 +175,16 @@ class NotificationService(service_base.BaseService):
self.event_pipe_manager)
if cfg.CONF.notification.workload_partitioning:
self._configure_pipeline_listeners()
# join group after all manager set up is configured
self.partition_coordinator.join_group(self.group_id)
self.partition_coordinator.watch_group(self.group_id,
self._refresh_agent)
self.tg.add_timer(cfg.CONF.coordination.heartbeat,
self.partition_coordinator.heartbeat)
self.tg.add_timer(cfg.CONF.coordination.check_watchers,
self.partition_coordinator.run_watchers)
# configure pipelines after all coordination is configured.
self._configure_pipeline_listeners()
if not cfg.CONF.notification.disable_non_metric_meters:
LOG.warning(_LW('Non-metric meters may be collected. It is highly '
@ -237,47 +240,48 @@ class NotificationService(service_base.BaseService):
self._configure_pipeline_listeners(True)
def _configure_pipeline_listeners(self, reuse_listeners=False):
ev_pipes = []
if cfg.CONF.notification.store_events:
ev_pipes = self.event_pipeline_manager.pipelines
pipelines = self.pipeline_manager.pipelines + ev_pipes
transport = messaging.get_transport()
partitioned = self.partition_coordinator.extract_my_subset(
self.group_id,
range(cfg.CONF.notification.pipeline_processing_queues))
with self.coord_lock:
ev_pipes = []
if cfg.CONF.notification.store_events:
ev_pipes = self.event_pipeline_manager.pipelines
pipelines = self.pipeline_manager.pipelines + ev_pipes
transport = messaging.get_transport()
partitioned = self.partition_coordinator.extract_my_subset(
self.group_id,
range(cfg.CONF.notification.pipeline_processing_queues))
queue_set = {}
for pipe_set, pipe in itertools.product(partitioned, pipelines):
queue_set['%s-%s-%s' %
(self.NOTIFICATION_IPC, pipe.name, pipe_set)] = pipe
queue_set = {}
for pipe_set, pipe in itertools.product(partitioned, pipelines):
queue_set['%s-%s-%s' %
(self.NOTIFICATION_IPC, pipe.name, pipe_set)] = pipe
if reuse_listeners:
topics = queue_set.keys()
kill_list = []
for listener in self.pipeline_listeners:
if listener.dispatcher.targets[0].topic in topics:
queue_set.pop(listener.dispatcher.targets[0].topic)
else:
kill_list.append(listener)
for listener in kill_list:
utils.kill_listeners([listener])
self.pipeline_listeners.remove(listener)
else:
utils.kill_listeners(self.pipeline_listeners)
self.pipeline_listeners = []
if reuse_listeners:
topics = queue_set.keys()
kill_list = []
for listener in self.pipeline_listeners:
if listener.dispatcher.targets[0].topic in topics:
queue_set.pop(listener.dispatcher.targets[0].topic)
else:
kill_list.append(listener)
for listener in kill_list:
utils.kill_listeners([listener])
self.pipeline_listeners.remove(listener)
else:
utils.kill_listeners(self.pipeline_listeners)
self.pipeline_listeners = []
for topic, pipe in queue_set.items():
LOG.debug('Pipeline endpoint: %s from set: %s', pipe.name,
pipe_set)
pipe_endpoint = (pipeline.EventPipelineEndpoint
if isinstance(pipe, pipeline.EventPipeline)
else pipeline.SamplePipelineEndpoint)
listener = messaging.get_notification_listener(
transport,
[oslo_messaging.Target(topic=topic)],
[pipe_endpoint(self.ctxt, pipe)])
listener.start()
self.pipeline_listeners.append(listener)
for topic, pipe in queue_set.items():
LOG.debug('Pipeline endpoint: %s from set: %s', pipe.name,
pipe_set)
pipe_endpoint = (pipeline.EventPipelineEndpoint
if isinstance(pipe, pipeline.EventPipeline)
else pipeline.SamplePipelineEndpoint)
listener = messaging.get_notification_listener(
transport,
[oslo_messaging.Target(topic=topic)],
[pipe_endpoint(self.ctxt, pipe)])
listener.start()
self.pipeline_listeners.append(listener)
def stop(self):
if self.partition_coordinator:

View File

@ -0,0 +1,9 @@
---
critical:
- >
[`bug 1533787 <https://bugs.launchpad.net/ceilometer/+bug/1533787>`_]
Fix an issue where agents are not properly getting registered to group
when multiple notification agents are deployed. This can result in
bad transformation as the agents are not coordinated. It is still
recommended to set heartbeat_timeout_threshold = 0 in
[oslo_messaging_rabbit] section when deploying multiple agents.