Ensure trunk status is acknowledged during OVS subport operations

Before this patch, change to trunk status was reported only when
subports were being added, but not when being removed. This patch
tweak things to make sure the same happens on subports removal.

This patch also improves unit coverage for the parts of the code
affected. In particular the import of oslo_context has been aliased
to catch errors where context as variable is shadowed by the homonym
module.

Closes-bug: #1625135

Change-Id: I73127d7829c31ca66578eb8f7d76409b36c9dc6d
This commit is contained in:
Armando Migliaccio 2016-09-19 13:27:30 -07:00
parent 9b58b55c1d
commit 4371ae010a
4 changed files with 118 additions and 46 deletions

View File

@ -50,26 +50,27 @@ class OVSTrunkSkeleton(agent.TrunkSkeleton):
trunk_id = subports[0].trunk_id
if self.ovsdb_handler.manages_this_trunk(trunk_id):
LOG.debug("Event %s for subports: %s", event_type, subports)
if event_type == events.CREATED:
ctx = self.ovsdb_handler.context
try:
self.ovsdb_handler.wire_subports_for_trunk(
ctx, trunk_id, subports)
except oslo_messaging.MessagingException as e:
LOG.error(_LE(
"Got an error from trunk rpc when wiring subports "
"%(subports)s for trunk %(trunk_id)s: %(err)s"),
{'subports': subports,
'trunk_id': trunk_id,
'err': e})
elif event_type == events.DELETED:
subport_ids = [subport.port_id for subport in subports]
# unwire doesn't use RPC communication
self.ovsdb_handler.unwire_subports_for_trunk(
trunk_id, subport_ids)
else:
LOG.error(_LE("Unknown event type %s"), event_type)
if event_type not in (events.CREATED, events.DELETED):
LOG.error(_LE("Unknown or unimplemented event %s"), event_type)
return
ctx = self.ovsdb_handler.context
try:
LOG.debug("Event %s for subports: %s", event_type, subports)
if event_type == events.CREATED:
status = self.ovsdb_handler.wire_subports_for_trunk(
ctx, trunk_id, subports)
elif event_type == events.DELETED:
subport_ids = [subport.port_id for subport in subports]
status = self.ovsdb_handler.unwire_subports_for_trunk(
trunk_id, subport_ids)
self.ovsdb_handler.report_trunk_status(ctx, trunk_id, status)
except oslo_messaging.MessagingException as e:
LOG.error(_LE(
"Error on event %(event)s for subports "
"%(subports)s: %(err)s"),
{'event': event_type, 'subports': subports,
'trunk_id': trunk_id, 'err': e})
def init_handler(resource, event, trigger, agent=None):

View File

@ -17,7 +17,7 @@ import functools
import eventlet
from oslo_concurrency import lockutils
from oslo_context import context
from oslo_context import context as o_context
from oslo_log import log as logging
import oslo_messaging
from oslo_serialization import jsonutils
@ -108,7 +108,7 @@ class OVSDBHandler(object):
@property
def context(self):
self._context.request_id = context.generate_request_id()
self._context.request_id = o_context.generate_request_id()
return self._context
def process_trunk_port_events(
@ -197,6 +197,10 @@ class OVSDBHandler(object):
try:
parent_port_id, trunk_id, subport_ids = self._get_trunk_metadata(
port)
# NOTE(status_police): we do not report changes in trunk status on
# removal to avoid potential races between agents in case the event
# is due to a live migration or reassociation of a trunk to a new
# VM.
self.unwire_subports_for_trunk(trunk_id, subport_ids)
self.trunk_manager.remove_trunk(trunk_id, parent_port_id)
except tman.TrunkManagerError as te:
@ -246,35 +250,31 @@ class OVSDBHandler(object):
# will cause troubles during deletion.
# TODO(jlibosva): Investigate how to proceed during removal of
# trunk bridge that doesn't have metadata stored and whether it's
# wise to set DEGRADED status in case we don't have metadata
# wise to report DEGRADED status in case we don't have metadata
# present on the bridge.
self.trunk_rpc.update_trunk_status(
context, trunk_id, constants.DEGRADED_STATUS)
return
# Set trunk status to DEGRADED if not all subports were created
# succesfully
status = (constants.ACTIVE_STATUS if len(subport_ids) == len(subports)
else constants.DEGRADED_STATUS)
# NOTE(status_police): Set trunk status to ACTIVE if all subports were
# added successfully. If some port wasn't added, trunk is set to
# DEGRADED.
self.trunk_rpc.update_trunk_status(
context, trunk_id, status)
return constants.DEGRADED_STATUS
LOG.debug("Added trunk: %s", trunk_id)
return self._get_current_status(subports, subport_ids)
def unwire_subports_for_trunk(self, trunk_id, subport_ids):
"""Destroy OVS ports associated to the logical subports."""
ids = []
for subport_id in subport_ids:
try:
self.trunk_manager.remove_sub_port(trunk_id, subport_id)
ids.append(subport_id)
except tman.TrunkManagerError as te:
LOG.error(_LE("Removing subport %(subport_id)s from trunk "
"%(trunk_id)s failed: %(err)s"),
{'subport_id': subport_id,
'trunk_id': trunk_id,
'err': te})
return self._get_current_status(subport_ids, ids)
def report_trunk_status(self, context, trunk_id, status):
"""Report trunk status to the server."""
self.trunk_rpc.update_trunk_status(context, trunk_id, status)
def _get_parent_port(self, trunk_bridge):
"""Return the OVS trunk parent port plugged on trunk_bridge."""
@ -320,13 +320,15 @@ class OVSDBHandler(object):
'err': te})
# NOTE(status_police): Trunk couldn't be created so it ends in
# ERROR status and resync can fix that later.
self.trunk_rpc.update_trunk_status(context, trunk.id,
constants.ERROR_STATUS)
self.report_trunk_status(ctx, trunk.id, constants.ERROR_STATUS)
return
self.wire_subports_for_trunk(
ctx, trunk.id, trunk.sub_ports, trunk_bridge=trunk_br,
parent_port=port)
# NOTE(status_police): inform the server whether the operation
# was a partial or complete success. Do not inline status.
status = self.wire_subports_for_trunk(
ctx, trunk.id, trunk.sub_ports,
trunk_bridge=trunk_br, parent_port=port)
self.report_trunk_status(ctx, trunk.id, status)
def _set_trunk_metadata(self, trunk_bridge, port, trunk_id, subport_ids):
"""Set trunk metadata in OVS port for trunk parent port."""
@ -351,3 +353,18 @@ class OVSDBHandler(object):
subport_ids = jsonutils.loads(port['external_ids']['subport_ids'])
return parent_port_id, trunk_id, subport_ids
def _get_current_status(self, expected_subports, actual_subports):
"""Return the current status of the trunk.
If the number of expected subports to be processed does not match the
number of subports successfully processed, the status returned is
DEGRADED, ACTIVE otherwise.
"""
# NOTE(status_police): a call to this method should be followed by
# a trunk_update_status to report the latest trunk status, but there
# can be exceptions (e.g. unwire_subports_for_trunk).
if len(expected_subports) != len(actual_subports):
return constants.DEGRADED_STATUS
else:
return constants.ACTIVE_STATUS

View File

@ -91,9 +91,42 @@ class OvsTrunkSkeletonTest(base.BaseTestCase):
self.assertFalse(self.trunk_manager.wire_subports_for_trunk.called)
self.assertFalse(self.trunk_manager.unwire_subports_for_trunk.called)
def test_handle_subports_unknown_event(self):
trunk_rpc = self.skeleton.ovsdb_handler.trunk_rpc
# unknown events should be ignored and thus lead to no updates
# and no trunk interactions.
with mock.patch.object(
self.skeleton.ovsdb_handler,
'wire_subports_for_trunk') as f,\
mock.patch.object(
self.skeleton.ovsdb_handler,
'unwire_subports_for_trunk') as g:
self.skeleton.handle_subports(self.subports, events.UPDATED)
self.assertFalse(f.called)
self.assertFalse(g.called)
self.assertFalse(trunk_rpc.update_trunk_status.called)
def test_handle_subports_trunk_rpc_error(self):
trunk_rpc = self.skeleton.ovsdb_handler.trunk_rpc
trunk_rpc.update_subport_bindings.side_effect = (
oslo_messaging.MessagingException)
self.skeleton.handle_subports(self.subports, events.CREATED)
self.assertTrue(trunk_rpc.update_subport_bindings.called)
def _test_handle_subports_trunk_on_trunk_update(self, event):
trunk_rpc = self.skeleton.ovsdb_handler.trunk_rpc
self.skeleton.handle_subports(self.subports, event)
# Make sure trunk state is reported to the server
self.assertTrue(trunk_rpc.update_trunk_status.called)
def test_handle_subports_created_trunk_on_trunk_update(self):
with mock.patch.object(
self.skeleton.ovsdb_handler, 'wire_subports_for_trunk'):
self._test_handle_subports_trunk_on_trunk_update(
events.CREATED)
def test_handle_subports_deleted_trunk_on_trunk_update(self):
with mock.patch.object(
self.skeleton.ovsdb_handler, 'unwire_subports_for_trunk'):
self._test_handle_subports_trunk_on_trunk_update(
events.DELETED)

View File

@ -188,11 +188,10 @@ class TestOVSDBHandler(base.BaseTestCase):
self.trunk_manager.add_sub_port.side_effect = (
trunk_manager.TrunkManagerError(error='error'))
self.ovsdb_handler.wire_subports_for_trunk(
status = self.ovsdb_handler.wire_subports_for_trunk(
None, 'trunk_id', self.fake_subports)
trunk_rpc.update_trunk_status.assert_called_once_with(
mock.ANY, mock.ANY, constants.DEGRADED_STATUS)
self.assertEqual(constants.DEGRADED_STATUS, status)
@mock.patch('neutron.agent.common.ovs_lib.OVSBridge')
def test_wire_subports_for_trunk_ovsdb_failure(self, br):
@ -200,13 +199,16 @@ class TestOVSDBHandler(base.BaseTestCase):
self.subport_bindings)
with mock.patch.object(self.ovsdb_handler, '_set_trunk_metadata',
side_effect=RuntimeError):
self.ovsdb_handler.wire_subports_for_trunk(
status = self.ovsdb_handler.wire_subports_for_trunk(
None, 'trunk_id', self.fake_subports)
self.assertEqual(constants.DEGRADED_STATUS, status)
def test_unwire_subports_for_trunk_trunk_manager_failure(self):
self.trunk_manager.remove_sub_port.side_effect = (
trunk_manager.TrunkManagerError(error='error'))
self.ovsdb_handler.unwire_subports_for_trunk(None, ['subport_id'])
status = self.ovsdb_handler.unwire_subports_for_trunk(
mock.ANY, ['subport_id'])
self.assertEqual(constants.DEGRADED_STATUS, status)
def test__wire_trunk_get_trunk_details_failure(self):
self.trunk_manager.get_port_uuid_from_external_ids.side_effect = (
@ -218,6 +220,8 @@ class TestOVSDBHandler(base.BaseTestCase):
resources_rpc.ResourceNotFound(
resource_id='id', resource_type='type'))
self.ovsdb_handler._wire_trunk(mock.Mock(), self.fake_port)
trunk_rpc = self.ovsdb_handler.trunk_rpc
self.assertFalse(trunk_rpc.update_trunk_status.called)
def test__wire_trunk_create_trunk_failure(self):
self.trunk_manager.create_trunk.side_effect = (
@ -227,6 +231,14 @@ class TestOVSDBHandler(base.BaseTestCase):
trunk_rpc.update_trunk_status.assert_called_once_with(
mock.ANY, mock.ANY, constants.ERROR_STATUS)
def test__wire_trunk_report_trunk_called_on_wiring(self):
with mock.patch.object(self.trunk_manager, 'create_trunk'),\
mock.patch.object(self.ovsdb_handler,
'wire_subports_for_trunk'):
self.ovsdb_handler._wire_trunk(mock.Mock(), self.fake_port)
trunk_rpc = self.ovsdb_handler.trunk_rpc
self.assertTrue(trunk_rpc.update_trunk_status.called)
@mock.patch('neutron.agent.common.ovs_lib.OVSBridge')
def test__set_trunk_metadata_with_None_params(self, br):
mock_br = br.return_value
@ -238,3 +250,12 @@ class TestOVSDBHandler(base.BaseTestCase):
'Interface', 'foo', 'external_ids',
{'bridge_name': mock.ANY, 'trunk_id': 'foo',
'subport_ids': '[]'})
def test__get_current_status_active(self):
self.assertEqual(constants.ACTIVE_STATUS,
self.ovsdb_handler._get_current_status([], []))
def test__get_current_status_degraded(self):
self.assertEqual(constants.DEGRADED_STATUS,
self.ovsdb_handler._get_current_status(
[mock.ANY], []))