MLNX Agent: ensure removed ports get treated on resyncs
This patch ensures that removed ports will be cleaned up properly if exception occurs in the daemon_loop. It does the same for the mlnx-agent as was done for the lb-agent in Ieada34ad315c0c29aa8462ebf041a448fde007b8. Refactoring the daemon_loop and scan_ports in a way that - If there's an exception, the ports that were flagged for cleaning will be retreated in the following iteration, unlike previous exception handling, where only existing ports were treated in the following iteration. Closes-Bug: 1287648 Change-Id: I3af505773aa3fdc0d17f2079ac1f1b3e93bb64a3 Signed-off-by: Roey Chen <roeyc@mellanox.com>
This commit is contained in:
parent
fa5eb301dc
commit
598e14cfde
|
@ -31,6 +31,7 @@ from neutron.common import rpc as n_rpc
|
|||
from neutron.common import topics
|
||||
from neutron.common import utils as q_utils
|
||||
from neutron import context
|
||||
from neutron.openstack.common.gettextutils import _LE, _LI
|
||||
from neutron.openstack.common import log as logging
|
||||
from neutron.openstack.common import loopingcall
|
||||
from neutron.plugins.common import constants as p_const
|
||||
|
@ -242,13 +243,24 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin):
|
|||
def add_port_update(self, port):
|
||||
self.updated_ports.add(port)
|
||||
|
||||
def scan_ports(self, registered_ports, updated_ports_copy=None):
|
||||
def scan_ports(self, previous, sync):
|
||||
cur_ports = self.eswitch.get_vnics_mac()
|
||||
port_info = {'current': cur_ports}
|
||||
# Shouldn't process updates for not existing ports
|
||||
port_info['updated'] = updated_ports_copy & cur_ports
|
||||
port_info['added'] = cur_ports - registered_ports
|
||||
port_info['removed'] = registered_ports - cur_ports
|
||||
updated_ports = self.updated_ports
|
||||
self.updated_ports = set()
|
||||
if sync:
|
||||
# Either it's the first iteration or previous iteration had
|
||||
# problems.
|
||||
port_info['added'] = cur_ports
|
||||
port_info['removed'] = ((previous['removed'] | previous['current'])
|
||||
- cur_ports)
|
||||
port_info['updated'] = ((previous['updated'] | updated_ports)
|
||||
& cur_ports)
|
||||
else:
|
||||
# Shouldn't process updates for not existing ports
|
||||
port_info['added'] = cur_ports - previous['current']
|
||||
port_info['removed'] = previous['current'] - cur_ports
|
||||
port_info['updated'] = updated_ports & cur_ports
|
||||
return port_info
|
||||
|
||||
def process_network_ports(self, port_info):
|
||||
|
@ -349,39 +361,30 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin):
|
|||
port_info['updated'])
|
||||
|
||||
def daemon_loop(self):
|
||||
sync = True
|
||||
ports = set()
|
||||
updated_ports_copy = set()
|
||||
|
||||
LOG.info(_("eSwitch Agent Started!"))
|
||||
|
||||
sync = True
|
||||
port_info = {'current': set(),
|
||||
'added': set(),
|
||||
'removed': set(),
|
||||
'updated': set()}
|
||||
while True:
|
||||
start = time.time()
|
||||
if sync:
|
||||
LOG.info(_("Agent out of sync with plugin!"))
|
||||
ports.clear()
|
||||
sync = False
|
||||
|
||||
try:
|
||||
updated_ports_copy = self.updated_ports
|
||||
self.updated_ports = set()
|
||||
port_info = self.scan_ports(ports, updated_ports_copy)
|
||||
LOG.debug("Agent loop process devices!")
|
||||
# If treat devices fails - must resync with plugin
|
||||
ports = port_info['current']
|
||||
if self._port_info_has_changes(port_info):
|
||||
LOG.debug("Starting to process devices in:%s", port_info)
|
||||
# sync with upper/lower layers about port deltas
|
||||
sync = self.process_network_ports(port_info)
|
||||
|
||||
port_info = self.scan_ports(previous=port_info, sync=sync)
|
||||
except exceptions.RequestTimeout:
|
||||
LOG.exception(_("Request timeout in agent event loop "
|
||||
"eSwitchD is not responding - exiting..."))
|
||||
raise SystemExit(1)
|
||||
except Exception:
|
||||
LOG.exception(_("Error in agent event loop"))
|
||||
sync = True
|
||||
self.updated_ports |= updated_ports_copy
|
||||
if sync:
|
||||
LOG.info(_LI("Agent out of sync with plugin!"))
|
||||
sync = False
|
||||
if self._port_info_has_changes(port_info):
|
||||
LOG.debug("Starting to process devices in:%s", port_info)
|
||||
try:
|
||||
sync = self.process_network_ports(port_info)
|
||||
except Exception:
|
||||
LOG.exception(_LE("Error in agent event loop"))
|
||||
sync = True
|
||||
# sleep till end of polling interval
|
||||
elapsed = (time.time() - start)
|
||||
if (elapsed < self._polling_interval):
|
||||
|
|
|
@ -187,44 +187,50 @@ class TestEswitchAgent(base.BaseTestCase):
|
|||
self.agent.add_port_update(mac_addr)
|
||||
self.assertEqual(set([mac_addr]), self.agent.updated_ports)
|
||||
|
||||
def _mock_scan_ports(self, vif_port_set, registered_ports, updated_ports):
|
||||
def _mock_scan_ports(self, vif_port_set, previous,
|
||||
updated_ports, sync=False):
|
||||
self.agent.updated_ports = updated_ports
|
||||
with mock.patch.object(self.agent.eswitch, 'get_vnics_mac',
|
||||
return_value=vif_port_set):
|
||||
return self.agent.scan_ports(registered_ports, updated_ports)
|
||||
return self.agent.scan_ports(previous, sync)
|
||||
|
||||
def test_scan_ports_return_current_for_unchanged_ports(self):
|
||||
vif_port_set = set([1, 2])
|
||||
registered_ports = set([1, 2])
|
||||
actual = self._mock_scan_ports(vif_port_set,
|
||||
registered_ports, set())
|
||||
previous = dict(current=set([1, 2]), added=set(),
|
||||
removed=set(), updated=set())
|
||||
expected = dict(current=vif_port_set, added=set(),
|
||||
removed=set(), updated=set())
|
||||
actual = self._mock_scan_ports(vif_port_set,
|
||||
previous, set())
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
def test_scan_ports_return_port_changes(self):
|
||||
vif_port_set = set([1, 3])
|
||||
registered_ports = set([1, 2])
|
||||
actual = self._mock_scan_ports(vif_port_set,
|
||||
registered_ports, set())
|
||||
previous = dict(current=set([1, 2]), added=set(),
|
||||
removed=set(), updated=set())
|
||||
expected = dict(current=vif_port_set, added=set([3]),
|
||||
removed=set([2]), updated=set())
|
||||
actual = self._mock_scan_ports(vif_port_set,
|
||||
previous, set())
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
def test_scan_ports_with_updated_ports(self):
|
||||
vif_port_set = set([1, 3, 4])
|
||||
registered_ports = set([1, 2, 4])
|
||||
actual = self._mock_scan_ports(vif_port_set,
|
||||
registered_ports, set([4]))
|
||||
previous = dict(current=set([1, 2, 4]), added=set(),
|
||||
removed=set(), updated=set())
|
||||
expected = dict(current=vif_port_set, added=set([3]),
|
||||
removed=set([2]), updated=set([4]))
|
||||
actual = self._mock_scan_ports(vif_port_set,
|
||||
previous, set([4]))
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
def test_scan_ports_with_unknown_updated_ports(self):
|
||||
vif_port_set = set([1, 3, 4])
|
||||
registered_ports = set([1, 2, 4])
|
||||
actual = self._mock_scan_ports(vif_port_set,
|
||||
registered_ports,
|
||||
updated_ports=set([4, 5]))
|
||||
previous = dict(current=set([1, 2, 4]), added=set(),
|
||||
removed=set(), updated=set())
|
||||
expected = dict(current=vif_port_set, added=set([3]),
|
||||
removed=set([2]), updated=set([4]))
|
||||
actual = self._mock_scan_ports(vif_port_set,
|
||||
previous,
|
||||
updated_ports=set([4, 5]))
|
||||
self.assertEqual(expected, actual)
|
||||
|
|
Loading…
Reference in New Issue