From 3099bdbebf1c281c4a8063e6fa6312b993be0f4c Mon Sep 17 00:00:00 2001 From: gord chung Date: Tue, 18 Apr 2017 14:11:54 +0000 Subject: [PATCH] don't lock on delete we don't need to lock metric on delete but rather we only need to check if sack is being processed. either: 1) sack is locked, so there's a chance that metric is being processed. therefore we skip. 2) sack is unlocked, so there's no chance that a concurrent process will start processing metric as the indexer already says it's already deleted and no other process can see it as not-deleted. Change-Id: I8df135621dfabc3d17733a3577d0ea60b30e83e4 --- gnocchi/cli.py | 6 +----- gnocchi/storage/_carbonara.py | 14 ++++++++------ gnocchi/storage/incoming/_carbonara.py | 5 +++++ 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/gnocchi/cli.py b/gnocchi/cli.py index 7c3f6fd5..4599c917 100644 --- a/gnocchi/cli.py +++ b/gnocchi/cli.py @@ -216,10 +216,6 @@ class MetricProcessor(MetricProcessBase): finally: return self._tasks or self.fallback_tasks - def _sack_lock(self, sack): - lock_name = b'gnocchi-sack-%s-lock' % str(sack).encode('ascii') - return self._coord.get_lock(lock_name) - def _run_job(self): m_count = 0 s_count = 0 @@ -227,7 +223,7 @@ class MetricProcessor(MetricProcessBase): for s in self._get_tasks(): # TODO(gordc): support delay release lock so we don't # process a sack right after another process - lock = self._sack_lock(s) + lock = in_store.get_sack_lock(self._coord, s) if not lock.acquire(blocking=False): continue try: diff --git a/gnocchi/storage/_carbonara.py b/gnocchi/storage/_carbonara.py index 6f7c0060..1a6eac6c 100644 --- a/gnocchi/storage/_carbonara.py +++ b/gnocchi/storage/_carbonara.py @@ -339,14 +339,16 @@ class CarbonaraBasedStorage(storage.StorageDriver): def delete_metric(self, metric, sync=False): LOG.debug("Deleting metric %s", metric) - lock = self._lock(metric.id) + lock = self.incoming.get_sack_lock( + self.coord, self.incoming.sack_for_metric(metric.id)) if not lock.acquire(blocking=sync): raise storage.LockedMetric(metric) - try: - self._delete_metric(metric) - self.incoming.delete_unprocessed_measures_for_metric_id(metric.id) - finally: - lock.release() + # NOTE(gordc): no need to hold lock because the metric has been already + # marked as "deleted" in the indexer so no measure worker + # is going to process it anymore. + lock.release() + self._delete_metric(metric) + self.incoming.delete_unprocessed_measures_for_metric_id(metric.id) @staticmethod def _delete_metric_measures(metric, timestamp_key, diff --git a/gnocchi/storage/incoming/_carbonara.py b/gnocchi/storage/incoming/_carbonara.py index 2fe9bbd5..22805ad0 100644 --- a/gnocchi/storage/incoming/_carbonara.py +++ b/gnocchi/storage/incoming/_carbonara.py @@ -71,6 +71,11 @@ class CarbonaraBasedStorage(incoming.StorageDriver): def remove_sack_group(num_sacks): raise NotImplementedError + @staticmethod + def get_sack_lock(coord, sack): + lock_name = b'gnocchi-sack-%s-lock' % str(sack).encode('ascii') + return coord.get_lock(lock_name) + def _unserialize_measures(self, measure_id, data): nb_measures = len(data) // self._MEASURE_SERIAL_LEN try: