diff --git a/gnocchi/cli.py b/gnocchi/cli.py index 8ae29ca4..f7c68dcc 100644 --- a/gnocchi/cli.py +++ b/gnocchi/cli.py @@ -16,7 +16,6 @@ import multiprocessing import threading import time -import uuid import cotyledon from futurist import periodics @@ -143,16 +142,11 @@ class MetricScheduler(MetricProcessBase): TASKS_PER_WORKER = 16 BLOCK_SIZE = 4 - def _enable_coordination(self, conf): - self._coord = coordination.get_coordinator( - conf.storage.coordination_url, self._my_id) - self._coord.start(start_heart=True) - def __init__(self, worker_id, conf, queue): super(MetricScheduler, self).__init__( worker_id, conf, conf.storage.metric_processing_delay) - self._my_id = str(uuid.uuid4()) - self._enable_coordination(conf) + self._coord, self._my_id = utils.get_coordinator_and_start( + conf.storage.coordination_url) self.queue = queue self.previously_scheduled_metrics = set() self.workers = conf.metricd.workers diff --git a/gnocchi/storage/_carbonara.py b/gnocchi/storage/_carbonara.py index 55992877..ee6061a4 100644 --- a/gnocchi/storage/_carbonara.py +++ b/gnocchi/storage/_carbonara.py @@ -65,24 +65,14 @@ class CarbonaraBasedStorage(storage.StorageDriver): def __init__(self, conf): super(CarbonaraBasedStorage, self).__init__(conf) - self.coord = coordination.get_coordinator( - conf.coordination_url, - str(uuid.uuid4()).encode('ascii')) self.aggregation_workers_number = conf.aggregation_workers_number if self.aggregation_workers_number == 1: # NOTE(jd) Avoid using futures at all if we don't want any threads. self._map_in_thread = self._map_no_thread else: self._map_in_thread = self._map_in_futures_threads - self.start() - - @utils.retry - def start(self): - try: - self.coord.start(start_heart=True) - except Exception as e: - LOG.error("Unable to start coordinator: %s" % e) - raise utils.Retry(e) + self.coord, my_id = utils.get_coordinator_and_start( + conf.coordination_url) def stop(self): self.coord.stop() diff --git a/gnocchi/utils.py b/gnocchi/utils.py index a49b161d..d823aace 100644 --- a/gnocchi/utils.py +++ b/gnocchi/utils.py @@ -14,13 +14,19 @@ # License for the specific language governing permissions and limitations # under the License. import datetime +import uuid import iso8601 + +from oslo_log import log from oslo_utils import timeutils from pytimeparse import timeparse import retrying import six -import uuid +from tooz import coordination + + +LOG = log.getLogger(__name__) # uuid5 namespace for id transformation. # NOTE(chdent): This UUID must stay the same, forever, across all @@ -64,6 +70,23 @@ retry = retrying.retry(wait_exponential_multiplier=500, retry_on_exception=retry_if_retry_is_raised) +# TODO(jd) Move this to tooz? +@retry +def _enable_coordination(coord): + try: + coord.start(start_heart=True) + except Exception as e: + LOG.error("Unable to start coordinator: %s", e) + raise Retry(e) + + +def get_coordinator_and_start(url): + my_id = str(uuid.uuid4()) + coord = coordination.get_coordinator(url, my_id) + _enable_coordination(coord) + return coord, my_id + + def to_timestamp(v): if isinstance(v, datetime.datetime): return v