implement hashring partitioning
support hashring partitioning if user wants to reduce potential locking load by sacrificing potential throughput. if hashring is not supported or does not assign jobs, we just default to entire set of sacks. if fails to set up partitioning, try again. Change-Id: I1439fb3cdb171ce57ce7887857aa4789fe8f0d9c
This commit is contained in:
parent
f8b18df786
commit
e82460117a
|
@ -19,10 +19,13 @@ import time
|
|||
|
||||
import cotyledon
|
||||
from cotyledon import oslo_config_glue
|
||||
from futurist import periodics
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
from oslo_utils import timeutils
|
||||
import six
|
||||
import tenacity
|
||||
import tooz
|
||||
|
||||
from gnocchi import archive_policy
|
||||
from gnocchi import genconfig
|
||||
|
@ -139,12 +142,56 @@ class MetricReporting(MetricProcessBase):
|
|||
|
||||
class MetricProcessor(MetricProcessBase):
|
||||
name = "processing"
|
||||
GROUP_ID = "gnocchi-processing"
|
||||
|
||||
def __init__(self, worker_id, conf):
|
||||
super(MetricProcessor, self).__init__(
|
||||
worker_id, conf, conf.metricd.metric_processing_delay)
|
||||
self._coord, self._my_id = utils.get_coordinator_and_start(
|
||||
conf.storage.coordination_url)
|
||||
self._tasks = []
|
||||
self.group_state = None
|
||||
|
||||
@utils.retry
|
||||
def _configure(self):
|
||||
super(MetricProcessor, self)._configure()
|
||||
# create fallback in case paritioning fails or assigned no tasks
|
||||
self.fallback_tasks = list(
|
||||
six.moves.range(self.store.incoming.NUM_SACKS))
|
||||
try:
|
||||
self.partitioner = self._coord.join_partitioned_group(
|
||||
self.GROUP_ID, partitions=200)
|
||||
LOG.info('Joined coordination group: %s', self.GROUP_ID)
|
||||
|
||||
@periodics.periodic(spacing=self.conf.metricd.worker_sync_rate,
|
||||
run_immediately=True)
|
||||
def run_watchers():
|
||||
self._coord.run_watchers()
|
||||
|
||||
self.periodic = periodics.PeriodicWorker.create([])
|
||||
self.periodic.add(run_watchers)
|
||||
t = threading.Thread(target=self.periodic.start)
|
||||
t.daemon = True
|
||||
t.start()
|
||||
except NotImplementedError:
|
||||
LOG.warning('Coordinator does not support partitioning. Worker '
|
||||
'will battle against other workers for jobs.')
|
||||
except tooz.ToozError as e:
|
||||
LOG.error('Unexpected error configuring coordinator for '
|
||||
'partitioning. Retrying: %s', e)
|
||||
raise tenacity.TryAgain(e)
|
||||
|
||||
def _get_tasks(self):
|
||||
try:
|
||||
if (not self._tasks or
|
||||
self.group_state != self.partitioner.ring.nodes):
|
||||
self.group_state = self.partitioner.ring.nodes.copy()
|
||||
# TODO(gordc): make replicas configurable
|
||||
self._tasks = [
|
||||
i for i in six.moves.range(self.store.incoming.NUM_SACKS)
|
||||
if self.partitioner.belongs_to_self(i, replicas=3)]
|
||||
finally:
|
||||
return self._tasks or self.fallback_tasks
|
||||
|
||||
def _sack_lock(self, sack):
|
||||
lock_name = b'gnocchi-sack-%s-lock' % str(sack).encode('ascii')
|
||||
|
@ -154,7 +201,7 @@ class MetricProcessor(MetricProcessBase):
|
|||
m_count = 0
|
||||
s_count = 0
|
||||
in_store = self.store.incoming
|
||||
for s in six.moves.range(in_store.NUM_SACKS):
|
||||
for s in self._get_tasks():
|
||||
# TODO(gordc): support delay release lock so we don't
|
||||
# process a sack right after another process
|
||||
lock = self._sack_lock(s)
|
||||
|
|
|
@ -85,6 +85,13 @@ def list_opts():
|
|||
required=True,
|
||||
help="How many seconds to wait between "
|
||||
"cleaning of expired data"),
|
||||
cfg.IntOpt('worker_sync_rate',
|
||||
default=30,
|
||||
help="Frequency to detect when metricd workers join or "
|
||||
"leave system (in seconds). A shorter rate, may "
|
||||
"improve rebalancing but create more coordination "
|
||||
"load"),
|
||||
|
||||
)),
|
||||
("api", (
|
||||
cfg.StrOpt('paste_config',
|
||||
|
|
Loading…
Reference in New Issue