diff --git a/devstack/override-defaults b/devstack/override-defaults index 654f80584..27afb493c 100644 --- a/devstack/override-defaults +++ b/devstack/override-defaults @@ -28,6 +28,7 @@ if is_service_enabled df-metadata ; then fi DRAGONFLOW_CONF=/etc/neutron/dragonflow.ini +DRAGONFLOW_DATAPATH=/etc/neutron/dragonflow_datapath_layout.yaml Q_PLUGIN_EXTRA_CONF_PATH=/etc/neutron Q_PLUGIN_EXTRA_CONF_FILES=(dragonflow.ini) diff --git a/devstack/plugin.sh b/devstack/plugin.sh index 74f64c772..80bb3a3ae 100644 --- a/devstack/plugin.sh +++ b/devstack/plugin.sh @@ -249,6 +249,7 @@ function configure_df_plugin { popd mkdir -p $Q_PLUGIN_EXTRA_CONF_PATH cp $DRAGONFLOW_DIR/etc/dragonflow.ini.sample $DRAGONFLOW_CONF + cp $DRAGONFLOW_DIR/etc/dragonflow_datapath_layout.yaml $DRAGONFLOW_DATAPATH if is_service_enabled q-svc ; then if is_service_enabled q-qos ; then diff --git a/dragonflow/conf/df_common_params.py b/dragonflow/conf/df_common_params.py index df2a2e0cb..7043329bd 100644 --- a/dragonflow/conf/df_common_params.py +++ b/dragonflow/conf/df_common_params.py @@ -177,6 +177,14 @@ df_opts = [ default=False, help=_("Automatically detect port-behind-port scenarios, " "e.g., amphora, or macvlan")), + cfg.StrOpt('datapath_layout_path', + help=_("Path to datapath layout configuration"), + default="/etc/neutron/dragonflow_datapath_layout.yaml"), + # FIXME (dimak) rename to something simpler once all tables are + # auto-allocated. + cfg.IntOpt('datapath_autoalloc_table_offset', + default=201, + help=_('Start offset for new datapath application tables')), ] diff --git a/dragonflow/controller/app_base.py b/dragonflow/controller/app_base.py new file mode 100644 index 000000000..5dda28c84 --- /dev/null +++ b/dragonflow/controller/app_base.py @@ -0,0 +1,114 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import collections + +from dragonflow.controller import df_base_app + + +# Specify the states, entrypoints, exitpoints, public mappings and private +# mappings of an application: +# States - the number of states the application has (Translates to number of +# OpenFlow tables) +# Entrypoints - Where do packets come in? +# Exitpoints - Where do packets come out? +# Public Mappings - Metadata that is passed between applications +# Private Mappings - Metadata that is private to this application (e.g. to save +# a state accross tables) +Specification = collections.namedtuple( + 'Specification', + ('states', 'entrypoints', 'exitpoints', 'public_mapping', + 'private_mapping'), +) + + +def define_specification(states, entrypoints, exitpoints, + public_mapping=None, private_mapping=None): + if public_mapping is None: + public_mapping = {} + + if private_mapping is None: + private_mapping = {} + + def decorator(cls): + cls._specification = Specification( + states=states, + entrypoints=entrypoints, + exitpoints=exitpoints, + public_mapping=public_mapping, + private_mapping=private_mapping, + ) + return cls + + return decorator + + +# Entrypoint: An entrypoint for packets - The application accepts packets here, +# and they should be routed to the given target (or OpenFlow table). +# consumes: Which metadata is consumbed by this entrypoint. +Entrypoint = collections.namedtuple( + 'Entrypoint', + ('name', 'target', 'consumes'), +) + + +# Exitpoint: An exitpoint for packets - The application sends (resubmits, or +# gotos) packets to this table (provided by the framework). +# provides: Which metadata is set on the packet +Exitpoint = collections.namedtuple( + 'Exitpoint', + ('name', 'provides'), +) + + +# The allocation of states (table numbers), entrypoints and exitpoints (tables +# for incoming and outgoing packets), and register mapping (where to place +# the metadata) +DpAlloc = collections.namedtuple( + 'DpAlloc', + ('states', 'exitpoints', 'entrypoints', 'full_mapping'), +) + + +class VariableMapping(dict): + pass + + +class AttributeDict(dict): + def __getattr__(self, name): + try: + return self[name] + except KeyError: + raise AttributeError(name) + + +class Base(df_base_app.DFlowApp): + def __init__(self, dp_alloc, *args, **kwargs): + super(Base, self).__init__(*args, **kwargs) + self._dp_alloc = dp_alloc + + def initialize(self): + pass + + @property + def states(self): + return self._dp_alloc.states + + @property + def exitpoints(self): + return self._dp_alloc.exitpoints + + @property + def entrypoints(self): + return self._dp_alloc.entrypoints + + +register_event = df_base_app.register_event diff --git a/dragonflow/controller/datapath.py b/dragonflow/controller/datapath.py new file mode 100644 index 000000000..b6b3a4714 --- /dev/null +++ b/dragonflow/controller/datapath.py @@ -0,0 +1,265 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from oslo_log import log +import stevedore + +from dragonflow._i18n import _ +from dragonflow import conf as cfg +from dragonflow.controller import app_base + + +LOG = log.getLogger(__name__) + +REGS = frozenset(( + 'reg0', + 'reg1', + 'reg2', + 'reg3', + 'reg4', + 'reg5', + 'reg6', + 'reg7', + 'metadata', +)) + + +def _sequence_generator(offset): + while True: + yield offset + offset += 1 + + +class Datapath(object): + """ + Given the layout (e.g. from the config file), instantiate all the + applications in the datapath (vertices), and connect them (edges). + Instantiation includes allocating OpenFlow tables and registers. + Connection includes wiring and mapping the registers + """ + def __init__(self, layout): + self._layout = layout + self._dp_allocs = {} + self._public_variables = set() + self.apps = None + + def set_up(self, ryu_base, vswitch_api, nb_api, notifier): + """ + Instantiate the application classes. + Instantiate the applications (Including table and register allocation) + Wire the applications (including translating registers) + """ + self._dp = ryu_base.datapath + self._table_generator = _sequence_generator( + cfg.CONF.df.datapath_autoalloc_table_offset) + self._public_variables.clear() + + app_classes = {} + self.apps = {} + + for vertex in self._layout.vertices: + if vertex.type in app_classes: + continue + + app_class = self._get_app_class(vertex.type) + app_classes[vertex.type] = app_class + self._public_variables.update( + app_class._specification.public_mapping.keys(), + ) + + for vertex in self._layout.vertices: + app_class = app_classes[vertex.type] + dp_alloc = self._create_dp_alloc(app_class._specification) + self.log_datapath_allocation(vertex.name, dp_alloc) + self._dp_allocs[vertex.name] = dp_alloc + app = app_class(api=ryu_base, + vswitch_api=vswitch_api, + nb_api=nb_api, + neutron_server_notifier=notifier, + dp_alloc=dp_alloc, + **(vertex.params or {}) + ) + self.apps[vertex.name] = app + + for app in self.apps.values(): + app.initialize() + + for edge in self._layout.edges: + self._install_edge(edge) + + def _get_app_class(self, app_type): + """Get an application class (Python class) by app name""" + mgr = stevedore.NamedExtensionManager( + 'dragonflow.controller.apps', + [app_type], + invoke_on_load=False, + ) + for ext in mgr: + return ext.plugin + else: + raise RuntimeError(_('Failed to load app {0}').format(app_type)) + + def _create_dp_alloc(self, specification): + """ + Allocate the tables and registers for the given application (given + by its specification) + """ + public_mapping = specification.public_mapping.copy() + unmapped_vars = self._public_variables.difference(public_mapping) + + # Convert to set() so the result won't be a frozenset() + unmapped_regs = set(REGS).difference( + public_mapping.values(), + ).difference( + specification.private_mapping.values(), + ) + + while unmapped_vars and unmapped_regs: + public_mapping[unmapped_vars.pop()] = unmapped_regs.pop() + + if unmapped_vars: + raise RuntimeError( + _("Can't allocate enough registers for variables"), + ) + + states_dict = { + state: next(self._table_generator) + for state in specification.states + } + states = app_base.AttributeDict(**states_dict) + + exitpoints_dict = { + exit.name: next(self._table_generator) + for exit in specification.exitpoints + } + exitpoints = app_base.AttributeDict(**exitpoints_dict) + + entrypoints_dict = { + entry.name: states[entry.target] + for entry in specification.entrypoints + } + entrypoints = app_base.AttributeDict(**entrypoints_dict) + + return app_base.DpAlloc( + states=states, + exitpoints=exitpoints, + entrypoints=entrypoints, + full_mapping=public_mapping, + ) + + def _get_connector_config(self, connector): + return self._dp_allocs[connector.vertex] + + def _install_edge(self, edge): + """ + Wire two applications. Infer the translation of metadata fields, + and install the actions/instructions to pass a packet from one + application's exit point to another's entry point + """ + exitpoint = edge.exitpoint + exit_config = self._get_connector_config(exitpoint) + entrypoint = edge.entrypoint + entry_config = self._get_connector_config(entrypoint) + translations = [] + + for var in self._public_variables: + exit_reg = exit_config.full_mapping[var] + entry_reg = entry_config.full_mapping[var] + if exit_reg == entry_reg: + continue + + translations.append( + (exit_reg, entry_reg), + ) + + self._install_goto( + # Source + exit_config.exitpoints[exitpoint.name], + # Destination + entry_config.entrypoints[entrypoint.name], + translations, + ) + + def _install_goto(self, source, dest, translations): + """ + Install the actions/instructions to pass a packet from one + application's exit point to another's entry point, including + translating the metadata fields. + """ + ofproto = self._dp.ofproto + parser = self._dp.ofproto_parser + actions = [] + + try: + from_regs, to_regs = zip(*translations) + except ValueError: + from_regs, to_regs = ((), ()) + + # Push all register values + for reg in from_regs: + actions.append( + parser.NXActionStackPush(field=reg, start=0, end=32), + ) + + # Pop into target registers in reverse order + for reg in reversed(to_regs): + actions.append( + parser.NXActionStackPop(field=reg, start=0, end=32), + ) + + if source < dest: + instructions = [ + parser.OFPInstructionActions( + ofproto.OFPIT_APPLY_ACTIONS, + actions, + ), + parser.OFPInstructionGotoTable(dest), + ] + else: + actions.append(parser.NXActionResubmitTable(table_id=dest)) + + instructions = [ + parser.OFPInstructionActions( + ofproto.OFPIT_APPLY_ACTIONS, + actions, + ), + ] + + message = parser.OFPFlowMod( + self._dp, + table_id=source, + command=ofproto.OFPFC_ADD, + match=parser.OFPMatch(), + instructions=instructions, + ) + self._dp.send_msg(message) + + def log_datapath_allocation(self, name, dp_alloc): + """ + Log the dp_alloc object (The allocation of tables, registers, etc.) for + the given application + """ + LOG.debug("Application: %s", name) + LOG.debug("\tStates:") + for state, table_num in dp_alloc.states.items(): + LOG.debug("\t\t%s: %s", state, table_num) + + LOG.debug("\tEntrypoints:") + for entry_name, table_num in dp_alloc.entrypoints.items(): + LOG.debug("\t\t%s: %s", entry_name, table_num) + + LOG.debug("\tExitpoints:") + for exit_name, table_num in dp_alloc.exitpoints.items(): + LOG.debug("\t\t%s: %s", exit_name, table_num) + + LOG.debug("\tMapping:") + for var, reg in dp_alloc.full_mapping.items(): + LOG.debug("\t\t%s: %s", var, reg) diff --git a/dragonflow/controller/datapath_layout.py b/dragonflow/controller/datapath_layout.py new file mode 100644 index 000000000..7d6f1903c --- /dev/null +++ b/dragonflow/controller/datapath_layout.py @@ -0,0 +1,71 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import collections + +import yaml + +from dragonflow import conf as cfg + + +Vertex = collections.namedtuple( + 'Vertex', + ('name', 'type', 'params'), +) + + +Edge = collections.namedtuple( + 'Edge', + ('exitpoint', 'entrypoint'), +) + + +_ConnectorBase = collections.namedtuple( + 'Connector', + ('vertex', 'direction', 'name'), +) + + +class Connector(_ConnectorBase): + @classmethod + def from_string(cls, val): + return cls(*val.split('.')) + + +Layout = collections.namedtuple( + 'Layout', + ('vertices', 'edges'), +) + + +def get_datapath_layout(path=None): + if path is None: + path = cfg.CONF.df.datapath_layout_path + + with open(path) as f: + raw_layout = yaml.load(f) + + vertices = tuple( + Vertex( + name=key, + type=value['type'], + params=value.get('params'), + ) for key, value in raw_layout['vertices'].items() + ) + + edges = tuple( + Edge( + exitpoint=Connector.from_string(key), + entrypoint=Connector.from_string(value), + ) for key, value in raw_layout['edges'].items() + ) + + return Layout(vertices, edges) diff --git a/dragonflow/tests/unit/test_datapath.py b/dragonflow/tests/unit/test_datapath.py new file mode 100644 index 000000000..c8cf9e5ee --- /dev/null +++ b/dragonflow/tests/unit/test_datapath.py @@ -0,0 +1,225 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import functools + +import mock +import testscenarios + +from dragonflow.controller import app_base +from dragonflow.controller import datapath +from dragonflow.controller import datapath_layout +from dragonflow.tests import base as tests_base + +load_tests = testscenarios.load_tests_apply_scenarios + + +@app_base.define_specification( + states=('main',), + entrypoints=( + app_base.Entrypoint( + name='conn1', + target='main', + consumes={}, + ), + app_base.Entrypoint( + name='conn2', + target='main', + consumes={}, + ), + ), + exitpoints=( + app_base.Exitpoint( + name='conn1', + provides={}, + ), + app_base.Exitpoint( + name='conn2', + provides={}, + ), + ), + public_mapping={ + 'var1': 'reg0', + 'var2': 'reg1', + 'var3': 'reg2', + }, + private_mapping={ + 'priv1': 'reg3', + 'priv2': 'reg7', + } +) +class DummyApp(app_base.Base): + def __init__(self, *args, **kwargs): + # super(DummyApp, self).__init__(*args, **kwargs) + self.args = args + self.kwargs = kwargs + + +@app_base.define_specification( + states=('main',), + entrypoints=( + app_base.Entrypoint( + name='conn1', + target='main', + consumes={}, + ), + app_base.Entrypoint( + name='conn2', + target='main', + consumes={}, + ), + ), + exitpoints=( + app_base.Exitpoint( + name='conn1', + provides={}, + ), + app_base.Exitpoint( + name='conn2', + provides={}, + ), + ), + public_mapping={ + 'var1': 'reg0', + 'var3': 'reg1', + }, + private_mapping={ + 'priv1': 'reg2', + 'priv2': 'reg7', + } +) +class Dummy2App(DummyApp): + pass + + +class TestDatapath(tests_base.BaseTestCase): + scenarios = [ + ( + 'empty-config', + { + 'layout': datapath_layout.Layout( + vertices=(), + edges=(), + ), + 'raises': None, + }, + ), + ( + 'non-existent-vertex', + { + 'layout': datapath_layout.Layout( + vertices=(), + edges=( + datapath_layout.Edge( + exitpoint=datapath_layout.Connector( + 'app1', 'out', 'conn1', + ), + entrypoint=datapath_layout.Connector( + 'app2', 'out', 'conn1', + ), + ), + ), + ), + 'raises': KeyError, + }, + ), + ( + 'connected-vertices', + { + 'layout': datapath_layout.Layout( + vertices=( + datapath_layout.Vertex( + name='app1', + type='dummy', + params={'key1': 'val1'}, + ), + datapath_layout.Vertex( + name='app2', + type='dummy2', + params={'key2': 'val2'}, + ), + ), + edges=( + datapath_layout.Edge( + exitpoint=datapath_layout.Connector( + 'app1', 'out', 'conn1', + ), + entrypoint=datapath_layout.Connector( + 'app2', 'in', 'conn1', + ), + ), + ), + ), + 'raises': None, + }, + ), + ] + + def get_dummy_class(self, type): + if type == 'dummy': + return DummyApp + else: + return Dummy2App + + def setUp(self): + super(TestDatapath, self).setUp() + self.dp = datapath.Datapath(self.layout) + self.dp._get_app_class = mock.Mock(side_effect=self.get_dummy_class) + self.dp._install_goto = mock.Mock() + + def test_set_up(self): + if self.raises: + caller = functools.partial( + self.assertRaises, + self.raises, + ) + else: + def caller(func, *args): + func(*args) + + caller( + self.dp.set_up, + mock.Mock(), + mock.Mock(), + mock.Mock(), + mock.Mock(), + ) + + def test_app_initialization(self): + if self.raises is not None: + raise self.skipTest('Tests only positive flows') + + self.dp.set_up( + mock.Mock(), + mock.Mock(), + mock.Mock(), + mock.Mock(), + ) + self.assertEqual( + len(self.layout.vertices), + self.dp._get_app_class.call_count, + ) + + def test_installed_gotos(self): + if self.raises is not None: + raise self.skipTest('Tests only positive flows') + + self.dp.set_up( + mock.Mock(), + mock.Mock(), + mock.Mock(), + mock.Mock(), + ) + self.assertEqual( + len(self.layout.edges), + self.dp._install_goto.call_count, + ) + # FIXME add check for actual call parameters diff --git a/etc/dragonflow_datapath_layout.yaml b/etc/dragonflow_datapath_layout.yaml new file mode 100644 index 000000000..77ab7901d --- /dev/null +++ b/etc/dragonflow_datapath_layout.yaml @@ -0,0 +1,2 @@ +vertices: {} +edges: {} diff --git a/requirements.txt b/requirements.txt index 930409697..7476f9afb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -28,6 +28,7 @@ WebOb>=1.7.1 # MIT jsonmodels>=2.1.3 # BSD License (3 clause) skydive-client>=0.4.4 # Apache-2.0 cotyledon>=1.3.0 # Apache-2.0 +PyYAML>=3.10 # MIT # These repos are installed from git in OpenStack CI if the job # configures them as required-projects: diff --git a/setup.cfg b/setup.cfg index 5b96aa9ff..4fb93138e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -22,6 +22,9 @@ classifier = [files] packages = dragonflow +data_files = + etc/neutron/ = + etc/dragonflow_datapath_layout.yaml [build_sphinx] source-dir = doc/source