diff --git a/neutron/agent/linux/ip_conntrack.py b/neutron/agent/linux/ip_conntrack.py index 4a9e70fb9db..799a8820337 100644 --- a/neutron/agent/linux/ip_conntrack.py +++ b/neutron/agent/linux/ip_conntrack.py @@ -17,7 +17,6 @@ import eventlet import netaddr from oslo_concurrency import lockutils from oslo_log import log as logging -from six.moves import queue as Queue from neutron.agent.linux import utils as linux_utils from neutron.common import constants as n_const @@ -28,6 +27,8 @@ CONTRACK_MGRS = {} MAX_CONNTRACK_ZONES = 65535 ZONE_START = 4097 +WORKERS = 8 + class IpConntrackUpdate(object): """Encapsulates a conntrack update @@ -40,20 +41,10 @@ class IpConntrackUpdate(object): self.rule = rule self.remote_ips = remote_ips - -class IpConntrackProcessingQueue(object): - """Manager of the queue of conntrack updates to process.""" - def __init__(self): - self._queue = Queue.Queue() - - def add(self, update): - self._queue.put(update) - - def updates(self): - """Grabs the next conntrack update from the queue and processes.""" - while not self._queue.empty(): - update = self._queue.get() - yield update + def __repr__(self): + return ('' % (self.device_info_list, self.rule, + self.remote_ips)) @lockutils.synchronized('conntrack') @@ -82,32 +73,43 @@ class IpConntrackManager(object): self.unfiltered_ports = unfiltered_ports self.zone_per_port = zone_per_port # zone per port vs per network self._populate_initial_zone_map() - self._queue = IpConntrackProcessingQueue() - self.start_process_queue() + self._queue = eventlet.queue.LightQueue() + self._start_process_queue() - def start_process_queue(self): - eventlet.spawn_n(self._process_queue_loop) + def _start_process_queue(self): + LOG.debug("Starting ip_conntrack _process_queue_worker() threads") + pool = eventlet.GreenPool(size=WORKERS) + for i in range(WORKERS): + pool.spawn_n(self._process_queue_worker) - def _process_queue_loop(self): - LOG.debug("Starting ipconntrack _process_queue_loop()") - pool = eventlet.GreenPool(size=8) + def _process_queue_worker(self): + # While it's technically not necessary to have this method, the + # 'while True' could just be in _process_queue(), the tests have + # to be able to drain the queue without blocking, so _process_queue() + # is made standalone. while True: - pool.spawn_n(self._process_queue) + self._process_queue() def _process_queue(self): - for update in self._queue.updates(): + update = None + try: + # this will block until an entry gets added to the queue + update = self._queue.get() if update.remote_ips: for remote_ip in update.remote_ips: self._delete_conntrack_state( update.device_info_list, update.rule, remote_ip) else: - self._delete_conntrack_state(update.device_info_list, - update.rule) + self._delete_conntrack_state( + update.device_info_list, update.rule) + except Exception: + LOG.exception("Failed to process ip_conntrack queue entry: %s", + update) def _process(self, device_info_list, rule, remote_ips=None): # queue the update to allow the caller to resume its work update = IpConntrackUpdate(device_info_list, rule, remote_ips) - self._queue.add(update) + self._queue.put(update) @staticmethod def _generate_conntrack_cmd_by_rule(rule, namespace): diff --git a/neutron/tests/unit/agent/linux/test_iptables_firewall.py b/neutron/tests/unit/agent/linux/test_iptables_firewall.py index 37bd31a9d52..765a28f26aa 100644 --- a/neutron/tests/unit/agent/linux/test_iptables_firewall.py +++ b/neutron/tests/unit/agent/linux/test_iptables_firewall.py @@ -1249,7 +1249,8 @@ class IptablesFirewallTestCase(BaseIptablesFirewallTestCase): self.assertFalse(self.utils_exec.called) return # process conntrack updates in the queue - self.firewall.ipconntrack._process_queue() + while not self.firewall.ipconntrack._queue.empty(): + self.firewall.ipconntrack._process_queue() cmd = ['conntrack', '-D'] if protocol: cmd.extend(['-p', protocol]) @@ -1339,7 +1340,8 @@ class IptablesFirewallTestCase(BaseIptablesFirewallTestCase): self.assertFalse(self.utils_exec.called) return # process conntrack updates in the queue - self.firewall.ipconntrack._process_queue() + while not self.firewall.ipconntrack._queue.empty(): + self.firewall.ipconntrack._process_queue() calls = self._get_expected_conntrack_calls( [('ipv4', '10.0.0.1'), ('ipv6', 'fe80::1')], ct_zone) self.utils_exec.assert_has_calls(calls) @@ -1404,7 +1406,8 @@ class IptablesFirewallTestCase(BaseIptablesFirewallTestCase): "ipv6": ['fe80::1', 'fe80::2']} calls = [] # process conntrack updates in the queue - self.firewall.ipconntrack._process_queue() + while not self.firewall.ipconntrack._queue.empty(): + self.firewall.ipconntrack._process_queue() for direction in ['ingress', 'egress']: direction = '-d' if direction == 'ingress' else '-s' remote_ip_direction = '-s' if direction == '-d' else '-d' @@ -1650,7 +1653,8 @@ class IptablesFirewallTestCase(BaseIptablesFirewallTestCase): self.assertFalse(self.utils_exec.called) return # process conntrack updates in the queue - self.firewall.ipconntrack._process_queue() + while not self.firewall.ipconntrack._queue.empty(): + self.firewall.ipconntrack._process_queue() calls = self._get_expected_conntrack_calls( [('ipv4', '10.0.0.1'), ('ipv6', 'fe80::1')], ct_zone) self.utils_exec.assert_has_calls(calls)