diff --git a/gnocchi/opts.py b/gnocchi/opts.py index f1142fd46..023138da5 100644 --- a/gnocchi/opts.py +++ b/gnocchi/opts.py @@ -114,6 +114,10 @@ def list_opts(): required=True, help=('The maximum number of items returned in a ' 'single response from a collection resource')), + cfg.IntOpt('refresh_timeout', + default=10, min=0, + help='Number of seconds before timeout when attempting ' + 'to force refresh of metric.'), )), ("storage", (_STORAGE_OPTS + gnocchi.storage._carbonara.OPTS)), ("incoming", _INCOMING_OPTS), diff --git a/gnocchi/rest/__init__.py b/gnocchi/rest/__init__.py index cd857fa76..8a8ccf867 100644 --- a/gnocchi/rest/__init__.py +++ b/gnocchi/rest/__init__.py @@ -440,10 +440,14 @@ class MetricController(rest.RestController): except ValueError as e: abort(400, e) - if strtobool("refresh", refresh): - pecan.request.storage.process_new_measures( - pecan.request.indexer, [six.text_type(self.metric.id)], True) - + if (strtobool("refresh", refresh) and + pecan.request.storage.incoming.has_unprocessed(self.metric)): + try: + pecan.request.storage.refresh_metric( + pecan.request.indexer, self.metric, + pecan.request.conf.api.refresh_timeout) + except storage.SackLockTimeoutError as e: + abort(503, e) try: if aggregation in self.custom_agg: measures = self.custom_agg[aggregation].compute( @@ -1630,9 +1634,16 @@ class AggregationController(rest.RestController): try: if strtobool("refresh", refresh): - pecan.request.storage.process_new_measures( - pecan.request.indexer, - [six.text_type(m.id) for m in metrics], True) + store = pecan.request.storage + metrics_to_update = [ + m for m in metrics if store.incoming.has_unprocessed(m)] + for m in metrics_to_update: + try: + pecan.request.storage.refresh_metric( + pecan.request.indexer, m, + pecan.request.conf.api.refresh_timeout) + except storage.SackLockTimeoutError as e: + abort(503, e) if number_of_metrics == 1: # NOTE(sileht): don't do the aggregation if we only have one # metric diff --git a/gnocchi/storage/_carbonara.py b/gnocchi/storage/_carbonara.py index 1a6eac6c1..76f8b2590 100644 --- a/gnocchi/storage/_carbonara.py +++ b/gnocchi/storage/_carbonara.py @@ -54,6 +54,10 @@ class CorruptionError(ValueError): super(CorruptionError, self).__init__(message) +class SackLockTimeoutError(Exception): + pass + + class CarbonaraBasedStorage(storage.StorageDriver): def __init__(self, conf, incoming): @@ -70,10 +74,6 @@ class CarbonaraBasedStorage(storage.StorageDriver): def stop(self): self.coord.stop() - def _lock(self, metric_id): - lock_name = b"gnocchi-" + str(metric_id).encode('ascii') - return self.coord.get_lock(lock_name) - @staticmethod def _get_measures(metric, timestamp_key, aggregation, granularity, version=3): @@ -355,18 +355,25 @@ class CarbonaraBasedStorage(storage.StorageDriver): aggregation, granularity, version=3): raise NotImplementedError + def refresh_metric(self, indexer, metric, timeout): + s = self.incoming.sack_for_metric(metric.id) + lock = self.incoming.get_sack_lock(self.coord, s) + if not lock.acquire(blocking=timeout): + raise SackLockTimeoutError( + 'Unable to refresh metric: %s. Metric is locked. ' + 'Please try again.' % metric.id) + try: + self.process_new_measures(indexer, [six.text_type(metric.id)]) + finally: + lock.release() + def process_new_measures(self, indexer, metrics_to_process, sync=False): # process only active metrics. deleted metrics with unprocessed # measures will be skipped until cleaned by janitor. metrics = indexer.list_metrics(ids=metrics_to_process) for metric in metrics: - lock = self._lock(metric.id) - # Do not block if we cannot acquire the lock, that means some other - # worker is doing the job. We'll just ignore this metric and may - # get back later to it if needed. - if not lock.acquire(blocking=sync): - continue + # NOTE(gordc): must lock at sack level try: locksw = timeutils.StopWatch().start() LOG.debug("Processing measures for %s", metric) @@ -381,8 +388,6 @@ class CarbonaraBasedStorage(storage.StorageDriver): if sync: raise LOG.error("Error processing new measures", exc_info=True) - finally: - lock.release() def _compute_and_store_timeseries(self, metric, measures): # NOTE(mnaser): The metric could have been handled by diff --git a/gnocchi/storage/incoming/_carbonara.py b/gnocchi/storage/incoming/_carbonara.py index 22805ad03..e20720d6b 100644 --- a/gnocchi/storage/incoming/_carbonara.py +++ b/gnocchi/storage/incoming/_carbonara.py @@ -127,6 +127,10 @@ class CarbonaraBasedStorage(incoming.StorageDriver): def process_measure_for_metric(metric): raise NotImplementedError + @staticmethod + def has_unprocessed(metric): + raise NotImplementedError + def sack_for_metric(self, metric_id): return metric_id.int % self.NUM_SACKS diff --git a/gnocchi/storage/incoming/ceph.py b/gnocchi/storage/incoming/ceph.py index 0f6c970a7..677c52331 100644 --- a/gnocchi/storage/incoming/ceph.py +++ b/gnocchi/storage/incoming/ceph.py @@ -183,6 +183,11 @@ class CephStorage(_carbonara.CarbonaraBasedStorage): self.ioctx.operate_write_op(op, self.get_sack_name(sack), flags=self.OMAP_WRITE_FLAGS) + def has_unprocessed(self, metric): + sack = self.sack_for_metric(metric.id) + object_prefix = self.MEASURE_PREFIX + "_" + str(metric.id) + return bool(self._list_object_names_to_process(sack, object_prefix)) + @contextlib.contextmanager def process_measure_for_metric(self, metric): sack = self.sack_for_metric(metric.id) diff --git a/gnocchi/storage/incoming/file.py b/gnocchi/storage/incoming/file.py index 070094e71..781d3ec5c 100644 --- a/gnocchi/storage/incoming/file.py +++ b/gnocchi/storage/incoming/file.py @@ -148,6 +148,9 @@ class FileStorage(_carbonara.CarbonaraBasedStorage): files = self._list_measures_container_for_metric_id(metric_id) self._delete_measures_files_for_metric_id(metric_id, files) + def has_unprocessed(self, metric): + return os.path.isdir(self._build_measure_path(metric.id)) + @contextlib.contextmanager def process_measure_for_metric(self, metric): files = self._list_measures_container_for_metric_id(metric.id) diff --git a/gnocchi/storage/incoming/redis.py b/gnocchi/storage/incoming/redis.py index fa5f1e887..9e81327c8 100644 --- a/gnocchi/storage/incoming/redis.py +++ b/gnocchi/storage/incoming/redis.py @@ -65,6 +65,9 @@ class RedisStorage(_carbonara.CarbonaraBasedStorage): def delete_unprocessed_measures_for_metric_id(self, metric_id): self._client.delete(self._build_measure_path(metric_id)) + def has_unprocessed(self, metric): + return bool(self._client.exists(self._build_measure_path(metric.id))) + @contextlib.contextmanager def process_measure_for_metric(self, metric): key = self._build_measure_path(metric.id) diff --git a/gnocchi/storage/incoming/s3.py b/gnocchi/storage/incoming/s3.py index 52016d118..89de4192f 100644 --- a/gnocchi/storage/incoming/s3.py +++ b/gnocchi/storage/incoming/s3.py @@ -154,6 +154,10 @@ class S3Storage(_carbonara.CarbonaraBasedStorage): files = self._list_measure_files_for_metric_id(sack, metric_id) s3.bulk_delete(self.s3, self._bucket_name_measures, files) + def has_unprocessed(self, metric): + sack = self.sack_for_metric(metric.id) + return bool(self._list_measure_files_for_metric_id(sack, metric.id)) + @contextlib.contextmanager def process_measure_for_metric(self, metric): sack = self.sack_for_metric(metric.id) diff --git a/gnocchi/storage/incoming/swift.py b/gnocchi/storage/incoming/swift.py index b0549d0f5..304126f9a 100644 --- a/gnocchi/storage/incoming/swift.py +++ b/gnocchi/storage/incoming/swift.py @@ -93,6 +93,10 @@ class SwiftStorage(_carbonara.CarbonaraBasedStorage): files = self._list_measure_files_for_metric_id(sack, metric_id) swift.bulk_delete(self.swift, self.get_sack_name(sack), files) + def has_unprocessed(self, metric): + sack = self.sack_for_metric(metric.id) + return bool(self._list_measure_files_for_metric_id(sack, metric.id)) + @contextlib.contextmanager def process_measure_for_metric(self, metric): sack = self.sack_for_metric(metric.id)