diff --git a/neutron/services/trunk/drivers/openvswitch/agent/ovsdb_handler.py b/neutron/services/trunk/drivers/openvswitch/agent/ovsdb_handler.py index 560cd1f756b..bb74f27840c 100644 --- a/neutron/services/trunk/drivers/openvswitch/agent/ovsdb_handler.py +++ b/neutron/services/trunk/drivers/openvswitch/agent/ovsdb_handler.py @@ -67,6 +67,10 @@ def is_trunk_bridge(port_name): return port_name.startswith(t_const.TRUNK_BR_PREFIX) +def is_subport(port_name): + return port_name.startswith(tman.SubPort.DEV_PREFIX) + + def is_trunk_service_port(port_name): """True if the port is any of the ports used to realize a trunk.""" return is_trunk_bridge(port_name) or port_name[:2] in ( @@ -74,9 +78,8 @@ def is_trunk_service_port(port_name): tman.SubPort.DEV_PREFIX) -def bridge_has_instance_port(bridge): - """True if there is an OVS port that doesn't have bridge or patch ports - prefix. +def bridge_has_port(bridge, is_port_predicate): + """True if there is an OVS port for which is_port_predicate is True. """ try: ifaces = bridge.get_iface_name_list() @@ -87,8 +90,21 @@ def bridge_has_instance_port(bridge): 'err': e}) return False - return any(iface for iface in ifaces - if not is_trunk_service_port(iface)) + return any(iface for iface in ifaces if is_port_predicate(iface)) + + +def bridge_has_instance_port(bridge): + """True if there is an OVS port that doesn't have bridge or patch ports + prefix. + """ + is_instance_port = lambda p: not is_trunk_service_port(p) + return bridge_has_port(bridge, is_instance_port) + + +def bridge_has_service_port(bridge): + """True if there is an OVS port that is used to implement a trunk. + """ + return bridge_has_port(bridge, is_trunk_service_port) class OVSDBHandler(object): @@ -167,10 +183,13 @@ class OVSDBHandler(object): bridge.destroy() return + # Check if the trunk was provisioned in a previous run. This can happen + # at agent startup when existing trunks are notified as added events. + rewire = bridge_has_service_port(bridge) # Once we get hold of the trunk parent port, we can provision # the OVS dataplane for the trunk. try: - self._wire_trunk(bridge, self._get_parent_port(bridge)) + self._wire_trunk(bridge, self._get_parent_port(bridge), rewire) except oslo_messaging.MessagingException as e: LOG.error(_LE("Got messaging error while processing trunk bridge " "%(bridge_name)s: %(err)s"), @@ -217,6 +236,23 @@ class OVSDBHandler(object): bridge_name = utils.gen_trunk_br_name(trunk_id) return ovs_lib.BaseOVS().bridge_exists(bridge_name) + def get_connected_subports_for_trunk(self, trunk_id): + """Return the list of subports present on the trunk bridge.""" + bridge = ovs_lib.OVSBridge(utils.gen_trunk_br_name(trunk_id)) + if not bridge.bridge_exists(bridge.br_name): + return [] + try: + ports = bridge.get_ports_attributes( + 'Interface', columns=['name', 'external_ids']) + return [ + self.trunk_manager.get_port_uuid_from_external_ids(port) + for port in ports if is_subport(port['name']) + ] + except (RuntimeError, tman.TrunkManagerError) as e: + LOG.error(_LE("Failed to get subports for bridge %(bridge)s: " + "%(err)s"), {'bridge': bridge.br_name, 'err': e}) + return [] + def wire_subports_for_trunk(self, context, trunk_id, subports, trunk_bridge=None, parent_port=None): """Create OVS ports associated to the logical subports.""" @@ -302,14 +338,19 @@ class OVSDBHandler(object): "Can't find parent port for trunk bridge %s" % trunk_bridge.br_name) - def _wire_trunk(self, trunk_br, port): + def _wire_trunk(self, trunk_br, port, rewire=False): """Wire trunk bridge with integration bridge. The method calls into trunk manager to create patch ports for trunk and - patch ports for all subports associated with this trunk. + patch ports for all subports associated with this trunk. If rewire is + True, a diff is performed between desired state (the one got from the + server) and actual state (the patch ports present on the trunk bridge) + and subports are wired/unwired accordingly. :param trunk_br: OVSBridge object representing the trunk bridge. :param port: Parent port dict. + :param rewire: True if local trunk state must be reconciled with + server's state. """ ctx = self.context try: @@ -337,8 +378,19 @@ class OVSDBHandler(object): self.report_trunk_status(ctx, trunk.id, constants.ERROR_STATUS) return + # We need to remove stale subports + if rewire: + old_subport_ids = self.get_connected_subports_for_trunk(trunk.id) + subports = {p['port_id'] for p in trunk.sub_ports} + subports_to_delete = set(old_subport_ids) - subports + if subports_to_delete: + self.unwire_subports_for_trunk(trunk.id, subports_to_delete) + # NOTE(status_police): inform the server whether the operation # was a partial or complete success. Do not inline status. + # NOTE: in case of rewiring we readd ports that are already present on + # the bridge because e.g. the segmentation ID might have changed (e.g. + # agent crashed, port was removed and readded with a different seg ID) status = self.wire_subports_for_trunk( ctx, trunk.id, trunk.sub_ports, trunk_bridge=trunk_br, parent_port=port) diff --git a/neutron/services/trunk/drivers/openvswitch/agent/trunk_manager.py b/neutron/services/trunk/drivers/openvswitch/agent/trunk_manager.py index b604165f44d..00145717e7e 100644 --- a/neutron/services/trunk/drivers/openvswitch/agent/trunk_manager.py +++ b/neutron/services/trunk/drivers/openvswitch/agent/trunk_manager.py @@ -133,7 +133,8 @@ class TrunkParentPort(object): # control over the wiring logic for trunk ports be required. patch_int_attrs = get_patch_peer_attrs( self.patch_port_trunk_name, self.port_mac, self.port_id) - patch_trunk_attrs = get_patch_peer_attrs(self.patch_port_int_name) + patch_trunk_attrs = get_patch_peer_attrs(self.patch_port_int_name, + self.port_mac, self.port_id) with self.ovsdb_transaction() as txn: txn.add(ovsdb.add_port(br_int.br_name, diff --git a/neutron/tests/functional/agent/l2/base.py b/neutron/tests/functional/agent/l2/base.py index a3fe5742411..fc8d9d87797 100644 --- a/neutron/tests/functional/agent/l2/base.py +++ b/neutron/tests/functional/agent/l2/base.py @@ -145,6 +145,10 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase): return {'added': filtered_ports, 'removed': events['removed']} polling_manager.get_events = mock.Mock(side_effect=filter_events) + def stop_agent(self, agent, rpc_loop_thread): + agent.run_daemon_loop = False + rpc_loop_thread.wait() + def start_agent(self, agent, ports=None, unplug_ports=None): if unplug_ports is None: unplug_ports = [] @@ -159,13 +163,10 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase): polling_manager._monitor.is_active) agent.check_ovs_status = mock.Mock( return_value=constants.OVS_NORMAL) - t = eventlet.spawn(agent.rpc_loop, polling_manager) + self.agent_thread = eventlet.spawn(agent.rpc_loop, + polling_manager) - def stop_agent(agent, rpc_loop_thread): - agent.run_daemon_loop = False - rpc_loop_thread.wait() - - self.addCleanup(stop_agent, agent, t) + self.addCleanup(self.stop_agent, agent, self.agent_thread) return polling_manager def _create_test_port_dict(self): diff --git a/neutron/tests/functional/services/trunk/drivers/openvswitch/agent/test_ovsdb_handler.py b/neutron/tests/functional/services/trunk/drivers/openvswitch/agent/test_ovsdb_handler.py index 2f92e10ff7f..ea5ea07903b 100644 --- a/neutron/tests/functional/services/trunk/drivers/openvswitch/agent/test_ovsdb_handler.py +++ b/neutron/tests/functional/services/trunk/drivers/openvswitch/agent/test_ovsdb_handler.py @@ -145,3 +145,48 @@ class OVSDBHandlerTestCase(base.OVSAgentTestFramework): ports = self.create_test_ports(amount=1) self.trunk_dict['port_id'] = ports[0]['id'] self._test_trunk_creation_helper(ports) + + def test_resync(self): + ports = self.create_test_ports(amount=3) + self.trunk_dict['port_id'] = ports[0]['id'] + self.trunk_dict['sub_ports'] = [trunk_obj.SubPort( + id=uuidutils.generate_uuid(), + port_id=ports[i]['id'], + mac_address=ports[i]['mac_address'], + segmentation_id=i, + trunk_id=self.trunk_dict['id']) + for i in range(1, 3)] + + self.setup_agent_and_ports(port_dicts=ports) + self.wait_until_ports_state(self.ports, up=True) + self.agent.fullsync = True + self.wait_until_ports_state(self.ports, up=True) + + def test_restart(self): + ports = self.create_test_ports(amount=3) + self.trunk_dict['port_id'] = ports[0]['id'] + self.trunk_dict['sub_ports'] = [trunk_obj.SubPort( + id=uuidutils.generate_uuid(), + port_id=ports[i]['id'], + mac_address=ports[i]['mac_address'], + segmentation_id=i, + trunk_id=self.trunk_dict['id']) + for i in range(1, 3)] + + self.setup_agent_and_ports(port_dicts=ports) + self.wait_until_ports_state(self.ports, up=True) + + # restart and simulate a subport delete + deleted_port = self.ports[2] + deleted_sp = trunk_manager.SubPort( + self.trunk_dict['id'], deleted_port['id']) + self.stop_agent(self.agent, self.agent_thread) + self.polling_manager.stop() + self.trunk_dict['sub_ports'] = self.trunk_dict['sub_ports'][:1] + self.setup_agent_and_ports(port_dicts=ports[:2]) + # NOTE: the port_dicts passed in setup_agent_and_ports is stored in + # self.ports so we are waiting here only for ports[:2] + self.wait_until_ports_state(self.ports, up=True) + common_utils.wait_until_true( + lambda: (deleted_sp.patch_port_trunk_name not in + self.trunk_br.get_port_name_list()))