coordination: create coordinator at init time

This also switches the coordination to zake:// in test so the code actually
works like it would with a production-ready coordinator.

Change-Id: I38f6a3389f70bed6b45fa7526a13d0484bfc9c3f
This commit is contained in:
Julien Danjou 2017-03-08 17:48:07 +01:00
parent 9400ce4a54
commit 5e43dc7733
5 changed files with 29 additions and 32 deletions

View File

@ -1,6 +1,6 @@
#
# Copyright 2013 Julien Danjou
# Copyright 2014 Red Hat, Inc
# Copyright 2014-2017 Red Hat, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -18,6 +18,7 @@ import collections
import itertools
import logging
import random
import uuid
from concurrent import futures
import cotyledon
@ -276,8 +277,10 @@ class AgentManager(cotyledon.Service):
self.polling_periodics = None
if self.conf.coordination.backend_url:
# XXX uuid4().bytes ought to work, but it requires ascii for now
coordination_id = str(uuid.uuid4()).encode('ascii')
self.partition_coordinator = coordination.PartitionCoordinator(
self.conf)
self.conf, coordination_id)
else:
self.partition_coordinator = None

View File

@ -1,5 +1,5 @@
#
# Copyright 2014 Red Hat, Inc.
# Copyright 2014-2017 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -14,7 +14,6 @@
# under the License.
import six
import uuid
from oslo_config import cfg
from oslo_log import log
@ -59,22 +58,18 @@ class PartitionCoordinator(object):
empty iterable in this case.
"""
def __init__(self, conf, my_id=None):
def __init__(self, conf, my_id):
self.conf = conf
self._coordinator = None
# XXX uuid4().bytes ought to work, but it requires ascii for now
self._my_id = my_id or str(uuid.uuid4()).encode('ascii')
self._my_id = my_id
self._coordinator = tooz.coordination.get_coordinator(
conf.coordination.backend_url, my_id)
def start(self):
backend_url = self.conf.coordination.backend_url
if backend_url:
try:
self._coordinator = tooz.coordination.get_coordinator(
backend_url, self._my_id)
self._coordinator.start(start_heart=True)
LOG.info(_LI('Coordination backend started successfully.'))
except tooz.coordination.ToozError:
LOG.exception(_LE('Error connecting to coordination backend.'))
try:
self._coordinator.start(start_heart=True)
LOG.info(_LI('Coordination backend started successfully.'))
except tooz.coordination.ToozError:
LOG.exception(_LE('Error connecting to coordination backend.'))
def stop(self):
if not self._coordinator:

View File

@ -16,6 +16,7 @@
import itertools
import threading
import time
import uuid
from ceilometer.agent import plugin_base
from concurrent import futures
@ -107,10 +108,13 @@ class NotificationService(cotyledon.Service):
NOTIFICATION_NAMESPACE = 'ceilometer.notification'
NOTIFICATION_IPC = 'ceilometer-pipe'
def __init__(self, worker_id, conf):
def __init__(self, worker_id, conf, coordination_id=None):
super(NotificationService, self).__init__(worker_id)
self.startup_delay = worker_id
self.conf = conf
# XXX uuid4().bytes ought to work, but it requires ascii for now
self.coordination_id = (coordination_id or
str(uuid.uuid4()).encode('ascii'))
@classmethod
def _get_notifications_manager(cls, pm):
@ -182,7 +186,7 @@ class NotificationService(cotyledon.Service):
if self.conf.notification.workload_partitioning:
self.group_id = self.NOTIFICATION_NAMESPACE
self.partition_coordinator = coordination.PartitionCoordinator(
self.conf)
self.conf, self.coordination_id)
self.partition_coordinator.start()
else:
# FIXME(sileht): endpoint uses the notification_topics option

View File

@ -102,7 +102,7 @@ class TestNotification(tests_base.BaseTestCase):
conf = service.prepare_service([], [])
self.CONF = self.useFixture(fixture_config.Config(conf)).conf
self.CONF.set_override("connection", "log://", group='database')
self.CONF.set_override("backend_url", None, group="coordination")
self.CONF.set_override("backend_url", "zake://", group="coordination")
self.CONF.set_override("workload_partitioning", True,
group='notification')
self.setup_messaging(self.CONF)
@ -124,7 +124,6 @@ class TestNotification(tests_base.BaseTestCase):
@mock.patch('ceilometer.event.endpoint.EventsNotificationEndpoint')
def _do_process_notification_manager_start(self,
fake_event_endpoint_class):
self.CONF([], project='ceilometer', validate_default_values=True)
with mock.patch.object(self.srv,
'_get_notifications_manager') as get_nm:
get_nm.side_effect = self.fake_get_notifications_manager
@ -168,7 +167,6 @@ class TestNotification(tests_base.BaseTestCase):
[extension.Extension('test', None, None, plugin),
extension.Extension('test', None, None, plugin)])
self.CONF([], project='ceilometer', validate_default_values=True)
with mock.patch.object(self.srv,
'_get_notifications_manager') as get_nm:
get_nm.side_effect = fake_get_notifications_manager_dup_targets
@ -232,8 +230,6 @@ class BaseRealNotification(tests_base.BaseTestCase):
self.expected_samples = 2
self.CONF.set_override("backend_url", None, group="coordination")
ev_pipeline_cfg_file = self.setup_event_pipeline(
['compute.instance.*'])
self.expected_events = 1
@ -296,6 +292,7 @@ class TestRealNotificationHA(BaseRealNotification):
super(TestRealNotificationHA, self).setUp()
self.CONF.set_override('workload_partitioning', True,
group='notification')
self.CONF.set_override("backend_url", "zake://", group="coordination")
self.srv = notification.NotificationService(0, self.CONF)
@mock.patch('ceilometer.publisher.test.TestPublisher')
@ -459,7 +456,7 @@ class TestRealNotificationMultipleAgents(tests_base.BaseTestCase):
self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file)
self.CONF.set_override("event_pipeline_cfg_file",
event_pipeline_cfg_file)
self.CONF.set_override("backend_url", None, group="coordination")
self.CONF.set_override("backend_url", "zake://", group="coordination")
self.CONF.set_override('workload_partitioning', True,
group='notification')
self.CONF.set_override('pipeline_processing_queues', 2,
@ -470,15 +467,13 @@ class TestRealNotificationMultipleAgents(tests_base.BaseTestCase):
def _check_notifications(self, fake_publisher_cls):
fake_publisher_cls.side_effect = [self.publisher, self.publisher2]
self.srv = notification.NotificationService(0, self.CONF)
self.srv2 = notification.NotificationService(0, self.CONF)
self.srv = notification.NotificationService(0, self.CONF, 'harry')
self.srv2 = notification.NotificationService(0, self.CONF, 'lloyd')
with mock.patch('ceilometer.coordination.PartitionCoordinator'
'._get_members', return_value=['harry', 'lloyd']):
with mock.patch('uuid.uuid4', return_value='harry'):
self.srv.run()
self.srv.run()
self.addCleanup(self.srv.terminate)
with mock.patch('uuid.uuid4', return_value='lloyd'):
self.srv2.run()
self.srv2.run()
self.addCleanup(self.srv2.terminate)
notifier = messaging.get_notifier(self.transport,

View File

@ -44,7 +44,7 @@ SQLAlchemy<1.1.0,>=1.0.10 # MIT
sqlalchemy-migrate>=0.9.6 # Apache-2.0
stevedore>=1.9.0 # Apache-2.0
tenacity>=3.2.1 # Apache-2.0
tooz>=1.47.0 # Apache-2.0
tooz[zake]>=1.47.0 # Apache-2.0
WebOb>=1.5.0 # MIT
WSME>=0.8 # MIT
# NOTE(jd) We do not import it directly, but WSME datetime string parsing