Graceful ovs-agent restart

When agent is restarted it drops all existing flows. This
breaks all networking until the flows are re-created.

This change adds an ability to drop only old flows.
Agent_uuid_stamp is added for agents. This agent_uuid_stamp is set as
cookie for flows and then flows with stale cookies are deleted during
cleanup.

Co-Authored-By: Ann Kamyshnikova<akamyshnikova@mirantis.com>

Closes-bug: #1383674

DocImpact

Change-Id: I95070d8218859d4fff1d572c1792cdf6019dd7ea
This commit is contained in:
Eugene Nikanorov 2015-05-11 03:10:29 +04:00 committed by Ann Kamyshnikova
parent a74e7c7b13
commit 73673beacd
15 changed files with 344 additions and 69 deletions

View File

@ -171,8 +171,12 @@ class OVSBridge(BaseOVS):
self.set_db_attribute('Bridge', self.br_name, 'protocols', protocols,
check_error=True)
def create(self):
self.ovsdb.add_br(self.br_name).execute()
def create(self, secure_mode=False):
with self.ovsdb.transaction() as txn:
txn.add(self.ovsdb.add_br(self.br_name))
if secure_mode:
txn.add(self.ovsdb.set_fail_mode(self.br_name,
FAILMODE_SECURE))
# Don't return until vswitchd sets up the internal port
self.get_port_ofport(self.br_name)
@ -268,6 +272,10 @@ class OVSBridge(BaseOVS):
if 'NXST' not in item)
return retval
def dump_all_flows(self):
return [f for f in self.run_ofctl("dump-flows", []).splitlines()
if 'NXST' not in f]
def deferred(self, **kwargs):
return DeferredOVSBridge(self, **kwargs)

View File

@ -97,7 +97,10 @@ agent_opts = [
cfg.IntOpt('quitting_rpc_timeout', default=10,
help=_("Set new timeout in seconds for new rpc calls after "
"agent receives SIGTERM. If value is set to 0, rpc "
"timeout won't be changed"))
"timeout won't be changed")),
cfg.BoolOpt('drop_flows_on_start', default=False,
help=_("Reset flow table on start. Setting this to True will "
"cause brief traffic interruption."))
]

View File

@ -29,7 +29,6 @@ class OVSIntegrationBridge(ovs_bridge.OVSAgentBridge):
"""openvswitch agent br-int specific logic."""
def setup_default_table(self):
self.delete_flows()
self.install_normal()
self.setup_canary_table()
self.install_drop(table_id=constants.ARP_SPOOF_TABLE)

View File

@ -98,7 +98,8 @@ class OVSTunnelBridge(ovs_bridge.OVSAgentBridge,
# to dynamically set-up flows in UCAST_TO_TUN corresponding to
# remote mac addresses (assumes that lvid has already been set by
# a previous flow)
learned_flow = ("table=%s,"
learned_flow = ("cookie=%(cookie)s,"
"table=%(table)s,"
"priority=1,"
"hard_timeout=300,"
"NXM_OF_VLAN_TCI[0..11],"
@ -106,7 +107,8 @@ class OVSTunnelBridge(ovs_bridge.OVSAgentBridge,
"load:0->NXM_OF_VLAN_TCI[],"
"load:NXM_NX_TUN_ID[]->NXM_NX_TUN_ID[],"
"output:NXM_OF_IN_PORT[]" %
constants.UCAST_TO_TUN)
{'cookie': self.agent_uuid_stamp,
'table': constants.UCAST_TO_TUN})
# Once remote mac addresses are learnt, output packet to patch_int
deferred_br.add_flow(table=constants.LEARN_FROM_TUN,
priority=1,

View File

@ -14,6 +14,14 @@
# License for the specific language governing permissions and limitations
# under the License.
import re
from oslo_log import log as logging
from neutron.i18n import _LW
LOG = logging.getLogger(__name__)
# Field name mappings (from Ryu to ovs-ofctl)
_keywords = {
'eth_src': 'dl_src',
@ -26,6 +34,10 @@ _keywords = {
class OpenFlowSwitchMixin(object):
"""Mixin to provide common convenient routines for an openflow switch."""
agent_uuid_stamp = '0x0'
def set_agent_uuid_stamp(self, val):
self.agent_uuid_stamp = val
@staticmethod
def _conv_args(kwargs):
@ -37,6 +49,9 @@ class OpenFlowSwitchMixin(object):
def dump_flows(self, table_id):
return self.dump_flows_for_table(table_id)
def dump_flows_all_tables(self):
return self.dump_all_flows()
def install_goto_next(self, table_id):
self.install_goto(table_id=table_id, dest_table_id=table_id + 1)
@ -72,3 +87,36 @@ class OpenFlowSwitchMixin(object):
**self._conv_args(kwargs))
else:
super(OpenFlowSwitchMixin, self).remove_all_flows()
def add_flow(self, **kwargs):
kwargs['cookie'] = self.agent_uuid_stamp
super(OpenFlowSwitchMixin, self).add_flow(**self._conv_args(kwargs))
def mod_flow(self, **kwargs):
kwargs['cookie'] = self.agent_uuid_stamp
super(OpenFlowSwitchMixin, self).mod_flow(**self._conv_args(kwargs))
def _filter_flows(self, flows):
LOG.debug("Agent uuid stamp used to filter flows: %s",
self.agent_uuid_stamp)
cookie_re = re.compile('cookie=(0x[A-Fa-f0-9]*)')
table_re = re.compile('table=([0-9]*)')
for flow in flows:
fl_cookie = cookie_re.search(flow)
if not fl_cookie:
continue
fl_cookie = fl_cookie.group(1)
if int(fl_cookie, 16) != self.agent_uuid_stamp:
fl_table = table_re.search(flow)
if not fl_table:
continue
fl_table = fl_table.group(1)
yield flow, fl_cookie, fl_table
def cleanup_flows(self):
flows = self.dump_flows_all_tables()
for flow, cookie, table in self._filter_flows(flows):
# deleting a stale flow should be rare.
# it might deserve some attention
LOG.warning(_LW("Deleting flow %s"), flow)
self.delete_flows(cookie=cookie + '/-1', table=table)

View File

@ -17,6 +17,7 @@ import hashlib
import signal
import sys
import time
import uuid
import netaddr
from oslo_config import cfg
@ -57,6 +58,7 @@ cfg.CONF.import_group('OVS', 'neutron.plugins.ml2.drivers.openvswitch.agent.'
# A placeholder for dead vlans.
DEAD_VLAN_TAG = p_const.MAX_VLAN_TAG + 1
UINT64_BITMASK = (1 << 64) - 1
class _mac_mydialect(netaddr.mac_unix):
@ -216,6 +218,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
# Keep track of int_br's device count for use by _report_state()
self.int_br_device_count = 0
self.agent_uuid_stamp = uuid.uuid4().int & UINT64_BITMASK
self.int_br = self.br_int_cls(integ_br)
self.setup_integration_br()
# Stores port update notifications for processing in main rpc loop
@ -244,8 +248,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
self.patch_tun_ofport = constants.OFPORT_INVALID
if self.enable_tunneling:
# The patch_int_ofport and patch_tun_ofport are updated
# here inside the call to reset_tunnel_br()
self.reset_tunnel_br(tun_br)
# here inside the call to setup_tunnel_br()
self.setup_tunnel_br(tun_br)
self.dvr_agent = ovs_dvr_neutron_agent.OVSDVRNeutronAgent(
self.context,
@ -269,7 +273,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
heartbeat.start(interval=report_interval)
if self.enable_tunneling:
self.setup_tunnel_br()
self.setup_tunnel_br_flows()
self.dvr_agent.setup_dvr_flows()
@ -872,8 +876,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
def setup_integration_br(self):
'''Setup the integration bridge.
Delete patch ports and remove all existing flows.
'''
self.int_br.set_agent_uuid_stamp(self.agent_uuid_stamp)
# Ensure the integration bridge is created.
# ovs_lib.OVSBridge.create() will run
# ovs-vsctl -- --may-exist add-br BRIDGE_NAME
@ -883,7 +887,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
self.int_br.setup_controllers(self.conf)
self.int_br.delete_port(self.conf.OVS.int_peer_patch_port)
if self.conf.AGENT.drop_flows_on_start:
self.int_br.delete_flows()
self.int_br.setup_default_table()
def setup_ancillary_bridges(self, integ_br, tun_br):
@ -912,7 +917,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
ancillary_bridges.append(br)
return ancillary_bridges
def reset_tunnel_br(self, tun_br_name=None):
def setup_tunnel_br(self, tun_br_name=None):
'''(re)initialize the tunnel bridge.
Creates tunnel bridge, and links it to the integration bridge
@ -922,15 +927,21 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
'''
if not self.tun_br:
self.tun_br = self.br_tun_cls(tun_br_name)
self.tun_br.set_agent_uuid_stamp(self.agent_uuid_stamp)
self.tun_br.reset_bridge(secure_mode=True)
if not self.tun_br.bridge_exists('br-tun'):
self.tun_br.create(secure_mode=True)
self.tun_br.setup_controllers(self.conf)
self.patch_tun_ofport = self.int_br.add_patch_port(
self.conf.OVS.int_peer_patch_port,
self.conf.OVS.tun_peer_patch_port)
self.patch_int_ofport = self.tun_br.add_patch_port(
self.conf.OVS.tun_peer_patch_port,
self.conf.OVS.int_peer_patch_port)
if (not self.int_br.port_exists(self.conf.OVS.int_peer_patch_port) or
self.patch_tun_ofport == ovs_lib.INVALID_OFPORT):
self.patch_tun_ofport = self.int_br.add_patch_port(
self.conf.OVS.int_peer_patch_port,
self.conf.OVS.tun_peer_patch_port)
if (not self.tun_br.port_exists(self.conf.OVS.tun_peer_patch_port) or
self.patch_int_ofport == ovs_lib.INVALID_OFPORT):
self.patch_int_ofport = self.tun_br.add_patch_port(
self.conf.OVS.tun_peer_patch_port,
self.conf.OVS.int_peer_patch_port)
if ovs_lib.INVALID_OFPORT in (self.patch_tun_ofport,
self.patch_int_ofport):
LOG.error(_LE("Failed to create OVS patch port. Cannot have "
@ -938,9 +949,10 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
"version of OVS does not support tunnels or patch "
"ports. Agent terminated!"))
exit(1)
self.tun_br.delete_flows()
if self.conf.AGENT.drop_flows_on_start:
self.tun_br.delete_flows()
def setup_tunnel_br(self):
def setup_tunnel_br_flows(self):
'''Setup the tunnel bridge.
Add all flows to the tunnel bridge.
@ -1008,9 +1020,15 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
bridge)
phys_if_name = self.get_peer_name(constants.PEER_PHYSICAL_PREFIX,
bridge)
self.int_br.delete_port(int_if_name)
br.delete_port(phys_if_name)
# Interface type of port for physical and integration bridges must
# be same, so check only one of them.
int_type = self.int_br.db_get_val("Interface", int_if_name, "type")
if self.use_veth_interconnection:
# Drop ports if the interface types doesn't match the
# configuration value.
if int_type == 'patch':
self.int_br.delete_port(int_if_name)
br.delete_port(phys_if_name)
if ip_lib.device_exists(int_if_name):
ip_lib.IPDevice(int_if_name).link.delete()
# Give udev a chance to process its rules here, to avoid
@ -1022,6 +1040,11 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
int_ofport = self.int_br.add_port(int_veth)
phys_ofport = br.add_port(phys_veth)
else:
# Drop ports if the interface type doesn't match the
# configuration value
if int_type == 'veth':
self.int_br.delete_port(int_if_name)
br.delete_port(phys_if_name)
# Create patch ports without associating them in order to block
# untranslated traffic before association
int_ofport = self.int_br.add_patch_port(
@ -1515,6 +1538,15 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
'removed': len(ancillary_port_info.get('removed', []))}
return port_stats
def cleanup_stale_flows(self):
if self.iter_num == 0:
bridges = [self.int_br]
if self.enable_tunneling:
bridges.append(self.tun_br)
for bridge in bridges:
LOG.info(_LI("Cleaning stale %s flows"), bridge.br_name)
bridge.cleanup_flows()
def rpc_loop(self, polling_manager=None):
if not polling_manager:
polling_manager = polling.get_polling_manager(
@ -1543,8 +1575,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
self.setup_integration_br()
self.setup_physical_bridges(self.bridge_mappings)
if self.enable_tunneling:
self.reset_tunnel_br()
self.setup_tunnel_br()
self.setup_tunnel_br_flows()
tunnel_sync = True
if self.enable_distributed_routing:
self.dvr_agent.reset_ovs_parameters(self.int_br,
@ -1613,6 +1645,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
# If treat devices fails - must resync with plugin
sync = self.process_network_ports(port_info,
ovs_restarted)
self.cleanup_stale_flows()
LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - "
"ports processed. Elapsed:%(elapsed).3f",
{'iter_num': self.iter_num,

View File

@ -14,6 +14,8 @@
#
import abc
from concurrent import futures
import contextlib
import functools
import os
import random
@ -86,6 +88,17 @@ def assert_ping(src_namespace, dst_ip, timeout=1, count=1):
dst_ip])
@contextlib.contextmanager
def async_ping(namespace, ips):
with futures.ThreadPoolExecutor(max_workers=len(ips)) as executor:
fs = [executor.submit(assert_ping, namespace, ip, count=10)
for ip in ips]
yield lambda: all(f.done() for f in fs)
futures.wait(fs)
for f in fs:
f.result()
def assert_no_ping(src_namespace, dst_ip, timeout=1, count=1):
try:
assert_ping(src_namespace, dst_ip, timeout, count)

View File

@ -43,6 +43,7 @@ from neutron.plugins.ml2.drivers.openvswitch.agent.openflow.ovs_ofctl \
import br_tun
from neutron.plugins.ml2.drivers.openvswitch.agent import ovs_neutron_agent \
as ovs_agent
from neutron.tests.common import net_helpers
from neutron.tests.functional.agent.linux import base
LOG = logging.getLogger(__name__)
@ -66,6 +67,7 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
self.ovs = ovs_lib.BaseOVS()
self.config = self._configure_agent()
self.driver = interface.OVSInterfaceDriver(self.config)
self.namespace = self.useFixture(net_helpers.NamespaceFixture()).name
def _get_config_opts(self):
config = cfg.ConfigOpts()
@ -169,10 +171,11 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
self.driver.plug(
network.get('id'), port.get('id'), port.get('vif_name'),
port.get('mac_address'),
agent.int_br.br_name, namespace=None)
agent.int_br.br_name, namespace=self.namespace)
ip_cidrs = ["%s/%s" % (port.get('fixed_ips')[0][
'ip_address'], ip_len)]
self.driver.init_l3(port.get('vif_name'), ip_cidrs, namespace=None)
self.driver.init_l3(port.get('vif_name'), ip_cidrs,
namespace=self.namespace)
def _get_device_details(self, port, network):
dev = {'device': port['id'],
@ -276,8 +279,9 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
lambda: self._expected_plugin_rpc_call(
self.agent.plugin_rpc.update_device_list, port_ids, up))
def setup_agent_and_ports(self, port_dicts, trigger_resync=False):
self.agent = self.create_agent()
def setup_agent_and_ports(self, port_dicts, create_tunnels=True,
trigger_resync=False):
self.agent = self.create_agent(create_tunnels=create_tunnels)
self.start_agent(self.agent)
self.network = self._create_test_network_dict()
self.ports = port_dicts

View File

@ -14,7 +14,9 @@
# License for the specific language governing permissions and limitations
# under the License.
import time
from neutron.tests.common import net_helpers
from neutron.tests.functional.agent.l2 import base
@ -54,3 +56,13 @@ class TestOVSAgent(base.OVSAgentTestFramework):
self.create_agent(create_tunnels=False)
self.assertTrue(self.ovs.bridge_exists(self.br_int))
self.assertFalse(self.ovs.bridge_exists(self.br_tun))
def test_assert_pings_during_br_int_setup_not_lost(self):
self.setup_agent_and_ports(port_dicts=self.create_test_ports(),
create_tunnels=False)
self.wait_until_ports_state(self.ports, up=True)
ips = [port['fixed_ips'][0]['ip_address'] for port in self.ports]
with net_helpers.async_ping(self.namespace, ips) as running:
while running():
self.agent.setup_integration_br()
time.sleep(0.25)

View File

@ -182,29 +182,36 @@ class OVS_Lib_Test(base.BaseTestCase):
cidr = '192.168.1.0/24'
flow_dict_1 = collections.OrderedDict([
('cookie', 1234),
('priority', 2),
('dl_src', 'ca:fe:de:ad:be:ef'),
('actions', 'strip_vlan,output:0')])
flow_dict_2 = collections.OrderedDict([
('cookie', 1254),
('priority', 1),
('actions', 'normal')])
flow_dict_3 = collections.OrderedDict([
('cookie', 1257),
('priority', 2),
('actions', 'drop')])
flow_dict_4 = collections.OrderedDict([
('cookie', 1274),
('priority', 2),
('in_port', ofport),
('actions', 'drop')])
flow_dict_5 = collections.OrderedDict([
('cookie', 1284),
('priority', 4),
('in_port', ofport),
('dl_vlan', vid),
('actions', "strip_vlan,set_tunnel:%s,normal" % (lsw_id))])
flow_dict_6 = collections.OrderedDict([
('cookie', 1754),
('priority', 3),
('tun_id', lsw_id),
('actions', "mod_vlan_vid:%s,output:%s" % (vid, ofport))])
flow_dict_7 = collections.OrderedDict([
('cookie', 1256),
('priority', 4),
('nw_src', cidr),
('proto', 'arp'),
@ -220,36 +227,39 @@ class OVS_Lib_Test(base.BaseTestCase):
expected_calls = [
self._ofctl_mock("add-flows", self.BR_NAME, '-',
process_input=OFCTLParamListMatcher(
"hard_timeout=0,idle_timeout=0,"
"hard_timeout=0,idle_timeout=0,cookie=1234,"
"priority=2,dl_src=ca:fe:de:ad:be:ef,"
"actions=strip_vlan,output:0")),
self._ofctl_mock("add-flows", self.BR_NAME, '-',
process_input=OFCTLParamListMatcher(
"hard_timeout=0,idle_timeout=0,"
"hard_timeout=0,idle_timeout=0,cookie=1254,"
"priority=1,actions=normal")),
self._ofctl_mock("add-flows", self.BR_NAME, '-',
process_input=OFCTLParamListMatcher(
"hard_timeout=0,idle_timeout=0,"
"hard_timeout=0,idle_timeout=0,cookie=1257,"
"priority=2,actions=drop")),
self._ofctl_mock("add-flows", self.BR_NAME, '-',
process_input=OFCTLParamListMatcher(
"hard_timeout=0,idle_timeout=0,priority=2,"
"in_port=%s,actions=drop" % ofport)),
"hard_timeout=0,idle_timeout=0,cookie=1274,"
"priority=2,in_port=%s,actions=drop" % ofport
)),
self._ofctl_mock("add-flows", self.BR_NAME, '-',
process_input=OFCTLParamListMatcher(
"hard_timeout=0,idle_timeout=0,"
"hard_timeout=0,idle_timeout=0,cookie=1284,"
"priority=4,dl_vlan=%s,in_port=%s,"
"actions=strip_vlan,set_tunnel:%s,normal" %
(vid, ofport, lsw_id))),
self._ofctl_mock("add-flows", self.BR_NAME, '-',
process_input=OFCTLParamListMatcher(
"hard_timeout=0,idle_timeout=0,priority=3,"
"tun_id=%s,actions=mod_vlan_vid:%s,"
"output:%s" % (lsw_id, vid, ofport))),
"hard_timeout=0,idle_timeout=0,cookie=1754,"
"priority=3,"
"tun_id=%s,actions=mod_vlan_vid:%s,output:%s"
% (lsw_id, vid, ofport))),
self._ofctl_mock("add-flows", self.BR_NAME, '-',
process_input=OFCTLParamListMatcher(
"hard_timeout=0,idle_timeout=0,priority=4,"
"nw_src=%s,arp,actions=drop" % cidr)),
"hard_timeout=0,idle_timeout=0,cookie=1256,"
"priority=4,nw_src=%s,arp,actions=drop"
% cidr)),
]
self.execute.assert_has_calls(expected_calls)
@ -269,6 +279,7 @@ class OVS_Lib_Test(base.BaseTestCase):
def test_add_flow_timeout_set(self):
flow_dict = collections.OrderedDict([
('cookie', 1234),
('priority', 1),
('hard_timeout', 1000),
('idle_timeout', 2000),
@ -277,17 +288,18 @@ class OVS_Lib_Test(base.BaseTestCase):
self.br.add_flow(**flow_dict)
self._verify_ofctl_mock(
"add-flows", self.BR_NAME, '-',
process_input="hard_timeout=1000,idle_timeout=2000,priority=1,"
"actions=normal")
process_input="hard_timeout=1000,idle_timeout=2000,"
"priority=1,cookie=1234,actions=normal")
def test_add_flow_default_priority(self):
flow_dict = collections.OrderedDict([('actions', 'normal')])
flow_dict = collections.OrderedDict([('actions', 'normal'),
('cookie', 1234)])
self.br.add_flow(**flow_dict)
self._verify_ofctl_mock(
"add-flows", self.BR_NAME, '-',
process_input="hard_timeout=0,idle_timeout=0,priority=1,"
"actions=normal")
"cookie=1234,actions=normal")
def _test_get_port_ofport(self, ofport, expected_result):
pname = "tap99"

View File

@ -80,6 +80,17 @@ class OVSBridgeTestBase(ovs_test_base.OVSOFCtlTestBase):
]
self.assertEqual(expected, self.mock.mock_calls)
def test_dump_flows_for_table(self):
table = 23
with mock.patch.object(self.br, 'run_ofctl') as run_ofctl:
self.br.dump_flows(table)
run_ofctl.assert_has_calls([mock.call("dump-flows", mock.ANY)])
def test_dump_all_flows(self):
with mock.patch.object(self.br, 'run_ofctl') as run_ofctl:
self.br.dump_flows_all_tables()
run_ofctl.assert_has_calls([mock.call("dump-flows", [])])
class OVSDVRProcessTestMixin(object):
def test_install_dvr_process_ipv4(self):

View File

@ -31,7 +31,6 @@ class OVSIntegrationBridgeTest(ovs_bridge_test_base.OVSBridgeTestBase):
def test_setup_default_table(self):
self.br.setup_default_table()
expected = [
call.delete_flows(),
call.add_flow(priority=0, table=0, actions='normal'),
call.add_flow(priority=0, table=23, actions='drop'),
call.add_flow(priority=0, table=24, actions='drop'),

View File

@ -54,7 +54,7 @@ class OVSTunnelBridgeTest(ovs_bridge_test_base.OVSBridgeTestBase,
{'priority': 0, 'table': 3, 'actions': 'drop'},
{'priority': 0, 'table': 4, 'actions': 'drop'},
{'priority': 1, 'table': 10,
'actions': 'learn(table=20,priority=1,'
'actions': 'learn(cookie=0x0,table=20,priority=1,'
'hard_timeout=300,NXM_OF_VLAN_TCI[0..11],'
'NXM_OF_ETH_DST[]=NXM_OF_ETH_SRC[],'
'load:0->NXM_OF_VLAN_TCI[],'
@ -88,7 +88,7 @@ class OVSTunnelBridgeTest(ovs_bridge_test_base.OVSBridgeTestBase,
{'priority': 0, 'table': 3, 'actions': 'drop'},
{'priority': 0, 'table': 4, 'actions': 'drop'},
{'priority': 1, 'table': 10,
'actions': 'learn(table=20,priority=1,'
'actions': 'learn(cookie=0x0,table=20,priority=1,'
'hard_timeout=300,NXM_OF_VLAN_TCI[0..11],'
'NXM_OF_ETH_DST[]=NXM_OF_ETH_SRC[],'
'load:0->NXM_OF_VLAN_TCI[],'

View File

@ -405,6 +405,9 @@ class TestOvsNeutronAgent(object):
'devices_down': details,
'failed_devices_up': [],
'failed_devices_down': []}),\
mock.patch.object(self.agent.int_br,
'get_port_tag_dict',
return_value={}),\
mock.patch.object(self.agent, func_name) as func:
skip_devs, need_bound_devices = (
self.agent.treat_devices_added_or_updated([{}], False))
@ -469,6 +472,9 @@ class TestOvsNeutronAgent(object):
'get_devices_details_list_and_failed_devices',
return_value={'devices': [dev_mock],
'failed_devices': None}),\
mock.patch.object(self.agent.int_br,
'get_port_tag_dict',
return_value={}),\
mock.patch.object(self.agent.int_br,
'get_vifs_by_ids',
return_value={}),\
@ -500,6 +506,8 @@ class TestOvsNeutronAgent(object):
mock.patch.object(self.agent.int_br,
'get_vifs_by_ids',
return_value={'xxx': mock.MagicMock()}),\
mock.patch.object(self.agent.int_br, 'get_port_tag_dict',
return_value={}),\
mock.patch.object(self.agent,
'treat_vif_port') as treat_vif_port:
skip_devs, need_bound_devices = (
@ -655,8 +663,11 @@ class TestOvsNeutronAgent(object):
mock.call.phys_br_cls('br-eth'),
mock.call.phys_br.setup_controllers(mock.ANY),
mock.call.phys_br.setup_default_table(),
mock.call.int_br.delete_port('int-br-eth'),
mock.call.phys_br.delete_port('phy-br-eth'),
mock.call.int_br.db_get_val('Interface', 'int-br-eth',
'type'),
# Have to use __getattr__ here to avoid mock._Call.__eq__
# method being called
mock.call.int_br.db_get_val().__getattr__('__eq__')('veth'),
mock.call.int_br.add_patch_port('int-br-eth',
constants.NONEXISTENT_PEER),
mock.call.phys_br.add_patch_port('phy-br-eth',
@ -713,6 +724,46 @@ class TestOvsNeutronAgent(object):
self.assertEqual(self.agent.phys_ofports["physnet1"],
"phys_veth_ofport")
def test_setup_physical_bridges_change_from_veth_to_patch_conf(self):
with mock.patch.object(sys, "exit"),\
mock.patch.object(utils, "execute"),\
mock.patch.object(self.agent, 'br_phys_cls') as phys_br_cls,\
mock.patch.object(self.agent, 'int_br') as int_br,\
mock.patch.object(self.agent.int_br, 'db_get_val',
return_value='veth'):
phys_br = phys_br_cls()
parent = mock.MagicMock()
parent.attach_mock(phys_br_cls, 'phys_br_cls')
parent.attach_mock(phys_br, 'phys_br')
parent.attach_mock(int_br, 'int_br')
phys_br.add_patch_port.return_value = "phy_ofport"
int_br.add_patch_port.return_value = "int_ofport"
self.agent.setup_physical_bridges({"physnet1": "br-eth"})
expected_calls = [
mock.call.phys_br_cls('br-eth'),
mock.call.phys_br.setup_controllers(mock.ANY),
mock.call.phys_br.setup_default_table(),
mock.call.int_br.delete_port('int-br-eth'),
mock.call.phys_br.delete_port('phy-br-eth'),
mock.call.int_br.add_patch_port('int-br-eth',
constants.NONEXISTENT_PEER),
mock.call.phys_br.add_patch_port('phy-br-eth',
constants.NONEXISTENT_PEER),
mock.call.int_br.drop_port(in_port='int_ofport'),
mock.call.phys_br.drop_port(in_port='phy_ofport'),
mock.call.int_br.set_db_attribute('Interface', 'int-br-eth',
'options:peer',
'phy-br-eth'),
mock.call.phys_br.set_db_attribute('Interface', 'phy-br-eth',
'options:peer',
'int-br-eth'),
]
parent.assert_has_calls(expected_calls)
self.assertEqual(self.agent.int_ofports["physnet1"],
"int_ofport")
self.assertEqual(self.agent.phys_ofports["physnet1"],
"phy_ofport")
def test_get_peer_name(self):
bridge1 = "A_REALLY_LONG_BRIDGE_NAME1"
bridge2 = "A_REALLY_LONG_BRIDGE_NAME2"
@ -728,15 +779,49 @@ class TestOvsNeutronAgent(object):
self.tun_br = mock.Mock()
with mock.patch.object(self.agent.int_br,
"add_patch_port",
return_value=1) as intbr_patch_fn,\
mock.patch.object(self.agent,
'tun_br',
autospec=True) as tun_br,\
return_value=1) as int_patch_port,\
mock.patch.object(self.agent.tun_br,
"add_patch_port",
return_value=1) as tun_patch_port,\
mock.patch.object(self.agent.tun_br, 'bridge_exists',
return_value=False),\
mock.patch.object(self.agent.tun_br, 'create') as create_tun,\
mock.patch.object(self.agent.tun_br,
'setup_controllers') as setup_controllers,\
mock.patch.object(self.agent.tun_br, 'port_exists',
return_value=False),\
mock.patch.object(self.agent.int_br, 'port_exists',
return_value=False),\
mock.patch.object(sys, "exit"):
tun_br.add_patch_port.return_value = 2
self.agent.reset_tunnel_br(None)
self.agent.setup_tunnel_br(None)
self.agent.setup_tunnel_br()
self.assertTrue(intbr_patch_fn.called)
self.assertTrue(create_tun.called)
self.assertTrue(setup_controllers.called)
self.assertTrue(int_patch_port.called)
self.assertTrue(tun_patch_port.called)
def test_setup_tunnel_br_ports_exits_drop_flows(self):
cfg.CONF.set_override('drop_flows_on_start', True, 'AGENT')
with mock.patch.object(self.agent.tun_br, 'port_exists',
return_value=True),\
mock.patch.object(self.agent, 'tun_br'),\
mock.patch.object(self.agent.int_br, 'port_exists',
return_value=True),\
mock.patch.object(self.agent.tun_br, 'setup_controllers'),\
mock.patch.object(self.agent, 'patch_tun_ofport', new=2),\
mock.patch.object(self.agent, 'patch_int_ofport', new=2),\
mock.patch.object(self.agent.tun_br,
'delete_flows') as delete,\
mock.patch.object(self.agent.int_br,
"add_patch_port") as int_patch_port,\
mock.patch.object(self.agent.tun_br,
"add_patch_port") as tun_patch_port,\
mock.patch.object(sys, "exit"):
self.agent.setup_tunnel_br(None)
self.agent.setup_tunnel_br()
self.assertFalse(int_patch_port.called)
self.assertFalse(tun_patch_port.called)
self.assertTrue(delete.called)
def test_setup_tunnel_port(self):
self.agent.tun_br = mock.Mock()
@ -999,12 +1084,15 @@ class TestOvsNeutronAgent(object):
return_value=fake_tunnel_details),\
mock.patch.object(
self.agent,
'_setup_tunnel_port') as _setup_tunnel_port_fn:
'_setup_tunnel_port') as _setup_tunnel_port_fn,\
mock.patch.object(self.agent,
'cleanup_stale_flows') as cleanup:
self.agent.tunnel_types = ['vxlan']
self.agent.tunnel_sync()
expected_calls = [mock.call(self.agent.tun_br, 'vxlan-64651f0f',
'100.101.31.15', 'vxlan')]
_setup_tunnel_port_fn.assert_has_calls(expected_calls)
self.assertEqual([], cleanup.mock_calls)
def test_tunnel_sync_invalid_ip_address(self):
fake_tunnel_details = {'tunnels': [{'ip_address': '300.300.300.300'},
@ -1014,13 +1102,16 @@ class TestOvsNeutronAgent(object):
return_value=fake_tunnel_details),\
mock.patch.object(
self.agent,
'_setup_tunnel_port') as _setup_tunnel_port_fn:
'_setup_tunnel_port') as _setup_tunnel_port_fn,\
mock.patch.object(self.agent,
'cleanup_stale_flows') as cleanup:
self.agent.tunnel_types = ['vxlan']
self.agent.tunnel_sync()
_setup_tunnel_port_fn.assert_called_once_with(self.agent.tun_br,
'vxlan-64646464',
'100.100.100.100',
'vxlan')
self.assertEqual([], cleanup.mock_calls)
def test_tunnel_update(self):
kwargs = {'tunnel_ip': '10.10.10.10',
@ -1070,8 +1161,11 @@ class TestOvsNeutronAgent(object):
mock.patch.object(self.mod_agent.OVSNeutronAgent,
'setup_physical_bridges') as setup_phys_br,\
mock.patch.object(time, 'sleep'),\
mock.patch.object(
self.mod_agent.OVSNeutronAgent,
'update_stale_ofport_rules') as update_stale, \
mock.patch.object(self.mod_agent.OVSNeutronAgent,
'update_stale_ofport_rules') as update_stale:
'cleanup_stale_flows') as cleanup:
log_exception.side_effect = Exception(
'Fake exception to get out of the loop')
scan_ports.side_effect = [reply2, reply3]
@ -1091,6 +1185,7 @@ class TestOvsNeutronAgent(object):
mock.call(reply2, False),
mock.call(reply3, True)
])
cleanup.assert_called_once_with()
self.assertTrue(update_stale.called)
# Verify the OVS restart we triggered in the loop
# re-setup the bridges
@ -1113,6 +1208,24 @@ class TestOvsNeutronAgent(object):
self.agent.state_rpc.client):
self.assertEqual(10, rpc_client.timeout)
def test_cleanup_stale_flows_iter_0(self):
with mock.patch.object(self.agent, 'agent_uuid_stamp', new=1234),\
mock.patch.object(self.agent.int_br,
'dump_flows_all_tables') as dump_flows,\
mock.patch.object(self.agent.int_br,
'delete_flows') as del_flow:
dump_flows.return_value = [
'cookie=0x4d2, duration=50.156s, table=0,actions=drop',
'cookie=0x4321, duration=54.143s, table=2, priority=0',
'cookie=0x2345, duration=50.125s, table=2, priority=0',
'cookie=0x4d2, duration=52.112s, table=3, actions=drop',
]
self.agent.cleanup_stale_flows()
del_flow.assert_has_calls([mock.call(cookie='0x4321/-1',
table='2'),
mock.call(cookie='0x2345/-1',
table='2')])
def test_set_rpc_timeout_no_value(self):
self.agent.quitting_rpc_timeout = None
with mock.patch.object(self.agent, 'set_rpc_timeout') as mock_set_rpc:
@ -2164,7 +2277,7 @@ class TestOvsDvrNeutronAgent(object):
# block RPC calls and bridge calls
self.agent.setup_physical_bridges = mock.Mock()
self.agent.setup_integration_br = mock.Mock()
self.agent.reset_tunnel_br = mock.Mock()
self.agent.setup_tunnel_br = mock.Mock()
self.agent.state_rpc = mock.Mock()
try:
self.agent.rpc_loop(polling_manager=mock.Mock())

View File

@ -167,6 +167,7 @@ class TunnelTest(object):
self.mock_int_bridge = self.ovs_bridges[self.INT_BRIDGE]
self.mock_int_bridge_expected = [
mock.call.set_agent_uuid_stamp(mock.ANY),
mock.call.create(),
mock.call.set_secure_mode(),
mock.call.setup_controllers(mock.ANY),
@ -177,11 +178,11 @@ class TunnelTest(object):
self.mock_map_tun_bridge_expected = [
mock.call.setup_controllers(mock.ANY),
mock.call.setup_default_table(),
mock.call.delete_port('phy-%s' % self.MAP_TUN_BRIDGE),
mock.call.add_patch_port('phy-%s' % self.MAP_TUN_BRIDGE,
constants.NONEXISTENT_PEER), ]
self.mock_int_bridge_expected += [
mock.call.delete_port('int-%s' % self.MAP_TUN_BRIDGE),
mock.call.db_get_val('Interface', 'int-%s' % self.MAP_TUN_BRIDGE,
'type'),
mock.call.add_patch_port('int-%s' % self.MAP_TUN_BRIDGE,
constants.NONEXISTENT_PEER),
]
@ -200,11 +201,17 @@ class TunnelTest(object):
]
self.mock_tun_bridge_expected = [
mock.call.reset_bridge(secure_mode=True),
mock.call.set_agent_uuid_stamp(mock.ANY),
mock.call.bridge_exists('br-tun'),
mock.call.bridge_exists().__nonzero__(),
mock.call.setup_controllers(mock.ANY),
mock.call.port_exists('patch-int'),
mock.call.port_exists().__nonzero__(),
mock.call.add_patch_port('patch-int', 'patch-tun'),
]
self.mock_int_bridge_expected += [
mock.call.port_exists('patch-tun'),
mock.call.port_exists().__nonzero__(),
mock.call.add_patch_port('patch-tun', 'patch-int'),
]
self.mock_int_bridge_expected += [
@ -214,7 +221,6 @@ class TunnelTest(object):
]
self.mock_tun_bridge_expected += [
mock.call.delete_flows(),
mock.call.setup_default_table(self.INT_OFPORT, arp_responder),
]
@ -510,8 +516,12 @@ class TunnelTest(object):
mock.patch.object(self.mod_agent.OVSNeutronAgent,
'tunnel_sync'),\
mock.patch.object(time, 'sleep'),\
mock.patch.object(self.mod_agent.OVSNeutronAgent,
'update_stale_ofport_rules') as update_stale:
mock.patch.object(
self.mod_agent.OVSNeutronAgent,
'update_stale_ofport_rules') as update_stale,\
mock.patch.object(
self.mod_agent.OVSNeutronAgent,
'cleanup_stale_flows') as cleanup:
log_exception.side_effect = Exception(
'Fake exception to get out of the loop')
scan_ports.side_effect = [reply2, reply3]
@ -545,6 +555,8 @@ class TunnelTest(object):
'removed': set(['tap0']),
'added': set([])}, False)
])
cleanup.assert_called_once_with()
self.assertTrue(update_stale.called)
self._verify_mock_calls()
@ -568,6 +580,7 @@ class TunnelTestUseVethInterco(TunnelTest):
]
self.mock_int_bridge_expected = [
mock.call.set_agent_uuid_stamp(mock.ANY),
mock.call.create(),
mock.call.set_secure_mode(),
mock.call.setup_controllers(mock.ANY),
@ -578,11 +591,11 @@ class TunnelTestUseVethInterco(TunnelTest):
self.mock_map_tun_bridge_expected = [
mock.call.setup_controllers(mock.ANY),
mock.call.setup_default_table(),
mock.call.delete_port('phy-%s' % self.MAP_TUN_BRIDGE),
mock.call.add_port(self.intb),
]
self.mock_int_bridge_expected += [
mock.call.delete_port('int-%s' % self.MAP_TUN_BRIDGE),
mock.call.db_get_val('Interface', 'int-%s' % self.MAP_TUN_BRIDGE,
'type'),
mock.call.add_port(self.inta)
]
@ -594,11 +607,17 @@ class TunnelTestUseVethInterco(TunnelTest):
]
self.mock_tun_bridge_expected = [
mock.call.reset_bridge(secure_mode=True),
mock.call.set_agent_uuid_stamp(mock.ANY),
mock.call.bridge_exists('br-tun'),
mock.call.bridge_exists().__nonzero__(),
mock.call.setup_controllers(mock.ANY),
mock.call.port_exists('patch-int'),
mock.call.port_exists().__nonzero__(),
mock.call.add_patch_port('patch-int', 'patch-tun'),
]
self.mock_int_bridge_expected += [
mock.call.port_exists('patch-tun'),
mock.call.port_exists().__nonzero__(),
mock.call.add_patch_port('patch-tun', 'patch-int')
]
self.mock_int_bridge_expected += [
@ -607,7 +626,6 @@ class TunnelTestUseVethInterco(TunnelTest):
'Port', columns=['name', 'other_config', 'tag'], ports=[])
]
self.mock_tun_bridge_expected += [
mock.call.delete_flows(),
mock.call.setup_default_table(self.INT_OFPORT, arp_responder),
]