Merge "drop single bucket partitioning"

This commit is contained in:
Jenkins 2017-04-21 06:18:29 +00:00 committed by Gerrit Code Review
commit bcc91366e4
12 changed files with 32 additions and 99 deletions

View File

@ -145,10 +145,8 @@ class MetricReporting(MetricProcessBase):
class MetricScheduler(MetricProcessBase):
name = "scheduler"
MAX_OVERLAP = 0.3
GROUP_ID = "gnocchi-scheduler"
SYNC_RATE = 30
TASKS_PER_WORKER = 16
BLOCK_SIZE = 4
def __init__(self, worker_id, conf, queue):
@ -157,32 +155,9 @@ class MetricScheduler(MetricProcessBase):
self._coord, self._my_id = utils.get_coordinator_and_start(
conf.storage.coordination_url)
self.queue = queue
self.previously_scheduled_metrics = set()
self.workers = conf.metricd.workers
self.block_index = 0
self.block_size_default = self.workers * self.TASKS_PER_WORKER
self.block_size = self.block_size_default
self.periodic = None
def set_block(self, event):
get_members_req = self._coord.get_members(self.GROUP_ID)
try:
members = sorted(get_members_req.get())
self.block_index = members.index(self._my_id)
reqs = list(self._coord.get_member_capabilities(self.GROUP_ID, m)
for m in members)
for req in reqs:
cap = msgpack.loads(req.get(), encoding='utf-8')
max_workers = max(cap['workers'], self.workers)
self.block_size = max_workers * self.TASKS_PER_WORKER
LOG.info('New set of agents detected. Now working on block: %s, '
'with up to %s metrics', self.block_index,
self.block_size)
except Exception:
LOG.warning('Error getting block to work on, defaulting to first')
self.block_index = 0
self.block_size = self.block_size_default
@utils.retry
def _configure(self):
super(MetricScheduler, self)._configure()
@ -191,7 +166,6 @@ class MetricScheduler(MetricProcessBase):
join_req = self._coord.join_group(self.GROUP_ID, cap)
join_req.get()
LOG.info('Joined coordination group: %s', self.GROUP_ID)
self.set_block(None)
@periodics.periodic(spacing=self.SYNC_RATE, run_immediately=True)
def run_watchers():
@ -203,8 +177,6 @@ class MetricScheduler(MetricProcessBase):
t.daemon = True
t.start()
self._coord.watch_join_group(self.GROUP_ID, self.set_block)
self._coord.watch_leave_group(self.GROUP_ID, self.set_block)
except coordination.GroupNotCreated as e:
create_group_req = self._coord.create_group(self.GROUP_ID)
try:
@ -221,23 +193,10 @@ class MetricScheduler(MetricProcessBase):
def _run_job(self):
try:
metrics = set(
self.store.incoming.list_metric_with_measures_to_process(
self.block_size, self.block_index))
if metrics and not self.queue.empty():
# NOTE(gordc): drop metrics we previously process to avoid
# handling twice
number_of_scheduled_metrics = len(metrics)
metrics = metrics - self.previously_scheduled_metrics
if (float(number_of_scheduled_metrics - len(metrics)) /
self.block_size > self.MAX_OVERLAP):
LOG.warning('Metric processing lagging scheduling rate. '
'It is recommended to increase the number of '
'workers or to lengthen processing interval.')
metrics = list(metrics)
metrics = list(
self.store.incoming.list_metric_with_measures_to_process())
for i in six.moves.range(0, len(metrics), self.BLOCK_SIZE):
self.queue.put(metrics[i:i + self.BLOCK_SIZE])
self.previously_scheduled_metrics = set(metrics)
LOG.debug("%d metrics scheduled for processing.", len(metrics))
except Exception:
LOG.error("Unexpected error scheduling metrics for processing",
@ -326,8 +285,8 @@ def metricd_tester(conf):
index = indexer.get_driver(conf)
index.connect()
s = storage.get_driver(conf)
metrics = s.incoming.list_metric_with_measures_to_process(
conf.stop_after_processing_metrics, 0)
metrics = s.incoming.list_metric_with_measures_to_process()[
:conf.stop_after_processing_metrics]
s.process_new_measures(index, metrics, True)

View File

@ -52,5 +52,5 @@ class StorageDriver(object):
raise exceptions.NotImplementedError
@staticmethod
def list_metric_with_measures_to_process(size, part, full=False):
def list_metric_with_measures_to_process():
raise NotImplementedError

View File

@ -67,10 +67,6 @@ class CarbonaraBasedStorage(incoming.StorageDriver):
def _build_report(details):
raise NotImplementedError
@staticmethod
def list_metric_with_measures_to_process(size, part, full=False):
raise NotImplementedError
@staticmethod
def delete_unprocessed_measures_for_metric_id(metric_id):
raise NotImplementedError

View File

@ -16,7 +16,6 @@ import contextlib
import datetime
import errno
import functools
import itertools
import uuid
@ -119,14 +118,18 @@ class CephStorage(_carbonara.CarbonaraBasedStorage):
return ()
return (k for k, v in omaps)
def list_metric_with_measures_to_process(self, size, part, full=False):
names = self._list_object_names_to_process(limit=-1 if full else
size * (part + 1))
if full:
objs_it = names
else:
objs_it = itertools.islice(names, size * part, size * (part + 1))
return set([name.split("_")[1] for name in objs_it])
def list_metric_with_measures_to_process(self):
names = set()
marker = ""
while True:
obj_names = list(self._list_object_names_to_process(
marker=marker, limit=1000))
names.update(name.split("_")[1] for name in obj_names)
if len(obj_names) < 1000:
break
else:
marker = obj_names[-1]
return names
def delete_unprocessed_measures_for_metric_id(self, metric_id):
object_prefix = self.MEASURE_PREFIX + "_" + str(metric_id)

View File

@ -75,11 +75,8 @@ class FileStorage(_carbonara.CarbonaraBasedStorage):
return (len(metric_details.keys()), sum(metric_details.values()),
metric_details if details else None)
def list_metric_with_measures_to_process(self, size, part, full=False):
if full:
return set(os.listdir(self.measure_path))
return set(
os.listdir(self.measure_path)[size * part:size * (part + 1)])
def list_metric_with_measures_to_process(self):
return set(os.listdir(self.measure_path))
def _list_measures_container_for_metric_id(self, metric_id):
try:

View File

@ -46,13 +46,11 @@ class RedisStorage(_carbonara.CarbonaraBasedStorage):
return (len(metric_details.keys()), sum(metric_details.values()),
metric_details if details else None)
def list_metric_with_measures_to_process(self, size, part, full=False):
def list_metric_with_measures_to_process(self):
match = redis.SEP.join([self.STORAGE_PREFIX, "*"])
keys = self._client.scan_iter(match=match, count=1000)
measures = set([k.decode('utf8').split(redis.SEP)[1] for k in keys])
if full:
return measures
return set(list(measures)[size * part:size * (part + 1)])
return measures
def delete_unprocessed_measures_for_metric_id(self, metric_id):
self._client.delete(self._build_measure_path(metric_id))

View File

@ -80,12 +80,8 @@ class S3Storage(_carbonara.CarbonaraBasedStorage):
return (len(metric_details), sum(metric_details.values()),
metric_details if details else None)
def list_metric_with_measures_to_process(self, size, part, full=False):
if full:
limit = 1000 # 1000 is the default anyway
else:
limit = size * (part + 1)
def list_metric_with_measures_to_process(self):
limit = 1000 # 1000 is the default anyway
metrics = set()
response = {}
# Handle pagination
@ -103,11 +99,7 @@ class S3Storage(_carbonara.CarbonaraBasedStorage):
**kwargs)
for p in response.get('CommonPrefixes', ()):
metrics.add(p['Prefix'].rstrip('/'))
if full:
return metrics
return sorted(list(metrics))[size * part:]
return metrics
def _list_measure_files_for_metric_id(self, metric_id):
files = set()

View File

@ -58,16 +58,10 @@ class SwiftStorage(_carbonara.CarbonaraBasedStorage):
measures = int(headers.get('x-container-object-count'))
return nb_metrics, measures, metric_details if details else None
def list_metric_with_measures_to_process(self, size, part, full=False):
limit = None
if not full:
limit = size * (part + 1)
def list_metric_with_measures_to_process(self):
headers, files = self.swift.get_container(self.MEASURE_PREFIX,
delimiter='/',
full_listing=full,
limit=limit)
if not full:
files = files[size * part:]
full_listing=True)
return set(f['subdir'][:-1] for f in files if 'subdir' in f)
def _list_measure_files_for_metric_id(self, metric_id):

View File

@ -181,8 +181,7 @@ class MetricdThread(threading.Thread):
def run(self):
incoming = self.storage.incoming
while self.flag:
metrics = incoming.list_metric_with_measures_to_process(
None, None, full=True)
metrics = incoming.list_metric_with_measures_to_process()
self.storage.process_background_tasks(self.index, metrics)
time.sleep(0.1)

View File

@ -60,8 +60,7 @@ class TestAggregates(tests_base.TestCase):
for n, val in enumerate(data)]
self.index.create_metric(metric.id, str(uuid.uuid4()), 'medium')
self.storage.incoming.add_measures(metric, measures)
metrics = self.storage.incoming.list_metric_with_measures_to_process(
None, None, full=True)
metrics = self.storage.incoming.list_metric_with_measures_to_process()
self.storage.process_background_tasks(self.index, metrics, sync=True)
return metric

View File

@ -122,8 +122,7 @@ class TestingApp(webtest.TestApp):
req.headers['X-User-Id'] = self.USER_ID
req.headers['X-Project-Id'] = self.PROJECT_ID
response = super(TestingApp, self).do_request(req, *args, **kwargs)
metrics = self.storage.incoming.list_metric_with_measures_to_process(
None, None, full=True)
metrics = self.storage.incoming.list_metric_with_measures_to_process()
self.storage.process_background_tasks(self.indexer, metrics, sync=True)
return response

View File

@ -97,18 +97,15 @@ class TestStorageDriver(tests_base.TestCase):
self.assertIn((utils.datetime_utc(2014, 1, 1, 12), 300.0, 5.0), m)
def test_list_metric_with_measures_to_process(self):
metrics = self.storage.incoming.list_metric_with_measures_to_process(
None, None, full=True)
metrics = self.storage.incoming.list_metric_with_measures_to_process()
self.assertEqual(set(), metrics)
self.storage.incoming.add_measures(self.metric, [
storage.Measure(utils.dt_to_unix_ns(2014, 1, 1, 12, 0, 1), 69),
])
metrics = self.storage.incoming.list_metric_with_measures_to_process(
None, None, full=True)
metrics = self.storage.incoming.list_metric_with_measures_to_process()
self.assertEqual(set([str(self.metric.id)]), metrics)
self.trigger_processing()
metrics = self.storage.incoming.list_metric_with_measures_to_process(
None, None, full=True)
metrics = self.storage.incoming.list_metric_with_measures_to_process()
self.assertEqual(set([]), metrics)
def test_delete_nonempty_metric(self):