Merge "Do not mark device as processed if it wasn't" into stable/havana
This commit is contained in:
commit
501e498c60
|
@ -38,6 +38,7 @@ from neutron.agent import rpc as agent_rpc
|
|||
from neutron.agent import securitygroups_rpc as sg_rpc
|
||||
from neutron.common import config as logging_config
|
||||
from neutron.common import constants as q_const
|
||||
from neutron.common import exceptions
|
||||
from neutron.common import legacy
|
||||
from neutron.common import topics
|
||||
from neutron.common import utils as q_utils
|
||||
|
@ -55,6 +56,11 @@ LOG = logging.getLogger(__name__)
|
|||
DEAD_VLAN_TAG = str(q_const.MAX_VLAN_TAG + 1)
|
||||
|
||||
|
||||
class DeviceListRetrievalError(exceptions.NeutronException):
|
||||
message = _("Unable to retrieve port details for devices: %(devices)s "
|
||||
"because of error: %(error)s")
|
||||
|
||||
|
||||
# A class to represent a VIF (i.e., a port that has 'iface-id' and 'vif-mac'
|
||||
# attributes set).
|
||||
class LocalVLANMapping:
|
||||
|
@ -928,22 +934,32 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||
self.tun_br_ofports[tunnel_type].pop(remote_ip, None)
|
||||
|
||||
def treat_devices_added_or_updated(self, devices):
|
||||
resync = False
|
||||
skipped_devices = []
|
||||
devices_details_list = []
|
||||
for device in devices:
|
||||
LOG.debug(_("Processing port:%s"), device)
|
||||
try:
|
||||
# TODO(salv-orlando): Provide bulk API for retrieving
|
||||
# details for all devices in one call
|
||||
details = self.plugin_rpc.get_device_details(self.context,
|
||||
device,
|
||||
self.agent_id)
|
||||
devices_details_list.append(
|
||||
self.plugin_rpc.get_device_details(
|
||||
self.context, device, self.agent_id))
|
||||
except Exception as e:
|
||||
LOG.debug(_("Unable to get port details for "
|
||||
"%(device)s: %(e)s"),
|
||||
{'device': device, 'e': e})
|
||||
resync = True
|
||||
raise DeviceListRetrievalError(devices=devices, error=e)
|
||||
|
||||
for details in devices_details_list:
|
||||
device = details['device']
|
||||
LOG.debug(_("Processing port %s"), device)
|
||||
port = self.int_br.get_vif_port_by_id(device)
|
||||
if not port:
|
||||
# The port disappeared and cannot be processed
|
||||
LOG.info(_("Port %s was not found on the integration bridge "
|
||||
"and will therefore not be processed"), device)
|
||||
skipped_devices.append(device)
|
||||
continue
|
||||
port = self.int_br.get_vif_port_by_id(details['device'])
|
||||
|
||||
if 'port_id' in details:
|
||||
LOG.info(_("Port %(device)s updated. Details: %(details)s"),
|
||||
{'device': device, 'details': details})
|
||||
|
@ -954,6 +970,10 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||
details['segmentation_id'],
|
||||
details['admin_state_up'])
|
||||
# update plugin about port status
|
||||
# FIXME(salv-orlando): Failures while updating device status
|
||||
# must be handled appropriately. Otherwise this might prevent
|
||||
# neutron server from sending network-vif-* events to the nova
|
||||
# API server, thus possibly preventing instance spawn.
|
||||
if details.get('admin_state_up'):
|
||||
LOG.debug(_("Setting status for %s to UP"), device)
|
||||
self.plugin_rpc.update_device_up(
|
||||
|
@ -966,28 +986,32 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||
LOG.debug(_("Device %s not defined on plugin"), device)
|
||||
if (port and int(port.ofport) != -1):
|
||||
self.port_dead(port)
|
||||
return resync
|
||||
return skipped_devices
|
||||
|
||||
def treat_ancillary_devices_added(self, devices):
|
||||
resync = False
|
||||
devices_details_list = []
|
||||
for device in devices:
|
||||
LOG.info(_("Ancillary Port %s added"), device)
|
||||
try:
|
||||
self.plugin_rpc.get_device_details(self.context, device,
|
||||
self.agent_id)
|
||||
# TODO(salv-orlando): Provide bulk API for retrieving
|
||||
# details for all devices in one call
|
||||
devices_details_list.append(
|
||||
self.plugin_rpc.get_device_details(
|
||||
self.context, device, self.agent_id))
|
||||
except Exception as e:
|
||||
LOG.debug(_("Unable to get port details for "
|
||||
"%(device)s: %(e)s"),
|
||||
{'device': device, 'e': e})
|
||||
resync = True
|
||||
continue
|
||||
raise DeviceListRetrievalError(devices=devices, error=e)
|
||||
|
||||
for details in devices_details_list:
|
||||
device = details['device']
|
||||
LOG.info(_("Ancillary Port %s added"), device)
|
||||
|
||||
# update plugin about port status
|
||||
self.plugin_rpc.update_device_up(self.context,
|
||||
device,
|
||||
self.agent_id,
|
||||
cfg.CONF.host)
|
||||
return resync
|
||||
|
||||
def treat_devices_removed(self, devices):
|
||||
resync = False
|
||||
|
@ -1051,8 +1075,29 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||
devices_added_updated = (port_info.get('added', set()) |
|
||||
port_info.get('updated', set()))
|
||||
if devices_added_updated:
|
||||
resync_a = self.treat_devices_added_or_updated(
|
||||
devices_added_updated)
|
||||
start = time.time()
|
||||
try:
|
||||
skipped_devices = self.treat_devices_added_or_updated(
|
||||
devices_added_updated)
|
||||
LOG.debug(_("process_network_ports - "
|
||||
"treat_devices_added_or_updated completed. "
|
||||
"Skipped %(num_skipped)d devices of "
|
||||
"%(num_current)d devices currently available. "
|
||||
"Time elapsed: %(elapsed).3f"),
|
||||
{'num_skipped': len(skipped_devices),
|
||||
'num_current': len(port_info['current']),
|
||||
'elapsed': time.time() - start})
|
||||
# Update the list of current ports storing only those which
|
||||
# have been actually processed.
|
||||
port_info['current'] = (port_info['current'] -
|
||||
set(skipped_devices))
|
||||
except DeviceListRetrievalError:
|
||||
# Need to resync as there was an error with server
|
||||
# communication.
|
||||
LOG.exception(_("process_network_ports - "
|
||||
"failure while retrieving port details "
|
||||
"from server"))
|
||||
resync_a = True
|
||||
if 'removed' in port_info:
|
||||
resync_b = self.treat_devices_removed(port_info['removed'])
|
||||
# If one of the above opertaions fails => resync with plugin
|
||||
|
@ -1062,7 +1107,20 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||
resync_a = False
|
||||
resync_b = False
|
||||
if 'added' in port_info:
|
||||
resync_a = self.treat_ancillary_devices_added(port_info['added'])
|
||||
start = time.time()
|
||||
try:
|
||||
self.treat_ancillary_devices_added(port_info['added'])
|
||||
LOG.debug(_("process_ancillary_network_ports - "
|
||||
"treat_ancillary_devices_added "
|
||||
"completed in %(elapsed).3f"),
|
||||
{'elapsed': time.time() - start})
|
||||
except DeviceListRetrievalError:
|
||||
# Need to resync as there was an error with server
|
||||
# communication.
|
||||
LOG.exception(_("process_ancillary_network_ports - "
|
||||
"failure while retrieving "
|
||||
"port details from server"))
|
||||
resync_a = True
|
||||
if 'removed' in port_info:
|
||||
resync_b = self.treat_ancillary_devices_removed(
|
||||
port_info['removed'])
|
||||
|
|
|
@ -233,10 +233,15 @@ class TestOvsNeutronAgent(base.BaseTestCase):
|
|||
vif_port_set, registered_ports, port_tags_dict=port_tags_dict)
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
def test_treat_devices_added_returns_true_for_missing_device(self):
|
||||
with mock.patch.object(self.agent.plugin_rpc, 'get_device_details',
|
||||
side_effect=Exception()):
|
||||
self.assertTrue(self.agent.treat_devices_added_or_updated([{}]))
|
||||
def test_treat_devices_added_returns_raises_for_missing_device(self):
|
||||
with contextlib.nested(
|
||||
mock.patch.object(self.agent.plugin_rpc, 'get_device_details',
|
||||
side_effect=Exception()),
|
||||
mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
|
||||
return_value=mock.Mock())):
|
||||
self.assertRaises(
|
||||
ovs_neutron_agent.DeviceListRetrievalError,
|
||||
self.agent.treat_devices_added_or_updated, [{}])
|
||||
|
||||
def _mock_treat_devices_added_updated(self, details, port, func_name):
|
||||
"""Mock treat devices added or updated.
|
||||
|
@ -255,7 +260,9 @@ class TestOvsNeutronAgent(base.BaseTestCase):
|
|||
mock.patch.object(self.agent.plugin_rpc, 'update_device_down'),
|
||||
mock.patch.object(self.agent, func_name)
|
||||
) as (get_dev_fn, get_vif_func, upd_dev_up, upd_dev_down, func):
|
||||
self.assertFalse(self.agent.treat_devices_added_or_updated([{}]))
|
||||
skip_devs = self.agent.treat_devices_added_or_updated([{}])
|
||||
# The function should not raise
|
||||
self.assertFalse(skip_devs)
|
||||
return func.called
|
||||
|
||||
def test_treat_devices_added_updated_ignores_invalid_ofport(self):
|
||||
|
@ -270,12 +277,42 @@ class TestOvsNeutronAgent(base.BaseTestCase):
|
|||
self.assertTrue(self._mock_treat_devices_added_updated(
|
||||
mock.MagicMock(), port, 'port_dead'))
|
||||
|
||||
def test_treat_devices_added_does_not_process_missing_port(self):
|
||||
with contextlib.nested(
|
||||
mock.patch.object(self.agent.plugin_rpc, 'get_device_details'),
|
||||
mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
|
||||
return_value=None)
|
||||
) as (get_dev_fn, get_vif_func):
|
||||
self.assertFalse(get_dev_fn.called)
|
||||
|
||||
def test_treat_devices_added_updated_updates_known_port(self):
|
||||
details = mock.MagicMock()
|
||||
details.__contains__.side_effect = lambda x: True
|
||||
self.assertTrue(self._mock_treat_devices_added_updated(
|
||||
details, mock.Mock(), 'treat_vif_port'))
|
||||
|
||||
def test_treat_devices_added_updated_skips_if_port_not_found(self):
|
||||
dev_mock = mock.MagicMock()
|
||||
dev_mock.__getitem__.return_value = 'the_skipped_one'
|
||||
with contextlib.nested(
|
||||
mock.patch.object(self.agent.plugin_rpc,
|
||||
'get_device_details',
|
||||
return_value=dev_mock),
|
||||
mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
|
||||
return_value=None),
|
||||
mock.patch.object(self.agent.plugin_rpc, 'update_device_up'),
|
||||
mock.patch.object(self.agent.plugin_rpc, 'update_device_down'),
|
||||
mock.patch.object(self.agent, 'treat_vif_port')
|
||||
) as (get_dev_fn, get_vif_func, upd_dev_up,
|
||||
upd_dev_down, treat_vif_port):
|
||||
skip_devs = self.agent.treat_devices_added_or_updated([{}])
|
||||
# The function should return False for resync and no device
|
||||
# processed
|
||||
self.assertEqual(['the_skipped_one'], skip_devs)
|
||||
self.assertFalse(treat_vif_port.called)
|
||||
self.assertFalse(upd_dev_down.called)
|
||||
self.assertFalse(upd_dev_up.called)
|
||||
|
||||
def test_treat_devices_added_updated_put_port_down(self):
|
||||
fake_details_dict = {'admin_state_up': False,
|
||||
'port_id': 'xxx',
|
||||
|
@ -294,7 +331,9 @@ class TestOvsNeutronAgent(base.BaseTestCase):
|
|||
mock.patch.object(self.agent, 'treat_vif_port')
|
||||
) as (get_dev_fn, get_vif_func, upd_dev_up,
|
||||
upd_dev_down, treat_vif_port):
|
||||
self.assertFalse(self.agent.treat_devices_added_or_updated([{}]))
|
||||
skip_devs = self.agent.treat_devices_added_or_updated([{}])
|
||||
# The function should return False for resync
|
||||
self.assertFalse(skip_devs)
|
||||
self.assertTrue(treat_vif_port.called)
|
||||
self.assertTrue(upd_dev_down.called)
|
||||
|
||||
|
@ -321,7 +360,7 @@ class TestOvsNeutronAgent(base.BaseTestCase):
|
|||
with contextlib.nested(
|
||||
mock.patch.object(self.agent.sg_agent, "setup_port_filters"),
|
||||
mock.patch.object(self.agent, "treat_devices_added_or_updated",
|
||||
return_value=False),
|
||||
return_value=[]),
|
||||
mock.patch.object(self.agent, "treat_devices_removed",
|
||||
return_value=False)
|
||||
) as (setup_port_filters, device_added_updated, device_removed):
|
||||
|
|
|
@ -142,6 +142,7 @@ class LBaaSAgentSchedulerTestCase(test_agent_ext_plugin.AgentDBTestMixIn,
|
|||
lbaas_plugin.create_pool, self.adminContext, pool)
|
||||
|
||||
def test_schedule_pool_with_down_agent(self):
|
||||
self.skipTest("Skipping test until #1344086 is fixed.")
|
||||
lbaas_hosta = {
|
||||
'binary': 'neutron-loadbalancer-agent',
|
||||
'host': LBAAS_HOSTA,
|
||||
|
|
Loading…
Reference in New Issue