Merge "messaging-publisher: fix threadsafe of flush()"
This commit is contained in:
commit
4bc3312bc4
|
@ -18,6 +18,7 @@
|
|||
import abc
|
||||
import itertools
|
||||
import operator
|
||||
import threading
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
|
@ -84,6 +85,7 @@ class MessagingPublisher(publisher.ConfigPublisherBase):
|
|||
'max_queue_length', [1024])[-1])
|
||||
self.max_retry = 0
|
||||
|
||||
self.queue_lock = threading.Lock()
|
||||
self.local_queue = []
|
||||
|
||||
if self.policy in ['default', 'queue', 'drop']:
|
||||
|
@ -123,17 +125,16 @@ class MessagingPublisher(publisher.ConfigPublisherBase):
|
|||
self.flush()
|
||||
|
||||
def flush(self):
|
||||
# NOTE(sileht):
|
||||
# this is why the self.local_queue is emptied before processing the
|
||||
# queue and the remaining messages in the queue are added to
|
||||
# self.local_queue after in case of another call having already added
|
||||
# something in the self.local_queue
|
||||
queue = self.local_queue
|
||||
self.local_queue = []
|
||||
self.local_queue = (self._process_queue(queue, self.policy) +
|
||||
self.local_queue)
|
||||
if self.policy == 'queue':
|
||||
self._check_queue_length()
|
||||
with self.queue_lock:
|
||||
queue = self.local_queue
|
||||
self.local_queue = []
|
||||
|
||||
queue = self._process_queue(queue, self.policy)
|
||||
|
||||
with self.queue_lock:
|
||||
self.local_queue = (queue + self.local_queue)
|
||||
if self.policy == 'queue':
|
||||
self._check_queue_length()
|
||||
|
||||
def _check_queue_length(self):
|
||||
queue_length = len(self.local_queue)
|
||||
|
|
Loading…
Reference in New Issue