diff --git a/gnocchi/cli.py b/gnocchi/cli.py index 91992c67..0de7816d 100644 --- a/gnocchi/cli.py +++ b/gnocchi/cli.py @@ -19,10 +19,13 @@ import time import cotyledon from cotyledon import oslo_config_glue +from futurist import periodics from oslo_config import cfg from oslo_log import log from oslo_utils import timeutils import six +import tenacity +import tooz from gnocchi import archive_policy from gnocchi import genconfig @@ -139,12 +142,56 @@ class MetricReporting(MetricProcessBase): class MetricProcessor(MetricProcessBase): name = "processing" + GROUP_ID = "gnocchi-processing" def __init__(self, worker_id, conf): super(MetricProcessor, self).__init__( worker_id, conf, conf.metricd.metric_processing_delay) self._coord, self._my_id = utils.get_coordinator_and_start( conf.storage.coordination_url) + self._tasks = [] + self.group_state = None + + @utils.retry + def _configure(self): + super(MetricProcessor, self)._configure() + # create fallback in case paritioning fails or assigned no tasks + self.fallback_tasks = list( + six.moves.range(self.store.incoming.NUM_SACKS)) + try: + self.partitioner = self._coord.join_partitioned_group( + self.GROUP_ID, partitions=200) + LOG.info('Joined coordination group: %s', self.GROUP_ID) + + @periodics.periodic(spacing=self.conf.metricd.worker_sync_rate, + run_immediately=True) + def run_watchers(): + self._coord.run_watchers() + + self.periodic = periodics.PeriodicWorker.create([]) + self.periodic.add(run_watchers) + t = threading.Thread(target=self.periodic.start) + t.daemon = True + t.start() + except NotImplementedError: + LOG.warning('Coordinator does not support partitioning. Worker ' + 'will battle against other workers for jobs.') + except tooz.ToozError as e: + LOG.error('Unexpected error configuring coordinator for ' + 'partitioning. Retrying: %s', e) + raise tenacity.TryAgain(e) + + def _get_tasks(self): + try: + if (not self._tasks or + self.group_state != self.partitioner.ring.nodes): + self.group_state = self.partitioner.ring.nodes.copy() + # TODO(gordc): make replicas configurable + self._tasks = [ + i for i in six.moves.range(self.store.incoming.NUM_SACKS) + if self.partitioner.belongs_to_self(i, replicas=3)] + finally: + return self._tasks or self.fallback_tasks def _sack_lock(self, sack): lock_name = b'gnocchi-sack-%s-lock' % str(sack).encode('ascii') @@ -154,7 +201,7 @@ class MetricProcessor(MetricProcessBase): m_count = 0 s_count = 0 in_store = self.store.incoming - for s in six.moves.range(in_store.NUM_SACKS): + for s in self._get_tasks(): # TODO(gordc): support delay release lock so we don't # process a sack right after another process lock = self._sack_lock(s) diff --git a/gnocchi/opts.py b/gnocchi/opts.py index 604cebcc..ca3a7be4 100644 --- a/gnocchi/opts.py +++ b/gnocchi/opts.py @@ -85,6 +85,13 @@ def list_opts(): required=True, help="How many seconds to wait between " "cleaning of expired data"), + cfg.IntOpt('worker_sync_rate', + default=30, + help="Frequency to detect when metricd workers join or " + "leave system (in seconds). A shorter rate, may " + "improve rebalancing but create more coordination " + "load"), + )), ("api", ( cfg.StrOpt('paste_config',