From e1c98e1b7ad35c1846f0af18d3e9c72ab11eab3e Mon Sep 17 00:00:00 2001 From: Mohankumar Date: Mon, 12 Dec 2016 17:16:10 +0530 Subject: [PATCH] Add SFC cli for OSC plugin Implements: blueprint openstackclient-cli-porting Change-Id: Ifeb62bad26ffeb0bb8b548c56d2d6a620a922f78 --- doc/source/cli/osc/v2/networking-sfc.rst | 36 ++ neutronclient/osc/v2/sfc/__init__.py | 0 .../osc/v2/sfc/sfc_flow_classifier.py | 320 ++++++++++ neutronclient/osc/v2/sfc/sfc_port_chain.py | 347 +++++++++++ neutronclient/osc/v2/sfc/sfc_port_pair.py | 215 +++++++ .../osc/v2/sfc/sfc_port_pair_group.py | 291 +++++++++ .../tests/unit/osc/v2/sfc/__init__.py | 0 neutronclient/tests/unit/osc/v2/sfc/fakes.py | 234 ++++++++ .../unit/osc/v2/sfc/test_flow_classifier.py | 378 ++++++++++++ .../tests/unit/osc/v2/sfc/test_port_chain.py | 556 ++++++++++++++++++ .../tests/unit/osc/v2/sfc/test_port_pair.py | 302 ++++++++++ .../unit/osc/v2/sfc/test_port_pair_group.py | 424 +++++++++++++ neutronclient/v2_0/client.py | 103 ++++ releasenotes/notes/add-sfc-commands.yaml | 5 + setup.cfg | 22 + 15 files changed, 3233 insertions(+) create mode 100644 doc/source/cli/osc/v2/networking-sfc.rst create mode 100644 neutronclient/osc/v2/sfc/__init__.py create mode 100755 neutronclient/osc/v2/sfc/sfc_flow_classifier.py create mode 100755 neutronclient/osc/v2/sfc/sfc_port_chain.py create mode 100755 neutronclient/osc/v2/sfc/sfc_port_pair.py create mode 100755 neutronclient/osc/v2/sfc/sfc_port_pair_group.py create mode 100644 neutronclient/tests/unit/osc/v2/sfc/__init__.py create mode 100755 neutronclient/tests/unit/osc/v2/sfc/fakes.py create mode 100755 neutronclient/tests/unit/osc/v2/sfc/test_flow_classifier.py create mode 100755 neutronclient/tests/unit/osc/v2/sfc/test_port_chain.py create mode 100755 neutronclient/tests/unit/osc/v2/sfc/test_port_pair.py create mode 100755 neutronclient/tests/unit/osc/v2/sfc/test_port_pair_group.py create mode 100644 releasenotes/notes/add-sfc-commands.yaml diff --git a/doc/source/cli/osc/v2/networking-sfc.rst b/doc/source/cli/osc/v2/networking-sfc.rst new file mode 100644 index 0000000..09d13d6 --- /dev/null +++ b/doc/source/cli/osc/v2/networking-sfc.rst @@ -0,0 +1,36 @@ +============== +networking sfc +============== + +**Service Function Chaining** is a mechanism for overriding the basic destination based forwarding +that is typical of IP networks. Service Function Chains consist of an ordered sequence of +Service Functions (SFs). SFs are virtual machines (or potentially physical devices) that perform a +network function such as firewall, content cache, packet inspection, or any other function that +requires processing of packets in a flow from point A to point B even though the SFs are not +literally between point A and B from a routing table perspective. + +Network v2 + +.. autoprogram-cliff:: openstack.neutronclient.v2 + :command: sfc flow classifier * + +.. autoprogram-cliff:: openstack.neutronclient.v2 + :command: sfc port chain * + +.. autoprogram-cliff:: openstack.neutronclient.v2 + :command: sfc port pair create + +.. autoprogram-cliff:: openstack.neutronclient.v2 + :command: sfc port pair delete + +.. autoprogram-cliff:: openstack.neutronclient.v2 + :command: sfc port pair list + +.. autoprogram-cliff:: openstack.neutronclient.v2 + :command: sfc port pair set + +.. autoprogram-cliff:: openstack.neutronclient.v2 + :command: sfc port pair show + +.. autoprogram-cliff:: openstack.neutronclient.v2 + :command: sfc port pair group * diff --git a/neutronclient/osc/v2/sfc/__init__.py b/neutronclient/osc/v2/sfc/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/neutronclient/osc/v2/sfc/sfc_flow_classifier.py b/neutronclient/osc/v2/sfc/sfc_flow_classifier.py new file mode 100755 index 0000000..38a3cbb --- /dev/null +++ b/neutronclient/osc/v2/sfc/sfc_flow_classifier.py @@ -0,0 +1,320 @@ +# Copyright (c) 2017 Huawei Technologies India Pvt.Limited. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import argparse +import logging + +from osc_lib.command import command +from osc_lib import exceptions +from osc_lib import utils + +from neutronclient._i18n import _ +from neutronclient.common import exceptions as nc_exc +from neutronclient.osc import utils as nc_osc_utils + +LOG = logging.getLogger(__name__) + +resource = 'flow_classifier' + +_attr_map = ( + ('id', 'ID', nc_osc_utils.LIST_BOTH), + ('name', 'Name', nc_osc_utils.LIST_BOTH), + ('summary', 'Summary', nc_osc_utils.LIST_SHORT_ONLY), + ('protocol', 'Protocol', nc_osc_utils.LIST_LONG_ONLY), + ('ethertype', 'Ethertype', nc_osc_utils.LIST_LONG_ONLY), + ('source_ip_prefix', 'Source IP', + nc_osc_utils.LIST_LONG_ONLY), + ('destination_ip_prefix', 'Destination IP', + nc_osc_utils.LIST_LONG_ONLY), + ('logical_source_port', 'Logical Source Port', + nc_osc_utils.LIST_LONG_ONLY), + ('logical_destination_port', 'Logical Destination Port', + nc_osc_utils.LIST_LONG_ONLY), + ('source_port_range_min', 'Source Port Range Min', + nc_osc_utils.LIST_LONG_ONLY), + ('source_port_range_max', 'Source Port Range Max', + nc_osc_utils.LIST_LONG_ONLY), + ('destination_port_range_min', 'Destination Port Range Min', + nc_osc_utils.LIST_LONG_ONLY), + ('destination_port_range_max', 'Destination Port Range Max', + nc_osc_utils.LIST_LONG_ONLY), + ('l7_parameters', 'L7 Parameters', nc_osc_utils.LIST_LONG_ONLY), + ('description', 'Description', nc_osc_utils.LIST_LONG_ONLY), + ('project_id', 'Project', nc_osc_utils.LIST_LONG_ONLY), +) + + +class CreateSfcFlowClassifier(command.ShowOne): + _description = _("Create a flow classifier") + + def get_parser(self, prog_name): + parser = super(CreateSfcFlowClassifier, self).get_parser(prog_name) + parser.add_argument( + 'name', + metavar='', + help=_('Name of the flow classifier')) + parser.add_argument( + '--description', + metavar='', + help=_('Description for the flow classifier')) + parser.add_argument( + '--protocol', + metavar='', + help=_('IP protocol name. Protocol name should be as per ' + 'IANA standard.')) + parser.add_argument( + '--ethertype', + metavar='{IPv4,IPv6}', + default='IPv4', choices=['IPv4', 'IPv6'], + help=_('L2 ethertype, default is IPv4')) + parser.add_argument( + '--source-port', + metavar=':', + help=_('Source protocol port (allowed range [1,65535]. Must be ' + 'specified as a:b, where a=min-port and b=max-port) ' + 'in the allowed range.')) + parser.add_argument( + '--destination-port', + metavar=':', + help=_('Destination protocol port (allowed range [1,65535]. Must ' + 'be specified as a:b, where a=min-port and b=max-port) ' + 'in the allowed range.')) + parser.add_argument( + '--source-ip-prefix', + metavar='', + help=_('Source IP address in CIDR notation')) + parser.add_argument( + '--destination-ip-prefix', + metavar='', + help=_('Destination IP address in CIDR notation')) + parser.add_argument( + '--logical-source-port', + metavar='', + help=_('Neutron source port (name or ID)')) + parser.add_argument( + '--logical-destination-port', + metavar='', + help=_('Neutron destination port (name or ID)')) + parser.add_argument( + '--l7-parameters', + help=_('Dictionary of L7 parameters. Currently, no value is ' + 'supported for this option.')) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + attrs = _get_common_attrs(self.app.client_manager, parsed_args) + body = {resource: attrs} + obj = client.create_flow_classifier(body)[resource] + columns, display_columns = nc_osc_utils.get_columns(obj, _attr_map) + data = utils.get_dict_properties(obj, columns) + return display_columns, data + + +class DeleteSfcFlowClassifier(command.Command): + _description = _("Delete a given flow classifier") + + def get_parser(self, prog_name): + parser = super(DeleteSfcFlowClassifier, self).get_parser(prog_name) + parser.add_argument( + 'flow_classifier', + metavar='', + help=_("Flow classifier to delete (name or ID)") + ) + return parser + + def take_action(self, parsed_args): + # TODO(mohan): Add support for deleting multiple resources. + client = self.app.client_manager.neutronclient + fc_id = _get_id(client, parsed_args.flow_classifier, resource) + try: + client.delete_flow_classifier(fc_id) + except Exception as e: + msg = (_("Failed to delete flow classifier with name " + "or ID '%(fc)s': %(e)s") + % {'fc': parsed_args.flow_classifier, 'e': e}) + raise exceptions.CommandError(msg) + + +class ListSfcFlowClassifier(command.Lister): + _description = _("List flow classifiers") + + def get_parser(self, prog_name): + parser = super(ListSfcFlowClassifier, self).get_parser(prog_name) + parser.add_argument( + '--long', + action='store_true', + help=_("List additional fields in output") + ) + return parser + + def extend_list(self, data, parsed_args): + ext_data = data['flow_classifiers'] + for d in ext_data: + val = [] + protocol = d['protocol'].upper() if d['protocol'] else 'any' + val.append('protocol: ' + protocol) + val.append(self._get_protocol_port_details(d, 'source')) + val.append(self._get_protocol_port_details(d, 'destination')) + if 'logical_source_port' in d: + val.append('neutron_source_port: ' + + str(d['logical_source_port'])) + + if 'logical_destination_port' in d: + val.append('neutron_destination_port: ' + + str(d['logical_destination_port'])) + + if 'l7_parameters' in d: + l7_param = 'l7_parameters: {%s}' % ','.join(d['l7_parameters']) + val.append(l7_param) + d['summary'] = ',\n'.join(val) + return ext_data + + def _get_protocol_port_details(self, data, val): + type_ip_prefix = val + '_ip_prefix' + ip_prefix = data.get(type_ip_prefix) + if not ip_prefix: + ip_prefix = 'any' + min_port = data.get(val + '_port_range_min') + if min_port is None: + min_port = 'any' + max_port = data.get(val + '_port_range_max') + if max_port is None: + max_port = 'any' + return '%s[port]: %s[%s:%s]' % ( + type, ip_prefix, min_port, max_port) + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + obj = client.list_flow_classifier() + obj_extend = self.extend_list(obj, parsed_args) + headers, columns = nc_osc_utils.get_column_definitions( + _attr_map, long_listing=parsed_args.long) + return (headers, (utils.get_dict_properties( + s, columns) for s in obj_extend)) + + +class SetSfcFlowClassifier(command.Command): + _description = _("Set flow classifier properties") + + def get_parser(self, prog_name): + parser = super(SetSfcFlowClassifier, self).get_parser(prog_name) + parser.add_argument( + '--name', + metavar='', + help=_('Name of the flow classifier')) + parser.add_argument( + '--description', + metavar='', + help=_('Description for the flow classifier')) + parser.add_argument( + 'flow_classifier', + metavar='', + help=_("Flow classifier to modify (name or ID)") + ) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + fc_id = _get_id(client, parsed_args.flow_classifier, resource) + attrs = _get_common_attrs(self.app.client_manager, parsed_args, + is_create=False) + body = {resource: attrs} + try: + client.update_flow_classifier(fc_id, body) + except Exception as e: + msg = (_("Failed to update flow classifier '%(fc)s': %(e)s") + % {'fc': parsed_args.flow_classifier, 'e': e}) + raise exceptions.CommandError(msg) + + +class ShowSfcFlowClassifier(command.ShowOne): + _description = _("Display flow classifier details") + + def get_parser(self, prog_name): + parser = super(ShowSfcFlowClassifier, self).get_parser(prog_name) + parser.add_argument( + 'flow_classifier', + metavar='', + help=_("Flow classifier to display (name or ID)") + ) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + fc_id = _get_id(client, parsed_args.flow_classifier, resource) + obj = client.show_flow_classifier(fc_id)[resource] + columns, display_columns = nc_osc_utils.get_columns(obj, _attr_map) + data = utils.get_dict_properties(obj, columns) + return display_columns, data + + +def _get_common_attrs(client_manager, parsed_args, is_create=True): + attrs = {} + if parsed_args.name is not None: + attrs['name'] = parsed_args.name + if parsed_args.description is not None: + attrs['description'] = parsed_args.description + if is_create: + _get_attrs(client_manager, attrs, parsed_args) + return attrs + + +def _get_attrs(client_manager, attrs, parsed_args): + if parsed_args.protocol is not None: + attrs['protocol'] = parsed_args.protocol + if parsed_args.ethertype: + attrs['ethertype'] = parsed_args.ethertype + if parsed_args.source_ip_prefix is not None: + attrs['source_ip_prefix'] = parsed_args.source_ip_prefix + if parsed_args.destination_ip_prefix is not None: + attrs['destination_ip_prefix'] = parsed_args.destination_ip_prefix + if parsed_args.logical_source_port is not None: + attrs['logical_source_port'] = _get_id( + client_manager.neutronclient, parsed_args.logical_source_port, + 'port') + if parsed_args.logical_destination_port is not None: + attrs['logical_destination_port'] = _get_id( + client_manager.neutronclient, parsed_args.logical_destination_port, + 'port') + if parsed_args.source_port is not None: + _fill_protocol_port_info(attrs, 'source', + parsed_args.source_port) + if parsed_args.destination_port is not None: + _fill_protocol_port_info(attrs, 'destination', + parsed_args.destination_port) + if parsed_args.l7_parameters is not None: + attrs['l7_parameters'] = parsed_args.l7_parameters + + +def _fill_protocol_port_info(attrs, port_type, port_val): + min_port, sep, max_port = port_val.partition(":") + if not min_port: + msg = ("Invalid port value '%s', expected format is " + "min-port:max-port or min-port.") + raise argparse.ArgumentTypeError(msg % port_val) + if not max_port: + max_port = min_port + try: + attrs[port_type + '_port_range_min'] = int(min_port) + attrs[port_type + '_port_range_max'] = int(max_port) + except ValueError: + message = (_("Protocol port value %s must be an integer " + "or integer:integer.") % port_val) + raise nc_exc.CommandError(message=message) + + +def _get_id(client, id_or_name, resource): + return client.find_resource(resource, id_or_name)['id'] diff --git a/neutronclient/osc/v2/sfc/sfc_port_chain.py b/neutronclient/osc/v2/sfc/sfc_port_chain.py new file mode 100755 index 0000000..582a6d9 --- /dev/null +++ b/neutronclient/osc/v2/sfc/sfc_port_chain.py @@ -0,0 +1,347 @@ +# Copyright (c) 2017 Huawei Technologies India Pvt.Limited. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +from osc_lib.cli import parseractions +from osc_lib.command import command +from osc_lib import exceptions +from osc_lib import utils + +from neutronclient._i18n import _ +from neutronclient.osc import utils as nc_osc_utils + +LOG = logging.getLogger(__name__) + +resource = 'port_chain' + +_attr_map = ( + ('id', 'ID', nc_osc_utils.LIST_BOTH), + ('name', 'Name', nc_osc_utils.LIST_BOTH), + ('port_pair_groups', 'Port Pair Groups', nc_osc_utils.LIST_BOTH), + ('flow_classifiers', 'Flow Classifiers', + nc_osc_utils.LIST_BOTH), + ('chain_parameters', 'Chain Parameters', + nc_osc_utils.LIST_BOTH), + ('description', 'Description', nc_osc_utils.LIST_LONG_ONLY), + ('chain_id', 'Chain ID', nc_osc_utils.LIST_BOTH), + ('project_id', 'Project', nc_osc_utils.LIST_LONG_ONLY), +) + + +class CreateSfcPortChain(command.ShowOne): + _description = _("Create a port chain") + + def get_parser(self, prog_name): + parser = super(CreateSfcPortChain, self).get_parser(prog_name) + parser.add_argument( + 'name', + metavar='', + help=_('Name of the port chain')) + parser.add_argument( + '--description', + metavar='', + help=_('Description for the port chain')) + parser.add_argument( + '--flow-classifier', + default=[], + metavar='', + dest='flow_classifiers', + action='append', + help=_('Add flow classifier (name or ID). ' + 'This option can be repeated.')) + parser.add_argument( + '--chain-parameters', + metavar='correlation=,symmetric=', + action=parseractions.MultiKeyValueAction, + optional_keys=['correlation', 'symmetric'], + help=_('Dictionary of chain parameters. Supports ' + 'correlation=mpls and symmetric=true|false.')) + parser.add_argument( + '--port-pair-group', + metavar='', + dest='port_pair_groups', + required=True, + action='append', + help=_('Port pair group (name or ID). ' + 'This option can be repeated.')) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + attrs = _get_common_attrs(self.app.client_manager, parsed_args) + body = {resource: attrs} + obj = client.create_port_chain(body)[resource] + columns, display_columns = nc_osc_utils.get_columns(obj, _attr_map) + data = utils.get_dict_properties(obj, columns) + return display_columns, data + + +class DeleteSfcPortChain(command.Command): + _description = _("Delete a given port chain") + + def get_parser(self, prog_name): + parser = super(DeleteSfcPortChain, self).get_parser(prog_name) + parser.add_argument( + 'port_chain', + metavar="", + help=_("Port chain to delete (name or ID)") + ) + return parser + + def take_action(self, parsed_args): + # TODO(mohan): Add support for deleting multiple resources. + client = self.app.client_manager.neutronclient + pc_id = _get_id(client, parsed_args.port_chain, resource) + try: + client.delete_port_chain(pc_id) + except Exception as e: + msg = (_("Failed to delete port chain with name " + "or ID '%(pc)s': %(e)s") + % {'pc': parsed_args.port_chain, 'e': e}) + raise exceptions.CommandError(msg) + + +class ListSfcPortChain(command.Lister): + _description = _("List port chains") + + def get_parser(self, prog_name): + parser = super(ListSfcPortChain, self).get_parser(prog_name) + parser.add_argument( + '--long', + action='store_true', + default=False, + help=_("List additional fields in output") + ) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + data = client.list_port_chain() + headers, columns = nc_osc_utils.get_column_definitions( + _attr_map, long_listing=parsed_args.long) + return (headers, + (utils.get_dict_properties(s, columns) + for s in data['port_chains'])) + + +class SetSfcPortChain(command.Command): + _description = _("Set port chain properties") + + def get_parser(self, prog_name): + parser = super(SetSfcPortChain, self).get_parser(prog_name) + parser.add_argument( + '--name', + metavar='', + help=_('Name of the port chain')) + parser.add_argument( + '--description', + metavar='', + help=_('Description for the port chain')) + parser.add_argument( + '--flow-classifier', + metavar='', + dest='flow_classifiers', + action='append', + help=_('Add flow classifier (name or ID). ' + 'This option can be repeated.')) + parser.add_argument( + '--no-flow-classifier', + action='store_true', + help=_('Associate no flow classifier with the port chain')) + parser.add_argument( + '--port-pair-group', + metavar='', + dest='port_pair_groups', + action='append', + help=_('Add port pair group (name or ID). ' + 'This option can be repeated.')) + parser.add_argument( + '--no-port-pair-group', + action='store_true', + help=_('Remove associated port pair group from the port chain.' + 'At least one --port-pair-group must be specified ' + 'together.')) + parser.add_argument( + 'port_chain', + metavar='', + help=_("Port chain to modify (name or ID)")) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + pc_id = _get_id(client, parsed_args.port_chain, resource) + attrs = _get_common_attrs(self.app.client_manager, parsed_args, + is_create=False) + if parsed_args.no_flow_classifier: + attrs['flow_classifiers'] = [] + if parsed_args.flow_classifiers: + for fc in parsed_args.flow_classifiers: + added = [client.find_resource( + 'flow_classifier', fc, + cmd_resource='sfc_flow_classifier')['id']] + if parsed_args.no_flow_classifier: + existing = [] + else: + existing = [client.find_resource( + resource, parsed_args.port_chain, + cmd_resource='sfc_port_chain')['flow_classifiers']] + attrs['flow_classifiers'] = sorted(list( + set(existing) | set(added))) + if (parsed_args.no_port_pair_group and not + parsed_args.port_pair_groups): + message = _('At least one --port-pair-group must be specified.') + raise exceptions.CommandError(message) + if parsed_args.no_port_pair_group and parsed_args.port_pair_groups: + for ppg in parsed_args.port_pair_groups: + attrs['port_pair_groups'] = [client.find_resource( + 'port_pair_group', ppg, + cmd_resource='sfc_port_pair_group')['id']] + if (parsed_args.port_pair_groups and + not parsed_args.no_port_pair_group): + existing_ppg = [client.find_resource( + resource, parsed_args.port_chain, + cmd_resource='sfc_port_chain')['port_pair_groups']] + for ppg in parsed_args.port_pair_groups: + existing_ppg.append(client.find_resource( + 'port_pair_group', ppg, + cmd_resource='sfc_port_pair_group')['id']) + attrs['port_pair_groups'] = sorted(list(set(existing_ppg))) + body = {resource: attrs} + try: + client.update_port_chain(pc_id, body) + except Exception as e: + msg = (_("Failed to update port chain '%(pc)s': %(e)s") + % {'pc': parsed_args.port_chain, 'e': e}) + raise exceptions.CommandError(msg) + + +class ShowSfcPortChain(command.ShowOne): + _description = _("Display port chain details") + + def get_parser(self, prog_name): + parser = super(ShowSfcPortChain, self).get_parser(prog_name) + parser.add_argument( + 'port_chain', + metavar="", + help=_("Port chain to display (name or ID)") + ) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + pc_id = _get_id(client, parsed_args.port_chain, resource) + obj = client.show_port_chain(pc_id)[resource] + columns, display_columns = nc_osc_utils.get_columns(obj, _attr_map) + data = utils.get_dict_properties(obj, columns) + return display_columns, data + + +class UnsetSfcPortChain(command.Command): + _description = _("Unset port chain properties") + + def get_parser(self, prog_name): + parser = super(UnsetSfcPortChain, self).get_parser(prog_name) + parser.add_argument( + 'port_chain', + metavar='', + help=_("Port chain to unset (name or ID)")) + port_chain = parser.add_mutually_exclusive_group() + port_chain.add_argument( + '--flow-classifier', + action='append', + metavar='', + dest='flow_classifiers', + help=_('Remove flow classifier(s) from the port chain ' + '(name or ID). This option can be repeated.')) + port_chain.add_argument( + '--all-flow-classifier', + action='store_true', + help=_('Remove all flow classifiers from the port chain')) + parser.add_argument( + '--port-pair-group', + metavar='', + dest='port_pair_groups', + action='append', + help=_('Remove port pair group(s) from the port chain ' + '(name or ID). This option can be repeated.')) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + pc_id = _get_id(client, parsed_args.port_chain, resource) + attrs = {} + if parsed_args.flow_classifiers: + existing = [client.find_resource( + resource, parsed_args.port_chain, + cmd_resource='sfc_port_chain')['flow_classifiers']] + for fc in parsed_args.flow_classifiers: + removed = [client.find_resource( + 'flow_classifier', fc, + cmd_resource='sfc_flow_classifier')['id']] + attrs['flow_classifiers'] = list(set(existing) - set(removed)) + if parsed_args.all_flow_classifier: + attrs['flow_classifiers'] = [] + if parsed_args.port_pair_groups: + existing_ppg = [client.find_resource( + resource, parsed_args.port_chain, + cmd_resource='sfc_port_chain')['port_pair_groups']] + for ppg in parsed_args.port_pair_groups: + removed_ppg = [client.find_resource( + 'port_pair_group', ppg, + cmd_resource='sfc_port_pair_group')['id']] + attrs['port_pair_groups'] = list(set(existing_ppg) - + set(removed_ppg)) + if attrs['port_pair_groups'] == []: + message = _('At least one --port-pair-group must be' + ' specified.') + raise exceptions.CommandError(message) + body = {resource: attrs} + try: + client.update_port_chain(pc_id, body) + except Exception as e: + msg = (_("Failed to unset port chain '%(pc)s': %(e)s") + % {'pc': parsed_args.port_chain, 'e': e}) + raise exceptions.CommandError(msg) + + +def _get_common_attrs(client_manager, parsed_args, is_create=True): + attrs = {} + if parsed_args.name is not None: + attrs['name'] = parsed_args.name + if parsed_args.description is not None: + attrs['description'] = parsed_args.description + if ('port_pair_groups' in parsed_args and + parsed_args.port_pair_groups is not None): + attrs['port_pair_groups'] = [(_get_id(client_manager.neutronclient, + ppg, 'port_pair_group')) + for ppg in parsed_args.port_pair_groups] + if ('flow_classifiers' in parsed_args and + parsed_args.flow_classifiers is not None): + attrs['flow_classifiers'] = [(_get_id(client_manager.neutronclient, fc, + 'flow_classifier')) + for fc in parsed_args.flow_classifiers] + if is_create is True: + _get_attrs(attrs, parsed_args) + return attrs + + +def _get_attrs(attrs, parsed_args): + if 'chain_parameters' in parsed_args: + attrs['chain_parameters'] = parsed_args.chain_parameters + + +def _get_id(client, id_or_name, resource): + return client.find_resource(resource, id_or_name)['id'] diff --git a/neutronclient/osc/v2/sfc/sfc_port_pair.py b/neutronclient/osc/v2/sfc/sfc_port_pair.py new file mode 100755 index 0000000..26f369c --- /dev/null +++ b/neutronclient/osc/v2/sfc/sfc_port_pair.py @@ -0,0 +1,215 @@ +# Copyright (c) 2017 Huawei Technologies India Pvt.Limited. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +from osc_lib.cli import parseractions +from osc_lib.command import command +from osc_lib import exceptions +from osc_lib import utils + +from neutronclient._i18n import _ +from neutronclient.osc import utils as nc_osc_utils + +LOG = logging.getLogger(__name__) + +resource = 'port_pair' + +_attr_map = ( + ('id', 'ID', nc_osc_utils.LIST_BOTH), + ('name', 'Name', nc_osc_utils.LIST_BOTH), + ('ingress', 'Ingress Logical Port', nc_osc_utils.LIST_BOTH), + ('egress', 'Egress Logical Port', nc_osc_utils.LIST_BOTH), + ('service_function_parameters', 'Service Function Parameters', + nc_osc_utils.LIST_LONG_ONLY), + ('description', 'Description', nc_osc_utils.LIST_LONG_ONLY), + ('project_id', 'Project', nc_osc_utils.LIST_LONG_ONLY), +) + + +class CreateSfcPortPair(command.ShowOne): + _description = _("Create a port pair") + + def get_parser(self, prog_name): + parser = super(CreateSfcPortPair, self).get_parser(prog_name) + parser.add_argument( + 'name', + metavar='', + help=_('Name of the port pair')) + parser.add_argument( + '--description', + metavar='', + help=_('Description for the port pair')) + parser.add_argument( + '--service-function-parameters', + metavar='correlation=,weight=', + action=parseractions.MultiKeyValueAction, + optional_keys=['correlation', 'weight'], + help=_('Dictionary of service function parameters. ' + 'Currently, only correlation=None and weight ' + 'is supported. Weight is an integer that influences ' + 'the selection of a port pair within a port pair group ' + 'for a flow. The higher the weight, the more flows will ' + 'hash to the port pair. The default weight is 1.')) + parser.add_argument( + '--ingress', + metavar='', + required=True, + help=_('Ingress neutron port (name or ID)')) + parser.add_argument( + '--egress', + metavar='', + required=True, + help=_('Egress neutron port (name or ID)')) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + attrs = _get_common_attrs(self.app.client_manager, parsed_args) + body = {resource: attrs} + obj = client.create_port_pair(body)[resource] + columns, display_columns = nc_osc_utils.get_columns(obj, _attr_map) + data = utils.get_dict_properties(obj, columns) + return display_columns, data + + +class DeleteSfcPortPair(command.Command): + _description = _("Delete a given port pair") + + def get_parser(self, prog_name): + parser = super(DeleteSfcPortPair, self).get_parser(prog_name) + parser.add_argument( + 'port_pair', + metavar="", + help=_("Port pair to delete (name or ID)") + ) + return parser + + def take_action(self, parsed_args): + # TODO(mohan): Add support for deleting multiple resources. + client = self.app.client_manager.neutronclient + port_pair_id = _get_id(client, parsed_args.port_pair, resource) + try: + client.delete_port_pair(port_pair_id) + except Exception as e: + msg = (_("Failed to delete port pair with name " + "or ID '%(port_pair)s': %(e)s") + % {'port_pair': parsed_args.port_pair, 'e': e}) + raise exceptions.CommandError(msg) + + +class ListSfcPortPair(command.Lister): + _description = _("List port pairs") + + def get_parser(self, prog_name): + parser = super(ListSfcPortPair, self).get_parser(prog_name) + parser.add_argument( + '--long', + action='store_true', + help=_("List additional fields in output") + ) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + data = client.list_port_pair() + headers, columns = nc_osc_utils.get_column_definitions( + _attr_map, long_listing=parsed_args.long) + return (headers, + (utils.get_dict_properties( + s, columns, + ) for s in data['port_pairs'])) + + +class SetSfcPortPair(command.Command): + _description = _("Set port pair properties") + + def get_parser(self, prog_name): + parser = super(SetSfcPortPair, self).get_parser(prog_name) + parser.add_argument( + '--name', + metavar='', + help=_('Name of the port pair')) + parser.add_argument( + '--description', + metavar='', + help=_('Description for the port pair')) + parser.add_argument( + 'port_pair', + metavar='', + help=_("Port pair to modify (name or ID)") + ) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + port_pair_id = _get_id(client, parsed_args.port_pair, resource) + attrs = _get_common_attrs(self.app.client_manager, parsed_args, + is_create=False) + body = {resource: attrs} + try: + client.update_port_pair(port_pair_id, body) + except Exception as e: + msg = (_("Failed to update port pair '%(port_pair)s': %(e)s") + % {'port_pair': parsed_args.port_pair, 'e': e}) + raise exceptions.CommandError(msg) + + +class ShowSfcPortPair(command.ShowOne): + _description = _("Display port pair details") + + def get_parser(self, prog_name): + parser = super(ShowSfcPortPair, self).get_parser(prog_name) + parser.add_argument( + 'port_pair', + metavar='', + help=_("Port pair to display (name or ID)") + ) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + port_pair_id = _get_id(client, parsed_args.port_pair, resource) + obj = client.show_port_pair(port_pair_id)[resource] + columns, display_columns = nc_osc_utils.get_columns(obj, _attr_map) + data = utils.get_dict_properties(obj, columns) + return display_columns, data + + +def _get_common_attrs(client_manager, parsed_args, is_create=True): + attrs = {} + if parsed_args.name is not None: + attrs['name'] = parsed_args.name + if parsed_args.description is not None: + attrs['description'] = parsed_args.description + if is_create: + _get_attrs(client_manager, attrs, parsed_args) + return attrs + + +def _get_attrs(client_manager, attrs, parsed_args): + if parsed_args.ingress is not None: + attrs['ingress'] = _get_id(client_manager.neutronclient, + parsed_args.ingress, 'port') + if parsed_args.egress is not None: + attrs['egress'] = _get_id(client_manager.neutronclient, + parsed_args.egress, 'port') + if 'service_function_parameters' in parsed_args: + attrs['service_function_parameters'] = ( + parsed_args.service_function_parameters) + + +def _get_id(client, id_or_name, resource): + return client.find_resource(resource, id_or_name)['id'] diff --git a/neutronclient/osc/v2/sfc/sfc_port_pair_group.py b/neutronclient/osc/v2/sfc/sfc_port_pair_group.py new file mode 100755 index 0000000..367dab1 --- /dev/null +++ b/neutronclient/osc/v2/sfc/sfc_port_pair_group.py @@ -0,0 +1,291 @@ +# Copyright (c) 2017 Huawei Technologies India Pvt.Limited. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +from osc_lib.cli import parseractions +from osc_lib.command import command +from osc_lib import exceptions +from osc_lib import utils + +from neutronclient._i18n import _ +from neutronclient.osc import utils as nc_osc_utils + +LOG = logging.getLogger(__name__) + +resource = 'port_pair_group' + +_attr_map = ( + ('id', 'ID', nc_osc_utils.LIST_BOTH), + ('name', 'Name', nc_osc_utils.LIST_BOTH), + ('port_pairs', 'Port Pair', nc_osc_utils.LIST_BOTH), + ('port_pair_group_parameters', 'Port Pair Group Parameters', + nc_osc_utils.LIST_BOTH), + ('description', 'Description', nc_osc_utils.LIST_LONG_ONLY), + ('group_id', 'Loadbalance ID', nc_osc_utils.LIST_LONG_ONLY), + ('project_id', 'Project', nc_osc_utils.LIST_LONG_ONLY), +) + + +class CreateSfcPortPairGroup(command.ShowOne): + _description = _("Create a port pair group") + + def get_parser(self, prog_name): + parser = super(CreateSfcPortPairGroup, self).get_parser(prog_name) + parser.add_argument( + 'name', + metavar='', + help=_('Name of the port pair group')) + parser.add_argument( + '--description', + metavar='', + help=_('Description for the port pair group')) + parser.add_argument( + '--port-pair', + metavar='', + dest='port_pairs', + default=[], + action='append', + help=_('Port pair (name or ID). ' + 'This option can be repeated.')) + parser.add_argument( + '--port-pair-group-parameters', + metavar='lb-fields=', + action=parseractions.KeyValueAction, + help=_('Dictionary of port pair group parameters. ' + 'Currently only one parameter lb-fields is supported. ' + ' is a & separated list of load-balancing ' + 'fields.')) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + attrs = _get_common_attrs(self.app.client_manager, parsed_args) + body = {resource: attrs} + obj = client.create_port_pair_group(body)[resource] + columns, display_columns = nc_osc_utils.get_columns(obj, _attr_map) + data = utils.get_dict_properties(obj, columns) + return display_columns, data + + +class DeleteSfcPortPairGroup(command.Command): + _description = _("Delete a given port pair group") + + def get_parser(self, prog_name): + parser = super(DeleteSfcPortPairGroup, self).get_parser(prog_name) + parser.add_argument( + 'port_pair_group', + metavar='', + help=_("Port pair group to delete (name or ID)") + ) + return parser + + def take_action(self, parsed_args): + # TODO(mohan): Add support for deleting multiple resources. + client = self.app.client_manager.neutronclient + ppg_id = _get_id(client, parsed_args.port_pair_group, resource) + try: + client.delete_port_pair_group(ppg_id) + except Exception as e: + msg = (_("Failed to delete port pair group with name " + "or ID '%(ppg)s': %(e)s") + % {'ppg': parsed_args.port_pair_group, 'e': e}) + raise exceptions.CommandError(msg) + + +class ListSfcPortPairGroup(command.Lister): + _description = _("List port pair group") + + def get_parser(self, prog_name): + parser = super(ListSfcPortPairGroup, self).get_parser(prog_name) + parser.add_argument( + '--long', + action='store_true', + default=False, + help=_("List additional fields in output") + ) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + data = client.list_port_pair_group() + headers, columns = nc_osc_utils.get_column_definitions( + _attr_map, long_listing=parsed_args.long) + return (headers, + (utils.get_dict_properties( + s, columns, + ) for s in data['port_pair_groups'])) + + +class SetSfcPortPairGroup(command.Command): + _description = _("Set port pair group properties") + + def get_parser(self, prog_name): + parser = super(SetSfcPortPairGroup, self).get_parser(prog_name) + parser.add_argument( + 'port_pair_group', + metavar='', + help=_("Port pair group to modify (name or ID)")) + parser.add_argument( + '--name', + metavar='', + help=_('Name of the port pair group')) + parser.add_argument( + '--description', + metavar='', + help=_('Description for the port pair group')) + parser.add_argument( + '--port-pair', + metavar='', + dest='port_pairs', + default=[], + action='append', + help=_('Port pair (name or ID). ' + 'This option can be repeated.')) + parser.add_argument( + '--no-port-pair', + action='store_true', + help=_('Remove all port pair from port pair group')) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + ppg_id = _get_id(client, parsed_args.port_pair_group, resource) + attrs = _get_common_attrs(self.app.client_manager, parsed_args, + is_create=False) + if parsed_args.no_port_pair: + attrs['port_pairs'] = [] + if parsed_args.port_pairs: + added = [client.find_resource('port_pair', pp, + cmd_resource='sfc_port_pair')['id'] + for pp in parsed_args.port_pairs] + if parsed_args.no_port_pair: + existing = [] + else: + existing = [client.find_resource( + resource, parsed_args.port_pair_group, + cmd_resource='sfc_port_pair_group')['port_pairs']] + attrs['port_pairs'] = sorted(list(set(existing) | set(added))) + body = {resource: attrs} + try: + client.update_port_pair_group(ppg_id, body) + except Exception as e: + msg = (_("Failed to update port pair group '%(ppg)s': %(e)s") + % {'ppg': parsed_args.port_pair_group, 'e': e}) + raise exceptions.CommandError(msg) + + +class ShowSfcPortPairGroup(command.ShowOne): + _description = _("Display port pair group details") + + def get_parser(self, prog_name): + parser = super(ShowSfcPortPairGroup, self).get_parser(prog_name) + parser.add_argument( + 'port_pair_group', + metavar='', + help=_("Port pair group to display (name or ID)") + ) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + ppg_id = _get_id(client, parsed_args.port_pair_group, resource) + obj = client.show_port_pair_group(ppg_id)[resource] + columns, display_columns = nc_osc_utils.get_columns(obj, _attr_map) + data = utils.get_dict_properties(obj, columns) + return display_columns, data + + +class UnsetSfcPortPairGroup(command.Command): + _description = _("Unset port pairs from port pair group") + + def get_parser(self, prog_name): + parser = super(UnsetSfcPortPairGroup, self).get_parser(prog_name) + parser.add_argument( + 'port_pair_group', + metavar='', + help=_("Port pair group to unset (name or ID)")) + port_pair_group = parser.add_mutually_exclusive_group() + port_pair_group.add_argument( + '--port-pair', + action='append', + metavar='', + dest='port_pairs', + help=_('Remove port pair(s) from the port pair group ' + '(name or ID). This option can be repeated.')) + port_pair_group.add_argument( + '--all-port-pair', + action='store_true', + help=_('Remove all port pairs from the port pair group')) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + ppg_id = _get_id(client, parsed_args.port_pair_group, resource) + attrs = {} + if parsed_args.port_pairs: + existing = [client.find_resource( + resource, parsed_args.port_pair_group, + cmd_resource='sfc_port_pair_group')['port_pairs']] + for pp in parsed_args.port_pairs: + removed = [client.find_resource( + 'port_pair', pp, cmd_resource='sfc_port_pair')['id']] + attrs['port_pairs'] = list(set(existing) - set(removed)) + if parsed_args.all_port_pair: + attrs['port_pairs'] = [] + body = {resource: attrs} + try: + client.update_port_pair_group(ppg_id, body) + except Exception as e: + msg = (_("Failed to unset port pair group '%(ppg)s': %(e)s") + % {'ppg': parsed_args.port_pair_group, 'e': e}) + raise exceptions.CommandError(msg) + + +def _get_ppg_param(attrs, ppg): + attrs['port_pair_group_parameters'] = {} + for key, value in ppg.items(): + if key == 'lb_fields': + attrs['port_pair_group_parameters'][key] = ([ + field for field in value.split('&') if field]) + else: + attrs['port_pair_group_parameters'][key] = value + return attrs['port_pair_group_parameters'] + + +def _get_common_attrs(client_manager, parsed_args, is_create=True): + attrs = {} + if parsed_args.name is not None: + attrs['name'] = parsed_args.name + if parsed_args.description is not None: + attrs['description'] = parsed_args.description + if parsed_args.port_pairs: + attrs['port_pairs'] = [(_get_id(client_manager.neutronclient, pp, + 'port_pair')) + for pp in parsed_args.port_pairs] + if is_create: + _get_attrs(attrs, parsed_args) + return attrs + + +def _get_attrs(attrs, parsed_args): + if ('port_pair_group_parameters' in parsed_args and + parsed_args.port_pair_group_parameters is not None): + attrs['port_pair_group_parameters'] = ( + _get_ppg_param(attrs, parsed_args.port_pair_group_parameters)) + + +def _get_id(client, id_or_name, resource): + return client.find_resource(resource, id_or_name)['id'] diff --git a/neutronclient/tests/unit/osc/v2/sfc/__init__.py b/neutronclient/tests/unit/osc/v2/sfc/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/neutronclient/tests/unit/osc/v2/sfc/fakes.py b/neutronclient/tests/unit/osc/v2/sfc/fakes.py new file mode 100755 index 0000000..54bd5a8 --- /dev/null +++ b/neutronclient/tests/unit/osc/v2/sfc/fakes.py @@ -0,0 +1,234 @@ +# Copyright (c) 2017 Huawei Technologies India Pvt.Limited. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import argparse +import copy + +import mock + +from osc_lib.tests import utils +from oslo_utils import uuidutils + + +class TestNeutronClientOSCV2(utils.TestCommand): + + def setUp(self): + super(TestNeutronClientOSCV2, self).setUp() + self.namespace = argparse.Namespace() + self.app.client_manager.session = mock.Mock() + self.app.client_manager.neutronclient = mock.Mock() + self.neutronclient = self.app.client_manager.neutronclient + self.neutronclient.find_resource = mock.Mock( + side_effect=lambda resource, name_or_id, project_id=None, + cmd_resource=None, parent_id=None, fields=None: + {'id': name_or_id}) + + +class FakeSfcPortPair(object): + """Fake port pair attributes.""" + + @staticmethod + def create_port_pair(attrs=None): + """Create a fake port pair. + + :param Dictionary attrs: + A dictionary with all attributes + :return: + A Dictionary with id, name, description, ingress, egress, + service-function-parameter, project_id + """ + attrs = attrs or {} + + # Set default attributes. + port_pair_attrs = { + 'description': 'description', + 'egress': uuidutils.generate_uuid(), + 'id': uuidutils.generate_uuid(), + 'ingress': uuidutils.generate_uuid(), + 'name': 'port-pair-name', + 'service_function_parameters': 'correlation=None,weight=1', + 'project_id': uuidutils.generate_uuid(), + } + + # Overwrite default attributes. + port_pair_attrs.update(attrs) + return copy.deepcopy(port_pair_attrs) + + @staticmethod + def create_port_pairs(attrs=None, count=1): + """Create multiple port_pairs. + + :param Dictionary attrs: + A dictionary with all attributes + :param int count: + The number of port_pairs to fake + :return: + A list of dictionaries faking the port_pairs + """ + port_pairs = [] + for _ in range(count): + port_pairs.append(FakeSfcPortPair.create_port_pair(attrs)) + + return port_pairs + + +class FakeSfcPortPairGroup(object): + """Fake port pair group attributes.""" + + @staticmethod + def create_port_pair_group(attrs=None): + """Create a fake port pair group. + + :param Dictionary attrs: + A dictionary with all attributes + :return: + A Dictionary with id, name, description, port_pairs, group_id + port_pair_group_parameters, project_id + """ + attrs = attrs or {} + + # Set default attributes. + port_pair_group_attrs = { + 'id': uuidutils.generate_uuid(), + 'group_id': uuidutils.generate_uuid(), + 'name': 'port-pair-group-name', + 'description': 'description', + 'port_pairs': uuidutils.generate_uuid(), + 'port_pair_group_parameters': '{"lb_fields": []}', + 'project_id': uuidutils.generate_uuid() + } + + # port_pair_group_attrs default attributes. + port_pair_group_attrs.update(attrs) + return copy.deepcopy(port_pair_group_attrs) + + @staticmethod + def create_port_pair_groups(attrs=None, count=1): + """Create multiple port pair groups. + + :param Dictionary attrs: + A dictionary with all attributes + :param int count: + The number of port_pair_groups to fake + :return: + A list of dictionaries faking the port pair groups + """ + port_pair_groups = [] + for _ in range(count): + port_pair_groups.append( + FakeSfcPortPairGroup.create_port_pair_group(attrs)) + + return port_pair_groups + + +class FakeSfcFlowClassifier(object): + """Fake flow classifier attributes.""" + + @staticmethod + def create_flow_classifier(attrs=None): + """Create a fake flow classifier. + + :param Dictionary attrs: + A dictionary with all attributes + :return: + A Dictionary with faking port chain attributes + """ + attrs = attrs or {} + + # Set default attributes. + flow_classifier_attrs = { + 'id': uuidutils.generate_uuid(), + 'destination_ip_prefix': '2.2.2.2/32', + 'destination_port_range_max': '90', + 'destination_port_range_min': '80', + 'ethertype': 'IPv4', + 'logical_destination_port': uuidutils.generate_uuid(), + 'logical_source_port': uuidutils.generate_uuid(), + 'name': 'flow-classifier-name', + 'description': 'fc_description', + 'protocol': 'tcp', + 'source_ip_prefix': '1.1.1.1/32', + 'source_port_range_max': '20', + 'source_port_range_min': '10', + 'project_id': uuidutils.generate_uuid(), + 'l7_parameters': '{}' + } + flow_classifier_attrs.update(attrs) + return copy.deepcopy(flow_classifier_attrs) + + @staticmethod + def create_flow_classifiers(attrs=None, count=1): + """Create multiple flow classifiers. + + :param Dictionary attrs: + A dictionary with all attributes + :param int count: + The number of flow classifiers to fake + :return: + A list of dictionaries faking the flow classifiers + """ + flow_classifiers = [] + for _ in range(count): + flow_classifiers.append( + FakeSfcFlowClassifier.create_flow_classifier(attrs)) + + return flow_classifiers + + +class FakeSfcPortChain(object): + """Fake port chain attributes.""" + + @staticmethod + def create_port_chain(attrs=None): + """Create a fake port chain. + + :param Dictionary attrs: + A dictionary with all attributes + :return: + A Dictionary with faking port chain attributes + """ + attrs = attrs or {} + + # Set default attributes. + port_chain_attrs = { + 'id': uuidutils.generate_uuid(), + 'chain_id': uuidutils.generate_uuid(), + 'name': 'port-chain-name', + 'description': 'description', + 'port_pair_groups': uuidutils.generate_uuid(), + 'flow_classifiers': uuidutils.generate_uuid(), + 'chain_parameters': '{"correlation": mpls}', + 'project_id': uuidutils.generate_uuid(), + } + + # port_pair_group_attrs default attributes. + port_chain_attrs.update(attrs) + return copy.deepcopy(port_chain_attrs) + + @staticmethod + def create_port_chains(attrs=None, count=1): + """Create multiple port chains. + + :param Dictionary attrs: + A dictionary with all attributes + :param int count: + The number of port chains to fake + :return: + A list of dictionaries faking the port chains. + """ + port_chains = [] + for _ in range(count): + port_chains.append(FakeSfcPortChain.create_port_chain(attrs)) + return port_chains diff --git a/neutronclient/tests/unit/osc/v2/sfc/test_flow_classifier.py b/neutronclient/tests/unit/osc/v2/sfc/test_flow_classifier.py new file mode 100755 index 0000000..fff11b3 --- /dev/null +++ b/neutronclient/tests/unit/osc/v2/sfc/test_flow_classifier.py @@ -0,0 +1,378 @@ +# Copyright (c) 2017 Huawei Technologies India Pvt.Limited. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock + +from neutronclient.osc.v2.sfc import sfc_flow_classifier +from neutronclient.tests.unit.osc.v2.sfc import fakes + +get_id = 'neutronclient.osc.v2.sfc.sfc_flow_classifier._get_id' + + +def _get_id(client, id_or_name, resource): + return id_or_name + + +class TestCreateSfcFlowClassifier(fakes.TestNeutronClientOSCV2): + + _fc = fakes.FakeSfcFlowClassifier.create_flow_classifier() + + columns = ('Description', + 'Destination IP', + 'Destination Port Range Max', + 'Destination Port Range Min', + 'Ethertype', + 'ID', + 'L7 Parameters', + 'Logical Destination Port', + 'Logical Source Port', + 'Name', + 'Project', + 'Protocol', + 'Source IP', + 'Source Port Range Max', + 'Source Port Range Min') + + def get_data(self): + return ( + self._fc['description'], + self._fc['destination_ip_prefix'], + self._fc['destination_port_range_max'], + self._fc['destination_port_range_min'], + self._fc['ethertype'], + self._fc['id'], + self._fc['l7_parameters'], + self._fc['logical_destination_port'], + self._fc['logical_source_port'], + self._fc['name'], + self._fc['project_id'], + self._fc['protocol'], + self._fc['source_ip_prefix'], + self._fc['source_port_range_max'], + self._fc['source_port_range_min'] + ) + + def setUp(self): + super(TestCreateSfcFlowClassifier, self).setUp() + mock.patch(get_id, new=_get_id).start() + self.neutronclient.create_flow_classifier = mock.Mock( + return_value={'flow_classifier': self._fc}) + self.data = self.get_data() + + # Get the command object to test + self.cmd = sfc_flow_classifier.CreateSfcFlowClassifier(self.app, + self.namespace) + + def test_create_flow_classifier_default_options(self): + arglist = [ + "--logical-source-port", self._fc['logical_source_port'], + "--ethertype", self._fc['ethertype'], + self._fc['name'], + ] + verifylist = [ + ('logical_source_port', self._fc['logical_source_port']), + ('ethertype', self._fc['ethertype']), + ('name', self._fc['name']), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = (self.cmd.take_action(parsed_args)) + + self.neutronclient.create_flow_classifier.assert_called_once_with({ + 'flow_classifier': { + 'name': self._fc['name'], + 'logical_source_port': self._fc['logical_source_port'], + 'ethertype': self._fc['ethertype']} + }) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + def test_create_flow_classifier(self): + arglist = [ + "--description", self._fc['description'], + "--ethertype", self._fc['ethertype'], + "--protocol", self._fc['protocol'], + "--source-ip-prefix", self._fc['source_ip_prefix'], + "--destination-ip-prefix", self._fc['destination_ip_prefix'], + "--logical-source-port", self._fc['logical_source_port'], + "--logical-destination-port", self._fc['logical_destination_port'], + self._fc['name'], + "--l7-parameters", 'url=path', + ] + + param = 'url=path' + + verifylist = [ + ('description', self._fc['description']), + ('name', self._fc['name']), + ('ethertype', self._fc['ethertype']), + ('protocol', self._fc['protocol']), + ('source_ip_prefix', self._fc['source_ip_prefix']), + ('destination_ip_prefix', self._fc['destination_ip_prefix']), + ('logical_source_port', self._fc['logical_source_port']), + ('logical_destination_port', + self._fc['logical_destination_port']), + ('l7_parameters', param) + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = (self.cmd.take_action(parsed_args)) + self.neutronclient.create_flow_classifier.assert_called_once_with({ + 'flow_classifier': { + 'name': self._fc['name'], + 'description': self._fc['description'], + 'ethertype': self._fc['ethertype'], + 'protocol': self._fc['protocol'], + 'source_ip_prefix': self._fc['source_ip_prefix'], + 'destination_ip_prefix': self._fc['destination_ip_prefix'], + 'logical_source_port': self._fc['logical_source_port'], + 'logical_destination_port': + self._fc['logical_destination_port'], + 'l7_parameters': param + } + }) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + +class TestDeleteSfcFlowClassifier(fakes.TestNeutronClientOSCV2): + + _flow_classifier = \ + fakes.FakeSfcFlowClassifier.create_flow_classifiers(count=1) + + def setUp(self): + super(TestDeleteSfcFlowClassifier, self).setUp() + mock.patch(get_id, new=_get_id).start() + self.neutronclient.delete_flow_classifier = mock.Mock( + return_value=None) + self.cmd = sfc_flow_classifier.DeleteSfcFlowClassifier(self.app, + self.namespace) + + def test_delete_flow_classifier(self): + client = self.app.client_manager.neutronclient + mock_flow_classifier_delete = client.delete_flow_classifier + arglist = [ + self._flow_classifier[0]['id'], + ] + verifylist = [ + ('flow_classifier', self._flow_classifier[0]['id']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + mock_flow_classifier_delete.assert_called_once_with( + self._flow_classifier[0]['id']) + self.assertIsNone(result) + + +class TestSetSfcFlowClassifier(fakes.TestNeutronClientOSCV2): + _flow_classifier = fakes.FakeSfcFlowClassifier.create_flow_classifier() + _flow_classifier_name = _flow_classifier['name'] + _flow_classifier_id = _flow_classifier['id'] + + def setUp(self): + super(TestSetSfcFlowClassifier, self).setUp() + mock.patch(get_id, new=_get_id).start() + self.neutronclient.update_flow_classifier = mock.Mock( + return_value=None) + self.cmd = sfc_flow_classifier.SetSfcFlowClassifier(self.app, + self.namespace) + + def test_set_flow_classifier(self): + client = self.app.client_manager.neutronclient + mock_flow_classifier_update = client.update_flow_classifier + arglist = [ + self._flow_classifier_name, + '--name', 'name_updated', + '--description', 'desc_updated' + ] + verifylist = [ + ('flow_classifier', self._flow_classifier_name), + ('name', 'name_updated'), + ('description', 'desc_updated'), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + attrs = {'flow_classifier': { + 'name': 'name_updated', + 'description': 'desc_updated'}} + mock_flow_classifier_update.assert_called_once_with( + self._flow_classifier_name, attrs) + self.assertIsNone(result) + + +class TestShowSfcFlowClassifier(fakes.TestNeutronClientOSCV2): + + _fc = fakes.FakeSfcFlowClassifier.create_flow_classifier() + data = ( + _fc['description'], + _fc['destination_ip_prefix'], + _fc['destination_port_range_max'], + _fc['destination_port_range_min'], + _fc['ethertype'], + _fc['id'], + _fc['l7_parameters'], + _fc['logical_destination_port'], + _fc['logical_source_port'], + _fc['name'], + _fc['project_id'], + _fc['protocol'], + _fc['source_ip_prefix'], + _fc['source_port_range_max'], + _fc['source_port_range_min'] + ) + _flow_classifier = {'flow_classifier': _fc} + _flow_classifier_id = _fc['id'] + columns = ('Description', + 'Destination IP', + 'Destination Port Range Max', + 'Destination Port Range Min', + 'Ethertype', + 'ID', + 'L7 Parameters', + 'Logical Destination Port', + 'Logical Source Port', + 'Name', + 'Project', + 'Protocol', + 'Source IP', + 'Source Port Range Max', + 'Source Port Range Min') + + def setUp(self): + super(TestShowSfcFlowClassifier, self).setUp() + mock.patch(get_id, new=_get_id).start() + self.neutronclient.show_flow_classifier = mock.Mock( + return_value=self._flow_classifier + ) + # Get the command object to test + self.cmd = sfc_flow_classifier.ShowSfcFlowClassifier(self.app, + self.namespace) + + def test_show_flow_classifier(self): + client = self.app.client_manager.neutronclient + mock_flow_classifier_show = client.show_flow_classifier + arglist = [ + self._flow_classifier_id, + ] + verifylist = [ + ('flow_classifier', self._flow_classifier_id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + mock_flow_classifier_show.assert_called_once_with( + self._flow_classifier_id) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + +class TestListSfcFlowClassifier(fakes.TestNeutronClientOSCV2): + + _fc = fakes.FakeSfcFlowClassifier.create_flow_classifiers(count=1) + + columns = ('ID', 'Name', 'Summary') + + columns_long = ('ID', 'Name', 'Protocol', 'Ethertype', 'Source IP', + 'Destination IP', 'Logical Source Port', + 'Logical Destination Port', 'Source Port Range Min', + 'Source Port Range Max', 'Destination Port Range Min', + 'Destination Port Range Max', 'L7 Parameters', + 'Description', 'Project') + _flow_classifier = _fc[0] + data = [ + _flow_classifier['id'], + _flow_classifier['name'], + _flow_classifier['protocol'], + _flow_classifier['source_ip_prefix'], + _flow_classifier['destination_ip_prefix'], + _flow_classifier['logical_source_port'], + _flow_classifier['logical_destination_port'] + ] + data_long = [ + _flow_classifier['id'], + _flow_classifier['name'], + _flow_classifier['protocol'], + _flow_classifier['ethertype'], + _flow_classifier['source_ip_prefix'], + _flow_classifier['destination_ip_prefix'], + _flow_classifier['logical_source_port'], + _flow_classifier['logical_destination_port'], + _flow_classifier['source_port_range_min'], + _flow_classifier['source_port_range_max'], + _flow_classifier['destination_port_range_min'], + _flow_classifier['destination_port_range_max'], + _flow_classifier['l7_parameters'], + _flow_classifier['description'] + ] + + _flow_classifier1 = {'flow_classifiers': _flow_classifier} + _flow_classifier_id = _flow_classifier['id'] + + def setUp(self): + super(TestListSfcFlowClassifier, self).setUp() + mock.patch(get_id, new=_get_id).start() + self.neutronclient.list_flow_classifier = mock.Mock( + return_value={'flow_classifiers': self._fc} + ) + # Get the command object to test + self.cmd = sfc_flow_classifier.ListSfcFlowClassifier(self.app, + self.namespace) + + def test_list_flow_classifier(self): + arglist = [] + verifylist = [] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns = self.cmd.take_action(parsed_args) + fcs = self.neutronclient.list_flow_classifier()['flow_classifiers'] + fc = fcs[0] + data = [ + fc['id'], + fc['name'], + fc['protocol'], + fc['source_ip_prefix'], + fc['destination_ip_prefix'], + fc['logical_source_port'], + fc['logical_destination_port'] + ] + self.assertEqual(list(self.columns), columns[0]) + self.assertEqual(self.data, data) + + def test_list_with_long_option(self): + arglist = ['--long'] + verifylist = [('long', True)] + fcs = self.neutronclient.list_flow_classifier()['flow_classifiers'] + fc = fcs[0] + data = [ + fc['id'], + fc['name'], + fc['protocol'], + fc['ethertype'], + fc['source_ip_prefix'], + fc['destination_ip_prefix'], + fc['logical_source_port'], + fc['logical_destination_port'], + fc['source_port_range_min'], + fc['source_port_range_max'], + fc['destination_port_range_min'], + fc['destination_port_range_max'], + fc['l7_parameters'], + fc['description'] + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns_long = self.cmd.take_action(parsed_args)[0] + self.assertEqual(list(self.columns_long), columns_long) + self.assertEqual(self.data_long, data) diff --git a/neutronclient/tests/unit/osc/v2/sfc/test_port_chain.py b/neutronclient/tests/unit/osc/v2/sfc/test_port_chain.py new file mode 100755 index 0000000..8cb0fe2 --- /dev/null +++ b/neutronclient/tests/unit/osc/v2/sfc/test_port_chain.py @@ -0,0 +1,556 @@ +# Copyright (c) 2017 Huawei Technologies India Pvt.Limited. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock + +from osc_lib import exceptions + +from neutronclient.osc.v2.sfc import sfc_port_chain +from neutronclient.tests.unit.osc.v2.sfc import fakes + + +def _get_id(client, id_or_name, resource): + return id_or_name + + +class TestCreateSfcPortChain(fakes.TestNeutronClientOSCV2): + # The new port_chain created + _port_chain = fakes.FakeSfcPortChain.create_port_chain() + + columns = ('Chain ID', + 'Chain Parameters', + 'Description', + 'Flow Classifiers', + 'ID', + 'Name', + 'Port Pair Groups', + 'Project') + + def get_data(self): + return ( + self._port_chain['chain_id'], + self._port_chain['chain_parameters'], + self._port_chain['description'], + self._port_chain['flow_classifiers'], + self._port_chain['id'], + self._port_chain['name'], + self._port_chain['port_pair_groups'], + self._port_chain['project_id'], + ) + + def setUp(self): + super(TestCreateSfcPortChain, self).setUp() + mock.patch( + 'neutronclient.osc.v2.sfc.sfc_port_chain._get_id', + new=_get_id).start() + self.neutronclient.create_port_chain = mock.Mock( + return_value={'port_chain': self._port_chain}) + self.data = self.get_data() + + # Get the command object to test + self.cmd = sfc_port_chain.CreateSfcPortChain(self.app, self.namespace) + + def test_create_port_chain_dafault_options(self): + arglist = [ + self._port_chain['name'], + "--port-pair-group", self._port_chain['port_pair_groups'] + ] + verifylist = [ + ('name', self._port_chain['name']), + ('port_pair_groups', [self._port_chain['port_pair_groups']]), + ('flow_classifiers', []), + ('chain_parameters', None), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = (self.cmd.take_action(parsed_args)) + + self.neutronclient.create_port_chain.assert_called_once_with({ + 'port_chain': { + 'name': self._port_chain['name'], + 'port_pair_groups': [self._port_chain['port_pair_groups']], + 'flow_classifiers': [], + 'chain_parameters': None} + }) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + def test_create_port_chain_all_options(self): + arglist = [ + "--description", self._port_chain['description'], + "--port-pair-group", self._port_chain['port_pair_groups'], + self._port_chain['name'], + "--flow-classifier", self._port_chain['flow_classifiers'], + "--chain-parameters", 'correlation=mpls,symmetric=true', + ] + + cp = [{'correlation': 'mpls', 'symmetric': 'true'}] + + verifylist = [ + ('port_pair_groups', [self._port_chain['port_pair_groups']]), + ('name', self._port_chain['name']), + ('description', self._port_chain['description']), + ('flow_classifiers', [self._port_chain['flow_classifiers']]), + ('chain_parameters', cp) + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = (self.cmd.take_action(parsed_args)) + + self.neutronclient.create_port_chain.assert_called_once_with({ + 'port_chain': { + 'name': self._port_chain['name'], + 'port_pair_groups': [self._port_chain['port_pair_groups']], + 'description': self._port_chain['description'], + 'flow_classifiers': [self._port_chain['flow_classifiers']], + 'chain_parameters': cp + } + }) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + +class TestDeleteSfcPortChain(fakes.TestNeutronClientOSCV2): + + _port_chain = fakes.FakeSfcPortChain.create_port_chains(count=1) + + def setUp(self): + super(TestDeleteSfcPortChain, self).setUp() + mock.patch( + 'neutronclient.osc.v2.sfc.sfc_port_chain._get_id', + new=_get_id).start() + self.neutronclient.delete_port_chain = mock.Mock(return_value=None) + self.cmd = sfc_port_chain.DeleteSfcPortChain(self.app, self.namespace) + + def test_delete_port_chain(self): + client = self.app.client_manager.neutronclient + mock_port_chain_delete = client.delete_port_chain + arglist = [ + self._port_chain[0]['id'], + ] + verifylist = [ + ('port_chain', self._port_chain[0]['id']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + mock_port_chain_delete.assert_called_once_with( + self._port_chain[0]['id']) + self.assertIsNone(result) + + +class TestListSfcPortChain(fakes.TestNeutronClientOSCV2): + _port_chains = fakes.FakeSfcPortChain.create_port_chains(count=1) + columns = ('ID', 'Name', 'Port Pair Groups', 'Flow Classifiers', + 'Chain Parameters', 'Chain ID') + columns_long = ('ID', 'Name', 'Port Pair Groups', 'Flow Classifiers', + 'Chain Parameters', 'Description', 'Chain ID', 'Project') + _port_chain = _port_chains[0] + data = [ + _port_chain['id'], + _port_chain['name'], + _port_chain['port_pair_groups'], + _port_chain['flow_classifiers'], + _port_chain['chain_parameters'], + _port_chain['chain_id'] + ] + data_long = [ + _port_chain['id'], + _port_chain['name'], + _port_chain['project_id'], + _port_chain['chain_id'], + _port_chain['port_pair_groups'], + _port_chain['flow_classifiers'], + _port_chain['chain_parameters'], + _port_chain['description'] + ] + _port_chain1 = {'port_chains': _port_chain} + _port_chain_id = _port_chain['id'] + + def setUp(self): + super(TestListSfcPortChain, self).setUp() + mock.patch( + 'neutronclient.osc.v2.sfc.sfc_port_chain._get_id', + new=_get_id).start() + self.neutronclient.list_port_chain = mock.Mock( + return_value={'port_chains': self._port_chains} + ) + # Get the command object to test + self.cmd = sfc_port_chain.ListSfcPortChain(self.app, self.namespace) + + def test_list_port_chain(self): + arglist = [] + verifylist = [] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns = self.cmd.take_action(parsed_args)[0] + pcs = self.neutronclient.list_port_chain()['port_chains'] + pc = pcs[0] + data = [ + pc['id'], + pc['name'], + pc['port_pair_groups'], + pc['flow_classifiers'], + pc['chain_parameters'], + pc['chain_id'] + ] + self.assertEqual(list(self.columns), columns) + self.assertEqual(self.data, data) + + def test_list_port_chain_with_long_opion(self): + arglist = ['--long'] + verifylist = [('long', True)] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns = self.cmd.take_action(parsed_args)[0] + pcs = self.neutronclient.list_port_chain()['port_chains'] + pc = pcs[0] + data = [ + pc['id'], + pc['name'], + pc['project_id'], + pc['chain_id'], + pc['port_pair_groups'], + pc['flow_classifiers'], + pc['chain_parameters'], + pc['description'] + ] + self.assertEqual(list(self.columns_long), columns) + self.assertEqual(self.data_long, data) + + +class TestSetSfcPortChain(fakes.TestNeutronClientOSCV2): + _port_chain = fakes.FakeSfcPortChain.create_port_chain() + resource = _port_chain + res = 'port_chain' + _port_chain_name = _port_chain['name'] + _port_chain_id = _port_chain['id'] + pc_ppg = _port_chain['port_pair_groups'] + pc_fc = _port_chain['flow_classifiers'] + + def setUp(self): + super(TestSetSfcPortChain, self).setUp() + mock.patch( + 'neutronclient.osc.v2.sfc.sfc_port_chain._get_id', + new=_get_id).start() + self.mocked = self.neutronclient.update_port_chain + self.cmd = sfc_port_chain.SetSfcPortChain(self.app, self.namespace) + + def test_set_port_chain(self): + client = self.app.client_manager.neutronclient + mock_port_chain_update = client.update_port_chain + arglist = [ + self._port_chain_name, + '--name', 'name_updated', + '--description', 'desc_updated', + ] + verifylist = [ + ('port_chain', self._port_chain_name), + ('name', 'name_updated'), + ('description', 'desc_updated'), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + attrs = {'port_chain': {'name': 'name_updated', + 'description': 'desc_updated'}} + mock_port_chain_update.assert_called_once_with(self._port_chain_name, + attrs) + self.assertIsNone(result) + + def test_set_flow_classifier(self): + target = self.resource['id'] + fc1 = 'flow_classifier1' + + def _mock_flow_classifier(*args, **kwargs): + + if self.neutronclient.find_resource.call_count == 1: + self.neutronclient.find_resource.assert_called_with( + 'flow_classifier', fc1, cmd_resource='sfc_flow_classifier') + return {'id': args[1]} + + if self.neutronclient.find_resource.call_count == 2: + self.neutronclient.find_resource.assert_called_with( + self.res, target, cmd_resource='sfc_port_chain') + return {'flow_classifiers': self.pc_fc} + self.neutronclient.find_resource.side_effect = _mock_flow_classifier + arglist = [ + target, + '--flow-classifier', fc1, + ] + verifylist = [ + (self.res, target), + ('flow_classifiers', [fc1]) + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + expect = {'flow_classifiers': sorted([self.pc_fc, fc1])} + self.mocked.assert_called_once_with(target, {self.res: expect}) + self.assertEqual(2, self.neutronclient.find_resource.call_count) + self.assertIsNone(result) + + def test_set_no_flow_classifier(self): + client = self.app.client_manager.neutronclient + mock_port_chain_update = client.update_port_chain + arglist = [ + self._port_chain_name, + '--no-flow-classifier', + ] + verifylist = [ + ('port_chain', self._port_chain_name), + ('no_flow_classifier', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + attrs = {'port_chain': {'flow_classifiers': []}} + mock_port_chain_update.assert_called_once_with(self._port_chain_name, + attrs) + self.assertIsNone(result) + + def test_set_port_pair_groups(self): + target = self.resource['id'] + existing_ppg = self.pc_ppg + ppg1 = 'port_pair_group1' + ppg2 = 'port_pair_group2' + + def _mock_flow_classifier(*args, **kwargs): + if self.neutronclient.find_resource.call_count == 1: + self.neutronclient.find_resource.assert_called_with( + self.res, target, cmd_resource='sfc_port_chain') + return {'port_pair_groups': self.pc_ppg} + + if self.neutronclient.find_resource.call_count == 2: + self.neutronclient.find_resource.assert_called_with( + 'port_pair_group', ppg1, + cmd_resource='sfc_port_pair_group') + return {'id': args[1]} + + if self.neutronclient.find_resource.call_count == 3: + self.neutronclient.find_resource.assert_called_with( + 'port_pair_group', ppg2, + cmd_resource='sfc_port_pair_group') + return {'id': args[1]} + + self.neutronclient.find_resource.side_effect = _mock_flow_classifier + arglist = [ + target, + '--port-pair-group', ppg1, + '--port-pair-group', ppg2, + ] + verifylist = [ + (self.res, target), + ('port_pair_groups', [ppg1, ppg2]) + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + expect = {'port_pair_groups': sorted([existing_ppg, ppg1, ppg2])} + self.mocked.assert_called_once_with(target, {self.res: expect}) + self.assertEqual(3, self.neutronclient.find_resource.call_count) + self.assertIsNone(result) + + def test_set_no_port_pair_group(self): + target = self.resource['id'] + ppg1 = 'port_pair_group1' + + def _mock_port_pair_group(*args, **kwargs): + + if self.neutronclient.find_resource.call_count == 1: + self.neutronclient.find_resource.assert_called_with( + 'port_pair_group', ppg1, + cmd_resource='sfc_port_pair_group') + return {'id': args[1]} + self.neutronclient.find_resource.side_effect = _mock_port_pair_group + arglist = [ + target, + '--no-port-pair-group', + '--port-pair-group', ppg1, + ] + verifylist = [ + (self.res, target), + ('no_port_pair_group', True), + ('port_pair_groups', [ppg1]) + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + expect = {'port_pair_groups': [ppg1]} + self.mocked.assert_called_once_with(target, {self.res: expect}) + self.assertEqual(1, self.neutronclient.find_resource.call_count) + self.assertIsNone(result) + + def test_set_only_no_port_pair_group(self): + target = self.resource['id'] + arglist = [ + target, + '--no-port-pair-group', + ] + verifylist = [ + (self.res, target), + ('no_port_pair_group', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + self.assertRaises( + exceptions.CommandError, self.cmd.take_action, parsed_args) + + +class TestShowSfcPortChain(fakes.TestNeutronClientOSCV2): + + _pc = fakes.FakeSfcPortChain.create_port_chain() + data = ( + _pc['chain_id'], + _pc['chain_parameters'], + _pc['description'], + _pc['flow_classifiers'], + _pc['id'], + _pc['name'], + _pc['port_pair_groups'], + _pc['project_id'] + ) + _port_chain = {'port_chain': _pc} + _port_chain_id = _pc['id'] + columns = ('Chain ID', + 'Chain Parameters', + 'Description', + 'Flow Classifiers', + 'ID', + 'Name', + 'Port Pair Groups', + 'Project') + + def setUp(self): + super(TestShowSfcPortChain, self).setUp() + mock.patch( + 'neutronclient.osc.v2.sfc.sfc_port_chain._get_id', + new=_get_id).start() + self.neutronclient.show_port_chain = mock.Mock( + return_value=self._port_chain + ) + # Get the command object to test + self.cmd = sfc_port_chain.ShowSfcPortChain(self.app, self.namespace) + + def test_show_port_chain(self): + client = self.app.client_manager.neutronclient + mock_port_chain_show = client.show_port_chain + arglist = [ + self._port_chain_id, + ] + verifylist = [ + ('port_chain', self._port_chain_id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + mock_port_chain_show.assert_called_once_with(self._port_chain_id) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + +class TestUnsetSfcPortChain(fakes.TestNeutronClientOSCV2): + _port_chain = fakes.FakeSfcPortChain.create_port_chain() + resource = _port_chain + res = 'port_chain' + _port_chain_name = _port_chain['name'] + _port_chain_id = _port_chain['id'] + pc_ppg = _port_chain['port_pair_groups'] + pc_fc = _port_chain['flow_classifiers'] + + def setUp(self): + super(TestUnsetSfcPortChain, self).setUp() + mock.patch( + 'neutronclient.osc.v2.sfc.sfc_port_chain._get_id', + new=_get_id).start() + self.neutronclient.update_port_chain = mock.Mock( + return_value=None) + self.mocked = self.neutronclient.update_port_chain + self.cmd = sfc_port_chain.UnsetSfcPortChain(self.app, self.namespace) + + def test_unset_port_pair_group(self): + target = self.resource['id'] + ppg1 = 'port_pair_group1' + + def _mock_port_pair_group(*args, **kwargs): + if self.neutronclient.find_resource.call_count == 1: + self.neutronclient.find_resource.assert_called_with( + self.res, target, cmd_resource='sfc_port_chain') + return {'port_pair_groups': self.pc_ppg} + + if self.neutronclient.find_resource.call_count == 2: + self.neutronclient.find_resource.assert_called_with( + 'port_pair_group', ppg1, + cmd_resource='sfc_port_pair_group') + return {'id': args[1]} + if self.neutronclient.find_resource.call_count == 3: + self.neutronclient.find_resource.assert_called_with( + self.res, target, cmd_resource='sfc_port_chain') + return {'id': args[1]} + self.neutronclient.find_resource.side_effect = _mock_port_pair_group + + arglist = [ + target, + '--port-pair-group', ppg1, + ] + verifylist = [ + (self.res, target), + ('port_pair_groups', [ppg1]) + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + expect = {'port_pair_groups': sorted([self.pc_ppg])} + self.mocked.assert_called_once_with(target, {self.res: expect}) + self.assertIsNone(result) + + def test_unset_flow_classifier(self): + target = self.resource['id'] + fc1 = 'flow_classifier1' + + def _mock_flow_classifier(*args, **kwargs): + if self.neutronclient.find_resource.call_count == 1: + self.neutronclient.find_resource.assert_called_with( + self.res, target, cmd_resource='sfc_port_chain') + return {'flow_classifiers': self.pc_fc} + + if self.neutronclient.find_resource.call_count == 2: + self.neutronclient.find_resource.assert_called_with( + 'flow_classifier', fc1, cmd_resource='sfc_flow_classifier') + return {'id': args[1]} + self.neutronclient.find_resource.side_effect = _mock_flow_classifier + + arglist = [ + target, + '--flow-classifier', fc1, + ] + verifylist = [ + (self.res, target), + ('flow_classifiers', [fc1]) + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + expect = {'flow_classifiers': sorted([self.pc_fc])} + self.mocked.assert_called_once_with(target, {self.res: expect}) + self.assertIsNone(result) + + def test_unset_all_flow_classifier(self): + client = self.app.client_manager.neutronclient + target = self.resource['id'] + mock_port_chain_update = client.update_port_chain + arglist = [ + target, + '--all-flow-classifier', + ] + verifylist = [ + (self.res, target), + ('all_flow_classifier', True) + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + expect = {'flow_classifiers': []} + mock_port_chain_update.assert_called_once_with(target, + {self.res: expect}) + self.assertIsNone(result) diff --git a/neutronclient/tests/unit/osc/v2/sfc/test_port_pair.py b/neutronclient/tests/unit/osc/v2/sfc/test_port_pair.py new file mode 100755 index 0000000..e6c539a --- /dev/null +++ b/neutronclient/tests/unit/osc/v2/sfc/test_port_pair.py @@ -0,0 +1,302 @@ +# Copyright (c) 2017 Huawei Technologies India Pvt.Limited. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock + +from neutronclient.osc.v2.sfc import sfc_port_pair +from neutronclient.tests.unit.osc.v2.sfc import fakes + + +def _get_id(client, id_or_name, resource): + return id_or_name + + +class TestCreateSfcPortPair(fakes.TestNeutronClientOSCV2): + # The new port_pair created + _port_pair = fakes.FakeSfcPortPair.create_port_pair() + + columns = ('Description', + 'Egress Logical Port', + 'ID', + 'Ingress Logical Port', + 'Name', + 'Project', + 'Service Function Parameters') + + def get_data(self): + return ( + self._port_pair['description'], + self._port_pair['egress'], + self._port_pair['id'], + self._port_pair['ingress'], + self._port_pair['name'], + self._port_pair['project_id'], + self._port_pair['service_function_parameters'] + ) + + def setUp(self): + super(TestCreateSfcPortPair, self).setUp() + mock.patch('neutronclient.osc.v2.sfc.sfc_port_pair._get_id', + new=_get_id).start() + self.neutronclient.create_port_pair = mock.Mock( + return_value={'port_pair': self._port_pair}) + self.data = self.get_data() + + # Get the command object to test + self.cmd = sfc_port_pair.CreateSfcPortPair(self.app, self.namespace) + + def test_create_port_pair_default_options(self): + arglist = [ + "--ingress", self._port_pair['ingress'], + "--egress", self._port_pair['egress'], + self._port_pair['name'], + ] + verifylist = [ + ('ingress', self._port_pair['ingress']), + ('egress', self._port_pair['egress']), + ('name', self._port_pair['name']) + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = (self.cmd.take_action(parsed_args)) + + self.neutronclient.create_port_pair.assert_called_once_with({ + 'port_pair': {'name': self._port_pair['name'], + 'ingress': self._port_pair['ingress'], + 'egress': self._port_pair['egress'], + 'service_function_parameters': None, + } + }) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + def test_create_port_pair_all_options(self): + arglist = [ + "--description", self._port_pair['description'], + "--egress", self._port_pair['egress'], + "--ingress", self._port_pair['ingress'], + self._port_pair['name'], + "--service-function-parameters", 'correlation=None,weight=1', + ] + + sfp = [{'correlation': 'None', 'weight': '1'}] + + verifylist = [ + ('ingress', self._port_pair['ingress']), + ('egress', self._port_pair['egress']), + ('name', self._port_pair['name']), + ('description', self._port_pair['description']), + ('service_function_parameters', sfp) + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = (self.cmd.take_action(parsed_args)) + + self.neutronclient.create_port_pair.assert_called_once_with({ + 'port_pair': {'name': self._port_pair['name'], + 'ingress': self._port_pair['ingress'], + 'egress': self._port_pair['egress'], + 'description': self._port_pair['description'], + 'service_function_parameters': + [{'correlation': 'None', 'weight': '1'}], + } + }) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + +class TestDeleteSfcPortPair(fakes.TestNeutronClientOSCV2): + + _port_pair = fakes.FakeSfcPortPair.create_port_pairs(count=1) + + def setUp(self): + super(TestDeleteSfcPortPair, self).setUp() + mock.patch('neutronclient.osc.v2.sfc.sfc_port_pair._get_id', + new=_get_id).start() + self.neutronclient.delete_port_pair = mock.Mock(return_value=None) + self.cmd = sfc_port_pair.DeleteSfcPortPair(self.app, self.namespace) + + def test_delete_port_pair(self): + client = self.app.client_manager.neutronclient + mock_port_pair_delete = client.delete_port_pair + arglist = [ + self._port_pair[0]['id'], + ] + verifylist = [ + ('port_pair', self._port_pair[0]['id']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + mock_port_pair_delete.assert_called_once_with( + self._port_pair[0]['id']) + self.assertIsNone(result) + + +class TestListSfcPortPair(fakes.TestNeutronClientOSCV2): + _port_pairs = fakes.FakeSfcPortPair.create_port_pairs() + columns = ('ID', 'Name', 'Ingress Logical Port', 'Egress Logical Port') + columns_long = ('ID', 'Name', 'Ingress Logical Port', + 'Egress Logical Port', 'Service Function Parameters', + 'Description', 'Project') + _port_pair = _port_pairs[0] + data = [ + _port_pair['id'], + _port_pair['name'], + _port_pair['ingress'], + _port_pair['egress'] + ] + data_long = [ + _port_pair['id'], + _port_pair['name'], + _port_pair['ingress'], + _port_pair['egress'], + _port_pair['service_function_parameters'], + _port_pair['description'] + ] + _port_pair1 = {'port_pairs': _port_pair} + _port_pair_id = _port_pair['id'], + + def setUp(self): + super(TestListSfcPortPair, self).setUp() + mock.patch('neutronclient.osc.v2.sfc.sfc_port_pair._get_id', + new=_get_id).start() + self.neutronclient.list_port_pair = mock.Mock( + return_value={'port_pairs': self._port_pairs} + ) + # Get the command object to test + self.cmd = sfc_port_pair.ListSfcPortPair(self.app, self.namespace) + + def test_list_port_pair(self): + arglist = [] + verifylist = [] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns = self.cmd.take_action(parsed_args)[0] + port_pairs = self.neutronclient.list_port_pair()['port_pairs'] + port_pair = port_pairs[0] + data = [ + port_pair['id'], + port_pair['name'], + port_pair['ingress'], + port_pair['egress'] + ] + self.assertEqual(list(self.columns), columns) + self.assertEqual(self.data, data) + + def test_list_with_long_option(self): + arglist = ['--long'] + verifylist = [('long', True)] + port_pairs = self.neutronclient.list_port_pair()['port_pairs'] + port_pair = port_pairs[0] + data = [ + port_pair['id'], + port_pair['name'], + port_pair['ingress'], + port_pair['egress'], + port_pair['service_function_parameters'], + port_pair['description'] + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns_long = self.cmd.take_action(parsed_args)[0] + self.assertEqual(list(self.columns_long), columns_long) + self.assertEqual(self.data_long, data) + + +class TestSetSfcPortPair(fakes.TestNeutronClientOSCV2): + _port_pair = fakes.FakeSfcPortPair.create_port_pair() + _port_pair_name = _port_pair['name'] + _port_pair_id = _port_pair['id'] + + def setUp(self): + super(TestSetSfcPortPair, self).setUp() + mock.patch('neutronclient.osc.v2.sfc.sfc_port_pair._get_id', + new=_get_id).start() + self.neutronclient.update_port_pair = mock.Mock(return_value=None) + self.cmd = sfc_port_pair.SetSfcPortPair(self.app, self.namespace) + + def test_set_port_pair(self): + client = self.app.client_manager.neutronclient + mock_port_pair_update = client.update_port_pair + arglist = [ + self._port_pair_name, + '--name', 'name_updated', + '--description', 'desc_updated' + ] + verifylist = [ + ('port_pair', self._port_pair_name), + ('name', 'name_updated'), + ('description', 'desc_updated'), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + attrs = {'port_pair': { + 'name': 'name_updated', + 'description': 'desc_updated'} + } + mock_port_pair_update.assert_called_once_with(self._port_pair_name, + attrs) + self.assertIsNone(result) + + +class TestShowSfcPortPair(fakes.TestNeutronClientOSCV2): + + _pp = fakes.FakeSfcPortPair.create_port_pair() + + data = ( + _pp['description'], + _pp['egress'], + _pp['id'], + _pp['ingress'], + _pp['name'], + _pp['project_id'], + _pp['service_function_parameters'], + ) + _port_pair = {'port_pair': _pp} + _port_pair_id = _pp['id'] + columns = ( + 'Description', + 'Egress Logical Port', + 'ID', + 'Ingress Logical Port', + 'Name', + 'Project', + 'Service Function Parameters' + ) + + def setUp(self): + super(TestShowSfcPortPair, self).setUp() + mock.patch('neutronclient.osc.v2.sfc.sfc_port_pair._get_id', + new=_get_id).start() + + self.neutronclient.show_port_pair = mock.Mock( + return_value=self._port_pair + ) + + # Get the command object to test + self.cmd = sfc_port_pair.ShowSfcPortPair(self.app, self.namespace) + + def test_show_port_pair(self): + client = self.app.client_manager.neutronclient + mock_port_pair_show = client.show_port_pair + arglist = [ + self._port_pair_id, + ] + verifylist = [ + ('port_pair', self._port_pair_id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + mock_port_pair_show.assert_called_once_with(self._port_pair_id) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) diff --git a/neutronclient/tests/unit/osc/v2/sfc/test_port_pair_group.py b/neutronclient/tests/unit/osc/v2/sfc/test_port_pair_group.py new file mode 100755 index 0000000..188494c --- /dev/null +++ b/neutronclient/tests/unit/osc/v2/sfc/test_port_pair_group.py @@ -0,0 +1,424 @@ +# Copyright (c) 2017 Huawei Technologies India Pvt.Limited. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock + +from neutronclient.osc.v2.sfc import sfc_port_pair_group +from neutronclient.tests.unit.osc.v2.sfc import fakes + + +def _get_id(client, id_or_name, resource): + return id_or_name + + +class TestCreateSfcPortPairGroup(fakes.TestNeutronClientOSCV2): + + _port_pair_group = fakes.FakeSfcPortPairGroup.create_port_pair_group() + + columns = ('Description', + 'ID', + 'Loadbalance ID', + 'Name', + 'Port Pair', + 'Port Pair Group Parameters', + 'Project') + + def get_data(self): + return ( + self._port_pair_group['description'], + self._port_pair_group['id'], + self._port_pair_group['group_id'], + self._port_pair_group['name'], + self._port_pair_group['port_pairs'], + self._port_pair_group['port_pair_group_parameters'], + self._port_pair_group['project_id'] + ) + + def setUp(self): + super(TestCreateSfcPortPairGroup, self).setUp() + mock.patch( + 'neutronclient.osc.v2.sfc.sfc_port_pair_group._get_id', + new=_get_id).start() + self.neutronclient.create_port_pair_group = mock.Mock( + return_value={'port_pair_group': self._port_pair_group}) + self.data = self.get_data() + # Get the command object to test + self.cmd = sfc_port_pair_group.CreateSfcPortPairGroup(self.app, + self.namespace) + + def test_create_port_pair_group_default_options(self): + arglist = [ + "--port-pair", self._port_pair_group['port_pairs'], + self._port_pair_group['name'], + ] + verifylist = [ + ('port_pairs', [self._port_pair_group['port_pairs']]), + ('name', self._port_pair_group['name']), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = (self.cmd.take_action(parsed_args)) + self.neutronclient.create_port_pair_group.assert_called_once_with({ + 'port_pair_group': { + 'name': self._port_pair_group['name'], + 'port_pairs': [self._port_pair_group['port_pairs']]} + }) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + def test_create_port_pair_group(self): + arglist = [ + "--description", self._port_pair_group['description'], + "--port-pair", self._port_pair_group['port_pairs'], + self._port_pair_group['name'], + ] + verifylist = [ + ('port_pairs', [self._port_pair_group['port_pairs']]), + ('name', self._port_pair_group['name']), + ('description', self._port_pair_group['description']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = (self.cmd.take_action(parsed_args)) + + self.neutronclient.create_port_pair_group.assert_called_once_with({ + 'port_pair_group': { + 'name': self._port_pair_group['name'], + 'port_pairs': [self._port_pair_group['port_pairs']], + 'description': self._port_pair_group['description'], + } + }) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + +class TestDeleteSfcPortPairGroup(fakes.TestNeutronClientOSCV2): + + _port_pair_group = (fakes.FakeSfcPortPairGroup.create_port_pair_groups + (count=1)) + + def setUp(self): + super(TestDeleteSfcPortPairGroup, self).setUp() + mock.patch( + 'neutronclient.osc.v2.sfc.sfc_port_pair_group._get_id', + new=_get_id).start() + self.neutronclient.delete_port_pair_group = mock.Mock( + return_value=None) + self.cmd = sfc_port_pair_group.DeleteSfcPortPairGroup(self.app, + self.namespace) + + def test_delete_port_pair_group(self): + client = self.app.client_manager.neutronclient + mock_port_pair_group_delete = client.delete_port_pair_group + arglist = [ + self._port_pair_group[0]['id'], + ] + verifylist = [ + ('port_pair_group', self._port_pair_group[0]['id']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + mock_port_pair_group_delete.assert_called_once_with( + self._port_pair_group[0]['id']) + self.assertIsNone(result) + + +class TestListSfcPortPairGroup(fakes.TestNeutronClientOSCV2): + _ppgs = fakes.FakeSfcPortPairGroup.create_port_pair_groups(count=1) + columns = ('ID', 'Name', 'Port Pair', 'Port Pair Group Parameters') + columns_long = ('ID', 'Name', 'Port Pair', 'Port Pair Group Parameters', + 'Description', 'Loadbalance ID', 'Project') + _port_pair_group = _ppgs[0] + data = [ + _port_pair_group['id'], + _port_pair_group['name'], + _port_pair_group['port_pairs'], + _port_pair_group['port_pair_group_parameters'] + ] + data_long = [ + _port_pair_group['id'], + _port_pair_group['name'], + _port_pair_group['port_pairs'], + _port_pair_group['port_pair_group_parameters'], + _port_pair_group['description'] + ] + _port_pair_group1 = {'port_pair_groups': _port_pair_group} + _port_pair_id = _port_pair_group['id'] + + def setUp(self): + super(TestListSfcPortPairGroup, self).setUp() + mock.patch( + 'neutronclient.osc.v2.sfc.sfc_port_pair_group._get_id', + new=_get_id).start() + + self.neutronclient.list_port_pair_group = mock.Mock( + return_value={'port_pair_groups': self._ppgs} + ) + # Get the command object to test + self.cmd = sfc_port_pair_group.ListSfcPortPairGroup(self.app, + self.namespace) + + def test_list_port_pair_group(self): + arglist = [] + verifylist = [] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns = self.cmd.take_action(parsed_args)[0] + ppgs = self.neutronclient.list_port_pair_group()['port_pair_groups'] + ppg = ppgs[0] + data = [ + ppg['id'], + ppg['name'], + ppg['port_pairs'], + ppg['port_pair_group_parameters'] + ] + self.assertEqual(list(self.columns), columns) + self.assertEqual(self.data, data) + + def test_list_with_long_option(self): + arglist = ['--long'] + verifylist = [('long', True)] + ppgs = self.neutronclient.list_port_pair_group()['port_pair_groups'] + ppg = ppgs[0] + data = [ + ppg['id'], + ppg['name'], + ppg['port_pairs'], + ppg['port_pair_group_parameters'], + ppg['description'] + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns_long = self.cmd.take_action(parsed_args)[0] + self.assertEqual(list(self.columns_long), columns_long) + self.assertEqual(self.data_long, data) + + +class TestSetSfcPortPairGroup(fakes.TestNeutronClientOSCV2): + _port_pair_group = fakes.FakeSfcPortPairGroup.create_port_pair_group() + resource = _port_pair_group + res = 'port_pair_group' + _port_pair_group_name = _port_pair_group['name'] + ppg_pp = _port_pair_group['port_pairs'] + _port_pair_group_id = _port_pair_group['id'] + + def setUp(self): + super(TestSetSfcPortPairGroup, self).setUp() + + mock.patch( + 'neutronclient.osc.v2.sfc.sfc_port_pair_group._get_id', + new=_get_id).start() + self.neutronclient.update_port_pair_group = mock.Mock( + return_value=None) + self.mocked = self.neutronclient.update_port_pair_group + self.cmd = sfc_port_pair_group.SetSfcPortPairGroup(self.app, + self.namespace) + + def test_set_port_pair_group(self): + target = self.resource['id'] + port_pair1 = 'additional_port1' + port_pair2 = 'additional_port2' + + def _mock_port_pair_group(*args, **kwargs): + + if self.neutronclient.find_resource.call_count == 1: + self.neutronclient.find_resource.assert_called_with( + 'port_pair', port_pair1, cmd_resource='sfc_port_pair') + return {'id': args[1]} + + if self.neutronclient.find_resource.call_count == 2: + self.neutronclient.find_resource.assert_called_with( + 'port_pair', port_pair2, cmd_resource='sfc_port_pair') + return {'id': args[1]} + + if self.neutronclient.find_resource.call_count == 3: + self.neutronclient.find_resource.assert_called_with( + self.res, target, cmd_resource='sfc_port_pair_group') + return {'port_pairs': self.ppg_pp} + + self.neutronclient.find_resource.side_effect = _mock_port_pair_group + + arglist = [ + target, + '--port-pair', port_pair1, + '--port-pair', port_pair2, + ] + verifylist = [ + (self.res, target), + ('port_pairs', [port_pair1, port_pair2]) + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + expect = {'port_pairs': sorted([self.ppg_pp, port_pair1, port_pair2])} + self.mocked.assert_called_once_with(target, {self.res: expect}) + self.assertEqual(3, self.neutronclient.find_resource.call_count) + self.assertIsNone(result) + + def test_set_no_port_pair(self): + client = self.app.client_manager.neutronclient + mock_port_pair_group_update = client.update_port_pair_group + arglist = [ + self._port_pair_group_name, + '--name', 'name_updated', + '--description', 'desc_updated', + '--no-port-pair', + ] + verifylist = [ + ('port_pair_group', self._port_pair_group_name), + ('name', 'name_updated'), + ('description', 'desc_updated'), + ('no_port_pair', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + attrs = {'port_pair_group': {'name': 'name_updated', + 'description': 'desc_updated', + 'port_pairs': []}} + mock_port_pair_group_update.assert_called_once_with( + self._port_pair_group_name, attrs) + self.assertIsNone(result) + + +class TestShowSfcPortPairGroup(fakes.TestNeutronClientOSCV2): + + _ppg = fakes.FakeSfcPortPairGroup.create_port_pair_group() + data = ( + _ppg['description'], + _ppg['id'], + _ppg['group_id'], + _ppg['name'], + _ppg['port_pairs'], + _ppg['port_pair_group_parameters'], + _ppg['project_id']) + _port_pair_group = {'port_pair_group': _ppg} + _port_pair_group_id = _ppg['id'] + columns = ( + 'Description', + 'ID', + 'Loadbalance ID', + 'Name', + 'Port Pair', + 'Port Pair Group Parameters', + 'Project') + + def setUp(self): + super(TestShowSfcPortPairGroup, self).setUp() + mock.patch( + 'neutronclient.osc.v2.sfc.sfc_port_pair_group._get_id', + new=_get_id).start() + + self.neutronclient.show_port_pair_group = mock.Mock( + return_value=self._port_pair_group + ) + self.cmd = sfc_port_pair_group.ShowSfcPortPairGroup(self.app, + self.namespace) + + def test_show_port_pair_group(self): + client = self.app.client_manager.neutronclient + mock_port_pair_group_show = client.show_port_pair_group + arglist = [ + self._port_pair_group_id, + ] + verifylist = [ + ('port_pair_group', self._port_pair_group_id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + mock_port_pair_group_show.assert_called_once_with( + self._port_pair_group_id) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + +class TestUnsetSfcPortPairGroup(fakes.TestNeutronClientOSCV2): + _port_pair_group = fakes.FakeSfcPortPairGroup.create_port_pair_group() + resource = _port_pair_group + res = 'port_pair_group' + _port_pair_group_name = _port_pair_group['name'] + _port_pair_group_id = _port_pair_group['id'] + ppg_pp = _port_pair_group['port_pairs'] + + def setUp(self): + super(TestUnsetSfcPortPairGroup, self).setUp() + mock.patch( + 'neutronclient.osc.v2.sfc.sfc_port_pair_group._get_id', + new=_get_id).start() + self.neutronclient.update_port_pair_group = mock.Mock( + return_value=None) + self.mocked = self.neutronclient.update_port_pair_group + self.cmd = sfc_port_pair_group.UnsetSfcPortPairGroup( + self.app, self.namespace) + + def test_unset_port_pair(self): + target = self.resource['id'] + port_pair1 = 'additional_port1' + port_pair2 = 'additional_port2' + + def _mock_port_pair(*args, **kwargs): + + if self.neutronclient.find_resource.call_count == 1: + self.neutronclient.find_resource.assert_called_with( + self.res, target, cmd_resource='sfc_port_pair_group') + return {'port_pairs': self.ppg_pp} + + if self.neutronclient.find_resource.call_count == 2: + self.neutronclient.find_resource.assert_called_with( + 'port_pair', port_pair1, cmd_resource='sfc_port_pair') + return {'id': args[1]} + + if self.neutronclient.find_resource.call_count == 3: + self.neutronclient.find_resource.assert_called_with( + 'port_pair', port_pair2, cmd_resource='sfc_port_pair') + return {'id': args[1]} + + if self.neutronclient.find_resource.call_count == 4: + self.neutronclient.find_resource.assert_called_with( + self.res, target, cmd_resource='sfc_port_pair_group') + return {'id': args[1]} + + self.neutronclient.find_resource.side_effect = _mock_port_pair + + arglist = [ + target, + '--port-pair', port_pair1, + '--port-pair', port_pair2, + ] + verifylist = [ + (self.res, target), + ('port_pairs', [port_pair1, port_pair2]) + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + expect = {'port_pairs': sorted([self.ppg_pp])} + self.mocked.assert_called_once_with(target, {self.res: expect}) + self.assertIsNone(result) + + def test_unset_all_port_pair(self): + client = self.app.client_manager.neutronclient + mock_port_pair_group_update = client.update_port_pair_group + arglist = [ + self._port_pair_group_name, + '--all-port-pair', + ] + verifylist = [ + ('port_pair_group', self._port_pair_group_name), + ('all_port_pair', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + attrs = {'port_pair_group': {'port_pairs': []}} + mock_port_pair_group_update.assert_called_once_with( + self._port_pair_group_name, attrs) + self.assertIsNone(result) diff --git a/neutronclient/v2_0/client.py b/neutronclient/v2_0/client.py index 759f776..96bb01d 100644 --- a/neutronclient/v2_0/client.py +++ b/neutronclient/v2_0/client.py @@ -511,6 +511,16 @@ class Client(ClientBase): security_group_path = "/security-groups/%s" security_group_rules_path = "/security-group-rules" security_group_rule_path = "/security-group-rules/%s" + + sfc_flow_classifiers_path = "/sfc/flow_classifiers" + sfc_flow_classifier_path = "/sfc/flow_classifiers/%s" + sfc_port_pairs_path = "/sfc/port_pairs" + sfc_port_pair_path = "/sfc/port_pairs/%s" + sfc_port_pair_groups_path = "/sfc/port_pair_groups" + sfc_port_pair_group_path = "/sfc/port_pair_groups/%s" + sfc_port_chains_path = "/sfc/port_chains" + sfc_port_chain_path = "/sfc/port_chains/%s" + endpoint_groups_path = "/vpn/endpoint-groups" endpoint_group_path = "/vpn/endpoint-groups/%s" vpnservices_path = "/vpn/vpnservices" @@ -692,6 +702,10 @@ class Client(ClientBase): 'bgpvpns': 'bgpvpn', 'network_associations': 'network_association', 'router_associations': 'router_association', + 'flow_classifiers': 'flow_classifier', + 'port_pairs': 'port_pair', + 'port_pair_groups': 'port_pair_group', + 'port_chains': 'port_chain', } def list_ext(self, collection, path, retrieve_all, **_params): @@ -2159,6 +2173,95 @@ class Client(ClientBase): return self.delete( self.bgpvpn_router_association_path % (bgpvpn, router_assoc)) + def create_port_pair(self, body=None): + """Creates a new Port Pair.""" + return self.post(self.sfc_port_pairs_path, body=body) + + def update_port_pair(self, port_pair, body=None): + """Update a Port Pair.""" + return self.put(self.sfc_port_pair_path % port_pair, body=body) + + def delete_port_pair(self, port_pair): + """Deletes the specified Port Pair.""" + return self.delete(self.sfc_port_pair_path % (port_pair)) + + def list_port_pair(self, retrieve_all=True, **_params): + """Fetches a list of all Port Pairs.""" + return self.list('port_pairs', self.sfc_port_pairs_path, retrieve_all, + **_params) + + def show_port_pair(self, port_pair, **_params): + """Fetches information of a certain Port Pair.""" + return self.get(self.sfc_port_pair_path % (port_pair), params=_params) + + def create_port_pair_group(self, body=None): + """Creates a new Port Pair Group.""" + return self.post(self.sfc_port_pair_groups_path, body=body) + + def update_port_pair_group(self, port_pair_group, body=None): + """Update a Port Pair Group.""" + return self.put(self.sfc_port_pair_group_path % port_pair_group, + body=body) + + def delete_port_pair_group(self, port_pair_group): + """Deletes the specified Port Pair Group.""" + return self.delete(self.sfc_port_pair_group_path % (port_pair_group)) + + def list_port_pair_group(self, retrieve_all=True, **_params): + """Fetches a list of all Port Pair Groups.""" + return self.list('port_pair_groups', self.sfc_port_pair_groups_path, + retrieve_all, **_params) + + def show_port_pair_group(self, port_pair_group, **_params): + """Fetches information of a certain Port Pair Group.""" + return self.get(self.sfc_port_pair_group_path % (port_pair_group), + params=_params) + + def create_port_chain(self, body=None): + """Creates a new Port Chain.""" + return self.post(self.sfc_port_chains_path, body=body) + + def update_port_chain(self, port_chain, body=None): + """Update a Port Chain.""" + return self.put(self.sfc_port_chain_path % port_chain, body=body) + + def delete_port_chain(self, port_chain): + """Deletes the specified Port Chain.""" + return self.delete(self.sfc_port_chain_path % (port_chain)) + + def list_port_chain(self, retrieve_all=True, **_params): + """Fetches a list of all Port Chains.""" + return self.list('port_chains', self.sfc_port_chains_path, + retrieve_all, **_params) + + def show_port_chain(self, port_chain, **_params): + """Fetches information of a certain Port Chain.""" + return self.get(self.sfc_port_chain_path % (port_chain), + params=_params) + + def create_flow_classifier(self, body=None): + """Creates a new Flow Classifier.""" + return self.post(self.sfc_flow_classifiers_path, body=body) + + def update_flow_classifier(self, flow_classifier, body=None): + """Update a Flow Classifier.""" + return self.put(self.sfc_flow_classifier_path % flow_classifier, + body=body) + + def delete_flow_classifier(self, flow_classifier): + """Deletes the specified Flow Classifier.""" + return self.delete(self.sfc_flow_classifier_path % (flow_classifier)) + + def list_flow_classifier(self, retrieve_all=True, **_params): + """Fetches a list of all Flow Classifiers.""" + return self.list('flow_classifiers', self.sfc_flow_classifiers_path, + retrieve_all, **_params) + + def show_flow_classifier(self, flow_classifier, **_params): + """Fetches information of a certain Flow Classifier.""" + return self.get(self.sfc_flow_classifier_path % (flow_classifier), + params=_params) + def __init__(self, **kwargs): """Initialize a new client for the Neutron v2.0 API.""" super(Client, self).__init__(**kwargs) diff --git a/releasenotes/notes/add-sfc-commands.yaml b/releasenotes/notes/add-sfc-commands.yaml new file mode 100644 index 0000000..c081f50 --- /dev/null +++ b/releasenotes/notes/add-sfc-commands.yaml @@ -0,0 +1,5 @@ +--- +features: + - | + Add OSC plugin support for the “Networking Service Function Chaining” feature commands along with client bindings. + [Blueprint `openstackclient-cli-porting `_] \ No newline at end of file diff --git a/setup.cfg b/setup.cfg index ac7da5c..d62e29f 100644 --- a/setup.cfg +++ b/setup.cfg @@ -43,6 +43,28 @@ openstack.neutronclient.v2 = network_trunk_set = neutronclient.osc.v2.trunk.network_trunk:SetNetworkTrunk network_trunk_show = neutronclient.osc.v2.trunk.network_trunk:ShowNetworkTrunk network_trunk_unset = neutronclient.osc.v2.trunk.network_trunk:UnsetNetworkTrunk + sfc_flow_classifier_create = neutronclient.osc.v2.sfc.sfc_flow_classifier:CreateSfcFlowClassifier + sfc_flow_classifier_delete = neutronclient.osc.v2.sfc.sfc_flow_classifier:DeleteSfcFlowClassifier + sfc_flow_classifier_list = neutronclient.osc.v2.sfc.sfc_flow_classifier:ListSfcFlowClassifier + sfc_flow_classifier_set = neutronclient.osc.v2.sfc.sfc_flow_classifier:SetSfcFlowClassifier + sfc_flow_classifier_show = neutronclient.osc.v2.sfc.sfc_flow_classifier:ShowSfcFlowClassifier + sfc_port_chain_create = neutronclient.osc.v2.sfc.sfc_port_chain:CreateSfcPortChain + sfc_port_chain_delete = neutronclient.osc.v2.sfc.sfc_port_chain:DeleteSfcPortChain + sfc_port_chain_list = neutronclient.osc.v2.sfc.sfc_port_chain:ListSfcPortChain + sfc_port_chain_set = neutronclient.osc.v2.sfc.sfc_port_chain:SetSfcPortChain + sfc_port_chain_show = neutronclient.osc.v2.sfc.sfc_port_chain:ShowSfcPortChain + sfc_port_chain_unset = neutronclient.osc.v2.sfc.sfc_port_chain:UnsetSfcPortChain + sfc_port_pair_create = neutronclient.osc.v2.sfc.sfc_port_pair:CreateSfcPortPair + sfc_port_pair_delete = neutronclient.osc.v2.sfc.sfc_port_pair:DeleteSfcPortPair + sfc_port_pair_list = neutronclient.osc.v2.sfc.sfc_port_pair:ListSfcPortPair + sfc_port_pair_set = neutronclient.osc.v2.sfc.sfc_port_pair:SetSfcPortPair + sfc_port_pair_show = neutronclient.osc.v2.sfc.sfc_port_pair:ShowSfcPortPair + sfc_port_pair_group_create = neutronclient.osc.v2.sfc.sfc_port_pair_group:CreateSfcPortPairGroup + sfc_port_pair_group_delete = neutronclient.osc.v2.sfc.sfc_port_pair_group:DeleteSfcPortPairGroup + sfc_port_pair_group_list = neutronclient.osc.v2.sfc.sfc_port_pair_group:ListSfcPortPairGroup + sfc_port_pair_group_set = neutronclient.osc.v2.sfc.sfc_port_pair_group:SetSfcPortPairGroup + sfc_port_pair_group_show = neutronclient.osc.v2.sfc.sfc_port_pair_group:ShowSfcPortPairGroup + sfc_port_pair_group_unset = neutronclient.osc.v2.sfc.sfc_port_pair_group:UnsetSfcPortPairGroup firewall_group_create = neutronclient.osc.v2.fwaas.firewallgroup:CreateFirewallGroup firewall_group_delete = neutronclient.osc.v2.fwaas.firewallgroup:DeleteFirewallGroup