MLNX Agent: ensure removed ports get treated on resyncs

This patch ensures that removed ports will be cleaned up properly if exception
occurs in the daemon_loop.
It does the same for the mlnx-agent as was done for the lb-agent in
Ieada34ad315c0c29aa8462ebf041a448fde007b8.

Refactoring the daemon_loop and scan_ports in a way that -
If there's an exception, the ports that were flagged for cleaning will
be retreated in the following iteration,
unlike previous exception handling, where only existing ports
were treated in the following iteration.

Closes-Bug: 1287648
Change-Id: I3af505773aa3fdc0d17f2079ac1f1b3e93bb64a3
Signed-off-by: Roey Chen <roeyc@mellanox.com>
This commit is contained in:
Roey Chen 2014-05-20 17:10:17 +03:00 committed by Roey Chen
parent fa5eb301dc
commit 598e14cfde
2 changed files with 54 additions and 45 deletions

View File

@ -31,6 +31,7 @@ from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.common import utils as q_utils
from neutron import context
from neutron.openstack.common.gettextutils import _LE, _LI
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
from neutron.plugins.common import constants as p_const
@ -242,13 +243,24 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin):
def add_port_update(self, port):
self.updated_ports.add(port)
def scan_ports(self, registered_ports, updated_ports_copy=None):
def scan_ports(self, previous, sync):
cur_ports = self.eswitch.get_vnics_mac()
port_info = {'current': cur_ports}
# Shouldn't process updates for not existing ports
port_info['updated'] = updated_ports_copy & cur_ports
port_info['added'] = cur_ports - registered_ports
port_info['removed'] = registered_ports - cur_ports
updated_ports = self.updated_ports
self.updated_ports = set()
if sync:
# Either it's the first iteration or previous iteration had
# problems.
port_info['added'] = cur_ports
port_info['removed'] = ((previous['removed'] | previous['current'])
- cur_ports)
port_info['updated'] = ((previous['updated'] | updated_ports)
& cur_ports)
else:
# Shouldn't process updates for not existing ports
port_info['added'] = cur_ports - previous['current']
port_info['removed'] = previous['current'] - cur_ports
port_info['updated'] = updated_ports & cur_ports
return port_info
def process_network_ports(self, port_info):
@ -349,39 +361,30 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin):
port_info['updated'])
def daemon_loop(self):
sync = True
ports = set()
updated_ports_copy = set()
LOG.info(_("eSwitch Agent Started!"))
sync = True
port_info = {'current': set(),
'added': set(),
'removed': set(),
'updated': set()}
while True:
start = time.time()
if sync:
LOG.info(_("Agent out of sync with plugin!"))
ports.clear()
sync = False
try:
updated_ports_copy = self.updated_ports
self.updated_ports = set()
port_info = self.scan_ports(ports, updated_ports_copy)
LOG.debug("Agent loop process devices!")
# If treat devices fails - must resync with plugin
ports = port_info['current']
if self._port_info_has_changes(port_info):
LOG.debug("Starting to process devices in:%s", port_info)
# sync with upper/lower layers about port deltas
sync = self.process_network_ports(port_info)
port_info = self.scan_ports(previous=port_info, sync=sync)
except exceptions.RequestTimeout:
LOG.exception(_("Request timeout in agent event loop "
"eSwitchD is not responding - exiting..."))
raise SystemExit(1)
except Exception:
LOG.exception(_("Error in agent event loop"))
sync = True
self.updated_ports |= updated_ports_copy
if sync:
LOG.info(_LI("Agent out of sync with plugin!"))
sync = False
if self._port_info_has_changes(port_info):
LOG.debug("Starting to process devices in:%s", port_info)
try:
sync = self.process_network_ports(port_info)
except Exception:
LOG.exception(_LE("Error in agent event loop"))
sync = True
# sleep till end of polling interval
elapsed = (time.time() - start)
if (elapsed < self._polling_interval):

View File

@ -187,44 +187,50 @@ class TestEswitchAgent(base.BaseTestCase):
self.agent.add_port_update(mac_addr)
self.assertEqual(set([mac_addr]), self.agent.updated_ports)
def _mock_scan_ports(self, vif_port_set, registered_ports, updated_ports):
def _mock_scan_ports(self, vif_port_set, previous,
updated_ports, sync=False):
self.agent.updated_ports = updated_ports
with mock.patch.object(self.agent.eswitch, 'get_vnics_mac',
return_value=vif_port_set):
return self.agent.scan_ports(registered_ports, updated_ports)
return self.agent.scan_ports(previous, sync)
def test_scan_ports_return_current_for_unchanged_ports(self):
vif_port_set = set([1, 2])
registered_ports = set([1, 2])
actual = self._mock_scan_ports(vif_port_set,
registered_ports, set())
previous = dict(current=set([1, 2]), added=set(),
removed=set(), updated=set())
expected = dict(current=vif_port_set, added=set(),
removed=set(), updated=set())
actual = self._mock_scan_ports(vif_port_set,
previous, set())
self.assertEqual(expected, actual)
def test_scan_ports_return_port_changes(self):
vif_port_set = set([1, 3])
registered_ports = set([1, 2])
actual = self._mock_scan_ports(vif_port_set,
registered_ports, set())
previous = dict(current=set([1, 2]), added=set(),
removed=set(), updated=set())
expected = dict(current=vif_port_set, added=set([3]),
removed=set([2]), updated=set())
actual = self._mock_scan_ports(vif_port_set,
previous, set())
self.assertEqual(expected, actual)
def test_scan_ports_with_updated_ports(self):
vif_port_set = set([1, 3, 4])
registered_ports = set([1, 2, 4])
actual = self._mock_scan_ports(vif_port_set,
registered_ports, set([4]))
previous = dict(current=set([1, 2, 4]), added=set(),
removed=set(), updated=set())
expected = dict(current=vif_port_set, added=set([3]),
removed=set([2]), updated=set([4]))
actual = self._mock_scan_ports(vif_port_set,
previous, set([4]))
self.assertEqual(expected, actual)
def test_scan_ports_with_unknown_updated_ports(self):
vif_port_set = set([1, 3, 4])
registered_ports = set([1, 2, 4])
actual = self._mock_scan_ports(vif_port_set,
registered_ports,
updated_ports=set([4, 5]))
previous = dict(current=set([1, 2, 4]), added=set(),
removed=set(), updated=set())
expected = dict(current=vif_port_set, added=set([3]),
removed=set([2]), updated=set([4]))
actual = self._mock_scan_ports(vif_port_set,
previous,
updated_ports=set([4, 5]))
self.assertEqual(expected, actual)