Merge "don't lock on delete"
This commit is contained in:
commit
38df0f67f0
|
@ -216,10 +216,6 @@ class MetricProcessor(MetricProcessBase):
|
|||
finally:
|
||||
return self._tasks or self.fallback_tasks
|
||||
|
||||
def _sack_lock(self, sack):
|
||||
lock_name = b'gnocchi-sack-%s-lock' % str(sack).encode('ascii')
|
||||
return self._coord.get_lock(lock_name)
|
||||
|
||||
def _run_job(self):
|
||||
m_count = 0
|
||||
s_count = 0
|
||||
|
@ -227,7 +223,7 @@ class MetricProcessor(MetricProcessBase):
|
|||
for s in self._get_tasks():
|
||||
# TODO(gordc): support delay release lock so we don't
|
||||
# process a sack right after another process
|
||||
lock = self._sack_lock(s)
|
||||
lock = in_store.get_sack_lock(self._coord, s)
|
||||
if not lock.acquire(blocking=False):
|
||||
continue
|
||||
try:
|
||||
|
|
|
@ -339,14 +339,16 @@ class CarbonaraBasedStorage(storage.StorageDriver):
|
|||
|
||||
def delete_metric(self, metric, sync=False):
|
||||
LOG.debug("Deleting metric %s", metric)
|
||||
lock = self._lock(metric.id)
|
||||
lock = self.incoming.get_sack_lock(
|
||||
self.coord, self.incoming.sack_for_metric(metric.id))
|
||||
if not lock.acquire(blocking=sync):
|
||||
raise storage.LockedMetric(metric)
|
||||
try:
|
||||
self._delete_metric(metric)
|
||||
self.incoming.delete_unprocessed_measures_for_metric_id(metric.id)
|
||||
finally:
|
||||
lock.release()
|
||||
# NOTE(gordc): no need to hold lock because the metric has been already
|
||||
# marked as "deleted" in the indexer so no measure worker
|
||||
# is going to process it anymore.
|
||||
lock.release()
|
||||
self._delete_metric(metric)
|
||||
self.incoming.delete_unprocessed_measures_for_metric_id(metric.id)
|
||||
|
||||
@staticmethod
|
||||
def _delete_metric_measures(metric, timestamp_key,
|
||||
|
|
|
@ -71,6 +71,11 @@ class CarbonaraBasedStorage(incoming.StorageDriver):
|
|||
def remove_sack_group(num_sacks):
|
||||
raise NotImplementedError
|
||||
|
||||
@staticmethod
|
||||
def get_sack_lock(coord, sack):
|
||||
lock_name = b'gnocchi-sack-%s-lock' % str(sack).encode('ascii')
|
||||
return coord.get_lock(lock_name)
|
||||
|
||||
def _unserialize_measures(self, measure_id, data):
|
||||
nb_measures = len(data) // self._MEASURE_SERIAL_LEN
|
||||
try:
|
||||
|
|
Loading…
Reference in New Issue