From e82460117a97c520d410093644c03bbc22c5d5a2 Mon Sep 17 00:00:00 2001 From: gord chung Date: Wed, 19 Apr 2017 13:46:47 +0000 Subject: [PATCH] implement hashring partitioning support hashring partitioning if user wants to reduce potential locking load by sacrificing potential throughput. if hashring is not supported or does not assign jobs, we just default to entire set of sacks. if fails to set up partitioning, try again. Change-Id: I1439fb3cdb171ce57ce7887857aa4789fe8f0d9c --- gnocchi/cli.py | 49 ++++++++++++++++++++++++++++++++++++++++++++++++- gnocchi/opts.py | 7 +++++++ 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/gnocchi/cli.py b/gnocchi/cli.py index 853153dd..8d3b9549 100644 --- a/gnocchi/cli.py +++ b/gnocchi/cli.py @@ -19,10 +19,13 @@ import time import cotyledon from cotyledon import oslo_config_glue +from futurist import periodics from oslo_config import cfg from oslo_log import log from oslo_utils import timeutils import six +import tenacity +import tooz from gnocchi import archive_policy from gnocchi import genconfig @@ -139,12 +142,56 @@ class MetricReporting(MetricProcessBase): class MetricProcessor(MetricProcessBase): name = "processing" + GROUP_ID = "gnocchi-processing" def __init__(self, worker_id, conf): super(MetricProcessor, self).__init__( worker_id, conf, conf.metricd.metric_processing_delay) self._coord, self._my_id = utils.get_coordinator_and_start( conf.storage.coordination_url) + self._tasks = [] + self.group_state = None + + @utils.retry + def _configure(self): + super(MetricProcessor, self)._configure() + # create fallback in case paritioning fails or assigned no tasks + self.fallback_tasks = list( + six.moves.range(self.store.incoming.NUM_SACKS)) + try: + self.partitioner = self._coord.join_partitioned_group( + self.GROUP_ID, partitions=200) + LOG.info('Joined coordination group: %s', self.GROUP_ID) + + @periodics.periodic(spacing=self.conf.metricd.worker_sync_rate, + run_immediately=True) + def run_watchers(): + self._coord.run_watchers() + + self.periodic = periodics.PeriodicWorker.create([]) + self.periodic.add(run_watchers) + t = threading.Thread(target=self.periodic.start) + t.daemon = True + t.start() + except NotImplementedError: + LOG.warning('Coordinator does not support partitioning. Worker ' + 'will battle against other workers for jobs.') + except tooz.ToozError as e: + LOG.error('Unexpected error configuring coordinator for ' + 'partitioning. Retrying: %s', e) + raise tenacity.TryAgain(e) + + def _get_tasks(self): + try: + if (not self._tasks or + self.group_state != self.partitioner.ring.nodes): + self.group_state = self.partitioner.ring.nodes.copy() + # TODO(gordc): make replicas configurable + self._tasks = [ + i for i in six.moves.range(self.store.incoming.NUM_SACKS) + if self.partitioner.belongs_to_self(i, replicas=3)] + finally: + return self._tasks or self.fallback_tasks def _sack_lock(self, sack): lock_name = b'gnocchi-sack-%s-lock' % str(sack).encode('ascii') @@ -154,7 +201,7 @@ class MetricProcessor(MetricProcessBase): m_count = 0 s_count = 0 in_store = self.store.incoming - for s in six.moves.range(in_store.NUM_SACKS): + for s in self._get_tasks(): # TODO(gordc): support delay release lock so we don't # process a sack right after another process lock = self._sack_lock(s) diff --git a/gnocchi/opts.py b/gnocchi/opts.py index 604cebcc..ca3a7be4 100644 --- a/gnocchi/opts.py +++ b/gnocchi/opts.py @@ -85,6 +85,13 @@ def list_opts(): required=True, help="How many seconds to wait between " "cleaning of expired data"), + cfg.IntOpt('worker_sync_rate', + default=30, + help="Frequency to detect when metricd workers join or " + "leave system (in seconds). A shorter rate, may " + "improve rebalancing but create more coordination " + "load"), + )), ("api", ( cfg.StrOpt('paste_config',