Merge "configurable hashring replicas"
This commit is contained in:
commit
62e140317d
|
@ -186,10 +186,10 @@ class MetricProcessor(MetricProcessBase):
|
|||
if (not self._tasks or
|
||||
self.group_state != self.partitioner.ring.nodes):
|
||||
self.group_state = self.partitioner.ring.nodes.copy()
|
||||
# TODO(gordc): make replicas configurable
|
||||
self._tasks = [
|
||||
i for i in six.moves.range(self.store.incoming.NUM_SACKS)
|
||||
if self.partitioner.belongs_to_self(i, replicas=3)]
|
||||
if self.partitioner.belongs_to_self(
|
||||
i, replicas=self.conf.metricd.processing_replicas)]
|
||||
finally:
|
||||
return self._tasks or self.fallback_tasks
|
||||
|
||||
|
|
|
@ -91,7 +91,13 @@ def list_opts():
|
|||
"leave system (in seconds). A shorter rate, may "
|
||||
"improve rebalancing but create more coordination "
|
||||
"load"),
|
||||
|
||||
cfg.IntOpt('processing_replicas',
|
||||
default=3,
|
||||
min=1,
|
||||
help="Number of workers that share a task. A higher "
|
||||
"value may improve worker utilization but may also "
|
||||
"increase load on coordination backend. Value is "
|
||||
"capped by number of workers globally."),
|
||||
)),
|
||||
("api", (
|
||||
cfg.StrOpt('paste_config',
|
||||
|
|
Loading…
Reference in New Issue