Replace internal calls of create_{network, subnet, port}
When API controller calls method create_{network, subnet, port), it made sure that the necessary default values for attrs are filled properly according to attr mapping. However, internal calls to these methods do not follow the convention, when extension codes miss these values, exceptions will be thrown. This patch introduces helper functions to fix up arguments and replaces the direct callers of those methods. Co-Authored-By: gong yong sheng <gong.yongsheng@99cloud.net> Co-Authored-By: yalei wang <yalei.wang@intel.com> Change-Id: Ibc6ff897a1a00665a403981a218100a698eb1c33 Closes-Bug: #1383546
This commit is contained in:
parent
2584a1aa75
commit
f1457af336
|
@ -30,8 +30,10 @@ from neutron.db import api as db_api
|
|||
from neutron.extensions import portbindings
|
||||
from neutron.i18n import _LW
|
||||
from neutron import manager
|
||||
from neutron.plugins.common import utils as p_utils
|
||||
from neutron.quota import resource_registry
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
@ -77,7 +79,7 @@ class DhcpRpcCallback(object):
|
|||
"""Perform port operations taking care of concurrency issues."""
|
||||
try:
|
||||
if action == 'create_port':
|
||||
return plugin.create_port(context, port)
|
||||
return p_utils.create_port(plugin, context, port)
|
||||
elif action == 'update_port':
|
||||
return plugin.update_port(context, port['id'], port)
|
||||
else:
|
||||
|
|
|
@ -19,6 +19,7 @@ import netaddr
|
|||
from oslo_log import log as logging
|
||||
from oslo_utils import uuidutils
|
||||
import six
|
||||
import webob.exc
|
||||
|
||||
from neutron.common import constants
|
||||
from neutron.common import exceptions as n_exc
|
||||
|
@ -884,3 +885,65 @@ PLURALS = {NETWORKS: NETWORK,
|
|||
'allocation_pools': 'allocation_pool',
|
||||
'fixed_ips': 'fixed_ip',
|
||||
'extensions': 'extension'}
|
||||
|
||||
|
||||
def fill_default_value(attr_info, res_dict,
|
||||
exc_cls=ValueError,
|
||||
check_allow_post=True):
|
||||
for attr, attr_vals in six.iteritems(attr_info):
|
||||
if attr_vals['allow_post']:
|
||||
if ('default' not in attr_vals and
|
||||
attr not in res_dict):
|
||||
msg = _("Failed to parse request. Required "
|
||||
"attribute '%s' not specified") % attr
|
||||
raise exc_cls(msg)
|
||||
res_dict[attr] = res_dict.get(attr,
|
||||
attr_vals.get('default'))
|
||||
elif check_allow_post:
|
||||
if attr in res_dict:
|
||||
msg = _("Attribute '%s' not allowed in POST") % attr
|
||||
raise exc_cls(msg)
|
||||
|
||||
|
||||
def convert_value(attr_info, res_dict, exc_cls=ValueError):
|
||||
for attr, attr_vals in six.iteritems(attr_info):
|
||||
if (attr not in res_dict or
|
||||
res_dict[attr] is ATTR_NOT_SPECIFIED):
|
||||
continue
|
||||
# Convert values if necessary
|
||||
if 'convert_to' in attr_vals:
|
||||
res_dict[attr] = attr_vals['convert_to'](res_dict[attr])
|
||||
# Check that configured values are correct
|
||||
if 'validate' not in attr_vals:
|
||||
continue
|
||||
for rule in attr_vals['validate']:
|
||||
res = validators[rule](res_dict[attr], attr_vals['validate'][rule])
|
||||
if res:
|
||||
msg_dict = dict(attr=attr, reason=res)
|
||||
msg = _("Invalid input for %(attr)s. "
|
||||
"Reason: %(reason)s.") % msg_dict
|
||||
raise exc_cls(msg)
|
||||
|
||||
|
||||
def populate_tenant_id(context, res_dict, attr_info, is_create):
|
||||
if (('tenant_id' in res_dict and
|
||||
res_dict['tenant_id'] != context.tenant_id and
|
||||
not context.is_admin)):
|
||||
msg = _("Specifying 'tenant_id' other than authenticated "
|
||||
"tenant in request requires admin privileges")
|
||||
raise webob.exc.HTTPBadRequest(msg)
|
||||
|
||||
if is_create and 'tenant_id' not in res_dict:
|
||||
if context.tenant_id:
|
||||
res_dict['tenant_id'] = context.tenant_id
|
||||
elif 'tenant_id' in attr_info:
|
||||
msg = _("Running without keystone AuthN requires "
|
||||
"that tenant_id is specified")
|
||||
raise webob.exc.HTTPBadRequest(msg)
|
||||
|
||||
|
||||
def verify_attributes(res_dict, attr_info):
|
||||
extra_keys = set(res_dict.keys()) - set(attr_info.keys())
|
||||
if extra_keys:
|
||||
msg = _("Unrecognized attribute(s) '%s'") % ', '.join(extra_keys)
|
||||
raise webob.exc.HTTPBadRequest(msg)
|
||||
|
|
|
@ -596,23 +596,6 @@ class Controller(object):
|
|||
self._send_nova_notification(action, orig_object_copy, result)
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def _populate_tenant_id(context, res_dict, attr_info, is_create):
|
||||
if (('tenant_id' in res_dict and
|
||||
res_dict['tenant_id'] != context.tenant_id and
|
||||
not context.is_admin)):
|
||||
msg = _("Specifying 'tenant_id' other than authenticated "
|
||||
"tenant in request requires admin privileges")
|
||||
raise webob.exc.HTTPBadRequest(msg)
|
||||
|
||||
if is_create and 'tenant_id' not in res_dict:
|
||||
if context.tenant_id:
|
||||
res_dict['tenant_id'] = context.tenant_id
|
||||
elif 'tenant_id' in attr_info:
|
||||
msg = _("Running without keystone AuthN requires "
|
||||
"that tenant_id is specified")
|
||||
raise webob.exc.HTTPBadRequest(msg)
|
||||
|
||||
@staticmethod
|
||||
def prepare_request_body(context, body, is_create, resource, attr_info,
|
||||
allow_bulk=False):
|
||||
|
@ -652,56 +635,21 @@ class Controller(object):
|
|||
msg = _("Unable to find '%s' in request body") % resource
|
||||
raise webob.exc.HTTPBadRequest(msg)
|
||||
|
||||
Controller._populate_tenant_id(context, res_dict, attr_info, is_create)
|
||||
Controller._verify_attributes(res_dict, attr_info)
|
||||
attributes.populate_tenant_id(context, res_dict, attr_info, is_create)
|
||||
attributes.verify_attributes(res_dict, attr_info)
|
||||
|
||||
if is_create: # POST
|
||||
for attr, attr_vals in six.iteritems(attr_info):
|
||||
if attr_vals['allow_post']:
|
||||
if ('default' not in attr_vals and
|
||||
attr not in res_dict):
|
||||
msg = _("Failed to parse request. Required "
|
||||
"attribute '%s' not specified") % attr
|
||||
raise webob.exc.HTTPBadRequest(msg)
|
||||
res_dict[attr] = res_dict.get(attr,
|
||||
attr_vals.get('default'))
|
||||
else:
|
||||
if attr in res_dict:
|
||||
msg = _("Attribute '%s' not allowed in POST") % attr
|
||||
raise webob.exc.HTTPBadRequest(msg)
|
||||
attributes.fill_default_value(attr_info, res_dict,
|
||||
webob.exc.HTTPBadRequest)
|
||||
else: # PUT
|
||||
for attr, attr_vals in six.iteritems(attr_info):
|
||||
if attr in res_dict and not attr_vals['allow_put']:
|
||||
msg = _("Cannot update read-only attribute %s") % attr
|
||||
raise webob.exc.HTTPBadRequest(msg)
|
||||
|
||||
for attr, attr_vals in six.iteritems(attr_info):
|
||||
if (attr not in res_dict or
|
||||
res_dict[attr] is attributes.ATTR_NOT_SPECIFIED):
|
||||
continue
|
||||
# Convert values if necessary
|
||||
if 'convert_to' in attr_vals:
|
||||
res_dict[attr] = attr_vals['convert_to'](res_dict[attr])
|
||||
# Check that configured values are correct
|
||||
if 'validate' not in attr_vals:
|
||||
continue
|
||||
for rule in attr_vals['validate']:
|
||||
res = attributes.validators[rule](res_dict[attr],
|
||||
attr_vals['validate'][rule])
|
||||
if res:
|
||||
msg_dict = dict(attr=attr, reason=res)
|
||||
msg = _("Invalid input for %(attr)s. "
|
||||
"Reason: %(reason)s.") % msg_dict
|
||||
raise webob.exc.HTTPBadRequest(msg)
|
||||
attributes.convert_value(attr_info, res_dict, webob.exc.HTTPBadRequest)
|
||||
return body
|
||||
|
||||
@staticmethod
|
||||
def _verify_attributes(res_dict, attr_info):
|
||||
extra_keys = set(res_dict.keys()) - set(attr_info.keys())
|
||||
if extra_keys:
|
||||
msg = _("Unrecognized attribute(s) '%s'") % ', '.join(extra_keys)
|
||||
raise webob.exc.HTTPBadRequest(msg)
|
||||
|
||||
def _validate_network_tenant_ownership(self, request, resource_item):
|
||||
# TODO(salvatore-orlando): consider whether this check can be folded
|
||||
# in the policy engine
|
||||
|
|
|
@ -40,6 +40,7 @@ from neutron.extensions import l3
|
|||
from neutron.i18n import _LI, _LE
|
||||
from neutron import manager
|
||||
from neutron.plugins.common import constants
|
||||
from neutron.plugins.common import utils as p_utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
@ -278,15 +279,15 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase):
|
|||
|
||||
def _create_router_gw_port(self, context, router, network_id, ext_ips):
|
||||
# Port has no 'tenant-id', as it is hidden from user
|
||||
gw_port = self._core_plugin.create_port(context.elevated(), {
|
||||
'port': {'tenant_id': '', # intentionally not set
|
||||
port_data = {'tenant_id': '', # intentionally not set
|
||||
'network_id': network_id,
|
||||
'mac_address': attributes.ATTR_NOT_SPECIFIED,
|
||||
'fixed_ips': ext_ips or attributes.ATTR_NOT_SPECIFIED,
|
||||
'device_id': router['id'],
|
||||
'device_owner': DEVICE_OWNER_ROUTER_GW,
|
||||
'admin_state_up': True,
|
||||
'name': ''}})
|
||||
'name': ''}
|
||||
gw_port = p_utils.create_port(self._core_plugin,
|
||||
context.elevated(), {'port': port_data})
|
||||
|
||||
if not gw_port['fixed_ips']:
|
||||
LOG.debug('No IPs available for external network %s',
|
||||
|
@ -596,16 +597,15 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase):
|
|||
port['port_id'], {'port':
|
||||
{'fixed_ips': fixed_ips}}), [subnet], False
|
||||
|
||||
return self._core_plugin.create_port(context, {
|
||||
'port':
|
||||
{'tenant_id': subnet['tenant_id'],
|
||||
'network_id': subnet['network_id'],
|
||||
'fixed_ips': [fixed_ip],
|
||||
'mac_address': attributes.ATTR_NOT_SPECIFIED,
|
||||
'admin_state_up': True,
|
||||
'device_id': router.id,
|
||||
'device_owner': owner,
|
||||
'name': ''}}), [subnet], True
|
||||
port_data = {'tenant_id': subnet['tenant_id'],
|
||||
'network_id': subnet['network_id'],
|
||||
'fixed_ips': [fixed_ip],
|
||||
'admin_state_up': True,
|
||||
'device_id': router.id,
|
||||
'device_owner': owner,
|
||||
'name': ''}
|
||||
return p_utils.create_port(self._core_plugin, context,
|
||||
{'port': port_data}), [subnet], True
|
||||
|
||||
@staticmethod
|
||||
def _make_router_interface_info(
|
||||
|
@ -956,14 +956,11 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase):
|
|||
|
||||
port = {'tenant_id': '', # tenant intentionally not set
|
||||
'network_id': f_net_id,
|
||||
'mac_address': attributes.ATTR_NOT_SPECIFIED,
|
||||
'fixed_ips': attributes.ATTR_NOT_SPECIFIED,
|
||||
'admin_state_up': True,
|
||||
'device_id': fip_id,
|
||||
'device_owner': DEVICE_OWNER_FLOATINGIP,
|
||||
'status': l3_constants.PORT_STATUS_NOTAPPLICABLE,
|
||||
'name': ''}
|
||||
|
||||
if fip.get('floating_ip_address'):
|
||||
port['fixed_ips'] = [
|
||||
{'ip_address': fip['floating_ip_address']}]
|
||||
|
@ -971,9 +968,13 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase):
|
|||
if fip.get('subnet_id'):
|
||||
port['fixed_ips'] = [
|
||||
{'subnet_id': fip['subnet_id']}]
|
||||
external_port = self._core_plugin.create_port(context.elevated(),
|
||||
{'port': port})
|
||||
|
||||
# 'status' in port dict could not be updated by default, use
|
||||
# check_allow_post to stop the verification of system
|
||||
external_port = p_utils.create_port(self._core_plugin,
|
||||
context.elevated(),
|
||||
{'port': port},
|
||||
check_allow_post=False)
|
||||
# Ensure IPv4 addresses are allocated on external port
|
||||
external_ipv4_ips = self._port_ipv4_fixed_ips(external_port)
|
||||
if not external_ipv4_ips:
|
||||
|
|
|
@ -35,6 +35,7 @@ from neutron.extensions import portbindings
|
|||
from neutron.i18n import _LI
|
||||
from neutron import manager
|
||||
from neutron.plugins.common import constants
|
||||
from neutron.plugins.common import utils as p_utils
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
@ -563,17 +564,15 @@ class L3_NAT_with_dvr_db_mixin(l3_db.L3_NAT_db_mixin,
|
|||
if not f_port:
|
||||
LOG.info(_LI('Agent Gateway port does not exist,'
|
||||
' so create one: %s'), f_port)
|
||||
agent_port = self._core_plugin.create_port(
|
||||
context,
|
||||
{'port': {'tenant_id': '',
|
||||
'network_id': network_id,
|
||||
'mac_address': attributes.ATTR_NOT_SPECIFIED,
|
||||
'fixed_ips': attributes.ATTR_NOT_SPECIFIED,
|
||||
'device_id': l3_agent_db['id'],
|
||||
'device_owner': DEVICE_OWNER_AGENT_GW,
|
||||
'binding:host_id': host,
|
||||
'admin_state_up': True,
|
||||
'name': ''}})
|
||||
port_data = {'tenant_id': '',
|
||||
'network_id': network_id,
|
||||
'device_id': l3_agent_db['id'],
|
||||
'device_owner': DEVICE_OWNER_AGENT_GW,
|
||||
'binding:host_id': host,
|
||||
'admin_state_up': True,
|
||||
'name': ''}
|
||||
agent_port = p_utils.create_port(self._core_plugin, context,
|
||||
{'port': port_data})
|
||||
if agent_port:
|
||||
self._populate_subnets_for_ports(context, [agent_port])
|
||||
return agent_port
|
||||
|
@ -598,16 +597,15 @@ class L3_NAT_with_dvr_db_mixin(l3_db.L3_NAT_db_mixin,
|
|||
def _add_csnat_router_interface_port(
|
||||
self, context, router, network_id, subnet_id, do_pop=True):
|
||||
"""Add SNAT interface to the specified router and subnet."""
|
||||
snat_port = self._core_plugin.create_port(
|
||||
context,
|
||||
{'port': {'tenant_id': '',
|
||||
'network_id': network_id,
|
||||
'mac_address': attributes.ATTR_NOT_SPECIFIED,
|
||||
'fixed_ips': [{'subnet_id': subnet_id}],
|
||||
'device_id': router.id,
|
||||
'device_owner': DEVICE_OWNER_DVR_SNAT,
|
||||
'admin_state_up': True,
|
||||
'name': ''}})
|
||||
port_data = {'tenant_id': '',
|
||||
'network_id': network_id,
|
||||
'fixed_ips': [{'subnet_id': subnet_id}],
|
||||
'device_id': router.id,
|
||||
'device_owner': DEVICE_OWNER_DVR_SNAT,
|
||||
'admin_state_up': True,
|
||||
'name': ''}
|
||||
snat_port = p_utils.create_port(self._core_plugin, context,
|
||||
{'port': port_data})
|
||||
if not snat_port:
|
||||
msg = _("Unable to create the SNAT Interface Port")
|
||||
raise n_exc.BadRequest(resource='router', msg=msg)
|
||||
|
|
|
@ -32,6 +32,8 @@ from neutron.extensions import l3_ext_ha_mode as l3_ha
|
|||
from neutron.extensions import portbindings
|
||||
from neutron.extensions import providernet
|
||||
from neutron.i18n import _LI
|
||||
from neutron.plugins.common import utils as p_utils
|
||||
|
||||
|
||||
VR_ID_RANGE = set(range(1, 255))
|
||||
MAX_ALLOCATION_TRIES = 10
|
||||
|
@ -219,18 +221,15 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin):
|
|||
context, ha_network.network_id, router.id)
|
||||
|
||||
def _create_ha_subnet(self, context, network_id, tenant_id):
|
||||
args = {'subnet':
|
||||
{'network_id': network_id,
|
||||
'tenant_id': '',
|
||||
'name': constants.HA_SUBNET_NAME % tenant_id,
|
||||
'ip_version': 4,
|
||||
'cidr': cfg.CONF.l3_ha_net_cidr,
|
||||
'enable_dhcp': False,
|
||||
'host_routes': attributes.ATTR_NOT_SPECIFIED,
|
||||
'dns_nameservers': attributes.ATTR_NOT_SPECIFIED,
|
||||
'allocation_pools': attributes.ATTR_NOT_SPECIFIED,
|
||||
'gateway_ip': None}}
|
||||
return self._core_plugin.create_subnet(context, args)
|
||||
args = {'network_id': network_id,
|
||||
'tenant_id': '',
|
||||
'name': constants.HA_SUBNET_NAME % tenant_id,
|
||||
'ip_version': 4,
|
||||
'cidr': cfg.CONF.l3_ha_net_cidr,
|
||||
'enable_dhcp': False,
|
||||
'gateway_ip': None}
|
||||
return p_utils.create_subnet(self._core_plugin, context,
|
||||
{'subnet': args})
|
||||
|
||||
def _create_ha_network_tenant_binding(self, context, tenant_id,
|
||||
network_id):
|
||||
|
@ -255,11 +254,10 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin):
|
|||
{'name': constants.HA_NETWORK_NAME % tenant_id,
|
||||
'tenant_id': '',
|
||||
'shared': False,
|
||||
'admin_state_up': True,
|
||||
'status': constants.NET_STATUS_ACTIVE}}
|
||||
'admin_state_up': True}}
|
||||
self._add_ha_network_settings(args['network'])
|
||||
network = p_utils.create_network(self._core_plugin, admin_ctx, args)
|
||||
|
||||
network = self._core_plugin.create_network(admin_ctx, args)
|
||||
try:
|
||||
ha_network = self._create_ha_network_tenant_binding(admin_ctx,
|
||||
tenant_id,
|
||||
|
@ -312,16 +310,14 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin):
|
|||
return portbinding
|
||||
|
||||
def add_ha_port(self, context, router_id, network_id, tenant_id):
|
||||
port = self._core_plugin.create_port(context, {
|
||||
'port':
|
||||
{'tenant_id': '',
|
||||
'network_id': network_id,
|
||||
'fixed_ips': attributes.ATTR_NOT_SPECIFIED,
|
||||
'mac_address': attributes.ATTR_NOT_SPECIFIED,
|
||||
'admin_state_up': True,
|
||||
'device_id': router_id,
|
||||
'device_owner': constants.DEVICE_OWNER_ROUTER_HA_INTF,
|
||||
'name': constants.HA_PORT_NAME % tenant_id}})
|
||||
args = {'tenant_id': '',
|
||||
'network_id': network_id,
|
||||
'admin_state_up': True,
|
||||
'device_id': router_id,
|
||||
'device_owner': constants.DEVICE_OWNER_ROUTER_HA_INTF,
|
||||
'name': constants.HA_PORT_NAME % tenant_id}
|
||||
port = p_utils.create_port(self._core_plugin, context,
|
||||
{'port': args})
|
||||
|
||||
try:
|
||||
return self._create_ha_port_binding(context, port['id'], router_id)
|
||||
|
|
|
@ -16,6 +16,9 @@
|
|||
Common utilities and helper functions for Openstack Networking Plugins.
|
||||
"""
|
||||
|
||||
import webob.exc
|
||||
|
||||
from neutron.api.v2 import attributes
|
||||
from neutron.common import exceptions as n_exc
|
||||
from neutron.plugins.common import constants as p_const
|
||||
|
||||
|
@ -96,3 +99,37 @@ def in_pending_status(status):
|
|||
return status in (p_const.PENDING_CREATE,
|
||||
p_const.PENDING_UPDATE,
|
||||
p_const.PENDING_DELETE)
|
||||
|
||||
|
||||
def _fixup_res_dict(context, attr_name, res_dict, check_allow_post=True):
|
||||
attr_info = attributes.RESOURCE_ATTRIBUTE_MAP[attr_name]
|
||||
try:
|
||||
attributes.populate_tenant_id(context, res_dict, attr_info, True)
|
||||
attributes.verify_attributes(res_dict, attr_info)
|
||||
except webob.exc.HTTPBadRequest as e:
|
||||
# convert webob exception into ValueError as these functions are
|
||||
# for internal use. webob exception doesn't make sense.
|
||||
raise ValueError(e.detail)
|
||||
attributes.fill_default_value(attr_info, res_dict,
|
||||
check_allow_post=check_allow_post)
|
||||
attributes.convert_value(attr_info, res_dict)
|
||||
return res_dict
|
||||
|
||||
|
||||
def create_network(core_plugin, context, net):
|
||||
net_data = _fixup_res_dict(context, attributes.NETWORKS,
|
||||
net.get('network', {}))
|
||||
return core_plugin.create_network(context, {'network': net_data})
|
||||
|
||||
|
||||
def create_subnet(core_plugin, context, subnet):
|
||||
subnet_data = _fixup_res_dict(context, attributes.SUBNETS,
|
||||
subnet.get('subnet', {}))
|
||||
return core_plugin.create_subnet(context, {'subnet': subnet_data})
|
||||
|
||||
|
||||
def create_port(core_plugin, context, port, check_allow_post=True):
|
||||
port_data = _fixup_res_dict(context, attributes.PORTS,
|
||||
port.get('port', {}),
|
||||
check_allow_post=check_allow_post)
|
||||
return core_plugin.create_port(context, {'port': port_data})
|
||||
|
|
|
@ -36,6 +36,8 @@ class TestDhcpRpcCallback(base.BaseTestCase):
|
|||
set_dirty_p = mock.patch('neutron.quota.resource_registry.'
|
||||
'set_resources_dirty')
|
||||
self.mock_set_dirty = set_dirty_p.start()
|
||||
self.utils_p = mock.patch('neutron.plugins.common.utils.create_port')
|
||||
self.utils = self.utils_p.start()
|
||||
|
||||
def test_get_active_networks(self):
|
||||
plugin_retval = [dict(id='a'), dict(id='b')]
|
||||
|
@ -79,6 +81,7 @@ class TestDhcpRpcCallback(base.BaseTestCase):
|
|||
'fixed_ips': [{'subnet_id': 'foo_subnet_id'}]
|
||||
}
|
||||
self.plugin.create_port.side_effect = exc
|
||||
self.utils.side_effect = exc
|
||||
self.assertIsNone(self.callbacks._port_action(self.plugin,
|
||||
mock.Mock(),
|
||||
{'port': port},
|
||||
|
@ -87,7 +90,10 @@ class TestDhcpRpcCallback(base.BaseTestCase):
|
|||
def _test__port_action_good_action(self, action, port, expected_call):
|
||||
self.callbacks._port_action(self.plugin, mock.Mock(),
|
||||
port, action)
|
||||
self.plugin.assert_has_calls([expected_call])
|
||||
if action == 'create_port':
|
||||
self.utils.assert_called_once_with(mock.ANY, mock.ANY, mock.ANY)
|
||||
else:
|
||||
self.plugin.assert_has_calls([expected_call])
|
||||
|
||||
def test_port_action_create_port(self):
|
||||
self._test__port_action_good_action(
|
||||
|
|
|
@ -878,3 +878,90 @@ class TestConvertToList(base.BaseTestCase):
|
|||
def test_convert_to_list_non_iterable(self):
|
||||
for item in (True, False, 1, 1.2, object()):
|
||||
self.assertEqual([item], attributes.convert_to_list(item))
|
||||
|
||||
|
||||
class TestResDict(base.BaseTestCase):
|
||||
class _MyException(Exception):
|
||||
pass
|
||||
_EXC_CLS = _MyException
|
||||
|
||||
def _test_fill_default_value(self, attr_info, expected, res_dict):
|
||||
attributes.fill_default_value(attr_info, res_dict)
|
||||
self.assertEqual(expected, res_dict)
|
||||
|
||||
def test_fill_default_value(self):
|
||||
attr_info = {
|
||||
'key': {
|
||||
'allow_post': True,
|
||||
'default': attributes.ATTR_NOT_SPECIFIED,
|
||||
},
|
||||
}
|
||||
self._test_fill_default_value(attr_info, {'key': 'X'}, {'key': 'X'})
|
||||
self._test_fill_default_value(
|
||||
attr_info, {'key': attributes.ATTR_NOT_SPECIFIED}, {})
|
||||
|
||||
attr_info = {
|
||||
'key': {
|
||||
'allow_post': True,
|
||||
},
|
||||
}
|
||||
self._test_fill_default_value(attr_info, {'key': 'X'}, {'key': 'X'})
|
||||
self.assertRaises(ValueError, self._test_fill_default_value,
|
||||
attr_info, {'key': 'X'}, {})
|
||||
self.assertRaises(self._EXC_CLS, attributes.fill_default_value,
|
||||
attr_info, {}, self._EXC_CLS)
|
||||
attr_info = {
|
||||
'key': {
|
||||
'allow_post': False,
|
||||
},
|
||||
}
|
||||
self.assertRaises(ValueError, self._test_fill_default_value,
|
||||
attr_info, {'key': 'X'}, {'key': 'X'})
|
||||
self._test_fill_default_value(attr_info, {}, {})
|
||||
self.assertRaises(self._EXC_CLS, attributes.fill_default_value,
|
||||
attr_info, {'key': 'X'}, self._EXC_CLS)
|
||||
|
||||
def _test_convert_value(self, attr_info, expected, res_dict):
|
||||
attributes.convert_value(attr_info, res_dict)
|
||||
self.assertEqual(expected, res_dict)
|
||||
|
||||
def test_convert_value(self):
|
||||
attr_info = {
|
||||
'key': {
|
||||
},
|
||||
}
|
||||
self._test_convert_value(attr_info,
|
||||
{'key': attributes.ATTR_NOT_SPECIFIED},
|
||||
{'key': attributes.ATTR_NOT_SPECIFIED})
|
||||
self._test_convert_value(attr_info, {'key': 'X'}, {'key': 'X'})
|
||||
self._test_convert_value(attr_info,
|
||||
{'other_key': 'X'}, {'other_key': 'X'})
|
||||
|
||||
attr_info = {
|
||||
'key': {
|
||||
'convert_to': attributes.convert_to_int,
|
||||
},
|
||||
}
|
||||
self._test_convert_value(attr_info,
|
||||
{'key': attributes.ATTR_NOT_SPECIFIED},
|
||||
{'key': attributes.ATTR_NOT_SPECIFIED})
|
||||
self._test_convert_value(attr_info, {'key': 1}, {'key': '1'})
|
||||
self._test_convert_value(attr_info, {'key': 1}, {'key': 1})
|
||||
self.assertRaises(n_exc.InvalidInput, self._test_convert_value,
|
||||
attr_info, {'key': 1}, {'key': 'a'})
|
||||
|
||||
attr_info = {
|
||||
'key': {
|
||||
'validate': {'type:uuid': None},
|
||||
},
|
||||
}
|
||||
self._test_convert_value(attr_info,
|
||||
{'key': attributes.ATTR_NOT_SPECIFIED},
|
||||
{'key': attributes.ATTR_NOT_SPECIFIED})
|
||||
uuid_str = '01234567-1234-1234-1234-1234567890ab'
|
||||
self._test_convert_value(attr_info,
|
||||
{'key': uuid_str}, {'key': uuid_str})
|
||||
self.assertRaises(ValueError, self._test_convert_value,
|
||||
attr_info, {'key': 1}, {'key': 1})
|
||||
self.assertRaises(self._EXC_CLS, attributes.convert_value,
|
||||
attr_info, {'key': 1}, self._EXC_CLS)
|
||||
|
|
Loading…
Reference in New Issue