Handle add/remove subports events loss due to agent failures

Upon restart the agent reconciles the logical with the physical
state by removing/adding physical subports that are used to
be/are current present in the logical view.

This patch adds a functional test to demonstrate that there's
no need to handle the resync in the trunk driver, since the ovs
agent already takes care of it.

Change-Id: I164153c79313f2ae7a1fca0414736d5987656185
Partially-implements: blueprint vlan-aware-vms
Partial-bug: #1623708
(cherry picked from commit 08f2af18f9)
This commit is contained in:
rossella 2016-09-02 19:40:52 +02:00 committed by Armando Migliaccio
parent de71c79269
commit 766fb5f252
4 changed files with 114 additions and 15 deletions

View File

@ -67,6 +67,10 @@ def is_trunk_bridge(port_name):
return port_name.startswith(t_const.TRUNK_BR_PREFIX)
def is_subport(port_name):
return port_name.startswith(tman.SubPort.DEV_PREFIX)
def is_trunk_service_port(port_name):
"""True if the port is any of the ports used to realize a trunk."""
return is_trunk_bridge(port_name) or port_name[:2] in (
@ -74,9 +78,8 @@ def is_trunk_service_port(port_name):
tman.SubPort.DEV_PREFIX)
def bridge_has_instance_port(bridge):
"""True if there is an OVS port that doesn't have bridge or patch ports
prefix.
def bridge_has_port(bridge, is_port_predicate):
"""True if there is an OVS port for which is_port_predicate is True.
"""
try:
ifaces = bridge.get_iface_name_list()
@ -87,8 +90,21 @@ def bridge_has_instance_port(bridge):
'err': e})
return False
return any(iface for iface in ifaces
if not is_trunk_service_port(iface))
return any(iface for iface in ifaces if is_port_predicate(iface))
def bridge_has_instance_port(bridge):
"""True if there is an OVS port that doesn't have bridge or patch ports
prefix.
"""
is_instance_port = lambda p: not is_trunk_service_port(p)
return bridge_has_port(bridge, is_instance_port)
def bridge_has_service_port(bridge):
"""True if there is an OVS port that is used to implement a trunk.
"""
return bridge_has_port(bridge, is_trunk_service_port)
class OVSDBHandler(object):
@ -167,10 +183,13 @@ class OVSDBHandler(object):
bridge.destroy()
return
# Check if the trunk was provisioned in a previous run. This can happen
# at agent startup when existing trunks are notified as added events.
rewire = bridge_has_service_port(bridge)
# Once we get hold of the trunk parent port, we can provision
# the OVS dataplane for the trunk.
try:
self._wire_trunk(bridge, self._get_parent_port(bridge))
self._wire_trunk(bridge, self._get_parent_port(bridge), rewire)
except oslo_messaging.MessagingException as e:
LOG.error(_LE("Got messaging error while processing trunk bridge "
"%(bridge_name)s: %(err)s"),
@ -217,6 +236,23 @@ class OVSDBHandler(object):
bridge_name = utils.gen_trunk_br_name(trunk_id)
return ovs_lib.BaseOVS().bridge_exists(bridge_name)
def get_connected_subports_for_trunk(self, trunk_id):
"""Return the list of subports present on the trunk bridge."""
bridge = ovs_lib.OVSBridge(utils.gen_trunk_br_name(trunk_id))
if not bridge.bridge_exists(bridge.br_name):
return []
try:
ports = bridge.get_ports_attributes(
'Interface', columns=['name', 'external_ids'])
return [
self.trunk_manager.get_port_uuid_from_external_ids(port)
for port in ports if is_subport(port['name'])
]
except (RuntimeError, tman.TrunkManagerError) as e:
LOG.error(_LE("Failed to get subports for bridge %(bridge)s: "
"%(err)s"), {'bridge': bridge.br_name, 'err': e})
return []
def wire_subports_for_trunk(self, context, trunk_id, subports,
trunk_bridge=None, parent_port=None):
"""Create OVS ports associated to the logical subports."""
@ -302,14 +338,19 @@ class OVSDBHandler(object):
"Can't find parent port for trunk bridge %s" %
trunk_bridge.br_name)
def _wire_trunk(self, trunk_br, port):
def _wire_trunk(self, trunk_br, port, rewire=False):
"""Wire trunk bridge with integration bridge.
The method calls into trunk manager to create patch ports for trunk and
patch ports for all subports associated with this trunk.
patch ports for all subports associated with this trunk. If rewire is
True, a diff is performed between desired state (the one got from the
server) and actual state (the patch ports present on the trunk bridge)
and subports are wired/unwired accordingly.
:param trunk_br: OVSBridge object representing the trunk bridge.
:param port: Parent port dict.
:param rewire: True if local trunk state must be reconciled with
server's state.
"""
ctx = self.context
try:
@ -337,8 +378,19 @@ class OVSDBHandler(object):
self.report_trunk_status(ctx, trunk.id, constants.ERROR_STATUS)
return
# We need to remove stale subports
if rewire:
old_subport_ids = self.get_connected_subports_for_trunk(trunk.id)
subports = {p['port_id'] for p in trunk.sub_ports}
subports_to_delete = set(old_subport_ids) - subports
if subports_to_delete:
self.unwire_subports_for_trunk(trunk.id, subports_to_delete)
# NOTE(status_police): inform the server whether the operation
# was a partial or complete success. Do not inline status.
# NOTE: in case of rewiring we readd ports that are already present on
# the bridge because e.g. the segmentation ID might have changed (e.g.
# agent crashed, port was removed and readded with a different seg ID)
status = self.wire_subports_for_trunk(
ctx, trunk.id, trunk.sub_ports,
trunk_bridge=trunk_br, parent_port=port)

View File

@ -133,7 +133,8 @@ class TrunkParentPort(object):
# control over the wiring logic for trunk ports be required.
patch_int_attrs = get_patch_peer_attrs(
self.patch_port_trunk_name, self.port_mac, self.port_id)
patch_trunk_attrs = get_patch_peer_attrs(self.patch_port_int_name)
patch_trunk_attrs = get_patch_peer_attrs(self.patch_port_int_name,
self.port_mac, self.port_id)
with self.ovsdb_transaction() as txn:
txn.add(ovsdb.add_port(br_int.br_name,

View File

@ -145,6 +145,10 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
return {'added': filtered_ports, 'removed': events['removed']}
polling_manager.get_events = mock.Mock(side_effect=filter_events)
def stop_agent(self, agent, rpc_loop_thread):
agent.run_daemon_loop = False
rpc_loop_thread.wait()
def start_agent(self, agent, ports=None, unplug_ports=None):
if unplug_ports is None:
unplug_ports = []
@ -159,13 +163,10 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
polling_manager._monitor.is_active)
agent.check_ovs_status = mock.Mock(
return_value=constants.OVS_NORMAL)
t = eventlet.spawn(agent.rpc_loop, polling_manager)
self.agent_thread = eventlet.spawn(agent.rpc_loop,
polling_manager)
def stop_agent(agent, rpc_loop_thread):
agent.run_daemon_loop = False
rpc_loop_thread.wait()
self.addCleanup(stop_agent, agent, t)
self.addCleanup(self.stop_agent, agent, self.agent_thread)
return polling_manager
def _create_test_port_dict(self):

View File

@ -145,3 +145,48 @@ class OVSDBHandlerTestCase(base.OVSAgentTestFramework):
ports = self.create_test_ports(amount=1)
self.trunk_dict['port_id'] = ports[0]['id']
self._test_trunk_creation_helper(ports)
def test_resync(self):
ports = self.create_test_ports(amount=3)
self.trunk_dict['port_id'] = ports[0]['id']
self.trunk_dict['sub_ports'] = [trunk_obj.SubPort(
id=uuidutils.generate_uuid(),
port_id=ports[i]['id'],
mac_address=ports[i]['mac_address'],
segmentation_id=i,
trunk_id=self.trunk_dict['id'])
for i in range(1, 3)]
self.setup_agent_and_ports(port_dicts=ports)
self.wait_until_ports_state(self.ports, up=True)
self.agent.fullsync = True
self.wait_until_ports_state(self.ports, up=True)
def test_restart(self):
ports = self.create_test_ports(amount=3)
self.trunk_dict['port_id'] = ports[0]['id']
self.trunk_dict['sub_ports'] = [trunk_obj.SubPort(
id=uuidutils.generate_uuid(),
port_id=ports[i]['id'],
mac_address=ports[i]['mac_address'],
segmentation_id=i,
trunk_id=self.trunk_dict['id'])
for i in range(1, 3)]
self.setup_agent_and_ports(port_dicts=ports)
self.wait_until_ports_state(self.ports, up=True)
# restart and simulate a subport delete
deleted_port = self.ports[2]
deleted_sp = trunk_manager.SubPort(
self.trunk_dict['id'], deleted_port['id'])
self.stop_agent(self.agent, self.agent_thread)
self.polling_manager.stop()
self.trunk_dict['sub_ports'] = self.trunk_dict['sub_ports'][:1]
self.setup_agent_and_ports(port_dicts=ports[:2])
# NOTE: the port_dicts passed in setup_agent_and_ports is stored in
# self.ports so we are waiting here only for ports[:2]
self.wait_until_ports_state(self.ports, up=True)
common_utils.wait_until_true(
lambda: (deleted_sp.patch_port_trunk_name not in
self.trunk_br.get_port_name_list()))