[03/xx] Introduce SFC flow classifier app

This patch introduces classification app for SFC, see flow classifier
API at [1] for more information.

Fullstack tests for the app provided in the SFC app patch [2]

[1] https://docs.openstack.org/developer/networking-sfc/api.html
[2] https://review.openstack.org/#/c/424146

Partially-implements: blueprint service-function-chaining
Change-Id: If046b84ed432c225e332f3d7daf4dafa3550c33d
This commit is contained in:
Dima Kuznetsov 2017-02-06 13:25:19 +02:00
parent f01f6307c8
commit 9a3092ffce
6 changed files with 628 additions and 3 deletions

View File

@ -19,6 +19,10 @@ EXTERNAL_HOST_IP=${EXTERNAL_HOST_IP:-}
DEFAULT_TUNNEL_TYPES="vxlan,geneve,gre"
DEFAULT_APPS_LIST="l2,l3_proactive,dhcp,dnat,sg,portsec,portqos,classifier,tunneling,provider"
if [[ $ENABLE_DF_SFC == "True" ]]; then
DEFAULT_APPS_LIST="$DEFAULT_APPS_LIST,fc"
fi
if is_service_enabled df-metadata ; then
DEFAULT_APPS_LIST="$DEFAULT_APPS_LIST,metadata_service"
fi

View File

@ -0,0 +1,343 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import itertools
from neutron_lib import constants as lib_constants
from oslo_log import log
from ryu.lib.packet import ether_types
from ryu.lib.packet import in_proto
from ryu.ofproto import nicira_ext
from dragonflow._i18n import _
from dragonflow.controller.common import constants
from dragonflow.controller.common import utils
from dragonflow.controller import df_base_app
from dragonflow.db.models import constants as model_const
from dragonflow.db.models import l2
from dragonflow.db.models import sfc
LOG = log.getLogger(__name__)
# We're using 2 least significant bits of reg3 to mark if we already classified
# the packet or not (to avoid infinite loops)
_SRC_NOT_DONE = 0
_SRC_DONE = 1
_SRC_BIT = 0
_SRC_MASK = (1 << _SRC_BIT)
_DST_NOT_DONE = 0
_DST_DONE = 2
_DST_BIT = 1
_DST_MASK = (1 << _DST_BIT)
def _is_lport_ref_eq(lport_ref, lport):
return lport_ref is not None and lport_ref.id == lport.id
class FcApp(df_base_app.DFlowApp):
def switch_features_handler(self, ev):
# Add SFC short-circuit in case SFC app is not loaded
self.add_flow_go_to_table(constants.SFC_ENCAP_TABLE,
constants.PRIORITY_DEFAULT,
constants.SFC_END_OF_CHAIN_TABLE)
def _flow_classifiers_by_lport(self, lport):
return itertools.chain(
self.db_store.get_all(
sfc.FlowClassifier(source_port=lport),
index=sfc.FlowClassifier.get_index('source_port'),
),
self.db_store.get_all(
sfc.FlowClassifier(dest_port=lport),
index=sfc.FlowClassifier.get_index('dest_port'),
),
)
@df_base_app.register_event(l2.LogicalPort, l2.EVENT_LOCAL_CREATED)
def _local_lport_added(self, lport):
for fc in self._flow_classifiers_by_lport(lport):
# Install classification/dispatch only if it wasn't installed by
# _install_flow_classifier
if _is_lport_ref_eq(fc.source_port, lport):
self._install_classification_flows(fc)
elif _is_lport_ref_eq(fc.dest_port, lport):
self._install_dispatch_flows(fc)
def _install_flow_classifier(self, flow_classifier):
# If FC is on a source lport, install flows only when its local
# If FC is on a dest lport, install everywhere, we can start the chain
# here and save a few hops.
if flow_classifier.is_classification_local:
self._install_classification_flows(flow_classifier)
# If FC is on a source lport, install flows everywhere. We won't go
# through the classifier app again so reg6 will not be set.
# If FC is on a dest lport, install only on local chassis, to avoid
# reclassification.
if flow_classifier.is_dispatch_local:
self._install_dispatch_flows(flow_classifier)
def _install_classification_flows(self, flow_classifier):
# FIXME (dimak) assume port is on the types supported by classifier app
for match in self._create_matches(flow_classifier):
self.mod_flow(
table_id=self._get_flow_classifier_table(flow_classifier),
priority=constants.PRIORITY_VERY_HIGH,
match=match,
inst=[
self.parser.OFPInstructionActions(
self.ofproto.OFPIT_APPLY_ACTIONS,
[
self.parser.OFPActionSetField(
reg2=flow_classifier.unique_key),
],
),
self.parser.OFPInstructionGotoTable(
constants.SFC_ENCAP_TABLE,
),
],
)
def _install_dispatch_flows(self, flow_classifier):
lport = self._get_flow_classifier_lport(flow_classifier)
# End-of-chain
# 1) Restore network ID in metadata and zero reg6 + reg2
# 2) Restore port ID in reg7 in dest port
# 3) Resubmit to the next table
lswitch = lport.lswitch
actions = [
self.parser.OFPActionSetField(metadata=lswitch.unique_key),
self.parser.OFPActionSetField(reg6=0),
self.parser.OFPActionSetField(reg2=0),
]
if flow_classifier.source_port == lport:
done_bit = _SRC_BIT
elif flow_classifier.dest_port == lport:
done_bit = _DST_BIT
# FIXME (dimak) maybe get it from L2 table
actions.append(
self.parser.OFPActionSetField(reg7=lport.unique_key)
)
actions += [
self.parser.NXActionRegLoad(
dst='reg3',
value=1,
ofs_nbits=nicira_ext.ofs_nbits(done_bit, done_bit),
),
self.parser.NXActionResubmitTable(
table_id=self._get_flow_classifier_table(flow_classifier))
]
self.mod_flow(
table_id=constants.SFC_END_OF_CHAIN_TABLE,
priority=constants.PRIORITY_MEDIUM,
match=self.parser.OFPMatch(reg2=flow_classifier.unique_key),
actions=actions,
)
@df_base_app.register_event(l2.LogicalPort, l2.EVENT_LOCAL_DELETED)
def _remove_local_port(self, lport):
for fc in self._flow_classifiers_by_lport(lport):
# Remove classification/dispatch only if they're no longer needed
if _is_lport_ref_eq(fc.source_port, lport):
self._uninstall_classification_flows(fc)
elif _is_lport_ref_eq(fc.dest_port, lport):
self._uninstall_dispatch_flows(fc)
def _uninstall_flow_classifier(self, flow_classifier):
if flow_classifier.is_classification_local:
self._uninstall_classification_flows(flow_classifier)
if flow_classifier.is_dispatch_local:
self._uninstall_dispatch_flows(flow_classifier)
def _uninstall_classification_flows(self, flow_classifier):
# ORIGIN TABLE => SFC ENCAP TABLE
for match in self._create_matches(flow_classifier):
self.mod_flow(
command=self.ofproto.OFPFC_DELETE_STRICT,
table_id=self._get_flow_classifier_table(flow_classifier),
priority=constants.PRIORITY_VERY_HIGH,
match=match,
)
def _uninstall_dispatch_flows(self, flow_classifier):
# SFC_END_OF_CHAIN_TABLE => NEXT TABLE
self.mod_flow(
command=self.ofproto.OFPFC_DELETE_STRICT,
table_id=constants.SFC_END_OF_CHAIN_TABLE,
priority=constants.PRIORITY_MEDIUM,
match=self.parser.OFPMatch(reg2=flow_classifier.unique_key),
)
def _get_flow_classifier_table(self, flow_classifier):
if flow_classifier.source_port is not None:
return constants.L2_LOOKUP_TABLE
elif flow_classifier.dest_port is not None:
return constants.EGRESS_TABLE
else:
raise ValueError(
_('Neither source not destination port specified'))
def _get_flow_classifier_lport(self, flow_classifier):
return flow_classifier.source_port or flow_classifier.dest_port
@df_base_app.register_event(sfc.PortChain, model_const.EVENT_CREATED)
def _port_chain_created(self, port_chain):
for fc in port_chain.flow_classifiers:
self._install_flow_classifier(fc)
@df_base_app.register_event(sfc.PortChain, model_const.EVENT_DELETED)
def _port_chain_deleted(self, port_chain):
for fc in port_chain.flow_classifiers:
self._uninstall_flow_classifier(fc)
@df_base_app.register_event(sfc.PortChain, model_const.EVENT_UPDATED)
def _port_chain_updated(self, port_chain, old_port_chain):
old_fcs = set(fc.id for fc in old_port_chain.flow_classifiers)
new_fcs = set(fc.id for fc in port_chain.flow_classifiers)
added_fcs = new_fcs - old_fcs
deleted_fcs = old_fcs - new_fcs
for fc_id in deleted_fcs:
fc = old_port_chain.find_flow_classifier(fc_id)
self._uninstall_flow_classifier(fc)
for fc_id in added_fcs:
fc = port_chain.find_flow_classifier(fc_id)
self._install_flow_classifier(fc)
def _create_matches(self, flow_classifier):
params = {}
if flow_classifier.source_port is not None:
lport = flow_classifier.source_port
params['reg6'] = lport.unique_key
params['reg3'] = (_SRC_NOT_DONE, _SRC_MASK)
if flow_classifier.dest_port is not None:
lport = flow_classifier.dest_port
params['reg7'] = lport.unique_key
params['reg3'] = (_DST_NOT_DONE, _DST_MASK)
if flow_classifier.ether_type is not None:
if flow_classifier.ether_type == lib_constants.IPv4:
params.update(_create_ipv4_params(flow_classifier))
elif flow_classifier.ether_type == lib_constants.IPv6:
params.update(_create_ipv6_params(flow_classifier))
else:
raise RuntimeError(
_('Unsupported ethertype {0}').format(
flow_classifier.ether_type))
param_list = [params]
if flow_classifier.protocol is not None:
if flow_classifier.protocol == lib_constants.PROTO_NAME_TCP:
l4_params = _create_tcp_params(flow_classifier)
elif flow_classifier.protocol == lib_constants.PROTO_NAME_UDP:
l4_params = _create_udp_params(flow_classifier)
else:
raise RuntimeError(
_('Unsupported protocol {0}').format(
flow_classifier.protocol))
param_list = _multiply_params(param_list, l4_params)
return (self.parser.OFPMatch(**p) for p in param_list)
def _multiply_params(old_params, new_params):
'''Create combined dictionaries for all pairs of (old, new) params.
Example:
>>> _multiply_params([{a: 1}, {a: 2}], [{b: 1}, {b:2}])
[{a:1, b:1}, {a:1, b:2}, {a:2, b:1}, {a:2, b:2}]
'''
res = []
for base, new in itertools.product(old_params, new_params):
p = base.copy()
p.update(new)
res.append(p)
return res
def _create_ipv4_params(fc):
return _create_ip_params(
fc, ether_types.ETH_TYPE_IP, 'ipv4_src', 'ipv4_dst')
def _create_ipv6_params(fc):
return _create_ip_params(
fc, ether_types.ETH_TYPE_IPV6, 'ipv6_src', 'ipv6_dst')
def _create_ip_params(fc, eth_type, src_label, dst_label):
params = {'eth_type': eth_type}
if fc.source_cidr is not None:
params[src_label] = (
int(fc.source_cidr.network),
int(fc.source_cidr.netmask),
)
if fc.dest_cidr is not None:
params[dst_label] = (
int(fc.dest_cidr.network),
int(fc.dest_cidr.netmask),
)
return params
def _create_l4_port_params(fc, proto, src_label, dst_label):
params = [{'ip_proto': proto}]
if fc.source_transport_ports is not None:
source_port_params = utils.get_port_match_list_from_port_range(
fc.source_transport_ports.min,
fc.source_transport_ports.max,
)
params = _multiply_params(
params,
[{src_label: s} for s in source_port_params],
)
if fc.dest_transport_ports is not None:
dest_port_params = utils.get_port_match_list_from_port_range(
fc.dest_transport_ports.min,
fc.dest_transport_ports.max,
)
params = _multiply_params(
params,
[{dst_label: d} for d in dest_port_params],
)
return params
def _create_tcp_params(fc):
return _create_l4_port_params(fc, in_proto.IPPROTO_TCP,
'tcp_src', 'tcp_dst')
def _create_udp_params(fc):
return _create_l4_port_params(fc, in_proto.IPPROTO_UDP,
'udp_src', 'udp_dst')

View File

@ -72,6 +72,11 @@ INGRESS_CONNTRACK_TABLE = 105
INGRESS_SECURITY_GROUP_TABLE = 110
# Send packets to target local ovs ports.
INGRESS_DISPATCH_TABLE = 115
# SFC tables
SFC_ENCAP_TABLE = 120
SFC_END_OF_CHAIN_TABLE = 125
# Table used by aging app.
CANARY_TABLE = 200

View File

@ -59,8 +59,8 @@ class PortPairGroup(mf.ModelBase,
@mf.register_model
@mf.construct_nb_db_model(
indexes={
'source_port': 'source_port',
'dest_port': 'dest_port',
'source_port': 'source_port.id',
'dest_port': 'dest_port.id',
},
)
class FlowClassifier(mf.ModelBase,
@ -140,7 +140,7 @@ class FlowClassifier(mf.ModelBase,
@mf.register_model
@mf.construct_nb_db_model(
indexes={
'flow_classifiers': 'flow_classifiers',
'flow_classifiers': 'flow_classifiers.id',
'port_pair_groups': 'port_pair_groups',
},
)

View File

@ -0,0 +1,272 @@
# Copyright (c) 2016 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from dragonflow.db.models import l2
from dragonflow.db.models import sfc
from dragonflow.tests.common import utils
from dragonflow.tests.unit import test_app_base
lswitch1 = l2.LogicalSwitch(
id='lswitch1',
topic='topic1',
version=10,
unique_key=22,
)
lport1 = l2.LogicalPort(
id='lport1',
topic='topic1',
version=10,
unique_key=22,
lswitch='lswitch1',
is_local=True,
)
lport2 = l2.LogicalPort(
id='lport2',
topic='topic1',
version=10,
unique_key=24,
lswitch='lswitch1',
is_local=True,
)
lport3 = l2.LogicalPort(
id='lport3',
topic='topic1',
version=10,
unique_key=29,
lswitch='lswitch1',
is_local=True,
)
fc1 = sfc.FlowClassifier(
id='fc1',
topic='topic1',
unique_key=22,
source_port='lport1',
)
fc2 = sfc.FlowClassifier(
id='fc2',
topic='topic1',
unique_key=12,
dest_port='lport2',
)
fc3 = sfc.FlowClassifier(
id='fc3',
topic='topic1',
unique_key=13,
source_port='lport3',
)
pc1 = sfc.PortChain(
id='pc1',
topic='topic1',
flow_classifiers=['fc1', 'fc2'],
)
pc1add = sfc.PortChain(
id='pc1',
topic='topic1',
flow_classifiers=['fc1', 'fc3', 'fc2'],
)
pc1remove = sfc.PortChain(
id='pc1',
topic='topic1',
flow_classifiers=['fc2'],
)
pc1replace = sfc.PortChain(
id='pc1',
topic='topic1',
flow_classifiers=['fc3', 'fc2'],
)
fc10 = sfc.FlowClassifier(
id='fc10',
topic='topic1',
unique_key=10,
source_port='lport1',
)
fc11 = sfc.FlowClassifier(
id='fc11',
topic='topic1',
unique_key=11,
source_port='lport2',
)
fc12 = sfc.FlowClassifier(
id='fc12',
topic='topic1',
unique_key=12,
dest_port='lport1',
)
fc13 = sfc.FlowClassifier(
id='fc13',
topic='topic1',
unique_key=13,
dest_port='lport2',
)
pc2 = sfc.PortChain(
id='pc2',
topic='topic1',
flow_classifiers=['fc10', 'fc11', 'fc12', 'fc14'],
)
l2_objs = (lswitch1, lport1, lport2, lport3)
class TestFcApp(test_app_base.DFAppTestBase):
apps_list = ['fc']
def setUp(self):
super(TestFcApp, self).setUp()
self.app = self.open_flow_app.dispatcher.apps[0]
for attribute in ('_install_flow_classifier',
'_uninstall_flow_classifier',
'_install_classification_flows',
'_install_dispatch_flows',
'_uninstall_classification_flows',
'_uninstall_dispatch_flows'):
orig = getattr(self.app, attribute)
p = mock.patch.object(self.app, attribute, side_effect=orig)
self.addCleanup(p.stop)
p.start()
@utils.with_local_objects(fc1, fc2, fc3, pc1, *l2_objs)
def test_pc_created(self):
pc1.emit_created()
self.app._install_flow_classifier.assert_has_calls(
[
mock.call(pc1.flow_classifiers[0]),
mock.call(pc1.flow_classifiers[1]),
],
)
self.assertEqual(2, self.app._install_flow_classifier.call_count)
self.app._uninstall_flow_classifier.assert_not_called()
@utils.with_local_objects(fc1, fc2, fc3, pc1, *l2_objs)
def test_pc_deleted(self):
pc1.emit_deleted()
self.app._install_flow_classifier.assert_not_called()
self.app._uninstall_flow_classifier.assert_has_calls(
[
mock.call(pc1.flow_classifiers[0]),
mock.call(pc1.flow_classifiers[1]),
],
)
self.assertEqual(2, self.app._uninstall_flow_classifier.call_count)
@utils.with_local_objects(fc1, fc2, fc3, pc1, *l2_objs)
def test_pc_updated_add_fc(self):
pc1add.emit_updated(pc1)
self.app._install_flow_classifier.assert_called_once_with(
pc1add.flow_classifiers[1])
self.app._uninstall_flow_classifier.assert_not_called()
@utils.with_local_objects(fc1, fc2, fc3, pc1, *l2_objs)
def test_pc_updated_remove_fc(self):
pc1remove.emit_updated(pc1)
self.app._install_flow_classifier.assert_not_called()
self.app._uninstall_flow_classifier.assert_called_once_with(
pc1.flow_classifiers[0])
@utils.with_local_objects(fc1, fc2, fc3, pc1, *l2_objs)
def test_pc_updated_replace_fc(self):
pc1replace.emit_updated(pc1)
self.app._install_flow_classifier.assert_called_once_with(
pc1replace.flow_classifiers[0])
self.app._uninstall_flow_classifier.assert_called_once_with(
pc1.flow_classifiers[0])
@utils.with_local_objects(fc10, fc11, fc12, fc13, pc2, *l2_objs)
def test_install_flow_classifier(self):
pc2.emit_created()
# Installed only for dest-port and local source ports:
self.app._install_classification_flows.has_calls(
[
mock.call(pc2.flow_classifiers[0]),
mock.call(pc2.flow_classifiers[2]),
mock.call(pc2.flow_classifiers[3]),
],
)
self.assertEqual(3, self.app._install_classification_flows.call_count)
# Installed only for source-port and local dest ports:
self.app._install_dispatch_flows.assert_has_calls(
[
mock.call(pc2.flow_classifiers[0]),
mock.call(pc2.flow_classifiers[1]),
mock.call(pc2.flow_classifiers[2]),
],
)
self.assertEqual(3, self.app._install_dispatch_flows.call_count)
@utils.with_local_objects(fc10, fc11, fc12, fc13, pc2, *l2_objs)
def test_uninstall_flow_classifier(self):
pc2.emit_deleted()
# Installed only for dest-port and local source ports:
self.app._uninstall_classification_flows.has_calls(
[
mock.call(pc2.flow_classifiers[0]),
mock.call(pc2.flow_classifiers[2]),
mock.call(pc2.flow_classifiers[3]),
],
)
self.assertEqual(
3, self.app._uninstall_classification_flows.call_count)
# Installed only for source-port and local dest ports:
self.app._uninstall_dispatch_flows.assert_has_calls(
[
mock.call(pc2.flow_classifiers[0]),
mock.call(pc2.flow_classifiers[1]),
mock.call(pc2.flow_classifiers[2]),
],
)
self.assertEqual(3, self.app._uninstall_dispatch_flows.call_count)
@utils.with_local_objects(fc1, fc2, pc1, *l2_objs)
def test_src_local_port_added(self):
lport1.emit_local_created()
self.app._install_classification_flows.assert_called_once_with(fc1)
self.app._install_dispatch_flows.assert_not_called()
@utils.with_local_objects(fc1, fc2, pc1, *l2_objs)
def test_src_local_port_removed(self):
lport1.emit_local_deleted()
self.app._uninstall_classification_flows.assert_called_once_with(fc1)
self.app._uninstall_dispatch_flows.assert_not_called()
@utils.with_local_objects(fc1, fc2, pc1, *l2_objs)
def test_dest_local_port_added(self):
lport2.emit_local_created()
self.app._install_classification_flows.assert_not_called()
self.app._install_dispatch_flows.assert_called_once_with(fc2)
@utils.with_local_objects(fc1, fc2, pc1, *l2_objs)
def test_dest_local_port_removed(self):
lport2.emit_local_deleted()
self.app._uninstall_classification_flows.assert_not_called()
self.app._uninstall_dispatch_flows.assert_called_once_with(fc2)

View File

@ -86,6 +86,7 @@ dragonflow.controller.apps =
classifier = dragonflow.controller.apps.classifier:ClassifierApp
dhcp = dragonflow.controller.apps.dhcp:DHCPApp
dnat = dragonflow.controller.apps.dnat:DNATApp
fc = dragonflow.controller.apps.fc:FcApp
l2 = dragonflow.controller.apps.l2:L2App
legacy_snat = dragonflow.controller.apps.legacy_snat:LegacySNatApp
l3_proactive = dragonflow.controller.apps.l3_proactive:L3ProactiveApp