Merge "OVS agent reacts to events instead of polling"

This commit is contained in:
Jenkins 2015-11-09 20:55:30 +00:00 committed by Gerrit Code Review
commit a55ead3e32
5 changed files with 346 additions and 54 deletions

View File

@ -34,6 +34,7 @@ from neutron.agent.common import polling
from neutron.agent.common import utils
from neutron.agent.l2.extensions import manager as ext_manager
from neutron.agent.linux import ip_lib
from neutron.agent.linux import polling as linux_polling
from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.handlers import dvr_rpc
@ -1212,6 +1213,88 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
port_info['removed'] = registered_ports - cur_ports
return port_info
def process_ports_events(self, events, registered_ports, ancillary_ports,
updated_ports=None):
port_info = {}
port_info['added'] = set()
port_info['removed'] = set()
port_info['current'] = registered_ports
ancillary_port_info = {}
ancillary_port_info['added'] = set()
ancillary_port_info['removed'] = set()
ancillary_port_info['current'] = (
ancillary_ports if ancillary_ports else set())
# if a port was added and then removed or viceversa since the agent
# can't know the order of the operations, check the status of the port
# to determine if the port was added or deleted
device_removed_or_added = [
dev for dev in events['added'] if dev in events['removed']]
for device in device_removed_or_added:
if ovs_lib.BaseOVS().port_exists(device['name']):
events['removed'].remove(device)
else:
events['added'].remove(device)
#TODO(rossella_s): scanning the ancillary bridge won't be needed
# anymore when https://review.openstack.org/#/c/203381 since the bridge
# id stored in external_ids will be used to identify the bridge the
# port belongs to
cur_ancillary_ports = set()
for bridge in self.ancillary_brs:
cur_ancillary_ports |= bridge.get_vif_port_set()
cur_ancillary_ports |= ancillary_port_info['current']
def _process_device(device, devices, ancillary_devices):
# check 'iface-id' is set otherwise is not a port
# the agent should care about
if 'attached-mac' in device.get('external_ids', []):
iface_id = self.int_br.portid_from_external_ids(
device['external_ids'])
if iface_id:
if device['ofport'] == ovs_lib.UNASSIGNED_OFPORT:
#TODO(rossella_s) it's extreme to trigger a full resync
# if a port is not ready, resync only the device that
# is not ready
raise Exception(
_("Port %s is not ready, resync needed") % device[
'name'])
# check if device belong to ancillary bridge
if iface_id in cur_ancillary_ports:
ancillary_devices.add(iface_id)
else:
devices.add(iface_id)
for device in events['added']:
_process_device(device, port_info['added'],
ancillary_port_info['added'])
for device in events['removed']:
_process_device(device, port_info['removed'],
ancillary_port_info['removed'])
if updated_ports is None:
updated_ports = set()
updated_ports.update(self.check_changed_vlans())
# Disregard devices that were never noticed by the agent
port_info['removed'] &= port_info['current']
port_info['current'] |= port_info['added']
port_info['current'] -= port_info['removed']
ancillary_port_info['removed'] &= ancillary_port_info['current']
ancillary_port_info['current'] |= ancillary_port_info['added']
ancillary_port_info['current'] -= ancillary_port_info['removed']
if updated_ports:
# Some updated ports might have been removed in the
# meanwhile, and therefore should not be processed.
# In this case the updated port won't be found among
# current ports.
updated_ports &= port_info['current']
port_info['updated'] = updated_ports
return port_info, ancillary_port_info
def scan_ports(self, registered_ports, sync, updated_ports=None):
cur_ports = self.int_br.get_vif_port_set()
self.int_br_device_count = len(cur_ports)
@ -1656,12 +1739,64 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
LOG.info(_LI("Cleaning stale %s flows"), bridge.br_name)
bridge.cleanup_flows()
def process_port_info(self, start, polling_manager, sync, ovs_restarted,
ports, ancillary_ports, updated_ports_copy,
consecutive_resyncs):
# There are polling managers that don't have get_events, e.g.
# AlwaysPoll used by windows implementations
# REVISIT (rossella_s) This needs to be reworked to hide implementation
# details regarding polling in BasePollingManager subclasses
if sync or not (hasattr(polling_manager, 'get_events')):
if sync:
LOG.info(_LI("Agent out of sync with plugin!"))
consecutive_resyncs = consecutive_resyncs + 1
if (consecutive_resyncs >=
constants.MAX_DEVICE_RETRIES):
LOG.warn(_LW(
"Clearing cache of registered ports,"
" retries to resync were > %s"),
constants.MAX_DEVICE_RETRIES)
ports.clear()
ancillary_ports.clear()
consecutive_resyncs = 0
else:
consecutive_resyncs = 0
# NOTE(rossella_s) don't empty the queue of events
# calling polling_manager.get_events() since
# the agent might miss some event (for example a port
# deletion)
reg_ports = (set() if ovs_restarted else ports)
port_info = self.scan_ports(reg_ports, sync,
updated_ports_copy)
# Treat ancillary devices if they exist
if self.ancillary_brs:
ancillary_port_info = self.scan_ancillary_ports(
ancillary_ports, sync)
LOG.debug("Agent rpc_loop - iteration:%(iter_num)d"
" - ancillary port info retrieved. "
"Elapsed:%(elapsed).3f",
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
else:
ancillary_port_info = {}
else:
consecutive_resyncs = 0
events = polling_manager.get_events()
ancillary_ports = (
ancillary_ports if self.ancillary_brs else None)
port_info, ancillary_port_info = (
self.process_ports_events(events, ports,
ancillary_ports, updated_ports_copy))
return port_info, ancillary_port_info, consecutive_resyncs
def rpc_loop(self, polling_manager=None):
if not polling_manager:
polling_manager = polling.get_polling_manager(
minimize_polling=False)
sync = True
sync = False
ports = set()
updated_ports_copy = set()
ancillary_ports = set()
@ -1674,20 +1809,6 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
start = time.time()
LOG.debug("Agent rpc_loop - iteration:%d started",
self.iter_num)
if sync:
LOG.info(_LI("Agent out of sync with plugin!"))
polling_manager.force_polling()
consecutive_resyncs = consecutive_resyncs + 1
if consecutive_resyncs >= constants.MAX_DEVICE_RETRIES:
LOG.warn(_LW("Clearing cache of registered ports, retrials"
" to resync were > %s"),
constants.MAX_DEVICE_RETRIES)
ports.clear()
ancillary_ports.clear()
sync = False
consecutive_resyncs = 0
else:
consecutive_resyncs = 0
ovs_status = self.check_ovs_status()
if ovs_status == constants.OVS_RESTARTED:
self.setup_integration_br()
@ -1703,6 +1824,15 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
self.patch_tun_ofport)
self.dvr_agent.reset_dvr_parameters()
self.dvr_agent.setup_dvr_flows()
# restart the polling manager so that it will signal as added
# all the current ports
# REVISIT (rossella_s) Define a method "reset" in
# BasePollingManager that will be implemented by AlwaysPoll as
# no action and by InterfacePollingMinimizer as start/stop
if isinstance(
polling_manager, linux_polling.InterfacePollingMinimizer):
polling_manager.stop()
polling_manager.start()
elif ovs_status == constants.OVS_DEAD:
# Agent doesn't apply any operations when ovs is dead, to
# prevent unexpected failure or crash. Sleep and continue
@ -1719,7 +1849,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
LOG.exception(_LE("Error while synchronizing tunnels"))
tunnel_sync = True
ovs_restarted |= (ovs_status == constants.OVS_RESTARTED)
if self._agent_has_updates(polling_manager) or ovs_restarted:
if self._agent_has_updates(polling_manager) or sync:
try:
LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - "
"starting polling. Elapsed:%(elapsed).3f",
@ -1731,9 +1861,13 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
# between these two statements, this will be thread-safe
updated_ports_copy = self.updated_ports
self.updated_ports = set()
reg_ports = (set() if ovs_restarted else ports)
port_info = self.scan_ports(reg_ports, sync,
updated_ports_copy)
port_info, ancillary_port_info, consecutive_resyncs = (
self.process_port_info(
start, polling_manager, sync, ovs_restarted,
ports, ancillary_ports, updated_ports_copy,
consecutive_resyncs)
)
self.process_deleted_ports(port_info)
ofport_changed_ports = self.update_stale_ofport_rules()
if ofport_changed_ports:
@ -1744,16 +1878,6 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
"Elapsed:%(elapsed).3f",
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
# Treat ancillary devices if they exist
if self.ancillary_brs:
ancillary_port_info = self.scan_ancillary_ports(
ancillary_ports, sync)
LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - "
"ancillary port info retrieved. "
"Elapsed:%(elapsed).3f",
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
sync = False
# Secure and wire/unwire VIFs and update their status
# on Neutron server
if (self._port_info_has_changes(port_info) or

View File

@ -118,13 +118,38 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
if tunnel_types:
self.addCleanup(self.ovs.delete_bridge, self.br_tun)
agent.sg_agent = mock.Mock()
agent.ancillary_brs = None
return agent
def start_agent(self, agent, unplug_ports=None):
def _mock_get_events(self, agent, polling_manager, ports):
get_events = polling_manager.get_events
p_ids = [p['id'] for p in ports]
def filter_events():
events = get_events()
filtered_ports = []
for dev in events['added']:
iface_id = agent.int_br.portid_from_external_ids(
dev.get('external_ids', []))
if iface_id in p_ids:
# if the event is not about a port that was created by
# this test, we filter the event out. Since these tests are
# not run in isolation processing all the events might make
# some test fail ( e.g. the agent might keep resycing
# because it keeps finding not ready ports that are created
# by other tests)
filtered_ports.append(dev)
return {'added': filtered_ports, 'removed': events['removed']}
polling_manager.get_events = mock.Mock(side_effect=filter_events)
def start_agent(self, agent, ports=None, unplug_ports=None):
if unplug_ports is None:
unplug_ports = []
if ports is None:
ports = []
self.setup_agent_rpc_mocks(agent, unplug_ports)
polling_manager = polling.InterfacePollingMinimizer()
self._mock_get_events(agent, polling_manager, ports)
self.addCleanup(polling_manager.stop)
polling_manager.start()
agent_utils.wait_until_true(
@ -138,6 +163,7 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
rpc_loop_thread.wait()
self.addCleanup(stop_agent, agent, t)
return polling_manager
def _create_test_port_dict(self):
return {'id': uuidutils.generate_uuid(),
@ -280,10 +306,10 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
def setup_agent_and_ports(self, port_dicts, create_tunnels=True,
trigger_resync=False):
self.agent = self.create_agent(create_tunnels=create_tunnels)
self.start_agent(self.agent)
self.network = self._create_test_network_dict()
self.ports = port_dicts
self.agent = self.create_agent(create_tunnels=create_tunnels)
self.polling_manager = self.start_agent(self.agent, ports=self.ports)
self.network = self._create_test_network_dict()
if trigger_resync:
self._prepare_resync_trigger(self.agent)
self._plug_ports(self.network, self.ports, self.agent)

View File

@ -73,7 +73,6 @@ class TestOVSAgent(base.OVSAgentTestFramework):
port_dicts=self.create_test_ports())
self.wait_until_ports_state(self.ports, up=True)
self.agent.check_ovs_status.return_value = constants.OVS_RESTARTED
# OVS restarted, the agent should reprocess all the ports
self.agent.plugin_rpc.update_device_list.reset_mock()
self.wait_until_ports_state(self.ports, up=True)

View File

@ -401,6 +401,130 @@ class TestOvsNeutronAgent(object):
updated_ports)
self.assertEqual(expected, actual)
def test_process_ports_events_returns_current_for_unchanged_ports(self):
with mock.patch.object(self.agent, 'check_changed_vlans',
return_value=set()):
events = {'added': [], 'removed': []}
registered_ports = {1, 3}
ancillary_ports = {2, 5}
expected_ports = {'current': registered_ports, 'added': set(),
'removed': set()}
expected_ancillary = {'current': ancillary_ports, 'added': set(),
'removed': set()}
actual = self.agent.process_ports_events(events, registered_ports,
ancillary_ports)
self.assertEqual((expected_ports, expected_ancillary), actual)
def test_process_port_events_returns_port_changes(self):
events = {'added': [{'name': 'port3', 'ofport': 3,
'external_ids': {'attached-mac': 'test-mac'}},
{'name': 'qg-port2', 'ofport': 5,
'external_ids': {'attached-mac': 'test-mac'}}],
'removed': [{'name': 'port2', 'ofport': 2,
'external_ids': {'attached-mac': 'test-mac'}},
{'name': 'qg-port1', 'ofport': 4,
'external_ids': {'attached-mac': 'test-mac'}}]}
registered_ports = {1, 2}
ancillary_ports = {4}
expected_ports = dict(
current={1, 3}, added={3}, removed={2})
expected_ancillary_ports = dict(
current={5}, added={5}, removed={4})
ancillary_bridge = mock.Mock()
ancillary_bridge.get_vif_port_set.return_value = {4, 5}
self.agent.ancillary_brs = [ancillary_bridge]
with mock.patch.object(self.agent.int_br, 'portid_from_external_ids',
side_effect=[3, 5, 2, 4]), \
mock.patch.object(self.agent, 'check_changed_vlans',
return_value=set()):
actual = self.agent.process_ports_events(
events, registered_ports, ancillary_ports)
self.assertEqual(
(expected_ports, expected_ancillary_ports), actual)
def _test_process_port_events_with_updated_ports(self, updated_ports):
events = {'added': [{'name': 'port3', 'ofport': 3,
'external_ids': {'attached-mac': 'test-mac'}},
{'name': 'qg-port2', 'ofport': 6,
'external_ids': {'attached-mac': 'test-mac'}}],
'removed': [{'name': 'port2', 'ofport': 2,
'external_ids': {'attached-mac': 'test-mac'}},
{'name': 'qg-port1', 'ofport': 5,
'external_ids': {'attached-mac': 'test-mac'}}]}
registered_ports = {1, 2, 4}
ancillary_ports = {5, 8}
expected_ports = dict(current={1, 3, 4}, added={3},
removed={2}, updated={4})
expected_ancillary = dict(current={6, 8}, added={6},
removed={5})
ancillary_bridge = mock.Mock()
ancillary_bridge.get_vif_port_set.return_value = {5, 6, 8}
self.agent.ancillary_brs = [ancillary_bridge]
with mock.patch.object(self.agent.int_br, 'portid_from_external_ids',
side_effect=[3, 6, 2, 5]), \
mock.patch.object(self.agent, 'check_changed_vlans',
return_value=set()):
actual = self.agent.process_ports_events(
events, registered_ports, ancillary_ports, updated_ports)
self.assertEqual((expected_ports, expected_ancillary), actual)
def test_process_port_events_finds_known_updated_ports(self):
self._test_process_port_events_with_updated_ports({4})
def test_process_port_events_ignores_unknown_updated_ports(self):
# the port '5' was not seen on current ports. Hence it has either
# never been wired or already removed and should be ignored
self._test_process_port_events_with_updated_ports({4, 5})
def test_process_port_events_ignores_updated_port_if_removed(self):
events = {'added': [{'name': 'port3', 'ofport': 3,
'external_ids': {'attached-mac': 'test-mac'}}],
'removed': [{'name': 'port2', 'ofport': 2,
'external_ids': {'attached-mac': 'test-mac'}}]}
registered_ports = {1, 2}
updated_ports = {1, 2}
expected_ports = dict(current={1, 3}, added={3},
removed={2}, updated={1})
expected_ancillary = dict(current=set(), added=set(), removed=set())
with mock.patch.object(self.agent.int_br, 'portid_from_external_ids',
side_effect=[3, 2]), \
mock.patch.object(self.agent, 'check_changed_vlans',
return_value=set()):
actual = self.agent.process_ports_events(
events, registered_ports, None, updated_ports)
self.assertEqual((expected_ports, expected_ancillary), actual)
def test_process_port_events_no_vif_changes_return_updated_port_only(self):
events = {'added': [], 'removed': []}
registered_ports = {1, 2, 3}
updated_ports = {2}
expected_ports = dict(current=registered_ports, updated={2},
added=set(), removed=set())
expected_ancillary = dict(current=set(), added=set(), removed=set())
with mock.patch.object(self.agent, 'check_changed_vlans',
return_value=set()):
actual = self.agent.process_ports_events(
events, registered_ports, None, updated_ports)
self.assertEqual((expected_ports, expected_ancillary), actual)
def test_process_port_events_ignores_removed_port_if_never_added(self):
events = {'added': [],
'removed': [{'name': 'port2', 'ofport': 2,
'external_ids': {'attached-mac': 'test-mac'}}]}
registered_ports = {1}
expected_ports = dict(current=registered_ports, added=set(),
removed=set())
expected_ancillary = dict(current=set(), added=set(), removed=set())
with mock.patch.object(self.agent.int_br, 'portid_from_external_ids',
side_effect=[2]), \
mock.patch.object(self.agent, 'check_changed_vlans',
return_value=set()):
actual = self.agent.process_ports_events(events, registered_ports,
None)
self.assertEqual((expected_ports, expected_ancillary), actual)
def test_update_ports_returns_changed_vlan(self):
br = self.br_int_cls('br-int')
mac = "ca:fe:de:ad:be:ef"
@ -1371,11 +1495,17 @@ class TestOvsNeutronAgent(object):
'added': set([]),
'removed': set(['tap0'])}
reply_ancillary = {'current': set([]),
'added': set([]),
'removed': set([])}
with mock.patch.object(async_process.AsyncProcess, "_spawn"),\
mock.patch.object(async_process.AsyncProcess, "start"),\
mock.patch.object(async_process.AsyncProcess, "stop"),\
mock.patch.object(log.KeywordArgumentAdapter,
'exception') as log_exception,\
mock.patch.object(self.mod_agent.OVSNeutronAgent,
'scan_ports') as scan_ports,\
'process_ports_events') as process_p_events,\
mock.patch.object(
self.mod_agent.OVSNeutronAgent,
'process_network_ports') as process_network_ports,\
@ -1393,7 +1523,8 @@ class TestOvsNeutronAgent(object):
'cleanup_stale_flows') as cleanup:
log_exception.side_effect = Exception(
'Fake exception to get out of the loop')
scan_ports.side_effect = [reply2, reply3]
process_p_events.side_effect = [(reply2, reply_ancillary),
(reply3, reply_ancillary)]
process_network_ports.side_effect = [
False, Exception('Fake exception to get out of the loop')]
check_ovs_status.side_effect = args
@ -1402,10 +1533,12 @@ class TestOvsNeutronAgent(object):
except Exception:
pass
scan_ports.assert_has_calls([
mock.call(set(), True, set()),
mock.call(set(), False, set())
process_p_events.assert_has_calls([
mock.call({'removed': [], 'added': []}, set(), None, set()),
mock.call({'removed': [], 'added': []}, set(['tap0']), None,
set())
])
process_network_ports.assert_has_calls([
mock.call(reply2, False),
mock.call(reply3, True)

View File

@ -495,14 +495,24 @@ class TunnelTest(object):
self._verify_mock_calls()
def test_daemon_loop(self):
reply2 = {'current': set(['tap0']),
'added': set(['tap2']),
'removed': set([])}
reply_ge_1 = {'added': set(['tap0']),
'removed': set([])}
reply3 = {'current': set(['tap2']),
'added': set([]),
reply_ge_2 = {'added': set([]),
'removed': set(['tap0'])}
reply_pe_1 = {'current': set(['tap0']),
'added': set(['tap0']),
'removed': set([])}
reply_pe_2 = {'current': set([]),
'added': set([]),
'removed': set(['tap0'])}
reply_ancillary = {'current': set([]),
'added': set([]),
'removed': set([])}
self.mock_int_bridge_expected += [
mock.call.check_canary_table(),
mock.call.check_canary_table()
@ -513,7 +523,7 @@ class TunnelTest(object):
with mock.patch.object(log.KeywordArgumentAdapter,
'exception') as log_exception,\
mock.patch.object(self.mod_agent.OVSNeutronAgent,
'scan_ports') as scan_ports,\
'process_ports_events') as process_p_events,\
mock.patch.object(
self.mod_agent.OVSNeutronAgent,
'process_network_ports') as process_network_ports,\
@ -528,8 +538,11 @@ class TunnelTest(object):
'cleanup_stale_flows') as cleanup:
log_exception.side_effect = Exception(
'Fake exception to get out of the loop')
scan_ports.side_effect = [reply2, reply3]
update_stale.return_value = []
process_p_events.side_effect = [
(reply_pe_1, reply_ancillary), (reply_pe_2, reply_ancillary)]
interface_polling = mock.Mock()
interface_polling.get_events.side_effect = [reply_ge_1, reply_ge_2]
process_network_ports.side_effect = [
False, Exception('Fake exception to get out of the loop')]
@ -539,7 +552,7 @@ class TunnelTest(object):
# We start method and expect it will raise after 2nd loop
# If something goes wrong, assert_has_calls below will catch it
try:
n_agent.daemon_loop()
n_agent.rpc_loop(interface_polling)
except Exception:
pass
@ -547,17 +560,14 @@ class TunnelTest(object):
# messages
log_exception.assert_called_once_with(
"Error while processing VIF ports")
scan_ports.assert_has_calls([
mock.call(set(), True, set()),
mock.call(set(['tap0']), False, set())
process_p_events.assert_has_calls([
mock.call(reply_ge_1, set(), None, set()),
mock.call(reply_ge_2, set(['tap0']), None, set())
])
process_network_ports.assert_has_calls([
mock.call({'current': set(['tap0']),
'removed': set([]),
'added': set(['tap2'])}, False),
mock.call({'current': set(['tap2']),
'removed': set(['tap0']),
'added': set([])}, False)
'added': set(['tap0'])}, False),
])
cleanup.assert_called_once_with()