From 0451df0d1b1c11afc7d9103a4fb917a2c72a90fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20Weing=C3=A4rtner?= Date: Wed, 12 Jul 2023 07:48:47 -0300 Subject: [PATCH] Fix a concurrency issue when locking reprocessing tasks It was discovered that in some situations the same reprocessing task might be processed simultaneously by different workers, which can lead to unnecessary processing. This was happening due to the use of "current_reprocess_time" in the lock name, which would lead to different locking name for some situations; for instance, when worker start processing a brand new reprocessing task, and after reprocessing a few time frames, the "current_reprocess_time" is updated, then when other workers achieve the same locking moment, they would have a different lock name for the same scope ID, and reprocess a scope that is currently in reprocessing. Change-Id: I487d0eeb1cedc162d44f8c879a27f924b5c76206 --- cloudkitty/orchestrator.py | 19 ++++++++++++------- cloudkitty/tests/test_orchestrator.py | 5 ++--- ...g-concurrency-issues-2a71f4d86a93c507.yaml | 4 ++++ 3 files changed, 18 insertions(+), 10 deletions(-) create mode 100644 releasenotes/notes/reprocessing-concurrency-issues-2a71f4d86a93c507.yaml diff --git a/cloudkitty/orchestrator.py b/cloudkitty/orchestrator.py index 87391411..6b2e4328 100644 --- a/cloudkitty/orchestrator.py +++ b/cloudkitty/orchestrator.py @@ -614,13 +614,17 @@ class CloudKittyProcessor(cotyledon.Service): lock_name, lock = get_lock( self.coord, self.generate_lock_base_name(tenant_id)) - LOG.debug('[Worker: {w}] Trying to acquire lock "{lock_name}".' - .format(w=self._worker_id, lock_name=lock_name)) + LOG.debug('[Worker: {w}] Trying to acquire lock "{lock_name}" for ' + 'scope ID {scope_id}.'.format(w=self._worker_id, + lock_name=lock_name, + scope_id=tenant_id)) lock_acquired = lock.acquire(blocking=False) if lock_acquired: - LOG.debug('[Worker: {w}] Acquired lock "{lock_name}".'.format( - w=self._worker_id, lock_name=lock_name)) + LOG.debug('[Worker: {w}] Acquired lock "{lock_name}" for ' + 'scope ID {scope_id}.'.format(w=self._worker_id, + lock_name=lock_name, + scope_id=tenant_id)) try: self.process_scope(tenant_id) @@ -702,9 +706,10 @@ class CloudKittyReprocessor(CloudKittyProcessor): self._worker_id, len(self.tenants)) def generate_lock_base_name(self, scope): - return "%s-id=%s-start=%s-end=%s-current=%s" % ( - self.worker_class, scope.identifier, scope.start_reprocess_time, - scope.end_reprocess_time, scope.current_reprocess_time) + return "%s-id=%s-start=%s-end=%s" % (self.worker_class, + scope.identifier, + scope.start_reprocess_time, + scope.end_reprocess_time) class CloudKittyServiceManager(cotyledon.ServiceManager): diff --git a/cloudkitty/tests/test_orchestrator.py b/cloudkitty/tests/test_orchestrator.py index 6e28024c..ba46c06c 100644 --- a/cloudkitty/tests/test_orchestrator.py +++ b/cloudkitty/tests/test_orchestrator.py @@ -623,10 +623,9 @@ class CloudKittyReprocessorTest(tests.TestCase): expected_lock_name = "-id=scope_identifier-" \ - "start=%s-end=%s-current=%s" % ( + "start=%s-end=%s" % ( scope_mock.start_reprocess_time, - scope_mock.end_reprocess_time, - scope_mock.current_reprocess_time) + scope_mock.end_reprocess_time) self.assertEqual(expected_lock_name, return_generate_lock_name) diff --git a/releasenotes/notes/reprocessing-concurrency-issues-2a71f4d86a93c507.yaml b/releasenotes/notes/reprocessing-concurrency-issues-2a71f4d86a93c507.yaml new file mode 100644 index 00000000..6c902fb8 --- /dev/null +++ b/releasenotes/notes/reprocessing-concurrency-issues-2a71f4d86a93c507.yaml @@ -0,0 +1,4 @@ +--- +fixes: + - | + Fixed concurrency issues during reprocessing tasks.