Merge "Optimize CloudKitty reprocessing process"
This commit is contained in:
commit
c842ea6ab2
|
@ -517,13 +517,27 @@ class ReprocessingWorker(Worker):
|
|||
|
||||
end_of_this_processing = tzutils.local_to_utc(end_of_this_processing)
|
||||
|
||||
LOG.debug("Cleaning backend [%s] data for reprocessing scope [%s] "
|
||||
"for timeframe[start=%s, end=%s].",
|
||||
self._storage, self.scope, timestamp, end_of_this_processing)
|
||||
|
||||
self._storage.delete(
|
||||
begin=timestamp, end=end_of_this_processing,
|
||||
filters={self.scope_key: self._tenant_id})
|
||||
# If the start_reprocess_time of the reprocessing task equals to
|
||||
# the current reprocessing time, it means that we have just started
|
||||
# executing it. Therefore, we can clean/erase the old data in the
|
||||
# reprocessing task time frame.
|
||||
if tzutils.local_to_utc(self.scope.start_reprocess_time) == timestamp:
|
||||
LOG.info(
|
||||
"Cleaning backend [%s] data for reprocessing scope [%s] for "
|
||||
"timeframe[start=%s, end=%s].", self._storage, self.scope,
|
||||
self.scope.start_reprocess_time, self.scope.end_reprocess_time)
|
||||
self._storage.delete(
|
||||
begin=self.scope.start_reprocess_time,
|
||||
end=self.scope.end_reprocess_time,
|
||||
filters={self.scope_key: self._tenant_id})
|
||||
else:
|
||||
LOG.debug("No need to clean backend [%s] data for reprocessing "
|
||||
"scope [%s] for timeframe[start=%s, end=%s]. We are "
|
||||
"past the very first timestamp; therefore, the cleaning "
|
||||
"for the reprocessing task period has already been "
|
||||
"executed.", self._storage, self.scope,
|
||||
self.scope.start_reprocess_time,
|
||||
self.scope.end_reprocess_time)
|
||||
|
||||
LOG.debug("Executing the reprocessing of scope [%s] for "
|
||||
"timeframe[start=%s, end=%s].", self.scope, timestamp,
|
||||
|
|
|
@ -1034,15 +1034,16 @@ class ReprocessingWorkerTest(tests.TestCase):
|
|||
self, do_execute_scope_processing_mock_from_worker):
|
||||
|
||||
now_timestamp = tzutils.localized_now()
|
||||
self.reprocessing_worker.scope.start_reprocess_time = now_timestamp
|
||||
self.reprocessing_worker.do_execute_scope_processing(now_timestamp)
|
||||
|
||||
expected_end = tzutils.localized_now() + datetime.timedelta(
|
||||
seconds=self.reprocessing_worker._period)
|
||||
|
||||
self.storage_mock.delete.assert_has_calls([
|
||||
mock.call(begin=now_timestamp, end=expected_end,
|
||||
filters={self.reprocessing_worker.scope_key:
|
||||
self.reprocessing_worker._tenant_id})])
|
||||
mock.call(
|
||||
begin=self.reprocessing_worker.scope.start_reprocess_time,
|
||||
end=self.reprocessing_worker.scope.end_reprocess_time,
|
||||
filters={
|
||||
self.reprocessing_worker.scope_key:
|
||||
self.reprocessing_worker._tenant_id})])
|
||||
|
||||
do_execute_scope_processing_mock_from_worker.assert_has_calls([
|
||||
mock.call(now_timestamp)])
|
||||
|
|
|
@ -0,0 +1,5 @@
|
|||
---
|
||||
features:
|
||||
- |
|
||||
Optimized the reprocessing workflow to execute batch cleaning
|
||||
of data in the storage backend of CloudKitty.
|
Loading…
Reference in New Issue