diff --git a/neutron/agent/linux/ip_conntrack.py b/neutron/agent/linux/ip_conntrack.py index 648ab332ad8..4a9e70fb9db 100644 --- a/neutron/agent/linux/ip_conntrack.py +++ b/neutron/agent/linux/ip_conntrack.py @@ -17,6 +17,7 @@ import eventlet import netaddr from oslo_concurrency import lockutils from oslo_log import log as logging +from six.moves import queue as Queue from neutron.agent.linux import utils as linux_utils from neutron.common import constants as n_const @@ -43,7 +44,7 @@ class IpConntrackUpdate(object): class IpConntrackProcessingQueue(object): """Manager of the queue of conntrack updates to process.""" def __init__(self): - self._queue = eventlet.queue.LightQueue() + self._queue = Queue.Queue() def add(self, update): self._queue.put(update) @@ -51,15 +52,8 @@ class IpConntrackProcessingQueue(object): def updates(self): """Grabs the next conntrack update from the queue and processes.""" while not self._queue.empty(): - try: - update = self._queue.get() - yield update - except IndexError: - # queue was empty, another worker stole our entry - continue - except Exception as e: - LOG.error("Failed to yield ip_conntrack process queue " - "entry: %s", e) + update = self._queue.get() + yield update @lockutils.synchronized('conntrack') @@ -89,11 +83,9 @@ class IpConntrackManager(object): self.zone_per_port = zone_per_port # zone per port vs per network self._populate_initial_zone_map() self._queue = IpConntrackProcessingQueue() - # Don't start the queue processing thread here, do later when - # the first entry is added to the queue. - self._process_queue_started = False + self.start_process_queue() - def _start_process_queue(self): + def start_process_queue(self): eventlet.spawn_n(self._process_queue_loop) def _process_queue_loop(self): @@ -113,9 +105,6 @@ class IpConntrackManager(object): update.rule) def _process(self, device_info_list, rule, remote_ips=None): - if not self._process_queue_started: - self._process_queue_started = True - self._start_process_queue() # queue the update to allow the caller to resume its work update = IpConntrackUpdate(device_info_list, rule, remote_ips) self._queue.add(update)