don't lock on delete

we don't need to lock metric on delete but rather we only need to
check if sack is being processed. either:

1) sack is locked, so there's a chance that metric is being processed.
therefore we skip.
2) sack is unlocked, so there's no chance that a concurrent process
will start processing metric as the indexer already says it's already
deleted and no other process can see it as not-deleted.

Change-Id: I8df135621dfabc3d17733a3577d0ea60b30e83e4
This commit is contained in:
gord chung 2017-04-18 14:11:54 +00:00
parent 5b83fe64cc
commit 3099bdbebf
3 changed files with 14 additions and 11 deletions

View File

@ -216,10 +216,6 @@ class MetricProcessor(MetricProcessBase):
finally:
return self._tasks or self.fallback_tasks
def _sack_lock(self, sack):
lock_name = b'gnocchi-sack-%s-lock' % str(sack).encode('ascii')
return self._coord.get_lock(lock_name)
def _run_job(self):
m_count = 0
s_count = 0
@ -227,7 +223,7 @@ class MetricProcessor(MetricProcessBase):
for s in self._get_tasks():
# TODO(gordc): support delay release lock so we don't
# process a sack right after another process
lock = self._sack_lock(s)
lock = in_store.get_sack_lock(self._coord, s)
if not lock.acquire(blocking=False):
continue
try:

View File

@ -339,14 +339,16 @@ class CarbonaraBasedStorage(storage.StorageDriver):
def delete_metric(self, metric, sync=False):
LOG.debug("Deleting metric %s", metric)
lock = self._lock(metric.id)
lock = self.incoming.get_sack_lock(
self.coord, self.incoming.sack_for_metric(metric.id))
if not lock.acquire(blocking=sync):
raise storage.LockedMetric(metric)
try:
self._delete_metric(metric)
self.incoming.delete_unprocessed_measures_for_metric_id(metric.id)
finally:
lock.release()
# NOTE(gordc): no need to hold lock because the metric has been already
# marked as "deleted" in the indexer so no measure worker
# is going to process it anymore.
lock.release()
self._delete_metric(metric)
self.incoming.delete_unprocessed_measures_for_metric_id(metric.id)
@staticmethod
def _delete_metric_measures(metric, timestamp_key,

View File

@ -71,6 +71,11 @@ class CarbonaraBasedStorage(incoming.StorageDriver):
def remove_sack_group(num_sacks):
raise NotImplementedError
@staticmethod
def get_sack_lock(coord, sack):
lock_name = b'gnocchi-sack-%s-lock' % str(sack).encode('ascii')
return coord.get_lock(lock_name)
def _unserialize_measures(self, measure_id, data):
nb_measures = len(data) // self._MEASURE_SERIAL_LEN
try: