diff --git a/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py b/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py index 8036f5f304e..25c7bb4ac91 100644 --- a/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py +++ b/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py @@ -38,6 +38,7 @@ from neutron.agent import rpc as agent_rpc from neutron.agent import securitygroups_rpc as sg_rpc from neutron.common import config as logging_config from neutron.common import constants as q_const +from neutron.common import exceptions from neutron.common import legacy from neutron.common import topics from neutron.common import utils as q_utils @@ -55,6 +56,11 @@ LOG = logging.getLogger(__name__) DEAD_VLAN_TAG = str(q_const.MAX_VLAN_TAG + 1) +class DeviceListRetrievalError(exceptions.NeutronException): + message = _("Unable to retrieve port details for devices: %(devices)s " + "because of error: %(error)s") + + # A class to represent a VIF (i.e., a port that has 'iface-id' and 'vif-mac' # attributes set). class LocalVLANMapping: @@ -928,22 +934,32 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, self.tun_br_ofports[tunnel_type].pop(remote_ip, None) def treat_devices_added_or_updated(self, devices): - resync = False + skipped_devices = [] + devices_details_list = [] for device in devices: - LOG.debug(_("Processing port:%s"), device) try: # TODO(salv-orlando): Provide bulk API for retrieving # details for all devices in one call - details = self.plugin_rpc.get_device_details(self.context, - device, - self.agent_id) + devices_details_list.append( + self.plugin_rpc.get_device_details( + self.context, device, self.agent_id)) except Exception as e: LOG.debug(_("Unable to get port details for " "%(device)s: %(e)s"), {'device': device, 'e': e}) - resync = True + raise DeviceListRetrievalError(devices=devices, error=e) + + for details in devices_details_list: + device = details['device'] + LOG.debug(_("Processing port %s"), device) + port = self.int_br.get_vif_port_by_id(device) + if not port: + # The port disappeared and cannot be processed + LOG.info(_("Port %s was not found on the integration bridge " + "and will therefore not be processed"), device) + skipped_devices.append(device) continue - port = self.int_br.get_vif_port_by_id(details['device']) + if 'port_id' in details: LOG.info(_("Port %(device)s updated. Details: %(details)s"), {'device': device, 'details': details}) @@ -954,6 +970,10 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, details['segmentation_id'], details['admin_state_up']) # update plugin about port status + # FIXME(salv-orlando): Failures while updating device status + # must be handled appropriately. Otherwise this might prevent + # neutron server from sending network-vif-* events to the nova + # API server, thus possibly preventing instance spawn. if details.get('admin_state_up'): LOG.debug(_("Setting status for %s to UP"), device) self.plugin_rpc.update_device_up( @@ -966,28 +986,32 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, LOG.debug(_("Device %s not defined on plugin"), device) if (port and int(port.ofport) != -1): self.port_dead(port) - return resync + return skipped_devices def treat_ancillary_devices_added(self, devices): - resync = False + devices_details_list = [] for device in devices: - LOG.info(_("Ancillary Port %s added"), device) try: - self.plugin_rpc.get_device_details(self.context, device, - self.agent_id) + # TODO(salv-orlando): Provide bulk API for retrieving + # details for all devices in one call + devices_details_list.append( + self.plugin_rpc.get_device_details( + self.context, device, self.agent_id)) except Exception as e: LOG.debug(_("Unable to get port details for " "%(device)s: %(e)s"), {'device': device, 'e': e}) - resync = True - continue + raise DeviceListRetrievalError(devices=devices, error=e) + + for details in devices_details_list: + device = details['device'] + LOG.info(_("Ancillary Port %s added"), device) # update plugin about port status self.plugin_rpc.update_device_up(self.context, device, self.agent_id, cfg.CONF.host) - return resync def treat_devices_removed(self, devices): resync = False @@ -1051,8 +1075,29 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, devices_added_updated = (port_info.get('added', set()) | port_info.get('updated', set())) if devices_added_updated: - resync_a = self.treat_devices_added_or_updated( - devices_added_updated) + start = time.time() + try: + skipped_devices = self.treat_devices_added_or_updated( + devices_added_updated) + LOG.debug(_("process_network_ports - " + "treat_devices_added_or_updated completed. " + "Skipped %(num_skipped)d devices of " + "%(num_current)d devices currently available. " + "Time elapsed: %(elapsed).3f"), + {'num_skipped': len(skipped_devices), + 'num_current': len(port_info['current']), + 'elapsed': time.time() - start}) + # Update the list of current ports storing only those which + # have been actually processed. + port_info['current'] = (port_info['current'] - + set(skipped_devices)) + except DeviceListRetrievalError: + # Need to resync as there was an error with server + # communication. + LOG.exception(_("process_network_ports - " + "failure while retrieving port details " + "from server")) + resync_a = True if 'removed' in port_info: resync_b = self.treat_devices_removed(port_info['removed']) # If one of the above opertaions fails => resync with plugin @@ -1062,7 +1107,20 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, resync_a = False resync_b = False if 'added' in port_info: - resync_a = self.treat_ancillary_devices_added(port_info['added']) + start = time.time() + try: + self.treat_ancillary_devices_added(port_info['added']) + LOG.debug(_("process_ancillary_network_ports - " + "treat_ancillary_devices_added " + "completed in %(elapsed).3f"), + {'elapsed': time.time() - start}) + except DeviceListRetrievalError: + # Need to resync as there was an error with server + # communication. + LOG.exception(_("process_ancillary_network_ports - " + "failure while retrieving " + "port details from server")) + resync_a = True if 'removed' in port_info: resync_b = self.treat_ancillary_devices_removed( port_info['removed']) diff --git a/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py b/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py index a985a798ca2..0e7d2df4219 100644 --- a/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py +++ b/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py @@ -233,10 +233,15 @@ class TestOvsNeutronAgent(base.BaseTestCase): vif_port_set, registered_ports, port_tags_dict=port_tags_dict) self.assertEqual(expected, actual) - def test_treat_devices_added_returns_true_for_missing_device(self): - with mock.patch.object(self.agent.plugin_rpc, 'get_device_details', - side_effect=Exception()): - self.assertTrue(self.agent.treat_devices_added_or_updated([{}])) + def test_treat_devices_added_returns_raises_for_missing_device(self): + with contextlib.nested( + mock.patch.object(self.agent.plugin_rpc, 'get_device_details', + side_effect=Exception()), + mock.patch.object(self.agent.int_br, 'get_vif_port_by_id', + return_value=mock.Mock())): + self.assertRaises( + ovs_neutron_agent.DeviceListRetrievalError, + self.agent.treat_devices_added_or_updated, [{}]) def _mock_treat_devices_added_updated(self, details, port, func_name): """Mock treat devices added or updated. @@ -255,7 +260,9 @@ class TestOvsNeutronAgent(base.BaseTestCase): mock.patch.object(self.agent.plugin_rpc, 'update_device_down'), mock.patch.object(self.agent, func_name) ) as (get_dev_fn, get_vif_func, upd_dev_up, upd_dev_down, func): - self.assertFalse(self.agent.treat_devices_added_or_updated([{}])) + skip_devs = self.agent.treat_devices_added_or_updated([{}]) + # The function should not raise + self.assertFalse(skip_devs) return func.called def test_treat_devices_added_updated_ignores_invalid_ofport(self): @@ -270,12 +277,42 @@ class TestOvsNeutronAgent(base.BaseTestCase): self.assertTrue(self._mock_treat_devices_added_updated( mock.MagicMock(), port, 'port_dead')) + def test_treat_devices_added_does_not_process_missing_port(self): + with contextlib.nested( + mock.patch.object(self.agent.plugin_rpc, 'get_device_details'), + mock.patch.object(self.agent.int_br, 'get_vif_port_by_id', + return_value=None) + ) as (get_dev_fn, get_vif_func): + self.assertFalse(get_dev_fn.called) + def test_treat_devices_added_updated_updates_known_port(self): details = mock.MagicMock() details.__contains__.side_effect = lambda x: True self.assertTrue(self._mock_treat_devices_added_updated( details, mock.Mock(), 'treat_vif_port')) + def test_treat_devices_added_updated_skips_if_port_not_found(self): + dev_mock = mock.MagicMock() + dev_mock.__getitem__.return_value = 'the_skipped_one' + with contextlib.nested( + mock.patch.object(self.agent.plugin_rpc, + 'get_device_details', + return_value=dev_mock), + mock.patch.object(self.agent.int_br, 'get_vif_port_by_id', + return_value=None), + mock.patch.object(self.agent.plugin_rpc, 'update_device_up'), + mock.patch.object(self.agent.plugin_rpc, 'update_device_down'), + mock.patch.object(self.agent, 'treat_vif_port') + ) as (get_dev_fn, get_vif_func, upd_dev_up, + upd_dev_down, treat_vif_port): + skip_devs = self.agent.treat_devices_added_or_updated([{}]) + # The function should return False for resync and no device + # processed + self.assertEqual(['the_skipped_one'], skip_devs) + self.assertFalse(treat_vif_port.called) + self.assertFalse(upd_dev_down.called) + self.assertFalse(upd_dev_up.called) + def test_treat_devices_added_updated_put_port_down(self): fake_details_dict = {'admin_state_up': False, 'port_id': 'xxx', @@ -294,7 +331,9 @@ class TestOvsNeutronAgent(base.BaseTestCase): mock.patch.object(self.agent, 'treat_vif_port') ) as (get_dev_fn, get_vif_func, upd_dev_up, upd_dev_down, treat_vif_port): - self.assertFalse(self.agent.treat_devices_added_or_updated([{}])) + skip_devs = self.agent.treat_devices_added_or_updated([{}]) + # The function should return False for resync + self.assertFalse(skip_devs) self.assertTrue(treat_vif_port.called) self.assertTrue(upd_dev_down.called) @@ -321,7 +360,7 @@ class TestOvsNeutronAgent(base.BaseTestCase): with contextlib.nested( mock.patch.object(self.agent.sg_agent, "setup_port_filters"), mock.patch.object(self.agent, "treat_devices_added_or_updated", - return_value=False), + return_value=[]), mock.patch.object(self.agent, "treat_devices_removed", return_value=False) ) as (setup_port_filters, device_added_updated, device_removed): diff --git a/neutron/tests/unit/services/loadbalancer/test_agent_scheduler.py b/neutron/tests/unit/services/loadbalancer/test_agent_scheduler.py index 83a97aea000..f163e660d67 100644 --- a/neutron/tests/unit/services/loadbalancer/test_agent_scheduler.py +++ b/neutron/tests/unit/services/loadbalancer/test_agent_scheduler.py @@ -142,6 +142,7 @@ class LBaaSAgentSchedulerTestCase(test_agent_ext_plugin.AgentDBTestMixIn, lbaas_plugin.create_pool, self.adminContext, pool) def test_schedule_pool_with_down_agent(self): + self.skipTest("Skipping test until #1344086 is fixed.") lbaas_hosta = { 'binary': 'neutron-loadbalancer-agent', 'host': LBAAS_HOSTA,