Move network trunk commands from python-neutronclient

Network trunk commands originally were added to python-neutronclient,
although we can long consider these operations core Networking operations.
This patch is not a blind copy but also changes the original code to use
openstacksdk python bindings instead of the binding code in
python-neutronclient as that is already deprecated.

Change-Id: Ic4bc35c296a95d5dae92e9fc1cab3a3fa8f103cd
Related-Bug: #1999774
This commit is contained in:
elajkat 2023-01-06 09:10:00 +01:00
parent a03b2352d9
commit 7f1c21b27a
7 changed files with 1496 additions and 0 deletions

View File

@ -108,6 +108,7 @@
neutron-tag-ports-during-bulk-creation: true
neutron-conntrack-helper: true
neutron-ndp-proxy: true
q-trunk: true
devstack_localrc:
Q_AGENT: openvswitch
Q_ML2_TENANT_NETWORK_TYPE: vxlan

View File

@ -0,0 +1,16 @@
=============
network trunk
=============
A **network trunk** is a container to group logical ports from different
networks and provide a single trunked vNIC for servers. It consists of
one parent port which is a regular VIF and multiple subports which allow
the server to connect to more networks.
Network v2
.. autoprogram-cliff:: openstack.network.v2
:command: network subport list
.. autoprogram-cliff:: openstack.network.v2
:command: network trunk *

View File

@ -0,0 +1,402 @@
# Copyright 2016 ZTE Corporation.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""Network trunk and subports action implementations"""
import logging
from cliff import columns as cliff_columns
from osc_lib.cli import format_columns
from osc_lib.cli import identity as identity_utils
from osc_lib.cli import parseractions
from osc_lib.command import command
from osc_lib import exceptions
from osc_lib import utils as osc_utils
from openstackclient.i18n import _
LOG = logging.getLogger(__name__)
TRUNK = 'trunk'
TRUNKS = 'trunks'
SUB_PORTS = 'sub_ports'
class AdminStateColumn(cliff_columns.FormattableColumn):
def human_readable(self):
return 'UP' if self._value else 'DOWN'
class CreateNetworkTrunk(command.ShowOne):
"""Create a network trunk for a given project"""
def get_parser(self, prog_name):
parser = super(CreateNetworkTrunk, self).get_parser(prog_name)
parser.add_argument(
'name',
metavar='<name>',
help=_("Name of the trunk to create")
)
parser.add_argument(
'--description',
metavar='<description>',
help=_("A description of the trunk")
)
parser.add_argument(
'--parent-port',
metavar='<parent-port>',
required=True,
help=_("Parent port belonging to this trunk (name or ID)")
)
parser.add_argument(
'--subport',
metavar='<port=,segmentation-type=,segmentation-id=>',
action=parseractions.MultiKeyValueAction, dest='add_subports',
optional_keys=['segmentation-id', 'segmentation-type'],
required_keys=['port'],
help=_("Subport to add. Subport is of form "
"\'port=<name or ID>,segmentation-type=<segmentation-type>,"
"segmentation-id=<segmentation-ID>\' (--subport) option "
"can be repeated")
)
admin_group = parser.add_mutually_exclusive_group()
admin_group.add_argument(
'--enable',
action='store_true',
default=True,
help=_("Enable trunk (default)")
)
admin_group.add_argument(
'--disable',
action='store_true',
help=_("Disable trunk")
)
identity_utils.add_project_owner_option_to_parser(parser)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.network
attrs = _get_attrs_for_trunk(self.app.client_manager,
parsed_args)
obj = client.create_trunk(**attrs)
display_columns, columns = _get_columns(obj)
data = osc_utils.get_dict_properties(obj, columns,
formatters=_formatters)
return display_columns, data
class DeleteNetworkTrunk(command.Command):
"""Delete a given network trunk"""
def get_parser(self, prog_name):
parser = super(DeleteNetworkTrunk, self).get_parser(prog_name)
parser.add_argument(
'trunk',
metavar="<trunk>",
nargs="+",
help=_("Trunk(s) to delete (name or ID)")
)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.network
result = 0
for trunk in parsed_args.trunk:
try:
trunk_id = client.find_trunk(trunk).id
client.delete_trunk(trunk_id)
except Exception as e:
result += 1
LOG.error(_("Failed to delete trunk with name "
"or ID '%(trunk)s': %(e)s"),
{'trunk': trunk, 'e': e})
if result > 0:
total = len(parsed_args.trunk)
msg = (_("%(result)s of %(total)s trunks failed "
"to delete.") % {'result': result, 'total': total})
raise exceptions.CommandError(msg)
class ListNetworkTrunk(command.Lister):
"""List all network trunks"""
def get_parser(self, prog_name):
parser = super(ListNetworkTrunk, self).get_parser(prog_name)
parser.add_argument(
'--long',
action='store_true',
default=False,
help=_("List additional fields in output")
)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.network
data = client.trunks()
headers = (
'ID',
'Name',
'Parent Port',
'Description'
)
columns = (
'id',
'name',
'port_id',
'description'
)
if parsed_args.long:
headers += (
'Status',
'State',
'Created At',
'Updated At',
)
columns += (
'status',
'admin_state_up',
'created_at',
'updated_at'
)
return (headers,
(osc_utils.get_item_properties(
s, columns,
formatters=_formatters,
) for s in data))
class SetNetworkTrunk(command.Command):
"""Set network trunk properties"""
def get_parser(self, prog_name):
parser = super(SetNetworkTrunk, self).get_parser(prog_name)
parser.add_argument(
'trunk',
metavar="<trunk>",
help=_("Trunk to modify (name or ID)")
)
parser.add_argument(
'--name',
metavar="<name>",
help=_("Set trunk name")
)
parser.add_argument(
'--description',
metavar='<description>',
help=_("A description of the trunk")
)
parser.add_argument(
'--subport',
metavar='<port=,segmentation-type=,segmentation-id=>',
action=parseractions.MultiKeyValueAction, dest='set_subports',
optional_keys=['segmentation-id', 'segmentation-type'],
required_keys=['port'],
help=_("Subport to add. Subport is of form "
"\'port=<name or ID>,segmentation-type=<segmentation-type>"
",segmentation-id=<segmentation-ID>\' (--subport) option "
"can be repeated")
)
admin_group = parser.add_mutually_exclusive_group()
admin_group.add_argument(
'--enable',
action='store_true',
help=_("Enable trunk")
)
admin_group.add_argument(
'--disable',
action='store_true',
help=_("Disable trunk")
)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.network
trunk_id = client.find_trunk(parsed_args.trunk)
attrs = _get_attrs_for_trunk(self.app.client_manager, parsed_args)
try:
client.update_trunk(trunk_id, **attrs)
except Exception as e:
msg = (_("Failed to set trunk '%(t)s': %(e)s")
% {'t': parsed_args.trunk, 'e': e})
raise exceptions.CommandError(msg)
if parsed_args.set_subports:
subport_attrs = _get_attrs_for_subports(self.app.client_manager,
parsed_args)
try:
client.add_trunk_subports(trunk_id, subport_attrs)
except Exception as e:
msg = (_("Failed to add subports to trunk '%(t)s': %(e)s")
% {'t': parsed_args.trunk, 'e': e})
raise exceptions.CommandError(msg)
class ShowNetworkTrunk(command.ShowOne):
"""Show information of a given network trunk"""
def get_parser(self, prog_name):
parser = super(ShowNetworkTrunk, self).get_parser(prog_name)
parser.add_argument(
'trunk',
metavar="<trunk>",
help=_("Trunk to display (name or ID)")
)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.network
trunk_id = client.find_trunk(parsed_args.trunk).id
obj = client.get_trunk(trunk_id)
display_columns, columns = _get_columns(obj)
data = osc_utils.get_dict_properties(obj, columns,
formatters=_formatters)
return display_columns, data
class ListNetworkSubport(command.Lister):
"""List all subports for a given network trunk"""
def get_parser(self, prog_name):
parser = super(ListNetworkSubport, self).get_parser(prog_name)
parser.add_argument(
'--trunk',
required=True,
metavar="<trunk>",
help=_("List subports belonging to this trunk (name or ID)")
)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.network
trunk_id = client.find_trunk(parsed_args.trunk)
data = client.get_trunk_subports(trunk_id)
headers = ('Port', 'Segmentation Type', 'Segmentation ID')
columns = ('port_id', 'segmentation_type', 'segmentation_id')
return (headers,
(osc_utils.get_dict_properties(
s, columns,
) for s in data[SUB_PORTS]))
class UnsetNetworkTrunk(command.Command):
"""Unset subports from a given network trunk"""
def get_parser(self, prog_name):
parser = super(UnsetNetworkTrunk, self).get_parser(prog_name)
parser.add_argument(
'trunk',
metavar="<trunk>",
help=_("Unset subports from this trunk (name or ID)")
)
parser.add_argument(
'--subport',
metavar="<subport>",
required=True,
action='append', dest='unset_subports',
help=_("Subport to delete (name or ID of the port) "
"(--subport) option can be repeated")
)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.network
attrs = _get_attrs_for_subports(self.app.client_manager, parsed_args)
trunk_id = client.find_trunk(parsed_args.trunk)
client.delete_trunk_subports(trunk_id, attrs)
_formatters = {
'admin_state_up': AdminStateColumn,
'sub_ports': format_columns.ListDictColumn,
}
def _get_columns(item):
column_map = {}
hidden_columns = ['location', 'tenant_id']
return osc_utils.get_osc_show_columns_for_sdk_resource(
item,
column_map,
hidden_columns
)
def _get_attrs_for_trunk(client_manager, parsed_args):
attrs = {}
if parsed_args.name is not None:
attrs['name'] = str(parsed_args.name)
if parsed_args.description is not None:
attrs['description'] = str(parsed_args.description)
if parsed_args.enable:
attrs['admin_state_up'] = True
if parsed_args.disable:
attrs['admin_state_up'] = False
if 'parent_port' in parsed_args and parsed_args.parent_port is not None:
port_id = client_manager.network.find_port(
parsed_args.parent_port)['id']
attrs['port_id'] = port_id
if 'add_subports' in parsed_args and parsed_args.add_subports is not None:
attrs[SUB_PORTS] = _format_subports(client_manager,
parsed_args.add_subports)
# "trunk set" command doesn't support setting project.
if 'project' in parsed_args and parsed_args.project is not None:
identity_client = client_manager.identity
project_id = identity_utils.find_project(
identity_client,
parsed_args.project,
parsed_args.project_domain,
).id
attrs['tenant_id'] = project_id
return attrs
def _format_subports(client_manager, subports):
attrs = []
for subport in subports:
subport_attrs = {}
if subport.get('port'):
port_id = client_manager.network.find_port(subport['port'])['id']
subport_attrs['port_id'] = port_id
if subport.get('segmentation-id'):
try:
subport_attrs['segmentation_id'] = int(
subport['segmentation-id'])
except ValueError:
msg = (_("Segmentation-id '%s' is not an integer") %
subport['segmentation-id'])
raise exceptions.CommandError(msg)
if subport.get('segmentation-type'):
subport_attrs['segmentation_type'] = subport['segmentation-type']
attrs.append(subport_attrs)
return attrs
def _get_attrs_for_subports(client_manager, parsed_args):
attrs = {}
if 'set_subports' in parsed_args and parsed_args.set_subports is not None:
attrs = _format_subports(client_manager,
parsed_args.set_subports)
if ('unset_subports' in parsed_args and
parsed_args.unset_subports is not None):
subports_list = []
for subport in parsed_args.unset_subports:
port_id = client_manager.network.find_port(subport)['id']
subports_list.append({'port_id': port_id})
attrs = subports_list
return attrs
def _get_id(client, id_or_name, resource):
return client.find_resource(resource, str(id_or_name))['id']

View File

@ -0,0 +1,149 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import uuid
from openstackclient.tests.functional.network.v2 import common
class NetworkTrunkTests(common.NetworkTests):
"""Functional tests for Network Trunks"""
def setUp(self):
super().setUp()
# Nothing in this class works with Nova Network
if not self.haz_network:
self.skipTest("No Network service present")
network_name = uuid.uuid4().hex
subnet_name = uuid.uuid4().hex
self.parent_port_name = uuid.uuid4().hex
self.sub_port_name = uuid.uuid4().hex
self.openstack('network create %s' % network_name)
self.addCleanup(self.openstack, 'network delete %s' % network_name)
self.openstack(
'subnet create %s '
'--network %s --subnet-range 10.0.0.0/24' % (
subnet_name, network_name))
self.openstack('port create %s --network %s' %
(self.parent_port_name, network_name))
self.addCleanup(self.openstack, 'port delete %s' %
self.parent_port_name)
json_out = self.openstack('port create %s --network %s -f json' %
(self.sub_port_name, network_name))
self.sub_port_id = json.loads(json_out)['id']
self.addCleanup(self.openstack, 'port delete %s' % self.sub_port_name)
def test_network_trunk_create_delete(self):
trunk_name = uuid.uuid4().hex
self.openstack('network trunk create %s --parent-port %s -f json ' %
(trunk_name, self.parent_port_name))
raw_output = self.openstack(
'network trunk delete ' +
trunk_name
)
self.assertEqual('', raw_output)
def test_network_trunk_list(self):
trunk_name = uuid.uuid4().hex
json_output = json.loads(self.openstack(
'network trunk create %s --parent-port %s -f json ' %
(trunk_name, self.parent_port_name)))
self.addCleanup(self.openstack,
'network trunk delete ' + trunk_name)
self.assertEqual(trunk_name, json_output['name'])
json_output = json.loads(self.openstack(
'network trunk list -f json'
))
self.assertIn(trunk_name, [tr['Name'] for tr in json_output])
def test_network_trunk_set_unset(self):
trunk_name = uuid.uuid4().hex
json_output = json.loads(self.openstack(
'network trunk create %s --parent-port %s -f json ' %
(trunk_name, self.parent_port_name)))
self.addCleanup(self.openstack,
'network trunk delete ' + trunk_name)
self.assertEqual(trunk_name, json_output['name'])
self.openstack(
'network trunk set '
'--enable ' +
trunk_name
)
json_output = json.loads(self.openstack(
'network trunk show -f json ' +
trunk_name
))
self.assertTrue(json_output['is_admin_state_up'])
# Add subport to trunk
self.openstack(
'network trunk set ' +
'--subport port=%s,segmentation-type=vlan,segmentation-id=42 ' %
(self.sub_port_name) +
trunk_name
)
json_output = json.loads(self.openstack(
'network trunk show -f json ' +
trunk_name
))
self.assertEqual(
[{
'port_id': self.sub_port_id,
'segmentation_id': 42,
'segmentation_type': 'vlan'
}],
json_output['sub_ports'])
# Remove subport from trunk
self.openstack(
'network trunk unset ' +
trunk_name +
' --subport ' +
self.sub_port_name
)
json_output = json.loads(self.openstack(
'network trunk show -f json ' +
trunk_name
))
self.assertEqual(
[],
json_output['sub_ports'])
def test_network_trunk_list_subports(self):
trunk_name = uuid.uuid4().hex
json_output = json.loads(self.openstack(
'network trunk create %s --parent-port %s '
'--subport port=%s,segmentation-type=vlan,segmentation-id=42 '
'-f json ' %
(trunk_name, self.parent_port_name, self.sub_port_name)))
self.addCleanup(self.openstack,
'network trunk delete ' + trunk_name)
self.assertEqual(trunk_name, json_output['name'])
json_output = json.loads(self.openstack(
'network subport list --trunk %s -f json' % trunk_name))
self.assertEqual(
[{
'Port': self.sub_port_id,
'Segmentation ID': 42,
'Segmentation Type': 'vlan'
}],
json_output)

View File

@ -34,6 +34,7 @@ from openstack.network.v2 import port as _port
from openstack.network.v2 import rbac_policy as network_rbac
from openstack.network.v2 import segment as _segment
from openstack.network.v2 import service_profile as _flavor_profile
from openstack.network.v2 import trunk as _trunk
from openstackclient.tests.unit import fakes
from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes_v3
@ -2152,3 +2153,71 @@ def get_ndp_proxies(ndp_proxies=None, count=2):
create_ndp_proxies(count)
)
return mock.Mock(side_effect=ndp_proxies)
def create_one_trunk(attrs=None):
"""Create a fake trunk.
:param Dictionary attrs:
A dictionary with all attributes
:return:
A FakeResource object with name, id, etc.
"""
attrs = attrs or {}
# Set default attributes.
trunk_attrs = {
'id': 'trunk-id-' + uuid.uuid4().hex,
'name': 'trunk-name-' + uuid.uuid4().hex,
'description': '',
'port_id': 'port-' + uuid.uuid4().hex,
'admin_state_up': True,
'project_id': 'project-id-' + uuid.uuid4().hex,
'status': 'ACTIVE',
'sub_ports': [{'port_id': 'subport-' +
uuid.uuid4().hex,
'segmentation_type': 'vlan',
'segmentation_id': 100}],
}
# Overwrite default attributes.
trunk_attrs.update(attrs)
trunk = _trunk.Trunk(**trunk_attrs)
return trunk
def create_trunks(attrs=None, count=2):
"""Create multiple fake trunks.
:param Dictionary attrs:
A dictionary with all attributes
:param int count:
The number of trunks to fake
:return:
A list of FakeResource objects faking the trunks
"""
trunks = []
for i in range(0, count):
trunks.append(create_one_trunk(attrs))
return trunks
def get_trunks(trunks=None, count=2):
"""Get an iterable Mock object with a list of faked trunks.
If trunk list is provided, then initialize the Mock object
with the list. Otherwise create one.
:param List trunks:
A list of FakeResource objects faking trunks
:param int count:
The number of trunks to fake
:return:
An iterable Mock object with side_effect set to a list of faked
trunks
"""
if trunks is None:
trunks = create_trunks(count)
return mock.Mock(side_effect=trunks)

View File

@ -0,0 +1,851 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import argparse
import copy
from unittest import mock
from unittest.mock import call
from osc_lib.cli import format_columns
from osc_lib import exceptions
import testtools
from openstackclient.network.v2 import network_trunk
from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes_v3
from openstackclient.tests.unit.network.v2 import fakes as network_fakes
from openstackclient.tests.unit import utils as tests_utils
# Tests for Neutron trunks
#
class TestNetworkTrunk(network_fakes.TestNetworkV2):
def setUp(self):
super().setUp()
# Get a shortcut to the network client
self.network = self.app.client_manager.network
# Get a shortcut to the ProjectManager Mock
self.projects_mock = self.app.client_manager.identity.projects
# Get a shortcut to the DomainManager Mock
self.domains_mock = self.app.client_manager.identity.domains
class TestCreateNetworkTrunk(TestNetworkTrunk):
project = identity_fakes_v3.FakeProject.create_one_project()
domain = identity_fakes_v3.FakeDomain.create_one_domain()
trunk_networks = network_fakes.create_networks(count=2)
parent_port = network_fakes.create_one_port(
attrs={'project_id': project.id,
'network_id': trunk_networks[0]['id']})
sub_port = network_fakes.create_one_port(
attrs={'project_id': project.id,
'network_id': trunk_networks[1]['id']})
new_trunk = network_fakes.create_one_trunk(
attrs={'project_id': project.id,
'port_id': parent_port['id'],
'sub_ports': {
'port_id': sub_port['id'],
'segmentation_id': 42,
'segmentation_type': 'vlan'}
})
columns = (
'description',
'id',
'is_admin_state_up',
'name',
'port_id',
'project_id',
'status',
'sub_ports',
'tags'
)
data = (
new_trunk.description,
new_trunk.id,
new_trunk.is_admin_state_up,
new_trunk.name,
new_trunk.port_id,
new_trunk.project_id,
new_trunk.status,
format_columns.ListDictColumn(new_trunk.sub_ports),
[],
)
def setUp(self):
super().setUp()
self.network.create_trunk = mock.Mock(return_value=self.new_trunk)
self.network.find_port = mock.Mock(
side_effect=[self.parent_port, self.sub_port])
# Get the command object to test
self.cmd = network_trunk.CreateNetworkTrunk(self.app, self.namespace)
self.projects_mock.get.return_value = self.project
self.domains_mock.get.return_value = self.domain
def test_create_no_options(self):
arglist = []
verifylist = []
self.assertRaises(tests_utils.ParserException, self.check_parser,
self.cmd, arglist, verifylist)
def test_create_default_options(self):
arglist = [
"--parent-port", self.new_trunk['port_id'],
self.new_trunk['name'],
]
verifylist = [
('parent_port', self.new_trunk['port_id']),
('name', self.new_trunk['name']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = (self.cmd.take_action(parsed_args))
self.network.create_trunk.assert_called_once_with(**{
'name': self.new_trunk['name'],
'admin_state_up': self.new_trunk['admin_state_up'],
'port_id': self.new_trunk['port_id'],
})
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, data)
def test_create_full_options(self):
self.new_trunk['description'] = 'foo description'
subport = self.new_trunk.sub_ports[0]
arglist = [
"--disable",
"--description", self.new_trunk.description,
"--parent-port", self.new_trunk.port_id,
"--subport", 'port=%(port)s,segmentation-type=%(seg_type)s,'
'segmentation-id=%(seg_id)s' % {
'seg_id': subport['segmentation_id'],
'seg_type': subport['segmentation_type'],
'port': subport['port_id']},
self.new_trunk.name,
]
verifylist = [
('name', self.new_trunk.name),
('description', self.new_trunk.description),
('parent_port', self.new_trunk.port_id),
('add_subports', [{
'port': subport['port_id'],
'segmentation-id': str(subport['segmentation_id']),
'segmentation-type': subport['segmentation_type']}]),
('disable', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = (self.cmd.take_action(parsed_args))
self.network.create_trunk.assert_called_once_with(**{
'name': self.new_trunk.name,
'description': self.new_trunk.description,
'admin_state_up': False,
'port_id': self.new_trunk.port_id,
'sub_ports': [subport],
})
self.assertEqual(self.columns, columns)
data_with_desc = list(self.data)
data_with_desc[0] = self.new_trunk['description']
data_with_desc = tuple(data_with_desc)
self.assertEqual(data_with_desc, data)
def test_create_trunk_with_subport_invalid_segmentation_id_fail(self):
subport = self.new_trunk.sub_ports[0]
arglist = [
"--parent-port", self.new_trunk.port_id,
"--subport", "port=%(port)s,segmentation-type=%(seg_type)s,"
"segmentation-id=boom" % {
'seg_type': subport['segmentation_type'],
'port': subport['port_id']},
self.new_trunk.name,
]
verifylist = [
('name', self.new_trunk.name),
('parent_port', self.new_trunk.port_id),
('add_subports', [{
'port': subport['port_id'],
'segmentation-id': 'boom',
'segmentation-type': subport['segmentation_type']}]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
with testtools.ExpectedException(exceptions.CommandError) as e:
self.cmd.take_action(parsed_args)
self.assertEqual("Segmentation-id 'boom' is not an integer",
str(e))
def test_create_network_trunk_subports_without_optional_keys(self):
subport = copy.copy(self.new_trunk.sub_ports[0])
# Pop out the segmentation-id and segmentation-type
subport.pop('segmentation_type')
subport.pop('segmentation_id')
arglist = [
'--parent-port', self.new_trunk.port_id,
'--subport', 'port=%(port)s' % {'port': subport['port_id']},
self.new_trunk.name,
]
verifylist = [
('name', self.new_trunk.name),
('parent_port', self.new_trunk.port_id),
('add_subports', [{
'port': subport['port_id']}]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = (self.cmd.take_action(parsed_args))
self.network.create_trunk.assert_called_once_with(**{
'name': self.new_trunk.name,
'admin_state_up': True,
'port_id': self.new_trunk.port_id,
'sub_ports': [subport],
})
self.assertEqual(self.columns, columns)
data_with_desc = list(self.data)
data_with_desc[0] = self.new_trunk['description']
data_with_desc = tuple(data_with_desc)
self.assertEqual(data_with_desc, data)
def test_create_network_trunk_subports_without_required_key_fail(self):
subport = self.new_trunk.sub_ports[0]
arglist = [
'--parent-port', self.new_trunk.port_id,
'--subport', 'segmentation-type=%(seg_type)s,'
'segmentation-id=%(seg_id)s' % {
'seg_id': subport['segmentation_id'],
'seg_type': subport['segmentation_type']},
self.new_trunk.name,
]
verifylist = [
('name', self.new_trunk.name),
('parent_port', self.new_trunk.port_id),
('add_subports', [{
'segmentation_id': str(subport['segmentation_id']),
'segmentation_type': subport['segmentation_type']}]),
]
with testtools.ExpectedException(argparse.ArgumentTypeError):
self.check_parser(self.cmd, arglist, verifylist)
class TestDeleteNetworkTrunk(TestNetworkTrunk):
# The trunk to be deleted.
project = identity_fakes_v3.FakeProject.create_one_project()
domain = identity_fakes_v3.FakeDomain.create_one_domain()
trunk_networks = network_fakes.create_networks(count=2)
parent_port = network_fakes.create_one_port(
attrs={'project_id': project.id,
'network_id': trunk_networks[0]['id']})
sub_port = network_fakes.create_one_port(
attrs={'project_id': project.id,
'network_id': trunk_networks[1]['id']})
new_trunks = network_fakes.create_trunks(
attrs={'project_id': project.id,
'port_id': parent_port['id'],
'sub_ports': {
'port_id': sub_port['id'],
'segmentation_id': 42,
'segmentation_type': 'vlan'}
})
def setUp(self):
super().setUp()
self.network.find_trunk = mock.Mock(
side_effect=[self.new_trunks[0], self.new_trunks[1]])
self.network.delete_trunk = mock.Mock(return_value=None)
self.network.find_port = mock.Mock(
side_effect=[self.parent_port, self.sub_port])
self.projects_mock.get.return_value = self.project
self.domains_mock.get.return_value = self.domain
# Get the command object to test
self.cmd = network_trunk.DeleteNetworkTrunk(self.app, self.namespace)
def test_delete_trunkx(self):
arglist = [
self.new_trunks[0].name,
]
verifylist = [
('trunk', [self.new_trunks[0].name]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.network.delete_trunk.assert_called_once_with(
self.new_trunks[0].id)
self.assertIsNone(result)
def test_delete_trunk_multiple(self):
arglist = []
verifylist = []
for t in self.new_trunks:
arglist.append(t['name'])
verifylist = [
('trunk', arglist),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
calls = []
for t in self.new_trunks:
calls.append(call(t.id))
self.network.delete_trunk.assert_has_calls(calls)
self.assertIsNone(result)
def test_delete_trunk_multiple_with_exception(self):
arglist = [
self.new_trunks[0].name,
'unexist_trunk',
]
verifylist = [
('trunk',
[self.new_trunks[0].name, 'unexist_trunk']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.network.find_trunk = mock.Mock(
side_effect=[self.new_trunks[0], exceptions.CommandError])
with testtools.ExpectedException(exceptions.CommandError) as e:
self.cmd.take_action(parsed_args)
self.assertEqual('1 of 2 trunks failed to delete.', str(e))
self.network.delete_trunk.assert_called_once_with(
self.new_trunks[0].id
)
class TestShowNetworkTrunk(TestNetworkTrunk):
project = identity_fakes_v3.FakeProject.create_one_project()
domain = identity_fakes_v3.FakeDomain.create_one_domain()
# The trunk to set.
new_trunk = network_fakes.create_one_trunk()
columns = (
'description',
'id',
'is_admin_state_up',
'name',
'port_id',
'project_id',
'status',
'sub_ports',
'tags'
)
data = (
new_trunk.description,
new_trunk.id,
new_trunk.is_admin_state_up,
new_trunk.name,
new_trunk.port_id,
new_trunk.project_id,
new_trunk.status,
format_columns.ListDictColumn(new_trunk.sub_ports),
[],
)
def setUp(self):
super().setUp()
self.network.find_trunk = mock.Mock(return_value=self.new_trunk)
self.network.get_trunk = mock.Mock(return_value=self.new_trunk)
self.projects_mock.get.return_value = self.project
self.domains_mock.get.return_value = self.domain
# Get the command object to test
self.cmd = network_trunk.ShowNetworkTrunk(self.app, self.namespace)
def test_show_no_options(self):
arglist = []
verifylist = []
self.assertRaises(tests_utils.ParserException, self.check_parser,
self.cmd, arglist, verifylist)
def test_show_all_options(self):
arglist = [
self.new_trunk.id,
]
verifylist = [
('trunk', self.new_trunk.id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.network.get_trunk.assert_called_once_with(self.new_trunk.id)
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, data)
class TestListNetworkTrunk(TestNetworkTrunk):
project = identity_fakes_v3.FakeProject.create_one_project()
domain = identity_fakes_v3.FakeDomain.create_one_domain()
# Create trunks to be listed.
new_trunks = network_fakes.create_trunks(
{'created_at': '2001-01-01 00:00:00',
'updated_at': '2001-01-01 00:00:00'}, count=3)
columns = (
'ID',
'Name',
'Parent Port',
'Description'
)
columns_long = columns + (
'Status',
'State',
'Created At',
'Updated At'
)
data = []
for t in new_trunks:
data.append((
t['id'],
t['name'],
t['port_id'],
t['description']
))
data_long = []
for t in new_trunks:
data_long.append((
t['id'],
t['name'],
t['port_id'],
t['description'],
t['status'],
network_trunk.AdminStateColumn(''),
'2001-01-01 00:00:00',
'2001-01-01 00:00:00',
))
def setUp(self):
super().setUp()
self.network.trunks = mock.Mock(return_value=self.new_trunks)
self.projects_mock.get.return_value = self.project
self.domains_mock.get.return_value = self.domain
# Get the command object to test
self.cmd = network_trunk.ListNetworkTrunk(self.app, self.namespace)
def test_trunk_list_no_option(self):
arglist = []
verifylist = []
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.network.trunks.assert_called_once_with()
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, list(data))
def test_trunk_list_long(self):
arglist = [
'--long',
]
verifylist = [
('long', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.network.trunks.assert_called_once_with()
self.assertEqual(self.columns_long, columns)
self.assertEqual(self.data_long, list(data))
class TestSetNetworkTrunk(TestNetworkTrunk):
project = identity_fakes_v3.FakeProject.create_one_project()
domain = identity_fakes_v3.FakeDomain.create_one_domain()
trunk_networks = network_fakes.create_networks(count=2)
parent_port = network_fakes.create_one_port(
attrs={'project_id': project.id,
'network_id': trunk_networks[0]['id']})
sub_port = network_fakes.create_one_port(
attrs={'project_id': project.id,
'network_id': trunk_networks[1]['id']})
# Create trunks to be listed.
_trunk = network_fakes.create_one_trunk(
attrs={'project_id': project.id,
'port_id': parent_port['id'],
'sub_ports': {
'port_id': sub_port['id'],
'segmentation_id': 42,
'segmentation_type': 'vlan'}
})
columns = (
'admin_state_up',
'id',
'name',
'description',
'port_id',
'project_id',
'status',
'sub_ports',
)
data = (
_trunk.id,
_trunk.name,
_trunk.description,
_trunk.port_id,
_trunk.project_id,
_trunk.status,
format_columns.ListDictColumn(_trunk.sub_ports),
)
def setUp(self):
super().setUp()
self.network.update_trunk = mock.Mock(return_value=self._trunk)
self.network.add_trunk_subports = mock.Mock(return_value=self._trunk)
self.network.find_trunk = mock.Mock(return_value=self._trunk)
self.network.find_port = mock.Mock(
side_effect=[self.sub_port, self.sub_port])
self.projects_mock.get.return_value = self.project
self.domains_mock.get.return_value = self.domain
# Get the command object to test
self.cmd = network_trunk.SetNetworkTrunk(self.app, self.namespace)
def _test_set_network_trunk_attr(self, attr, value):
arglist = [
'--%s' % attr, value,
self._trunk[attr],
]
verifylist = [
(attr, value),
('trunk', self._trunk[attr]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
attrs = {
attr: value,
}
self.network.update_trunk.assert_called_once_with(
self._trunk, **attrs)
self.assertIsNone(result)
def test_set_network_trunk_name(self):
self._test_set_network_trunk_attr('name', 'trunky')
def test_set_network_trunk_description(self):
self._test_set_network_trunk_attr('description', 'description')
def test_set_network_trunk_admin_state_up_disable(self):
arglist = [
'--disable',
self._trunk['name'],
]
verifylist = [
('disable', True),
('trunk', self._trunk['name']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
attrs = {
'admin_state_up': False,
}
self.network.update_trunk.assert_called_once_with(
self._trunk, **attrs)
self.assertIsNone(result)
def test_set_network_trunk_admin_state_up_enable(self):
arglist = [
'--enable',
self._trunk['name'],
]
verifylist = [
('enable', True),
('trunk', self._trunk['name']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
attrs = {
'admin_state_up': True,
}
self.network.update_trunk.assert_called_once_with(
self._trunk, **attrs)
self.assertIsNone(result)
def test_set_network_trunk_nothing(self):
arglist = [self._trunk['name'], ]
verifylist = [('trunk', self._trunk['name']), ]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
attrs = {}
self.network.update_trunk.assert_called_once_with(
self._trunk, **attrs)
self.assertIsNone(result)
def test_set_network_trunk_subports(self):
subport = self._trunk['sub_ports'][0]
arglist = [
'--subport', 'port=%(port)s,segmentation-type=%(seg_type)s,'
'segmentation-id=%(seg_id)s' % {
'seg_id': subport['segmentation_id'],
'seg_type': subport['segmentation_type'],
'port': subport['port_id']},
self._trunk['name'],
]
verifylist = [
('trunk', self._trunk['name']),
('set_subports', [{
'port': subport['port_id'],
'segmentation-id': str(subport['segmentation_id']),
'segmentation-type': subport['segmentation_type']}]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.network.add_trunk_subports.assert_called_once_with(
self._trunk, [subport])
self.assertIsNone(result)
def test_set_network_trunk_subports_without_optional_keys(self):
subport = copy.copy(self._trunk['sub_ports'][0])
# Pop out the segmentation-id and segmentation-type
subport.pop('segmentation_type')
subport.pop('segmentation_id')
arglist = [
'--subport', 'port=%(port)s' % {'port': subport['port_id']},
self._trunk['name'],
]
verifylist = [
('trunk', self._trunk['name']),
('set_subports', [{
'port': subport['port_id']}]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.network.add_trunk_subports.assert_called_once_with(
self._trunk, [subport])
self.assertIsNone(result)
def test_set_network_trunk_subports_without_required_key_fail(self):
subport = self._trunk['sub_ports'][0]
arglist = [
'--subport', 'segmentation-type=%(seg_type)s,'
'segmentation-id=%(seg_id)s' % {
'seg_id': subport['segmentation_id'],
'seg_type': subport['segmentation_type']},
self._trunk['name'],
]
verifylist = [
('trunk', self._trunk['name']),
('set_subports', [{
'segmentation-id': str(subport['segmentation_id']),
'segmentation-type': subport['segmentation_type']}]),
]
with testtools.ExpectedException(argparse.ArgumentTypeError):
self.check_parser(self.cmd, arglist, verifylist)
self.network.add_trunk_subports.assert_not_called()
def test_set_trunk_attrs_with_exception(self):
arglist = [
'--name', 'reallylongname',
self._trunk['name'],
]
verifylist = [
('trunk', self._trunk['name']),
('name', 'reallylongname'),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.network.update_trunk = (
mock.Mock(side_effect=exceptions.CommandError)
)
with testtools.ExpectedException(exceptions.CommandError) as e:
self.cmd.take_action(parsed_args)
self.assertEqual(
"Failed to set trunk '%s': " % self._trunk['name'],
str(e))
attrs = {'name': 'reallylongname'}
self.network.update_trunk.assert_called_once_with(
self._trunk, **attrs)
self.network.add_trunk_subports.assert_not_called()
def test_set_trunk_add_subport_with_exception(self):
arglist = [
'--subport', 'port=invalid_subport',
self._trunk['name'],
]
verifylist = [
('trunk', self._trunk['name']),
('set_subports', [{'port': 'invalid_subport'}]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.network.add_trunk_subports = (
mock.Mock(side_effect=exceptions.CommandError)
)
self.network.find_port = (mock.Mock(
return_value={'id': 'invalid_subport'}))
with testtools.ExpectedException(exceptions.CommandError) as e:
self.cmd.take_action(parsed_args)
self.assertEqual(
"Failed to add subports to trunk '%s': " % self._trunk['name'],
str(e))
self.network.update_trunk.assert_called_once_with(
self._trunk)
self.network.add_trunk_subports.assert_called_once_with(
self._trunk, [{'port_id': 'invalid_subport'}])
class TestListNetworkSubport(TestNetworkTrunk):
_trunk = network_fakes.create_one_trunk()
_subports = _trunk['sub_ports']
columns = (
'Port',
'Segmentation Type',
'Segmentation ID',
)
data = []
for s in _subports:
data.append((
s['port_id'],
s['segmentation_type'],
s['segmentation_id'],
))
def setUp(self):
super().setUp()
self.network.find_trunk = mock.Mock(return_value=self._trunk)
self.network.get_trunk_subports = mock.Mock(
return_value={network_trunk.SUB_PORTS: self._subports})
# Get the command object to test
self.cmd = network_trunk.ListNetworkSubport(self.app, self.namespace)
def test_subport_list(self):
arglist = [
'--trunk', self._trunk['name'],
]
verifylist = [
('trunk', self._trunk['name']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.network.get_trunk_subports.assert_called_once_with(self._trunk)
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, list(data))
class TestUnsetNetworkTrunk(TestNetworkTrunk):
project = identity_fakes_v3.FakeProject.create_one_project()
domain = identity_fakes_v3.FakeDomain.create_one_domain()
trunk_networks = network_fakes.create_networks(count=2)
parent_port = network_fakes.create_one_port(
attrs={'project_id': project.id,
'network_id': trunk_networks[0]['id']})
sub_port = network_fakes.create_one_port(
attrs={'project_id': project.id,
'network_id': trunk_networks[1]['id']})
_trunk = network_fakes.create_one_trunk(
attrs={'project_id': project.id,
'port_id': parent_port['id'],
'sub_ports': {
'port_id': sub_port['id'],
'segmentation_id': 42,
'segmentation_type': 'vlan'}
})
columns = (
'admin_state_up',
'id',
'name',
'port_id',
'project_id',
'status',
'sub_ports',
)
data = (
network_trunk.AdminStateColumn(_trunk['admin_state_up']),
_trunk['id'],
_trunk['name'],
_trunk['port_id'],
_trunk['project_id'],
_trunk['status'],
format_columns.ListDictColumn(_trunk['sub_ports']),
)
def setUp(self):
super().setUp()
self.network.find_trunk = mock.Mock(return_value=self._trunk)
self.network.find_port = mock.Mock(
side_effect=[self.sub_port, self.sub_port])
self.network.delete_trunk_subports = mock.Mock(return_value=None)
# Get the command object to test
self.cmd = network_trunk.UnsetNetworkTrunk(self.app, self.namespace)
def test_unset_network_trunk_subport(self):
subport = self._trunk['sub_ports'][0]
arglist = [
"--subport", subport['port_id'],
self._trunk['name'],
]
verifylist = [
('trunk', self._trunk['name']),
('unset_subports', [subport['port_id']]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.network.delete_trunk_subports.assert_called_once_with(
self._trunk,
[{'port_id': subport['port_id']}]
)
self.assertIsNone(result)
def test_unset_subport_no_arguments_fail(self):
arglist = [
self._trunk['name'],
]
verifylist = [
('trunk', self._trunk['name']),
]
self.assertRaises(tests_utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)

View File

@ -513,6 +513,14 @@ openstack.network.v2 =
network_service_provider_list = openstackclient.network.v2.network_service_provider:ListNetworkServiceProvider
network_subport_list = openstackclient.network.v2.network_trunk:ListNetworkSubport
network_trunk_create = openstackclient.network.v2.network_trunk:CreateNetworkTrunk
network_trunk_delete = openstackclient.network.v2.network_trunk:DeleteNetworkTrunk
network_trunk_list = openstackclient.network.v2.network_trunk:ListNetworkTrunk
network_trunk_set = openstackclient.network.v2.network_trunk:SetNetworkTrunk
network_trunk_show = openstackclient.network.v2.network_trunk:ShowNetworkTrunk
network_trunk_unset = openstackclient.network.v2.network_trunk:UnsetNetworkTrunk
port_create = openstackclient.network.v2.port:CreatePort
port_delete = openstackclient.network.v2.port:DeletePort
port_list = openstackclient.network.v2.port:ListPort