use plugin common utils from neutron-lib

A bulk of the public APIs that are part of neutron.plugins.common.utils
were rehomed into neutron-lib with [1] and the remaining with [2].

This patch consumes [1] by:
- Removing the rehomed code from neutron.
- Removing the UTs that are no longer applicable.
- Leaving the functions in [2][3] in neutron.plugins.common.utils until
we release [2][3] and can consume it at which point we should be able to
remove the utils module.

NeutronLibImpact

[1] Iabb155b5d2d0ec6104ebee5dd42cf292bdf3ec61
[2] I2c0e4ef03425ba0bb2651ae3e68d6c8cde7b8f90
[3] I73f5e8ad7a1a83392094db846d18964d811b8bb2

Change-Id: I1d63cbea463e92e1d2e053f8e1a564ed52cb84f8
This commit is contained in:
Boden R 2018-03-02 13:07:51 -07:00
parent c16d15fff2
commit 410a83c89d
14 changed files with 45 additions and 513 deletions

View File

@ -30,6 +30,7 @@ from neutron_lib import exceptions as n_exc
from neutron_lib.exceptions import l3 as l3_exc
from neutron_lib.plugins import constants as plugin_constants
from neutron_lib.plugins import directory
from neutron_lib.plugins import utils as plugin_utils
from neutron_lib.services import base as base_services
from oslo_log import log as logging
from oslo_utils import uuidutils
@ -381,8 +382,8 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
if not gw_port['fixed_ips']:
LOG.debug('No IPs available for external network %s',
network_id)
with p_utils.delete_port_on_error(self._core_plugin,
context.elevated(), gw_port['id']):
with plugin_utils.delete_port_on_error(
self._core_plugin, context.elevated(), gw_port['id']):
with context.session.begin(subtransactions=True):
router.gw_port = self._core_plugin._get_port(
context.elevated(), gw_port['id'])
@ -849,7 +850,7 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
port = self._check_router_port(context, port_id, '')
revert_value = {'device_id': '',
'device_owner': port['device_owner']}
with p_utils.update_port_on_error(
with plugin_utils.update_port_on_error(
self._core_plugin, context, port_id, revert_value):
port, subnets = self._add_interface_by_port(
context, router, port_id, device_owner)
@ -863,10 +864,10 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
'device_owner': port['device_owner']}
if cleanup_port:
mgr = p_utils.delete_port_on_error(
mgr = plugin_utils.delete_port_on_error(
self._core_plugin, context, port['id'])
else:
mgr = p_utils.update_port_on_error(
mgr = plugin_utils.update_port_on_error(
self._core_plugin, context, port['id'], revert_value)
if new_router_intf:
@ -1311,9 +1312,9 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
{'port': port},
check_allow_post=False)
with p_utils.delete_port_on_error(self._core_plugin,
context.elevated(),
external_port['id']),\
with plugin_utils.delete_port_on_error(
self._core_plugin, context.elevated(),
external_port['id']),\
context.session.begin(subtransactions=True):
# Ensure IPv4 addresses are allocated on external port
external_ipv4_ips = self._port_ipv4_fixed_ips(external_port)

View File

@ -26,6 +26,7 @@ from neutron_lib import exceptions as n_exc
from neutron_lib.exceptions import l3 as l3_exc
from neutron_lib.plugins import constants as plugin_constants
from neutron_lib.plugins import directory
from neutron_lib.plugins import utils as plugin_utils
from oslo_config import cfg
from oslo_log import helpers as log_helper
from oslo_log import log as logging
@ -212,7 +213,7 @@ class DVRResourceOperationHandler(object):
msg = _("Unable to create the SNAT Interface Port")
raise n_exc.BadRequest(resource='router', msg=msg)
with p_utils.delete_port_on_error(
with plugin_utils.delete_port_on_error(
self.l3plugin._core_plugin, context.elevated(), snat_port['id']):
l3_obj.RouterPort(
context,

View File

@ -14,141 +14,18 @@
"""
Common utilities and helper functions for OpenStack Networking Plugins.
These utils are private and for neutron internal use only.
"""
import collections
import contextlib
import hashlib
from neutron_lib.api import attributes as lib_attrs
from neutron_lib.api.definitions import network as net_def
from neutron_lib.api.definitions import port as port_def
from neutron_lib.api.definitions import subnet as subnet_def
from neutron_lib import constants as n_const
from neutron_lib import exceptions
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import encodeutils
from oslo_utils import excutils
import webob.exc
from neutron._i18n import _
INTERFACE_HASH_LEN = 6
LOG = logging.getLogger(__name__)
def get_deployment_physnet_mtu():
"""Retrieves global physical network MTU setting.
Plugins should use this function to retrieve the MTU set by the operator
that is equal to or less than the MTU of their nodes' physical interfaces.
Note that it is the responsibility of the plugin to deduct the value of
any encapsulation overhead required before advertising it to VMs.
"""
return cfg.CONF.global_physnet_mtu
def is_valid_vlan_tag(vlan):
return n_const.MIN_VLAN_TAG <= vlan <= n_const.MAX_VLAN_TAG
def is_valid_gre_id(gre_id):
return n_const.MIN_GRE_ID <= gre_id <= n_const.MAX_GRE_ID
def is_valid_vxlan_vni(vni):
return n_const.MIN_VXLAN_VNI <= vni <= n_const.MAX_VXLAN_VNI
def is_valid_geneve_vni(vni):
return n_const.MIN_GENEVE_VNI <= vni <= n_const.MAX_GENEVE_VNI
def verify_tunnel_range(tunnel_range, tunnel_type):
"""Raise an exception for invalid tunnel range or malformed range."""
mappings = {n_const.TYPE_GRE: is_valid_gre_id,
n_const.TYPE_VXLAN: is_valid_vxlan_vni,
n_const.TYPE_GENEVE: is_valid_geneve_vni}
if tunnel_type in mappings:
for ident in tunnel_range:
if not mappings[tunnel_type](ident):
raise exceptions.NetworkTunnelRangeError(
tunnel_range=tunnel_range,
error=_("%(id)s is not a valid %(type)s identifier") %
{'id': ident, 'type': tunnel_type})
if tunnel_range[1] < tunnel_range[0]:
raise exceptions.NetworkTunnelRangeError(
tunnel_range=tunnel_range,
error=_("End of tunnel range is less "
"than start of tunnel range"))
def raise_invalid_tag(vlan_str, vlan_range):
"""Raise an exception for invalid tag."""
raise exceptions.NetworkVlanRangeError(
vlan_range=vlan_range,
error=_("%s is not a valid VLAN tag") % vlan_str)
def verify_vlan_range(vlan_range):
"""Raise an exception for invalid tags or malformed range."""
for vlan_tag in vlan_range:
if not is_valid_vlan_tag(vlan_tag):
raise_invalid_tag(str(vlan_tag), vlan_range)
if vlan_range[1] < vlan_range[0]:
raise exceptions.NetworkVlanRangeError(
vlan_range=vlan_range,
error=_("End of VLAN range is less than start of VLAN range"))
def parse_network_vlan_range(network_vlan_range):
"""Interpret a string as network[:vlan_begin:vlan_end]."""
entry = network_vlan_range.strip()
if ':' in entry:
if entry.count(':') != 2:
raise exceptions.NetworkVlanRangeError(
vlan_range=entry,
error=_("Need exactly two values for VLAN range"))
network, vlan_min, vlan_max = entry.split(':')
if not network:
raise exceptions.PhysicalNetworkNameError()
try:
vlan_min = int(vlan_min)
except ValueError:
raise_invalid_tag(vlan_min, entry)
try:
vlan_max = int(vlan_max)
except ValueError:
raise_invalid_tag(vlan_max, entry)
vlan_range = (vlan_min, vlan_max)
verify_vlan_range(vlan_range)
return network, vlan_range
else:
return entry, None
def parse_network_vlan_ranges(network_vlan_ranges_cfg_entries):
"""Interpret a list of strings as network[:vlan_begin:vlan_end] entries."""
networks = collections.OrderedDict()
for entry in network_vlan_ranges_cfg_entries:
network, vlan_range = parse_network_vlan_range(entry)
if vlan_range:
networks.setdefault(network, []).append(vlan_range)
else:
networks.setdefault(network, [])
return networks
def in_pending_status(status):
return status in (n_const.PENDING_CREATE,
n_const.PENDING_UPDATE,
n_const.PENDING_DELETE)
# TODO(boden): remove when consuming I2c0e4ef03425ba0bb2651ae3e68d6c8cde7b8f90
def _fixup_res_dict(context, attr_name, res_dict, check_allow_post=True):
attr_info = lib_attrs.RESOURCES[attr_name]
@ -187,60 +64,13 @@ def create_port(core_plugin, context, port, check_allow_post=True):
return core_plugin.create_port(context, {'port': port_data})
@contextlib.contextmanager
def delete_port_on_error(core_plugin, context, port_id):
try:
yield
except Exception:
with excutils.save_and_reraise_exception():
try:
core_plugin.delete_port(context, port_id,
l3_port_check=False)
except exceptions.PortNotFound:
LOG.debug("Port %s not found", port_id)
except Exception:
LOG.exception("Failed to delete port: %s", port_id)
# TODO(boden): consume with I73f5e8ad7a1a83392094db846d18964d811b8bb2
def get_deployment_physnet_mtu():
"""Retrieves global physical network MTU setting.
@contextlib.contextmanager
def update_port_on_error(core_plugin, context, port_id, revert_value):
try:
yield
except Exception:
with excutils.save_and_reraise_exception():
try:
core_plugin.update_port(context, port_id,
{'port': revert_value})
except Exception:
LOG.exception("Failed to update port: %s", port_id)
def get_interface_name(name, prefix='', max_len=n_const.DEVICE_NAME_MAX_LEN):
"""Construct an interface name based on the prefix and name.
The interface name can not exceed the maximum length passed in. Longer
names are hashed to help ensure uniqueness.
Plugins should use this function to retrieve the MTU set by the operator
that is equal to or less than the MTU of their nodes' physical interfaces.
Note that it is the responsibility of the plugin to deduct the value of
any encapsulation overhead required before advertising it to VMs.
"""
requested_name = prefix + name
if len(requested_name) <= max_len:
return requested_name
# We can't just truncate because interfaces may be distinguished
# by an ident at the end. A hash over the name should be unique.
# Leave part of the interface name on for easier identification
if (len(prefix) + INTERFACE_HASH_LEN) > max_len:
raise ValueError(_("Too long prefix provided. New name would exceed "
"given length for an interface name."))
namelen = max_len - len(prefix) - INTERFACE_HASH_LEN
hashed_name = hashlib.sha1(encodeutils.to_utf8(name))
new_name = ('%(prefix)s%(truncated)s%(hash)s' %
{'prefix': prefix, 'truncated': name[0:namelen],
'hash': hashed_name.hexdigest()[0:INTERFACE_HASH_LEN]})
LOG.info("The requested interface name %(requested_name)s exceeds the "
"%(limit)d character limitation. It was shortened to "
"%(new_name)s to fit.",
{'requested_name': requested_name,
'limit': max_len, 'new_name': new_name})
return new_name
return cfg.CONF.global_physnet_mtu

View File

@ -24,6 +24,7 @@ import sys
import netaddr
from neutron_lib.agent import topics
from neutron_lib import constants
from neutron_lib.plugins import utils as plugin_utils
from neutron_lib.utils import helpers
from oslo_config import cfg
from oslo_log import log as logging
@ -40,7 +41,6 @@ from neutron.common import exceptions
from neutron.common import profiler as setup_profiler
from neutron.common import utils
from neutron.conf.agent import common as agent_config
from neutron.plugins.common import utils as p_utils
from neutron.plugins.ml2.drivers.agent import _agent_manager_base as amb
from neutron.plugins.ml2.drivers.agent import _common_agent as ca
from neutron.plugins.ml2.drivers.agent import config as cagt_config # noqa
@ -197,7 +197,7 @@ class LinuxBridgeManager(amb.CommonAgentManagerBase):
# prefix = mix_iHASHED.1111
if (len(physical_interface) + len(vlan_postfix) >
constants.DEVICE_NAME_MAX_LEN):
physical_interface = p_utils.get_interface_name(
physical_interface = plugin_utils.get_interface_name(
physical_interface, max_len=(constants.DEVICE_NAME_MAX_LEN -
MAX_VLAN_POSTFIX_LEN))
return "%s%s" % (physical_interface, vlan_postfix)

View File

@ -15,8 +15,8 @@
# under the License.
from neutron_lib import constants as n_const
from neutron_lib.plugins import utils as plugin_utils
from neutron.plugins.common import utils as p_utils
MAX_VLAN_POSTFIX_LEN = 5
@ -25,7 +25,6 @@ def get_vlan_device_name(src_dev, vlan):
"""Generating the vlan device name."""
# Ensure that independent of the vlan len the same name prefix is used.
src_dev = p_utils.get_interface_name(src_dev,
max_len=n_const.DEVICE_NAME_MAX_LEN -
MAX_VLAN_POSTFIX_LEN)
src_dev = plugin_utils.get_interface_name(
src_dev, max_len=n_const.DEVICE_NAME_MAX_LEN - MAX_VLAN_POSTFIX_LEN)
return "%s.%s" % (src_dev, vlan)

View File

@ -30,6 +30,7 @@ from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources as callback_resources
from neutron_lib import constants as n_const
from neutron_lib import context
from neutron_lib.plugins import utils as plugin_utils
from neutron_lib.utils import helpers
from oslo_config import cfg
from oslo_log import log as logging
@ -55,7 +56,6 @@ from neutron.api.rpc.handlers import securitygroups_rpc as sg_rpc
from neutron.common import config
from neutron.common import utils as n_utils
from neutron.conf.agent import xenapi_conf
from neutron.plugins.common import utils as p_utils
from neutron.plugins.ml2.drivers.agent import capabilities
from neutron.plugins.ml2.drivers.l2pop.rpc_manager import l2population_rpc
from neutron.plugins.ml2.drivers.openvswitch.agent.common \
@ -1101,9 +1101,9 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
self.phys_brs[physical_network] = br
# interconnect physical and integration bridges using veth/patches
int_if_name = p_utils.get_interface_name(
int_if_name = plugin_utils.get_interface_name(
bridge, prefix=constants.PEER_INTEGRATION_PREFIX)
phys_if_name = p_utils.get_interface_name(
phys_if_name = plugin_utils.get_interface_name(
bridge, prefix=constants.PEER_PHYSICAL_PREFIX)
# Interface type of port for physical and integration bridges must
# be same, so check only one of them.

View File

@ -22,6 +22,7 @@ from neutron_lib import constants as p_const
from neutron_lib import context
from neutron_lib import exceptions as exc
from neutron_lib.plugins.ml2 import api
from neutron_lib.plugins import utils as plugin_utils
from oslo_config import cfg
from oslo_db import exception as db_exc
from oslo_log import log
@ -32,7 +33,6 @@ from sqlalchemy import or_
from neutron._i18n import _
from neutron.db import api as db_api
from neutron.objects import base as base_obj
from neutron.plugins.common import utils as plugin_utils
from neutron.plugins.ml2.drivers import helpers
LOG = log.getLogger(__name__)

View File

@ -19,6 +19,7 @@ from neutron_lib import constants as p_const
from neutron_lib import context
from neutron_lib import exceptions as exc
from neutron_lib.plugins.ml2 import api
from neutron_lib.plugins import utils as plugin_utils
from oslo_config import cfg
from oslo_log import log
from six import moves
@ -27,7 +28,6 @@ from neutron._i18n import _
from neutron.conf.plugins.ml2.drivers import driver_type
from neutron.db import api as db_api
from neutron.objects.plugins.ml2 import vlanallocation as vlanalloc
from neutron.plugins.common import utils as plugin_utils
from neutron.plugins.ml2.drivers import helpers
LOG = log.getLogger(__name__)

View File

@ -22,13 +22,13 @@ from neutron_lib import constants as lib_const
from neutron_lib.exceptions import dns as dns_exc
from neutron_lib.plugins import directory
from neutron_lib.plugins.ml2 import api
from neutron_lib.plugins import utils as plugin_utils
from oslo_config import cfg
from oslo_log import log as logging
from neutron.db import segments_db
from neutron.objects import network as net_obj
from neutron.objects import ports as port_obj
from neutron.plugins.common import utils as plugin_utils
from neutron.services.externaldns import driver
LOG = logging.getLogger(__name__)

View File

@ -12,9 +12,9 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron._i18n import _
from neutron_lib.plugins import utils as plugin_utils
from neutron.plugins.common import utils
from neutron._i18n import _
from neutron.services.trunk import constants as trunk_consts
# Base map of segmentation types supported with their respective validator
@ -23,7 +23,7 @@ from neutron.services.trunk import constants as trunk_consts
# and respective validator, however this is a configuration that may be
# supported only in single-driver deployments.
_supported = {
trunk_consts.VLAN: utils.is_valid_vlan_tag,
trunk_consts.VLAN: plugin_utils.is_valid_vlan_tag,
}

View File

@ -22,7 +22,6 @@ import eventlet
import mock
import netaddr
from neutron_lib import constants
from neutron_lib import exceptions as exc
from oslo_log import log as logging
import six
import testscenarios
@ -30,7 +29,6 @@ import testtools
from neutron.common import constants as common_constants
from neutron.common import utils
from neutron.plugins.common import utils as plugin_utils
from neutron.tests import base
from neutron.tests.unit import tests
@ -94,306 +92,6 @@ def _port_rule_masking(port_min, port_max):
return current.get_list()
class TestParseTunnelRangesMixin(object):
TUN_MIN = None
TUN_MAX = None
TYPE = None
_err_prefix = "Invalid network tunnel range: '%d:%d' - "
_err_suffix = "%s is not a valid %s identifier."
_err_range = "End of tunnel range is less than start of tunnel range."
def _build_invalid_tunnel_range_msg(self, t_range_tuple, n):
bad_id = t_range_tuple[n - 1]
return (self._err_prefix % t_range_tuple) + (self._err_suffix
% (bad_id, self.TYPE))
def _build_range_reversed_msg(self, t_range_tuple):
return (self._err_prefix % t_range_tuple) + self._err_range
def _verify_range(self, tunnel_range):
return plugin_utils.verify_tunnel_range(tunnel_range, self.TYPE)
def _check_range_valid_ranges(self, tunnel_range):
self.assertIsNone(self._verify_range(tunnel_range))
def _check_range_invalid_ranges(self, bad_range, which):
expected_msg = self._build_invalid_tunnel_range_msg(bad_range, which)
err = self.assertRaises(exc.NetworkTunnelRangeError,
self._verify_range, bad_range)
self.assertEqual(expected_msg, str(err))
def _check_range_reversed(self, bad_range):
err = self.assertRaises(exc.NetworkTunnelRangeError,
self._verify_range, bad_range)
expected_msg = self._build_range_reversed_msg(bad_range)
self.assertEqual(expected_msg, str(err))
def test_range_tunnel_id_valid(self):
self._check_range_valid_ranges((self.TUN_MIN, self.TUN_MAX))
def test_range_tunnel_id_invalid(self):
self._check_range_invalid_ranges((-1, self.TUN_MAX), 1)
self._check_range_invalid_ranges((self.TUN_MIN,
self.TUN_MAX + 1), 2)
self._check_range_invalid_ranges((self.TUN_MIN - 1,
self.TUN_MAX + 1), 1)
def test_range_tunnel_id_reversed(self):
self._check_range_reversed((self.TUN_MAX, self.TUN_MIN))
class TestGreTunnelRangeVerifyValid(TestParseTunnelRangesMixin,
base.BaseTestCase):
TUN_MIN = constants.MIN_GRE_ID
TUN_MAX = constants.MAX_GRE_ID
TYPE = constants.TYPE_GRE
class TestVxlanTunnelRangeVerifyValid(TestParseTunnelRangesMixin,
base.BaseTestCase):
TUN_MIN = constants.MIN_VXLAN_VNI
TUN_MAX = constants.MAX_VXLAN_VNI
TYPE = constants.TYPE_VXLAN
class UtilTestParseVlanRanges(base.BaseTestCase):
_err_prefix = "Invalid network VLAN range: '"
_err_bad_count = "' - 'Need exactly two values for VLAN range'."
_err_bad_vlan = "' - '%s is not a valid VLAN tag'."
_err_range = "' - 'End of VLAN range is less than start of VLAN range'."
def _range_err_bad_count(self, nv_range):
return self._err_prefix + nv_range + self._err_bad_count
def _range_invalid_vlan(self, nv_range, n):
vlan = nv_range.split(':')[n]
return self._err_prefix + nv_range + (self._err_bad_vlan % vlan)
def _nrange_invalid_vlan(self, nv_range, n):
vlan = nv_range.split(':')[n]
v_range = ':'.join(nv_range.split(':')[1:])
return self._err_prefix + v_range + (self._err_bad_vlan % vlan)
def _vrange_invalid_vlan(self, v_range_tuple, n):
vlan = v_range_tuple[n - 1]
v_range_str = '%d:%d' % v_range_tuple
return self._err_prefix + v_range_str + (self._err_bad_vlan % vlan)
def _vrange_invalid(self, v_range_tuple):
v_range_str = '%d:%d' % v_range_tuple
return self._err_prefix + v_range_str + self._err_range
class TestVlanNetworkNameValid(base.BaseTestCase):
def parse_vlan_ranges(self, vlan_range):
return plugin_utils.parse_network_vlan_ranges(vlan_range)
def test_validate_provider_phynet_name_mixed(self):
self.assertRaises(exc.PhysicalNetworkNameError,
self.parse_vlan_ranges,
['', ':23:30', 'physnet1',
'tenant_net:100:200'])
def test_validate_provider_phynet_name_bad(self):
self.assertRaises(exc.PhysicalNetworkNameError,
self.parse_vlan_ranges,
[':1:34'])
class TestVlanRangeVerifyValid(UtilTestParseVlanRanges):
def verify_range(self, vlan_range):
return plugin_utils.verify_vlan_range(vlan_range)
def test_range_valid_ranges(self):
self.assertIsNone(self.verify_range((1, 2)))
self.assertIsNone(self.verify_range((1, 1999)))
self.assertIsNone(self.verify_range((100, 100)))
self.assertIsNone(self.verify_range((100, 200)))
self.assertIsNone(self.verify_range((4001, 4094)))
self.assertIsNone(self.verify_range((1, 4094)))
def check_one_vlan_invalid(self, bad_range, which):
expected_msg = self._vrange_invalid_vlan(bad_range, which)
err = self.assertRaises(exc.NetworkVlanRangeError,
self.verify_range, bad_range)
self.assertEqual(str(err), expected_msg)
def test_range_first_vlan_invalid_negative(self):
self.check_one_vlan_invalid((-1, 199), 1)
def test_range_first_vlan_invalid_zero(self):
self.check_one_vlan_invalid((0, 199), 1)
def test_range_first_vlan_invalid_limit_plus_one(self):
self.check_one_vlan_invalid((4095, 199), 1)
def test_range_first_vlan_invalid_too_big(self):
self.check_one_vlan_invalid((9999, 199), 1)
def test_range_second_vlan_invalid_negative(self):
self.check_one_vlan_invalid((299, -1), 2)
def test_range_second_vlan_invalid_zero(self):
self.check_one_vlan_invalid((299, 0), 2)
def test_range_second_vlan_invalid_limit_plus_one(self):
self.check_one_vlan_invalid((299, 4095), 2)
def test_range_second_vlan_invalid_too_big(self):
self.check_one_vlan_invalid((299, 9999), 2)
def test_range_both_vlans_invalid_01(self):
self.check_one_vlan_invalid((-1, 0), 1)
def test_range_both_vlans_invalid_02(self):
self.check_one_vlan_invalid((0, 4095), 1)
def test_range_both_vlans_invalid_03(self):
self.check_one_vlan_invalid((4095, 9999), 1)
def test_range_both_vlans_invalid_04(self):
self.check_one_vlan_invalid((9999, -1), 1)
def test_range_reversed(self):
bad_range = (95, 10)
expected_msg = self._vrange_invalid(bad_range)
err = self.assertRaises(exc.NetworkVlanRangeError,
self.verify_range, bad_range)
self.assertEqual(str(err), expected_msg)
class TestParseOneVlanRange(UtilTestParseVlanRanges):
def parse_one(self, cfg_entry):
return plugin_utils.parse_network_vlan_range(cfg_entry)
def test_parse_one_net_no_vlan_range(self):
config_str = "net1"
expected_networks = ("net1", None)
self.assertEqual(expected_networks, self.parse_one(config_str))
def test_parse_one_net_and_vlan_range(self):
config_str = "net1:100:199"
expected_networks = ("net1", (100, 199))
self.assertEqual(expected_networks, self.parse_one(config_str))
def test_parse_one_net_incomplete_range(self):
config_str = "net1:100"
expected_msg = self._range_err_bad_count(config_str)
err = self.assertRaises(exc.NetworkVlanRangeError,
self.parse_one, config_str)
self.assertEqual(expected_msg, str(err))
def test_parse_one_net_range_too_many(self):
config_str = "net1:100:150:200"
expected_msg = self._range_err_bad_count(config_str)
err = self.assertRaises(exc.NetworkVlanRangeError,
self.parse_one, config_str)
self.assertEqual(expected_msg, str(err))
def test_parse_one_net_vlan1_not_int(self):
config_str = "net1:foo:199"
expected_msg = self._range_invalid_vlan(config_str, 1)
err = self.assertRaises(exc.NetworkVlanRangeError,
self.parse_one, config_str)
self.assertEqual(expected_msg, str(err))
def test_parse_one_net_vlan2_not_int(self):
config_str = "net1:100:bar"
expected_msg = self._range_invalid_vlan(config_str, 2)
err = self.assertRaises(exc.NetworkVlanRangeError,
self.parse_one, config_str)
self.assertEqual(expected_msg, str(err))
def test_parse_one_net_and_max_range(self):
config_str = "net1:1:4094"
expected_networks = ("net1", (1, 4094))
self.assertEqual(expected_networks, self.parse_one(config_str))
def test_parse_one_net_range_bad_vlan1(self):
config_str = "net1:9000:150"
expected_msg = self._nrange_invalid_vlan(config_str, 1)
err = self.assertRaises(exc.NetworkVlanRangeError,
self.parse_one, config_str)
self.assertEqual(expected_msg, str(err))
def test_parse_one_net_range_bad_vlan2(self):
config_str = "net1:4000:4999"
expected_msg = self._nrange_invalid_vlan(config_str, 2)
err = self.assertRaises(exc.NetworkVlanRangeError,
self.parse_one, config_str)
self.assertEqual(expected_msg, str(err))
class TestParseVlanRangeList(UtilTestParseVlanRanges):
def parse_list(self, cfg_entries):
return plugin_utils.parse_network_vlan_ranges(cfg_entries)
def test_parse_list_one_net_no_vlan_range(self):
config_list = ["net1"]
expected_networks = {"net1": []}
self.assertEqual(expected_networks, self.parse_list(config_list))
def test_parse_list_one_net_vlan_range(self):
config_list = ["net1:100:199"]
expected_networks = {"net1": [(100, 199)]}
self.assertEqual(expected_networks, self.parse_list(config_list))
def test_parse_two_nets_no_vlan_range(self):
config_list = ["net1",
"net2"]
expected_networks = {"net1": [],
"net2": []}
self.assertEqual(expected_networks, self.parse_list(config_list))
def test_parse_two_nets_range_and_no_range(self):
config_list = ["net1:100:199",
"net2"]
expected_networks = {"net1": [(100, 199)],
"net2": []}
self.assertEqual(expected_networks, self.parse_list(config_list))
def test_parse_two_nets_no_range_and_range(self):
config_list = ["net1",
"net2:200:299"]
expected_networks = {"net1": [],
"net2": [(200, 299)]}
self.assertEqual(expected_networks, self.parse_list(config_list))
def test_parse_two_nets_bad_vlan_range1(self):
config_list = ["net1:100",
"net2:200:299"]
expected_msg = self._range_err_bad_count(config_list[0])
err = self.assertRaises(exc.NetworkVlanRangeError,
self.parse_list, config_list)
self.assertEqual(expected_msg, str(err))
def test_parse_two_nets_vlan_not_int2(self):
config_list = ["net1:100:199",
"net2:200:0x200"]
expected_msg = self._range_invalid_vlan(config_list[1], 2)
err = self.assertRaises(exc.NetworkVlanRangeError,
self.parse_list, config_list)
self.assertEqual(expected_msg, str(err))
def test_parse_two_nets_and_append_1_2(self):
config_list = ["net1:100:199",
"net1:1000:1099",
"net2:200:299"]
expected_networks = {"net1": [(100, 199),
(1000, 1099)],
"net2": [(200, 299)]}
self.assertEqual(expected_networks, self.parse_list(config_list))
def test_parse_two_nets_and_append_1_3(self):
config_list = ["net1:100:199",
"net2:200:299",
"net1:1000:1099"]
expected_networks = {"net1": [(100, 199),
(1000, 1099)],
"net2": [(200, 299)]}
self.assertEqual(expected_networks, self.parse_list(config_list))
class TestExceptionLogger(base.BaseTestCase):
def test_normal_call(self):
result = "Result"

View File

@ -17,10 +17,10 @@ import hashlib
import mock
from neutron_lib import constants
from neutron_lib import exceptions
from neutron_lib.plugins import utils
import testtools
from neutron.db import l3_db
from neutron.plugins.common import utils
from neutron.tests import base
LONG_NAME1 = "A_REALLY_LONG_INTERFACE_NAME1"

View File

@ -18,12 +18,12 @@ from neutron_lib import constants as p_const
from neutron_lib import context
from neutron_lib import exceptions as exc
from neutron_lib.plugins.ml2 import api
from neutron_lib.plugins import utils as plugin_utils
from oslo_config import cfg
from testtools import matchers
from neutron.db import api as db_api
from neutron.objects.plugins.ml2 import vlanallocation as vlan_alloc_obj
from neutron.plugins.common import utils as plugin_utils
from neutron.plugins.ml2.drivers import type_vlan
from neutron.tests.unit import testlib_api

View File

@ -21,9 +21,9 @@ from neutron_lib.api.definitions import trunk as trunk_api
from neutron_lib import exceptions as n_exc
from neutron_lib.plugins import directory
from neutron_lib.plugins.ml2 import api
from neutron_lib.plugins import utils as plugin_utils
from oslo_utils import uuidutils
from neutron.plugins.common import utils
from neutron.services.trunk import constants
from neutron.services.trunk import drivers
from neutron.services.trunk import exceptions as trunk_exc
@ -39,7 +39,8 @@ class SubPortsValidatorTestCase(base.BaseTestCase):
def setUp(self):
super(SubPortsValidatorTestCase, self).setUp()
self.segmentation_types = {constants.VLAN: utils.is_valid_vlan_tag}
self.segmentation_types = {
constants.VLAN: plugin_utils.is_valid_vlan_tag}
self.context = mock.ANY
mock.patch.object(rules.SubPortsValidator, '_get_port_mtu',
@ -130,7 +131,8 @@ class SubPortsValidatorPrepareTestCase(base.BaseTestCase):
def setUp(self):
super(SubPortsValidatorPrepareTestCase, self).setUp()
self.segmentation_types = {constants.VLAN: utils.is_valid_vlan_tag}
self.segmentation_types = {
constants.VLAN: plugin_utils.is_valid_vlan_tag}
self.context = mock.ANY
mock.patch.object(rules.SubPortsValidator, '_get_port_mtu',
@ -150,7 +152,8 @@ class SubPortsValidatorMtuSanityTestCase(test_plugin.Ml2PluginV2TestCase):
def setUp(self):
super(SubPortsValidatorMtuSanityTestCase, self).setUp()
self.segmentation_types = {constants.VLAN: utils.is_valid_vlan_tag}
self.segmentation_types = {
constants.VLAN: plugin_utils.is_valid_vlan_tag}
def test_validate_subport_mtu_same_as_trunk(self):
self._test_validate_subport_trunk_mtu(1500, 1500)
@ -230,7 +233,7 @@ class TrunkPortValidatorTestCase(test_plugin.Ml2PluginV2TestCase):
trunk_plugin.TrunkPlugin, 'check_compatibility').start()
self.trunk_plugin = trunk_plugin.TrunkPlugin()
self.trunk_plugin.add_segmentation_type(constants.VLAN,
utils.is_valid_vlan_tag)
plugin_utils.is_valid_vlan_tag)
def test_validate_port_parent_in_use_by_trunk(self):
with self.port() as trunk_parent: