Merge "Do not mark device as processed if it wasn't" into stable/havana

This commit is contained in:
Jenkins 2014-09-18 05:50:59 +00:00 committed by Gerrit Code Review
commit 501e498c60
3 changed files with 123 additions and 25 deletions

View File

@ -38,6 +38,7 @@ from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.common import config as logging_config
from neutron.common import constants as q_const
from neutron.common import exceptions
from neutron.common import legacy
from neutron.common import topics
from neutron.common import utils as q_utils
@ -55,6 +56,11 @@ LOG = logging.getLogger(__name__)
DEAD_VLAN_TAG = str(q_const.MAX_VLAN_TAG + 1)
class DeviceListRetrievalError(exceptions.NeutronException):
message = _("Unable to retrieve port details for devices: %(devices)s "
"because of error: %(error)s")
# A class to represent a VIF (i.e., a port that has 'iface-id' and 'vif-mac'
# attributes set).
class LocalVLANMapping:
@ -928,22 +934,32 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
self.tun_br_ofports[tunnel_type].pop(remote_ip, None)
def treat_devices_added_or_updated(self, devices):
resync = False
skipped_devices = []
devices_details_list = []
for device in devices:
LOG.debug(_("Processing port:%s"), device)
try:
# TODO(salv-orlando): Provide bulk API for retrieving
# details for all devices in one call
details = self.plugin_rpc.get_device_details(self.context,
device,
self.agent_id)
devices_details_list.append(
self.plugin_rpc.get_device_details(
self.context, device, self.agent_id))
except Exception as e:
LOG.debug(_("Unable to get port details for "
"%(device)s: %(e)s"),
{'device': device, 'e': e})
resync = True
raise DeviceListRetrievalError(devices=devices, error=e)
for details in devices_details_list:
device = details['device']
LOG.debug(_("Processing port %s"), device)
port = self.int_br.get_vif_port_by_id(device)
if not port:
# The port disappeared and cannot be processed
LOG.info(_("Port %s was not found on the integration bridge "
"and will therefore not be processed"), device)
skipped_devices.append(device)
continue
port = self.int_br.get_vif_port_by_id(details['device'])
if 'port_id' in details:
LOG.info(_("Port %(device)s updated. Details: %(details)s"),
{'device': device, 'details': details})
@ -954,6 +970,10 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
details['segmentation_id'],
details['admin_state_up'])
# update plugin about port status
# FIXME(salv-orlando): Failures while updating device status
# must be handled appropriately. Otherwise this might prevent
# neutron server from sending network-vif-* events to the nova
# API server, thus possibly preventing instance spawn.
if details.get('admin_state_up'):
LOG.debug(_("Setting status for %s to UP"), device)
self.plugin_rpc.update_device_up(
@ -966,28 +986,32 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
LOG.debug(_("Device %s not defined on plugin"), device)
if (port and int(port.ofport) != -1):
self.port_dead(port)
return resync
return skipped_devices
def treat_ancillary_devices_added(self, devices):
resync = False
devices_details_list = []
for device in devices:
LOG.info(_("Ancillary Port %s added"), device)
try:
self.plugin_rpc.get_device_details(self.context, device,
self.agent_id)
# TODO(salv-orlando): Provide bulk API for retrieving
# details for all devices in one call
devices_details_list.append(
self.plugin_rpc.get_device_details(
self.context, device, self.agent_id))
except Exception as e:
LOG.debug(_("Unable to get port details for "
"%(device)s: %(e)s"),
{'device': device, 'e': e})
resync = True
continue
raise DeviceListRetrievalError(devices=devices, error=e)
for details in devices_details_list:
device = details['device']
LOG.info(_("Ancillary Port %s added"), device)
# update plugin about port status
self.plugin_rpc.update_device_up(self.context,
device,
self.agent_id,
cfg.CONF.host)
return resync
def treat_devices_removed(self, devices):
resync = False
@ -1051,8 +1075,29 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
devices_added_updated = (port_info.get('added', set()) |
port_info.get('updated', set()))
if devices_added_updated:
resync_a = self.treat_devices_added_or_updated(
devices_added_updated)
start = time.time()
try:
skipped_devices = self.treat_devices_added_or_updated(
devices_added_updated)
LOG.debug(_("process_network_ports - "
"treat_devices_added_or_updated completed. "
"Skipped %(num_skipped)d devices of "
"%(num_current)d devices currently available. "
"Time elapsed: %(elapsed).3f"),
{'num_skipped': len(skipped_devices),
'num_current': len(port_info['current']),
'elapsed': time.time() - start})
# Update the list of current ports storing only those which
# have been actually processed.
port_info['current'] = (port_info['current'] -
set(skipped_devices))
except DeviceListRetrievalError:
# Need to resync as there was an error with server
# communication.
LOG.exception(_("process_network_ports - "
"failure while retrieving port details "
"from server"))
resync_a = True
if 'removed' in port_info:
resync_b = self.treat_devices_removed(port_info['removed'])
# If one of the above opertaions fails => resync with plugin
@ -1062,7 +1107,20 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
resync_a = False
resync_b = False
if 'added' in port_info:
resync_a = self.treat_ancillary_devices_added(port_info['added'])
start = time.time()
try:
self.treat_ancillary_devices_added(port_info['added'])
LOG.debug(_("process_ancillary_network_ports - "
"treat_ancillary_devices_added "
"completed in %(elapsed).3f"),
{'elapsed': time.time() - start})
except DeviceListRetrievalError:
# Need to resync as there was an error with server
# communication.
LOG.exception(_("process_ancillary_network_ports - "
"failure while retrieving "
"port details from server"))
resync_a = True
if 'removed' in port_info:
resync_b = self.treat_ancillary_devices_removed(
port_info['removed'])

View File

@ -233,10 +233,15 @@ class TestOvsNeutronAgent(base.BaseTestCase):
vif_port_set, registered_ports, port_tags_dict=port_tags_dict)
self.assertEqual(expected, actual)
def test_treat_devices_added_returns_true_for_missing_device(self):
with mock.patch.object(self.agent.plugin_rpc, 'get_device_details',
side_effect=Exception()):
self.assertTrue(self.agent.treat_devices_added_or_updated([{}]))
def test_treat_devices_added_returns_raises_for_missing_device(self):
with contextlib.nested(
mock.patch.object(self.agent.plugin_rpc, 'get_device_details',
side_effect=Exception()),
mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
return_value=mock.Mock())):
self.assertRaises(
ovs_neutron_agent.DeviceListRetrievalError,
self.agent.treat_devices_added_or_updated, [{}])
def _mock_treat_devices_added_updated(self, details, port, func_name):
"""Mock treat devices added or updated.
@ -255,7 +260,9 @@ class TestOvsNeutronAgent(base.BaseTestCase):
mock.patch.object(self.agent.plugin_rpc, 'update_device_down'),
mock.patch.object(self.agent, func_name)
) as (get_dev_fn, get_vif_func, upd_dev_up, upd_dev_down, func):
self.assertFalse(self.agent.treat_devices_added_or_updated([{}]))
skip_devs = self.agent.treat_devices_added_or_updated([{}])
# The function should not raise
self.assertFalse(skip_devs)
return func.called
def test_treat_devices_added_updated_ignores_invalid_ofport(self):
@ -270,12 +277,42 @@ class TestOvsNeutronAgent(base.BaseTestCase):
self.assertTrue(self._mock_treat_devices_added_updated(
mock.MagicMock(), port, 'port_dead'))
def test_treat_devices_added_does_not_process_missing_port(self):
with contextlib.nested(
mock.patch.object(self.agent.plugin_rpc, 'get_device_details'),
mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
return_value=None)
) as (get_dev_fn, get_vif_func):
self.assertFalse(get_dev_fn.called)
def test_treat_devices_added_updated_updates_known_port(self):
details = mock.MagicMock()
details.__contains__.side_effect = lambda x: True
self.assertTrue(self._mock_treat_devices_added_updated(
details, mock.Mock(), 'treat_vif_port'))
def test_treat_devices_added_updated_skips_if_port_not_found(self):
dev_mock = mock.MagicMock()
dev_mock.__getitem__.return_value = 'the_skipped_one'
with contextlib.nested(
mock.patch.object(self.agent.plugin_rpc,
'get_device_details',
return_value=dev_mock),
mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
return_value=None),
mock.patch.object(self.agent.plugin_rpc, 'update_device_up'),
mock.patch.object(self.agent.plugin_rpc, 'update_device_down'),
mock.patch.object(self.agent, 'treat_vif_port')
) as (get_dev_fn, get_vif_func, upd_dev_up,
upd_dev_down, treat_vif_port):
skip_devs = self.agent.treat_devices_added_or_updated([{}])
# The function should return False for resync and no device
# processed
self.assertEqual(['the_skipped_one'], skip_devs)
self.assertFalse(treat_vif_port.called)
self.assertFalse(upd_dev_down.called)
self.assertFalse(upd_dev_up.called)
def test_treat_devices_added_updated_put_port_down(self):
fake_details_dict = {'admin_state_up': False,
'port_id': 'xxx',
@ -294,7 +331,9 @@ class TestOvsNeutronAgent(base.BaseTestCase):
mock.patch.object(self.agent, 'treat_vif_port')
) as (get_dev_fn, get_vif_func, upd_dev_up,
upd_dev_down, treat_vif_port):
self.assertFalse(self.agent.treat_devices_added_or_updated([{}]))
skip_devs = self.agent.treat_devices_added_or_updated([{}])
# The function should return False for resync
self.assertFalse(skip_devs)
self.assertTrue(treat_vif_port.called)
self.assertTrue(upd_dev_down.called)
@ -321,7 +360,7 @@ class TestOvsNeutronAgent(base.BaseTestCase):
with contextlib.nested(
mock.patch.object(self.agent.sg_agent, "setup_port_filters"),
mock.patch.object(self.agent, "treat_devices_added_or_updated",
return_value=False),
return_value=[]),
mock.patch.object(self.agent, "treat_devices_removed",
return_value=False)
) as (setup_port_filters, device_added_updated, device_removed):

View File

@ -142,6 +142,7 @@ class LBaaSAgentSchedulerTestCase(test_agent_ext_plugin.AgentDBTestMixIn,
lbaas_plugin.create_pool, self.adminContext, pool)
def test_schedule_pool_with_down_agent(self):
self.skipTest("Skipping test until #1344086 is fixed.")
lbaas_hosta = {
'binary': 'neutron-loadbalancer-agent',
'host': LBAAS_HOSTA,