ovs agent resync may miss port remove event

In OVS Agent rpc_loop() resync mechanism clears the registered ports and
rescans them again, and it might result in missing some "port removed"
event and treat_devices_removed will not be called.

This fix rescans the newly updated ports when resync mechanism called,
without clearing the current registered ports.

The registered ports will be cleared only if there are too many
consecutive resyncs to avoid resycing forever because of the same
faulty port.

Closes-Bug: #1329223

Co-Authored-By: Andrey Epifanov <aepifanov@mirantis.com>
Co-Authored-By: Gandharva S <gandharva.s@hp.com>
Co-Authored-By: Romil Gupta <romilg@hp.com>
Co-Authored-By: Rossella Sblendido <rsblendido@gmail.com>

Change-Id: Ib0db9dcf889d9fd90b623857782c9a6b091e18f5
This commit is contained in:
Aman Kumar 2015-03-17 03:41:54 -07:00 committed by rossella
parent 375efc377f
commit d6d0853be3
4 changed files with 57 additions and 23 deletions

View File

@ -100,3 +100,5 @@ EXTENSION_DRIVER_TYPE = 'ovs'
# ovs datapath types
OVS_DATAPATH_SYSTEM = 'system'
OVS_DATAPATH_NETDEV = 'netdev'
MAX_DEVICE_RETRIES = 5

View File

@ -1157,22 +1157,26 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
port_moves.append(name)
return port_moves
def _get_port_info(self, registered_ports, cur_ports):
def _get_port_info(self, registered_ports, cur_ports,
readd_registered_ports):
port_info = {'current': cur_ports}
# FIXME(salv-orlando): It's not really necessary to return early
# if nothing has changed.
if cur_ports == registered_ports:
# No added or removed ports to set, just return here
if not readd_registered_ports and cur_ports == registered_ports:
return port_info
port_info['added'] = cur_ports - registered_ports
# Remove all the known ports not found on the integration bridge
if readd_registered_ports:
port_info['added'] = cur_ports
else:
port_info['added'] = cur_ports - registered_ports
# Update port_info with ports not found on the integration bridge
port_info['removed'] = registered_ports - cur_ports
return port_info
def scan_ports(self, registered_ports, updated_ports=None):
def scan_ports(self, registered_ports, sync, updated_ports=None):
cur_ports = self.int_br.get_vif_port_set()
self.int_br_device_count = len(cur_ports)
port_info = self._get_port_info(registered_ports, cur_ports)
port_info = self._get_port_info(registered_ports, cur_ports, sync)
if updated_ports is None:
updated_ports = set()
updated_ports.update(self.check_changed_vlans())
@ -1186,11 +1190,11 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
port_info['updated'] = updated_ports
return port_info
def scan_ancillary_ports(self, registered_ports):
def scan_ancillary_ports(self, registered_ports, sync):
cur_ports = set()
for bridge in self.ancillary_brs:
cur_ports |= bridge.get_vif_port_set()
return self._get_port_info(registered_ports, cur_ports)
return self._get_port_info(registered_ports, cur_ports, sync)
def check_changed_vlans(self):
"""Return ports which have lost their vlan tag.
@ -1621,6 +1625,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
ancillary_ports = set()
tunnel_sync = True
ovs_restarted = False
consecutive_resyncs = 0
while self._check_and_handle_signal():
port_info = {}
ancillary_port_info = {}
@ -1629,10 +1634,18 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
self.iter_num)
if sync:
LOG.info(_LI("Agent out of sync with plugin!"))
ports.clear()
ancillary_ports.clear()
sync = False
polling_manager.force_polling()
consecutive_resyncs = consecutive_resyncs + 1
if consecutive_resyncs >= constants.MAX_DEVICE_RETRIES:
LOG.warn(_LW("Clearing cache of registered ports, retrials"
" to resync were > %s"),
constants.MAX_DEVICE_RETRIES)
ports.clear()
ancillary_ports.clear()
sync = False
consecutive_resyncs = 0
else:
consecutive_resyncs = 0
ovs_status = self.check_ovs_status()
if ovs_status == constants.OVS_RESTARTED:
self.setup_integration_br()
@ -1677,7 +1690,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
updated_ports_copy = self.updated_ports
self.updated_ports = set()
reg_ports = (set() if ovs_restarted else ports)
port_info = self.scan_ports(reg_ports, updated_ports_copy)
port_info = self.scan_ports(reg_ports, sync,
updated_ports_copy)
self.process_deleted_ports(port_info)
ofport_changed_ports = self.update_stale_ofport_rules()
if ofport_changed_ports:
@ -1688,16 +1702,16 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
"Elapsed:%(elapsed).3f",
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
# Treat ancillary devices if they exist
if self.ancillary_brs:
ancillary_port_info = self.scan_ancillary_ports(
ancillary_ports)
ancillary_ports, sync)
LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - "
"ancillary port info retrieved. "
"Elapsed:%(elapsed).3f",
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
sync = False
# Secure and wire/unwire VIFs and update their status
# on Neutron server
if (self._port_info_has_changes(port_info) or

View File

@ -283,7 +283,7 @@ class TestOvsNeutronAgent(object):
self._test_port_dead(self.mod_agent.DEAD_VLAN_TAG)
def mock_scan_ports(self, vif_port_set=None, registered_ports=None,
updated_ports=None, port_tags_dict=None):
updated_ports=None, port_tags_dict=None, sync=False):
if port_tags_dict is None: # Because empty dicts evaluate as False.
port_tags_dict = {}
with mock.patch.object(self.agent.int_br,
@ -292,7 +292,7 @@ class TestOvsNeutronAgent(object):
mock.patch.object(self.agent.int_br,
'get_port_tag_dict',
return_value=port_tags_dict):
return self.agent.scan_ports(registered_ports, updated_ports)
return self.agent.scan_ports(registered_ports, sync, updated_ports)
def test_scan_ports_returns_current_only_for_unchanged_ports(self):
vif_port_set = set([1, 3])
@ -308,6 +308,15 @@ class TestOvsNeutronAgent(object):
actual = self.mock_scan_ports(vif_port_set, registered_ports)
self.assertEqual(expected, actual)
def test_scan_ports_returns_port_changes_with_sync(self):
vif_port_set = set([1, 3])
registered_ports = set([1, 2])
expected = dict(current=vif_port_set, added=vif_port_set,
removed=set([2]))
actual = self.mock_scan_ports(vif_port_set, registered_ports,
sync=True)
self.assertEqual(expected, actual)
def _test_scan_ports_with_updated_ports(self, updated_ports):
vif_port_set = set([1, 3, 4])
registered_ports = set([1, 2, 4])
@ -1315,8 +1324,8 @@ class TestOvsNeutronAgent(object):
pass
scan_ports.assert_has_calls([
mock.call(set(), set()),
mock.call(set(), set())
mock.call(set(), True, set()),
mock.call(set(), False, set())
])
process_network_ports.assert_has_calls([
mock.call(reply2, False),
@ -1556,7 +1565,7 @@ class AncillaryBridgesTest(object):
self._test_ancillary_bridges(bridges, ['br-ex1', 'br-ex2'])
def mock_scan_ancillary_ports(self, vif_port_set=None,
registered_ports=None):
registered_ports=None, sync=False):
bridges = ['br-int', 'br-ex']
ancillary = ['br-ex']
@ -1574,7 +1583,7 @@ class AncillaryBridgesTest(object):
return_value=vif_port_set):
self.agent = self.mod_agent.OVSNeutronAgent(self._bridge_classes(),
**self.kwargs)
return self.agent.scan_ancillary_ports(registered_ports)
return self.agent.scan_ancillary_ports(registered_ports, sync)
def test_scan_ancillary_ports_returns_cur_only_for_unchanged_ports(self):
vif_port_set = set([1, 2])
@ -1590,6 +1599,15 @@ class AncillaryBridgesTest(object):
actual = self.mock_scan_ancillary_ports(vif_port_set, registered_ports)
self.assertEqual(expected, actual)
def test_scan_ancillary_ports_returns_port_changes_with_sync(self):
vif_port_set = set([1, 3])
registered_ports = set([1, 2])
expected = dict(current=vif_port_set, added=vif_port_set,
removed=set([2]))
actual = self.mock_scan_ancillary_ports(vif_port_set, registered_ports,
sync=True)
self.assertEqual(expected, actual)
class AncillaryBridgesTestOFCtl(AncillaryBridgesTest,
ovs_test_base.OVSOFCtlTestBase):

View File

@ -554,8 +554,8 @@ class TunnelTest(object):
log_exception.assert_called_once_with(
"Error while processing VIF ports")
scan_ports.assert_has_calls([
mock.call(set(), set()),
mock.call(set(['tap0']), set())
mock.call(set(), True, set()),
mock.call(set(['tap0']), False, set())
])
process_network_ports.assert_has_calls([
mock.call({'current': set(['tap0']),