Merge "Replace internal calls of create_{network, subnet, port}"
This commit is contained in:
commit
93223f321f
|
@ -30,8 +30,10 @@ from neutron.db import api as db_api
|
|||
from neutron.extensions import portbindings
|
||||
from neutron.i18n import _LW
|
||||
from neutron import manager
|
||||
from neutron.plugins.common import utils as p_utils
|
||||
from neutron.quota import resource_registry
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
@ -77,7 +79,7 @@ class DhcpRpcCallback(object):
|
|||
"""Perform port operations taking care of concurrency issues."""
|
||||
try:
|
||||
if action == 'create_port':
|
||||
return plugin.create_port(context, port)
|
||||
return p_utils.create_port(plugin, context, port)
|
||||
elif action == 'update_port':
|
||||
return plugin.update_port(context, port['id'], port)
|
||||
else:
|
||||
|
|
|
@ -19,6 +19,7 @@ import netaddr
|
|||
from oslo_log import log as logging
|
||||
from oslo_utils import uuidutils
|
||||
import six
|
||||
import webob.exc
|
||||
|
||||
from neutron.common import constants
|
||||
from neutron.common import exceptions as n_exc
|
||||
|
@ -884,3 +885,65 @@ PLURALS = {NETWORKS: NETWORK,
|
|||
'allocation_pools': 'allocation_pool',
|
||||
'fixed_ips': 'fixed_ip',
|
||||
'extensions': 'extension'}
|
||||
|
||||
|
||||
def fill_default_value(attr_info, res_dict,
|
||||
exc_cls=ValueError,
|
||||
check_allow_post=True):
|
||||
for attr, attr_vals in six.iteritems(attr_info):
|
||||
if attr_vals['allow_post']:
|
||||
if ('default' not in attr_vals and
|
||||
attr not in res_dict):
|
||||
msg = _("Failed to parse request. Required "
|
||||
"attribute '%s' not specified") % attr
|
||||
raise exc_cls(msg)
|
||||
res_dict[attr] = res_dict.get(attr,
|
||||
attr_vals.get('default'))
|
||||
elif check_allow_post:
|
||||
if attr in res_dict:
|
||||
msg = _("Attribute '%s' not allowed in POST") % attr
|
||||
raise exc_cls(msg)
|
||||
|
||||
|
||||
def convert_value(attr_info, res_dict, exc_cls=ValueError):
|
||||
for attr, attr_vals in six.iteritems(attr_info):
|
||||
if (attr not in res_dict or
|
||||
res_dict[attr] is ATTR_NOT_SPECIFIED):
|
||||
continue
|
||||
# Convert values if necessary
|
||||
if 'convert_to' in attr_vals:
|
||||
res_dict[attr] = attr_vals['convert_to'](res_dict[attr])
|
||||
# Check that configured values are correct
|
||||
if 'validate' not in attr_vals:
|
||||
continue
|
||||
for rule in attr_vals['validate']:
|
||||
res = validators[rule](res_dict[attr], attr_vals['validate'][rule])
|
||||
if res:
|
||||
msg_dict = dict(attr=attr, reason=res)
|
||||
msg = _("Invalid input for %(attr)s. "
|
||||
"Reason: %(reason)s.") % msg_dict
|
||||
raise exc_cls(msg)
|
||||
|
||||
|
||||
def populate_tenant_id(context, res_dict, attr_info, is_create):
|
||||
if (('tenant_id' in res_dict and
|
||||
res_dict['tenant_id'] != context.tenant_id and
|
||||
not context.is_admin)):
|
||||
msg = _("Specifying 'tenant_id' other than authenticated "
|
||||
"tenant in request requires admin privileges")
|
||||
raise webob.exc.HTTPBadRequest(msg)
|
||||
|
||||
if is_create and 'tenant_id' not in res_dict:
|
||||
if context.tenant_id:
|
||||
res_dict['tenant_id'] = context.tenant_id
|
||||
elif 'tenant_id' in attr_info:
|
||||
msg = _("Running without keystone AuthN requires "
|
||||
"that tenant_id is specified")
|
||||
raise webob.exc.HTTPBadRequest(msg)
|
||||
|
||||
|
||||
def verify_attributes(res_dict, attr_info):
|
||||
extra_keys = set(res_dict.keys()) - set(attr_info.keys())
|
||||
if extra_keys:
|
||||
msg = _("Unrecognized attribute(s) '%s'") % ', '.join(extra_keys)
|
||||
raise webob.exc.HTTPBadRequest(msg)
|
||||
|
|
|
@ -601,23 +601,6 @@ class Controller(object):
|
|||
self._send_nova_notification(action, orig_object_copy, result)
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def _populate_tenant_id(context, res_dict, attr_info, is_create):
|
||||
if (('tenant_id' in res_dict and
|
||||
res_dict['tenant_id'] != context.tenant_id and
|
||||
not context.is_admin)):
|
||||
msg = _("Specifying 'tenant_id' other than authenticated "
|
||||
"tenant in request requires admin privileges")
|
||||
raise webob.exc.HTTPBadRequest(msg)
|
||||
|
||||
if is_create and 'tenant_id' not in res_dict:
|
||||
if context.tenant_id:
|
||||
res_dict['tenant_id'] = context.tenant_id
|
||||
elif 'tenant_id' in attr_info:
|
||||
msg = _("Running without keystone AuthN requires "
|
||||
"that tenant_id is specified")
|
||||
raise webob.exc.HTTPBadRequest(msg)
|
||||
|
||||
@staticmethod
|
||||
def prepare_request_body(context, body, is_create, resource, attr_info,
|
||||
allow_bulk=False):
|
||||
|
@ -657,56 +640,21 @@ class Controller(object):
|
|||
msg = _("Unable to find '%s' in request body") % resource
|
||||
raise webob.exc.HTTPBadRequest(msg)
|
||||
|
||||
Controller._populate_tenant_id(context, res_dict, attr_info, is_create)
|
||||
Controller._verify_attributes(res_dict, attr_info)
|
||||
attributes.populate_tenant_id(context, res_dict, attr_info, is_create)
|
||||
attributes.verify_attributes(res_dict, attr_info)
|
||||
|
||||
if is_create: # POST
|
||||
for attr, attr_vals in six.iteritems(attr_info):
|
||||
if attr_vals['allow_post']:
|
||||
if ('default' not in attr_vals and
|
||||
attr not in res_dict):
|
||||
msg = _("Failed to parse request. Required "
|
||||
"attribute '%s' not specified") % attr
|
||||
raise webob.exc.HTTPBadRequest(msg)
|
||||
res_dict[attr] = res_dict.get(attr,
|
||||
attr_vals.get('default'))
|
||||
else:
|
||||
if attr in res_dict:
|
||||
msg = _("Attribute '%s' not allowed in POST") % attr
|
||||
raise webob.exc.HTTPBadRequest(msg)
|
||||
attributes.fill_default_value(attr_info, res_dict,
|
||||
webob.exc.HTTPBadRequest)
|
||||
else: # PUT
|
||||
for attr, attr_vals in six.iteritems(attr_info):
|
||||
if attr in res_dict and not attr_vals['allow_put']:
|
||||
msg = _("Cannot update read-only attribute %s") % attr
|
||||
raise webob.exc.HTTPBadRequest(msg)
|
||||
|
||||
for attr, attr_vals in six.iteritems(attr_info):
|
||||
if (attr not in res_dict or
|
||||
res_dict[attr] is attributes.ATTR_NOT_SPECIFIED):
|
||||
continue
|
||||
# Convert values if necessary
|
||||
if 'convert_to' in attr_vals:
|
||||
res_dict[attr] = attr_vals['convert_to'](res_dict[attr])
|
||||
# Check that configured values are correct
|
||||
if 'validate' not in attr_vals:
|
||||
continue
|
||||
for rule in attr_vals['validate']:
|
||||
res = attributes.validators[rule](res_dict[attr],
|
||||
attr_vals['validate'][rule])
|
||||
if res:
|
||||
msg_dict = dict(attr=attr, reason=res)
|
||||
msg = _("Invalid input for %(attr)s. "
|
||||
"Reason: %(reason)s.") % msg_dict
|
||||
raise webob.exc.HTTPBadRequest(msg)
|
||||
attributes.convert_value(attr_info, res_dict, webob.exc.HTTPBadRequest)
|
||||
return body
|
||||
|
||||
@staticmethod
|
||||
def _verify_attributes(res_dict, attr_info):
|
||||
extra_keys = set(res_dict.keys()) - set(attr_info.keys())
|
||||
if extra_keys:
|
||||
msg = _("Unrecognized attribute(s) '%s'") % ', '.join(extra_keys)
|
||||
raise webob.exc.HTTPBadRequest(msg)
|
||||
|
||||
def _validate_network_tenant_ownership(self, request, resource_item):
|
||||
# TODO(salvatore-orlando): consider whether this check can be folded
|
||||
# in the policy engine
|
||||
|
|
|
@ -40,6 +40,7 @@ from neutron.extensions import l3
|
|||
from neutron.i18n import _LI, _LE
|
||||
from neutron import manager
|
||||
from neutron.plugins.common import constants
|
||||
from neutron.plugins.common import utils as p_utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
@ -278,15 +279,15 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase):
|
|||
|
||||
def _create_router_gw_port(self, context, router, network_id, ext_ips):
|
||||
# Port has no 'tenant-id', as it is hidden from user
|
||||
gw_port = self._core_plugin.create_port(context.elevated(), {
|
||||
'port': {'tenant_id': '', # intentionally not set
|
||||
port_data = {'tenant_id': '', # intentionally not set
|
||||
'network_id': network_id,
|
||||
'mac_address': attributes.ATTR_NOT_SPECIFIED,
|
||||
'fixed_ips': ext_ips or attributes.ATTR_NOT_SPECIFIED,
|
||||
'device_id': router['id'],
|
||||
'device_owner': DEVICE_OWNER_ROUTER_GW,
|
||||
'admin_state_up': True,
|
||||
'name': ''}})
|
||||
'name': ''}
|
||||
gw_port = p_utils.create_port(self._core_plugin,
|
||||
context.elevated(), {'port': port_data})
|
||||
|
||||
if not gw_port['fixed_ips']:
|
||||
LOG.debug('No IPs available for external network %s',
|
||||
|
@ -596,16 +597,15 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase):
|
|||
port['port_id'], {'port':
|
||||
{'fixed_ips': fixed_ips}}), [subnet], False
|
||||
|
||||
return self._core_plugin.create_port(context, {
|
||||
'port':
|
||||
{'tenant_id': subnet['tenant_id'],
|
||||
'network_id': subnet['network_id'],
|
||||
'fixed_ips': [fixed_ip],
|
||||
'mac_address': attributes.ATTR_NOT_SPECIFIED,
|
||||
'admin_state_up': True,
|
||||
'device_id': router.id,
|
||||
'device_owner': owner,
|
||||
'name': ''}}), [subnet], True
|
||||
port_data = {'tenant_id': subnet['tenant_id'],
|
||||
'network_id': subnet['network_id'],
|
||||
'fixed_ips': [fixed_ip],
|
||||
'admin_state_up': True,
|
||||
'device_id': router.id,
|
||||
'device_owner': owner,
|
||||
'name': ''}
|
||||
return p_utils.create_port(self._core_plugin, context,
|
||||
{'port': port_data}), [subnet], True
|
||||
|
||||
@staticmethod
|
||||
def _make_router_interface_info(
|
||||
|
@ -955,14 +955,11 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase):
|
|||
|
||||
port = {'tenant_id': '', # tenant intentionally not set
|
||||
'network_id': f_net_id,
|
||||
'mac_address': attributes.ATTR_NOT_SPECIFIED,
|
||||
'fixed_ips': attributes.ATTR_NOT_SPECIFIED,
|
||||
'admin_state_up': True,
|
||||
'device_id': fip_id,
|
||||
'device_owner': DEVICE_OWNER_FLOATINGIP,
|
||||
'status': l3_constants.PORT_STATUS_NOTAPPLICABLE,
|
||||
'name': ''}
|
||||
|
||||
if fip.get('floating_ip_address'):
|
||||
port['fixed_ips'] = [
|
||||
{'ip_address': fip['floating_ip_address']}]
|
||||
|
@ -970,9 +967,13 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase):
|
|||
if fip.get('subnet_id'):
|
||||
port['fixed_ips'] = [
|
||||
{'subnet_id': fip['subnet_id']}]
|
||||
external_port = self._core_plugin.create_port(context.elevated(),
|
||||
{'port': port})
|
||||
|
||||
# 'status' in port dict could not be updated by default, use
|
||||
# check_allow_post to stop the verification of system
|
||||
external_port = p_utils.create_port(self._core_plugin,
|
||||
context.elevated(),
|
||||
{'port': port},
|
||||
check_allow_post=False)
|
||||
# Ensure IPv4 addresses are allocated on external port
|
||||
external_ipv4_ips = self._port_ipv4_fixed_ips(external_port)
|
||||
if not external_ipv4_ips:
|
||||
|
|
|
@ -35,6 +35,7 @@ from neutron.extensions import portbindings
|
|||
from neutron.i18n import _LI
|
||||
from neutron import manager
|
||||
from neutron.plugins.common import constants
|
||||
from neutron.plugins.common import utils as p_utils
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
@ -563,17 +564,15 @@ class L3_NAT_with_dvr_db_mixin(l3_db.L3_NAT_db_mixin,
|
|||
if not f_port:
|
||||
LOG.info(_LI('Agent Gateway port does not exist,'
|
||||
' so create one: %s'), f_port)
|
||||
agent_port = self._core_plugin.create_port(
|
||||
context,
|
||||
{'port': {'tenant_id': '',
|
||||
'network_id': network_id,
|
||||
'mac_address': attributes.ATTR_NOT_SPECIFIED,
|
||||
'fixed_ips': attributes.ATTR_NOT_SPECIFIED,
|
||||
'device_id': l3_agent_db['id'],
|
||||
'device_owner': DEVICE_OWNER_AGENT_GW,
|
||||
'binding:host_id': host,
|
||||
'admin_state_up': True,
|
||||
'name': ''}})
|
||||
port_data = {'tenant_id': '',
|
||||
'network_id': network_id,
|
||||
'device_id': l3_agent_db['id'],
|
||||
'device_owner': DEVICE_OWNER_AGENT_GW,
|
||||
'binding:host_id': host,
|
||||
'admin_state_up': True,
|
||||
'name': ''}
|
||||
agent_port = p_utils.create_port(self._core_plugin, context,
|
||||
{'port': port_data})
|
||||
if agent_port:
|
||||
self._populate_subnets_for_ports(context, [agent_port])
|
||||
return agent_port
|
||||
|
@ -598,16 +597,15 @@ class L3_NAT_with_dvr_db_mixin(l3_db.L3_NAT_db_mixin,
|
|||
def _add_csnat_router_interface_port(
|
||||
self, context, router, network_id, subnet_id, do_pop=True):
|
||||
"""Add SNAT interface to the specified router and subnet."""
|
||||
snat_port = self._core_plugin.create_port(
|
||||
context,
|
||||
{'port': {'tenant_id': '',
|
||||
'network_id': network_id,
|
||||
'mac_address': attributes.ATTR_NOT_SPECIFIED,
|
||||
'fixed_ips': [{'subnet_id': subnet_id}],
|
||||
'device_id': router.id,
|
||||
'device_owner': DEVICE_OWNER_DVR_SNAT,
|
||||
'admin_state_up': True,
|
||||
'name': ''}})
|
||||
port_data = {'tenant_id': '',
|
||||
'network_id': network_id,
|
||||
'fixed_ips': [{'subnet_id': subnet_id}],
|
||||
'device_id': router.id,
|
||||
'device_owner': DEVICE_OWNER_DVR_SNAT,
|
||||
'admin_state_up': True,
|
||||
'name': ''}
|
||||
snat_port = p_utils.create_port(self._core_plugin, context,
|
||||
{'port': port_data})
|
||||
if not snat_port:
|
||||
msg = _("Unable to create the SNAT Interface Port")
|
||||
raise n_exc.BadRequest(resource='router', msg=msg)
|
||||
|
|
|
@ -32,6 +32,8 @@ from neutron.extensions import l3_ext_ha_mode as l3_ha
|
|||
from neutron.extensions import portbindings
|
||||
from neutron.extensions import providernet
|
||||
from neutron.i18n import _LI
|
||||
from neutron.plugins.common import utils as p_utils
|
||||
|
||||
|
||||
VR_ID_RANGE = set(range(1, 255))
|
||||
MAX_ALLOCATION_TRIES = 10
|
||||
|
@ -219,18 +221,15 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin):
|
|||
context, ha_network.network_id, router.id)
|
||||
|
||||
def _create_ha_subnet(self, context, network_id, tenant_id):
|
||||
args = {'subnet':
|
||||
{'network_id': network_id,
|
||||
'tenant_id': '',
|
||||
'name': constants.HA_SUBNET_NAME % tenant_id,
|
||||
'ip_version': 4,
|
||||
'cidr': cfg.CONF.l3_ha_net_cidr,
|
||||
'enable_dhcp': False,
|
||||
'host_routes': attributes.ATTR_NOT_SPECIFIED,
|
||||
'dns_nameservers': attributes.ATTR_NOT_SPECIFIED,
|
||||
'allocation_pools': attributes.ATTR_NOT_SPECIFIED,
|
||||
'gateway_ip': None}}
|
||||
return self._core_plugin.create_subnet(context, args)
|
||||
args = {'network_id': network_id,
|
||||
'tenant_id': '',
|
||||
'name': constants.HA_SUBNET_NAME % tenant_id,
|
||||
'ip_version': 4,
|
||||
'cidr': cfg.CONF.l3_ha_net_cidr,
|
||||
'enable_dhcp': False,
|
||||
'gateway_ip': None}
|
||||
return p_utils.create_subnet(self._core_plugin, context,
|
||||
{'subnet': args})
|
||||
|
||||
def _create_ha_network_tenant_binding(self, context, tenant_id,
|
||||
network_id):
|
||||
|
@ -255,11 +254,10 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin):
|
|||
{'name': constants.HA_NETWORK_NAME % tenant_id,
|
||||
'tenant_id': '',
|
||||
'shared': False,
|
||||
'admin_state_up': True,
|
||||
'status': constants.NET_STATUS_ACTIVE}}
|
||||
'admin_state_up': True}}
|
||||
self._add_ha_network_settings(args['network'])
|
||||
network = p_utils.create_network(self._core_plugin, admin_ctx, args)
|
||||
|
||||
network = self._core_plugin.create_network(admin_ctx, args)
|
||||
try:
|
||||
ha_network = self._create_ha_network_tenant_binding(admin_ctx,
|
||||
tenant_id,
|
||||
|
@ -312,16 +310,14 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin):
|
|||
return portbinding
|
||||
|
||||
def add_ha_port(self, context, router_id, network_id, tenant_id):
|
||||
port = self._core_plugin.create_port(context, {
|
||||
'port':
|
||||
{'tenant_id': '',
|
||||
'network_id': network_id,
|
||||
'fixed_ips': attributes.ATTR_NOT_SPECIFIED,
|
||||
'mac_address': attributes.ATTR_NOT_SPECIFIED,
|
||||
'admin_state_up': True,
|
||||
'device_id': router_id,
|
||||
'device_owner': constants.DEVICE_OWNER_ROUTER_HA_INTF,
|
||||
'name': constants.HA_PORT_NAME % tenant_id}})
|
||||
args = {'tenant_id': '',
|
||||
'network_id': network_id,
|
||||
'admin_state_up': True,
|
||||
'device_id': router_id,
|
||||
'device_owner': constants.DEVICE_OWNER_ROUTER_HA_INTF,
|
||||
'name': constants.HA_PORT_NAME % tenant_id}
|
||||
port = p_utils.create_port(self._core_plugin, context,
|
||||
{'port': args})
|
||||
|
||||
try:
|
||||
return self._create_ha_port_binding(context, port['id'], router_id)
|
||||
|
|
|
@ -16,6 +16,9 @@
|
|||
Common utilities and helper functions for Openstack Networking Plugins.
|
||||
"""
|
||||
|
||||
import webob.exc
|
||||
|
||||
from neutron.api.v2 import attributes
|
||||
from neutron.common import exceptions as n_exc
|
||||
from neutron.plugins.common import constants as p_const
|
||||
|
||||
|
@ -96,3 +99,37 @@ def in_pending_status(status):
|
|||
return status in (p_const.PENDING_CREATE,
|
||||
p_const.PENDING_UPDATE,
|
||||
p_const.PENDING_DELETE)
|
||||
|
||||
|
||||
def _fixup_res_dict(context, attr_name, res_dict, check_allow_post=True):
|
||||
attr_info = attributes.RESOURCE_ATTRIBUTE_MAP[attr_name]
|
||||
try:
|
||||
attributes.populate_tenant_id(context, res_dict, attr_info, True)
|
||||
attributes.verify_attributes(res_dict, attr_info)
|
||||
except webob.exc.HTTPBadRequest as e:
|
||||
# convert webob exception into ValueError as these functions are
|
||||
# for internal use. webob exception doesn't make sense.
|
||||
raise ValueError(e.detail)
|
||||
attributes.fill_default_value(attr_info, res_dict,
|
||||
check_allow_post=check_allow_post)
|
||||
attributes.convert_value(attr_info, res_dict)
|
||||
return res_dict
|
||||
|
||||
|
||||
def create_network(core_plugin, context, net):
|
||||
net_data = _fixup_res_dict(context, attributes.NETWORKS,
|
||||
net.get('network', {}))
|
||||
return core_plugin.create_network(context, {'network': net_data})
|
||||
|
||||
|
||||
def create_subnet(core_plugin, context, subnet):
|
||||
subnet_data = _fixup_res_dict(context, attributes.SUBNETS,
|
||||
subnet.get('subnet', {}))
|
||||
return core_plugin.create_subnet(context, {'subnet': subnet_data})
|
||||
|
||||
|
||||
def create_port(core_plugin, context, port, check_allow_post=True):
|
||||
port_data = _fixup_res_dict(context, attributes.PORTS,
|
||||
port.get('port', {}),
|
||||
check_allow_post=check_allow_post)
|
||||
return core_plugin.create_port(context, {'port': port_data})
|
||||
|
|
|
@ -36,6 +36,8 @@ class TestDhcpRpcCallback(base.BaseTestCase):
|
|||
set_dirty_p = mock.patch('neutron.quota.resource_registry.'
|
||||
'set_resources_dirty')
|
||||
self.mock_set_dirty = set_dirty_p.start()
|
||||
self.utils_p = mock.patch('neutron.plugins.common.utils.create_port')
|
||||
self.utils = self.utils_p.start()
|
||||
|
||||
def test_get_active_networks(self):
|
||||
plugin_retval = [dict(id='a'), dict(id='b')]
|
||||
|
@ -79,6 +81,7 @@ class TestDhcpRpcCallback(base.BaseTestCase):
|
|||
'fixed_ips': [{'subnet_id': 'foo_subnet_id'}]
|
||||
}
|
||||
self.plugin.create_port.side_effect = exc
|
||||
self.utils.side_effect = exc
|
||||
self.assertIsNone(self.callbacks._port_action(self.plugin,
|
||||
mock.Mock(),
|
||||
{'port': port},
|
||||
|
@ -87,7 +90,10 @@ class TestDhcpRpcCallback(base.BaseTestCase):
|
|||
def _test__port_action_good_action(self, action, port, expected_call):
|
||||
self.callbacks._port_action(self.plugin, mock.Mock(),
|
||||
port, action)
|
||||
self.plugin.assert_has_calls([expected_call])
|
||||
if action == 'create_port':
|
||||
self.utils.assert_called_once_with(mock.ANY, mock.ANY, mock.ANY)
|
||||
else:
|
||||
self.plugin.assert_has_calls([expected_call])
|
||||
|
||||
def test_port_action_create_port(self):
|
||||
self._test__port_action_good_action(
|
||||
|
|
|
@ -878,3 +878,90 @@ class TestConvertToList(base.BaseTestCase):
|
|||
def test_convert_to_list_non_iterable(self):
|
||||
for item in (True, False, 1, 1.2, object()):
|
||||
self.assertEqual([item], attributes.convert_to_list(item))
|
||||
|
||||
|
||||
class TestResDict(base.BaseTestCase):
|
||||
class _MyException(Exception):
|
||||
pass
|
||||
_EXC_CLS = _MyException
|
||||
|
||||
def _test_fill_default_value(self, attr_info, expected, res_dict):
|
||||
attributes.fill_default_value(attr_info, res_dict)
|
||||
self.assertEqual(expected, res_dict)
|
||||
|
||||
def test_fill_default_value(self):
|
||||
attr_info = {
|
||||
'key': {
|
||||
'allow_post': True,
|
||||
'default': attributes.ATTR_NOT_SPECIFIED,
|
||||
},
|
||||
}
|
||||
self._test_fill_default_value(attr_info, {'key': 'X'}, {'key': 'X'})
|
||||
self._test_fill_default_value(
|
||||
attr_info, {'key': attributes.ATTR_NOT_SPECIFIED}, {})
|
||||
|
||||
attr_info = {
|
||||
'key': {
|
||||
'allow_post': True,
|
||||
},
|
||||
}
|
||||
self._test_fill_default_value(attr_info, {'key': 'X'}, {'key': 'X'})
|
||||
self.assertRaises(ValueError, self._test_fill_default_value,
|
||||
attr_info, {'key': 'X'}, {})
|
||||
self.assertRaises(self._EXC_CLS, attributes.fill_default_value,
|
||||
attr_info, {}, self._EXC_CLS)
|
||||
attr_info = {
|
||||
'key': {
|
||||
'allow_post': False,
|
||||
},
|
||||
}
|
||||
self.assertRaises(ValueError, self._test_fill_default_value,
|
||||
attr_info, {'key': 'X'}, {'key': 'X'})
|
||||
self._test_fill_default_value(attr_info, {}, {})
|
||||
self.assertRaises(self._EXC_CLS, attributes.fill_default_value,
|
||||
attr_info, {'key': 'X'}, self._EXC_CLS)
|
||||
|
||||
def _test_convert_value(self, attr_info, expected, res_dict):
|
||||
attributes.convert_value(attr_info, res_dict)
|
||||
self.assertEqual(expected, res_dict)
|
||||
|
||||
def test_convert_value(self):
|
||||
attr_info = {
|
||||
'key': {
|
||||
},
|
||||
}
|
||||
self._test_convert_value(attr_info,
|
||||
{'key': attributes.ATTR_NOT_SPECIFIED},
|
||||
{'key': attributes.ATTR_NOT_SPECIFIED})
|
||||
self._test_convert_value(attr_info, {'key': 'X'}, {'key': 'X'})
|
||||
self._test_convert_value(attr_info,
|
||||
{'other_key': 'X'}, {'other_key': 'X'})
|
||||
|
||||
attr_info = {
|
||||
'key': {
|
||||
'convert_to': attributes.convert_to_int,
|
||||
},
|
||||
}
|
||||
self._test_convert_value(attr_info,
|
||||
{'key': attributes.ATTR_NOT_SPECIFIED},
|
||||
{'key': attributes.ATTR_NOT_SPECIFIED})
|
||||
self._test_convert_value(attr_info, {'key': 1}, {'key': '1'})
|
||||
self._test_convert_value(attr_info, {'key': 1}, {'key': 1})
|
||||
self.assertRaises(n_exc.InvalidInput, self._test_convert_value,
|
||||
attr_info, {'key': 1}, {'key': 'a'})
|
||||
|
||||
attr_info = {
|
||||
'key': {
|
||||
'validate': {'type:uuid': None},
|
||||
},
|
||||
}
|
||||
self._test_convert_value(attr_info,
|
||||
{'key': attributes.ATTR_NOT_SPECIFIED},
|
||||
{'key': attributes.ATTR_NOT_SPECIFIED})
|
||||
uuid_str = '01234567-1234-1234-1234-1234567890ab'
|
||||
self._test_convert_value(attr_info,
|
||||
{'key': uuid_str}, {'key': uuid_str})
|
||||
self.assertRaises(ValueError, self._test_convert_value,
|
||||
attr_info, {'key': 1}, {'key': 1})
|
||||
self.assertRaises(self._EXC_CLS, attributes.convert_value,
|
||||
attr_info, {'key': 1}, self._EXC_CLS)
|
||||
|
|
Loading…
Reference in New Issue