Add policy enforcer for "tags" service plugin

The following resources have been updated with new policies for
tags:
* Port
* Subnet
* Network
* Router
* FloatingIP
* NetworkSegmentRange
* NetworkSegment
* SecurityGroup
* Trunk
* Subnetpool

The admin can now enforce specific policies for the resource tags
for the creation, update and deletion actions.

NOTE: a follow-up patch, with a new Launchpad bug reference, will
      be created to move the ``Tagging`` class from
      ``ExtensionDescriptor`` to ``APIExtensionDescriptor``, and
      refactor the ``TaggingController`` to be a standard
      ``neutron.api.v2.base.Controller``. Any API resource using
      the second controller will use the path used by the wsgi
      hooks, in particular the policy hook. That will make unnecessary
      to manually call the ``policy.enforce`` method from the
      extension class methods.

Closes-Bug: #2037002
Change-Id: I9f3e032739824f268db74c5a1b4f04d353742dbd
This commit is contained in:
Rodolfo Alonso Hernandez 2023-09-26 08:03:19 +00:00
parent c69693f223
commit f9b91289a5
23 changed files with 1252 additions and 50 deletions

View File

@ -19,6 +19,21 @@ from neutron.conf.policies import base
COLLECTION_PATH = '/floatingips'
RESOURCE_PATH = '/floatingips/{id}'
TAGS_PATH = RESOURCE_PATH + '/tags'
TAG_PATH = RESOURCE_PATH + '/tags/{tag_id}'
ACTION_GET_TAGS = [
{'method': 'GET', 'path': TAGS_PATH},
{'method': 'GET', 'path': TAG_PATH},
]
ACTION_PUT_TAGS = [
{'method': 'PUT', 'path': TAGS_PATH},
{'method': 'PUT', 'path': TAG_PATH},
]
ACTION_DELETE_TAGS = [
{'method': 'DELETE', 'path': TAGS_PATH},
{'method': 'DELETE', 'path': TAG_PATH},
]
DEPRECATION_REASON = (
"The Floating IP API now supports system scope and default roles.")
@ -79,6 +94,14 @@ rules = [
deprecated_reason=DEPRECATION_REASON,
deprecated_since=versionutils.deprecated.WALLABY)
),
policy.DocumentedRuleDefault(
name='get_floatingips_tags',
check_str=base.ADMIN_OR_PROJECT_READER,
description='Get the floating IP tags',
operations=ACTION_GET_TAGS,
scope_types=['project'],
),
policy.DocumentedRuleDefault(
name='update_floatingip',
check_str=base.ADMIN_OR_PROJECT_MEMBER,
@ -96,6 +119,14 @@ rules = [
deprecated_reason=DEPRECATION_REASON,
deprecated_since=versionutils.deprecated.WALLABY)
),
policy.DocumentedRuleDefault(
name='update_floatingips_tags',
check_str=base.ADMIN_OR_PROJECT_MEMBER,
description='Update the floating IP tags',
operations=ACTION_PUT_TAGS,
scope_types=['project'],
),
policy.DocumentedRuleDefault(
name='delete_floatingip',
check_str=base.ADMIN_OR_PROJECT_MEMBER,
@ -113,6 +144,13 @@ rules = [
deprecated_reason=DEPRECATION_REASON,
deprecated_since=versionutils.deprecated.WALLABY)
),
policy.DocumentedRuleDefault(
name='delete_floatingips_tags',
check_str=base.ADMIN_OR_PROJECT_MEMBER,
description='Delete the floating IP tags',
operations=ACTION_DELETE_TAGS,
scope_types=['project'],
),
]

View File

@ -22,6 +22,8 @@ The network API now supports system scope and default roles.
COLLECTION_PATH = '/networks'
RESOURCE_PATH = '/networks/{id}'
TAGS_PATH = RESOURCE_PATH + '/tags'
TAG_PATH = RESOURCE_PATH + '/tags/{tag_id}'
ACTION_POST = [
{'method': 'POST', 'path': COLLECTION_PATH},
@ -36,6 +38,18 @@ ACTION_GET = [
{'method': 'GET', 'path': COLLECTION_PATH},
{'method': 'GET', 'path': RESOURCE_PATH},
]
ACTION_GET_TAGS = [
{'method': 'GET', 'path': TAGS_PATH},
{'method': 'GET', 'path': TAG_PATH},
]
ACTION_PUT_TAGS = [
{'method': 'PUT', 'path': TAGS_PATH},
{'method': 'PUT', 'path': TAG_PATH},
]
ACTION_DELETE_TAGS = [
{'method': 'DELETE', 'path': TAGS_PATH},
{'method': 'DELETE', 'path': TAG_PATH},
]
rules = [
@ -234,6 +248,18 @@ rules = [
deprecated_reason=DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.WALLABY)
),
policy.DocumentedRuleDefault(
name='get_networks_tags',
check_str=neutron_policy.policy_or(
base.ADMIN_OR_PROJECT_READER,
'rule:shared',
'rule:external',
neutron_policy.RULE_ADVSVC
),
scope_types=['project'],
description='Get the network tags',
operations=ACTION_GET_TAGS,
),
policy.DocumentedRuleDefault(
name='update_network',
@ -349,6 +375,13 @@ rules = [
deprecated_reason=DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.WALLABY)
),
policy.DocumentedRuleDefault(
name='update_networks_tags',
check_str=base.ADMIN_OR_PROJECT_MEMBER,
scope_types=['project'],
description='Update the network tags',
operations=ACTION_PUT_TAGS,
),
policy.DocumentedRuleDefault(
name='delete_network',
@ -362,6 +395,13 @@ rules = [
deprecated_reason=DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.WALLABY)
),
policy.DocumentedRuleDefault(
name='delete_networks_tags',
check_str=base.ADMIN_OR_PROJECT_MEMBER,
scope_types=['project'],
description='Delete the network tags',
operations=ACTION_DELETE_TAGS,
),
]

View File

@ -25,6 +25,21 @@ The network segment range API now supports project scope and default roles.
COLLECTION_PATH = '/network_segment_ranges'
RESOURCE_PATH = '/network_segment_ranges/{id}'
TAGS_PATH = RESOURCE_PATH + '/tags'
TAG_PATH = RESOURCE_PATH + '/tags/{tag_id}'
ACTION_GET_TAGS = [
{'method': 'GET', 'path': TAGS_PATH},
{'method': 'GET', 'path': TAG_PATH},
]
ACTION_PUT_TAGS = [
{'method': 'PUT', 'path': TAGS_PATH},
{'method': 'PUT', 'path': TAG_PATH},
]
ACTION_DELETE_TAGS = [
{'method': 'DELETE', 'path': TAGS_PATH},
{'method': 'DELETE', 'path': TAG_PATH},
]
rules = [
@ -45,6 +60,7 @@ rules = [
deprecated_reason=DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.WALLABY)
),
policy.DocumentedRuleDefault(
name='get_network_segment_range',
check_str=base.ADMIN,
@ -66,6 +82,14 @@ rules = [
deprecated_reason=DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.WALLABY)
),
policy.DocumentedRuleDefault(
name='get_network_segment_ranges_tags',
check_str=base.ADMIN,
scope_types=['project'],
description='Get the network segment range tags',
operations=ACTION_GET_TAGS,
),
policy.DocumentedRuleDefault(
name='update_network_segment_range',
check_str=base.ADMIN,
@ -83,6 +107,14 @@ rules = [
deprecated_reason=DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.WALLABY)
),
policy.DocumentedRuleDefault(
name='update_network_segment_ranges_tags',
check_str=base.ADMIN,
scope_types=['project'],
description='Update the network segment range tags',
operations=ACTION_PUT_TAGS,
),
policy.DocumentedRuleDefault(
name='delete_network_segment_range',
check_str=base.ADMIN,
@ -100,6 +132,13 @@ rules = [
deprecated_reason=DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.WALLABY)
),
policy.DocumentedRuleDefault(
name='delete_network_segment_ranges_tags',
check_str=base.ADMIN,
scope_types=['project'],
description='Delete the network segment range tags',
operations=ACTION_DELETE_TAGS,
),
]

View File

@ -22,6 +22,8 @@ DEPRECATED_REASON = (
COLLECTION_PATH = '/ports'
RESOURCE_PATH = '/ports/{id}'
TAGS_PATH = RESOURCE_PATH + '/tags'
TAG_PATH = RESOURCE_PATH + '/tags/{tag_id}'
ACTION_POST = [
{'method': 'POST', 'path': COLLECTION_PATH},
@ -36,6 +38,18 @@ ACTION_GET = [
{'method': 'GET', 'path': COLLECTION_PATH},
{'method': 'GET', 'path': RESOURCE_PATH},
]
ACTION_GET_TAGS = [
{'method': 'GET', 'path': TAGS_PATH},
{'method': 'GET', 'path': TAG_PATH},
]
ACTION_PUT_TAGS = [
{'method': 'PUT', 'path': TAGS_PATH},
{'method': 'PUT', 'path': TAG_PATH},
]
ACTION_DELETE_TAGS = [
{'method': 'DELETE', 'path': TAGS_PATH},
{'method': 'DELETE', 'path': TAG_PATH},
]
rules = [
@ -353,6 +367,17 @@ rules = [
description='Get ``hints`` attribute of a port',
operations=ACTION_GET,
),
policy.DocumentedRuleDefault(
name='get_ports_tags',
check_str=neutron_policy.policy_or(
neutron_policy.RULE_ADVSVC,
base.ADMIN_OR_NET_OWNER_READER,
base.PROJECT_READER
),
scope_types=['project'],
description='Get the port tags',
operations=ACTION_GET_TAGS,
),
# TODO(amotoki): Add get_port:binding:vnic_type
# TODO(amotoki): Add get_port:binding:data_plane_status
@ -588,6 +613,16 @@ rules = [
description='Update ``hints`` attribute of a port',
operations=ACTION_PUT,
),
policy.DocumentedRuleDefault(
name='update_ports_tags',
check_str=neutron_policy.policy_or(
base.ADMIN_OR_PROJECT_MEMBER,
neutron_policy.RULE_ADVSVC
),
scope_types=['project'],
description='Update the port tags',
operations=ACTION_PUT_TAGS,
),
policy.DocumentedRuleDefault(
name='delete_port',
@ -607,6 +642,17 @@ rules = [
deprecated_reason=DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.WALLABY)
),
policy.DocumentedRuleDefault(
name='delete_ports_tags',
check_str=neutron_policy.policy_or(
neutron_policy.RULE_ADVSVC,
base.PROJECT_MEMBER,
base.ADMIN_OR_NET_OWNER_MEMBER
),
scope_types=['project'],
description='Delete the port tags',
operations=ACTION_DELETE_TAGS,
)
]

View File

@ -21,6 +21,8 @@ DEPRECATED_REASON = (
COLLECTION_PATH = '/routers'
RESOURCE_PATH = '/routers/{id}'
TAGS_PATH = RESOURCE_PATH + '/tags'
TAG_PATH = RESOURCE_PATH + '/tags/{tag_id}'
ACTION_POST = [
{'method': 'POST', 'path': COLLECTION_PATH},
@ -35,6 +37,18 @@ ACTION_GET = [
{'method': 'GET', 'path': COLLECTION_PATH},
{'method': 'GET', 'path': RESOURCE_PATH},
]
ACTION_GET_TAGS = [
{'method': 'GET', 'path': TAGS_PATH},
{'method': 'GET', 'path': TAG_PATH},
]
ACTION_PUT_TAGS = [
{'method': 'PUT', 'path': TAGS_PATH},
{'method': 'PUT', 'path': TAG_PATH},
]
ACTION_DELETE_TAGS = [
{'method': 'DELETE', 'path': TAGS_PATH},
{'method': 'DELETE', 'path': TAG_PATH},
]
rules = [
@ -180,6 +194,13 @@ rules = [
deprecated_reason=DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.WALLABY)
),
policy.DocumentedRuleDefault(
name='get_routers_tags',
check_str=base.ADMIN_OR_PROJECT_READER,
scope_types=['project'],
description='Get the router tags',
operations=ACTION_GET_TAGS,
),
policy.DocumentedRuleDefault(
name='update_router',
@ -284,6 +305,13 @@ rules = [
'updating a router'),
operations=ACTION_POST,
),
policy.DocumentedRuleDefault(
name='update_routers_tags',
check_str=base.ADMIN_OR_PROJECT_MEMBER,
scope_types=['project'],
description='Update the router tags',
operations=ACTION_PUT_TAGS,
),
policy.DocumentedRuleDefault(
name='delete_router',
@ -297,6 +325,13 @@ rules = [
deprecated_reason=DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.WALLABY)
),
policy.DocumentedRuleDefault(
name='delete_routers_tags',
check_str=base.ADMIN_OR_PROJECT_MEMBER,
scope_types=['project'],
description='Delete the router tags',
operations=ACTION_DELETE_TAGS,
),
policy.DocumentedRuleDefault(
name='add_router_interface',

View File

@ -22,13 +22,31 @@ DEPRECATED_REASON = (
SG_COLLECTION_PATH = '/security-groups'
SG_RESOURCE_PATH = '/security-groups/{id}'
SG_TAGS_PATH = SG_RESOURCE_PATH + '/tags'
SG_TAG_PATH = SG_RESOURCE_PATH + '/tags/{tag_id}'
RULE_COLLECTION_PATH = '/security-group-rules'
RULE_RESOURCE_PATH = '/security-group-rules/{id}'
RULE_TAGS_PATH = RULE_RESOURCE_PATH + '/tags'
RULE_TAG_PATH = RULE_RESOURCE_PATH + '/tags/{tag_id}'
RULE_ADMIN_OR_SG_OWNER = 'rule:admin_or_sg_owner'
RULE_ADMIN_OWNER_OR_SG_OWNER = 'rule:admin_owner_or_sg_owner'
SG_ACTION_GET_TAGS = [
{'method': 'GET', 'path': SG_TAGS_PATH},
{'method': 'GET', 'path': SG_TAG_PATH},
]
SG_ACTION_PUT_TAGS = [
{'method': 'PUT', 'path': SG_TAGS_PATH},
{'method': 'PUT', 'path': SG_TAG_PATH},
]
SG_ACTION_DELETE_TAGS = [
{'method': 'DELETE', 'path': SG_TAGS_PATH},
{'method': 'DELETE', 'path': SG_TAG_PATH},
]
rules = [
policy.RuleDefault(
name='admin_or_sg_owner',
@ -98,6 +116,16 @@ rules = [
deprecated_reason=DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.WALLABY)
),
policy.DocumentedRuleDefault(
name='get_security_groups_tags',
check_str=neutron_policy.policy_or(
base.ADMIN_OR_PROJECT_READER,
'rule:shared_security_group'
),
scope_types=['project'],
description='Get the security group tags',
operations=SG_ACTION_GET_TAGS,
),
policy.DocumentedRuleDefault(
name='update_security_group',
check_str=base.ADMIN_OR_PROJECT_MEMBER,
@ -115,6 +143,13 @@ rules = [
deprecated_reason=DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.WALLABY)
),
policy.DocumentedRuleDefault(
name='update_security_groups_tags',
check_str=base.ADMIN_OR_PROJECT_MEMBER,
scope_types=['project'],
description='Update the security group tags',
operations=SG_ACTION_PUT_TAGS,
),
policy.DocumentedRuleDefault(
name='delete_security_group',
check_str=base.ADMIN_OR_PROJECT_MEMBER,
@ -132,6 +167,13 @@ rules = [
deprecated_reason=DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.WALLABY)
),
policy.DocumentedRuleDefault(
name='delete_security_groups_tags',
check_str=base.ADMIN_OR_PROJECT_MEMBER,
scope_types=['project'],
description='Delete the security group tags',
operations=SG_ACTION_DELETE_TAGS,
),
# TODO(amotoki): admin_or_owner is the right rule?
# Does an empty string make more sense for create_security_group_rule?

View File

@ -21,6 +21,21 @@ DEPRECATED_REASON = (
COLLECTION_PATH = '/segments'
RESOURCE_PATH = '/segments/{id}'
TAGS_PATH = RESOURCE_PATH + '/tags'
TAG_PATH = RESOURCE_PATH + '/tags/{tag_id}'
ACTION_GET_TAGS = [
{'method': 'GET', 'path': TAGS_PATH},
{'method': 'GET', 'path': TAG_PATH},
]
ACTION_PUT_TAGS = [
{'method': 'PUT', 'path': TAGS_PATH},
{'method': 'PUT', 'path': TAG_PATH},
]
ACTION_DELETE_TAGS = [
{'method': 'DELETE', 'path': TAGS_PATH},
{'method': 'DELETE', 'path': TAG_PATH},
]
rules = [
@ -62,6 +77,13 @@ rules = [
deprecated_reason=DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.WALLABY)
),
policy.DocumentedRuleDefault(
name='get_segments_tags',
check_str=base.ADMIN,
scope_types=['project'],
description='Get the segment tags',
operations=ACTION_GET_TAGS,
),
policy.DocumentedRuleDefault(
name='update_segment',
check_str=base.ADMIN,
@ -79,6 +101,13 @@ rules = [
deprecated_reason=DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.WALLABY)
),
policy.DocumentedRuleDefault(
name='update_segments_tags',
check_str=base.ADMIN,
scope_types=['project'],
description='Update the segment tags',
operations=ACTION_PUT_TAGS,
),
policy.DocumentedRuleDefault(
name='delete_segment',
check_str=base.ADMIN,
@ -96,6 +125,13 @@ rules = [
deprecated_reason=DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.WALLABY)
),
policy.DocumentedRuleDefault(
name='delete_segments_tags',
check_str=base.ADMIN,
scope_types=['project'],
description='Delete the segment tags',
operations=ACTION_DELETE_TAGS,
),
]

View File

@ -21,6 +21,8 @@ DEPRECATED_REASON = (
COLLECTION_PATH = '/subnets'
RESOURCE_PATH = '/subnets/{id}'
TAGS_PATH = RESOURCE_PATH + '/tags'
TAG_PATH = RESOURCE_PATH + '/tags/{tag_id}'
ACTION_POST = [
{'method': 'POST', 'path': COLLECTION_PATH},
@ -35,6 +37,18 @@ ACTION_GET = [
{'method': 'GET', 'path': COLLECTION_PATH},
{'method': 'GET', 'path': RESOURCE_PATH},
]
ACTION_GET_TAGS = [
{'method': 'GET', 'path': TAGS_PATH},
{'method': 'GET', 'path': TAG_PATH},
]
ACTION_PUT_TAGS = [
{'method': 'PUT', 'path': TAGS_PATH},
{'method': 'PUT', 'path': TAG_PATH},
]
ACTION_DELETE_TAGS = [
{'method': 'DELETE', 'path': TAGS_PATH},
{'method': 'DELETE', 'path': TAG_PATH},
]
rules = [
@ -107,6 +121,16 @@ rules = [
deprecated_reason=DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.WALLABY)
),
policy.DocumentedRuleDefault(
name='get_subnets_tags',
check_str=neutron_policy.policy_or(
base.ADMIN_OR_NET_OWNER_MEMBER,
base.PROJECT_READER,
'rule:shared'),
scope_types=['project'],
description='Get the subnet tags',
operations=ACTION_GET_TAGS,
),
policy.DocumentedRuleDefault(
name='update_subnet',
check_str=neutron_policy.policy_or(
@ -145,6 +169,15 @@ rules = [
deprecated_reason=DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.WALLABY)
),
policy.DocumentedRuleDefault(
name='update_subnets_tags',
check_str=neutron_policy.policy_or(
base.ADMIN_OR_NET_OWNER_MEMBER,
base.PROJECT_MEMBER),
scope_types=['project'],
description='Update the subnet tags',
operations=ACTION_PUT_TAGS,
),
policy.DocumentedRuleDefault(
name='delete_subnet',
check_str=neutron_policy.policy_or(
@ -159,6 +192,15 @@ rules = [
deprecated_reason=DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.WALLABY)
),
policy.DocumentedRuleDefault(
name='delete_subnets_tags',
check_str=neutron_policy.policy_or(
base.ADMIN_OR_NET_OWNER_MEMBER,
base.PROJECT_MEMBER),
scope_types=['project'],
description='Delete the subnet tags',
operations=ACTION_DELETE_TAGS,
),
]

View File

@ -24,6 +24,21 @@ RESOURCE_PATH = '/subnetpools/{id}'
ONBOARD_PATH = '/subnetpools/{id}/onboard_network_subnets'
ADD_PREFIXES_PATH = '/subnetpools/{id}/add_prefixes'
REMOVE_PREFIXES_PATH = '/subnetpools/{id}/remove_prefixes'
TAGS_PATH = RESOURCE_PATH + '/tags'
TAG_PATH = RESOURCE_PATH + '/tags/{tag_id}'
ACTION_GET_TAGS = [
{'method': 'GET', 'path': TAGS_PATH},
{'method': 'GET', 'path': TAG_PATH},
]
ACTION_PUT_TAGS = [
{'method': 'PUT', 'path': TAGS_PATH},
{'method': 'PUT', 'path': TAG_PATH},
]
ACTION_DELETE_TAGS = [
{'method': 'DELETE', 'path': TAGS_PATH},
{'method': 'DELETE', 'path': TAG_PATH},
]
rules = [
@ -111,6 +126,16 @@ rules = [
deprecated_reason=DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.WALLABY)
),
policy.DocumentedRuleDefault(
name='get_subnetpools_tags',
check_str=neutron_policy.policy_or(
base.ADMIN_OR_PROJECT_READER,
'rule:shared_subnetpools'
),
scope_types=['project'],
description='Get the subnetpool tags',
operations=ACTION_GET_TAGS
),
policy.DocumentedRuleDefault(
name='update_subnetpool',
check_str=base.ADMIN_OR_PROJECT_MEMBER,
@ -145,6 +170,13 @@ rules = [
deprecated_reason=DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.WALLABY)
),
policy.DocumentedRuleDefault(
name='update_subnetpools_tags',
check_str=base.ADMIN_OR_PROJECT_MEMBER,
scope_types=['project'],
description='Update the subnetpool tags',
operations=ACTION_PUT_TAGS
),
policy.DocumentedRuleDefault(
name='delete_subnetpool',
check_str=base.ADMIN_OR_PROJECT_MEMBER,
@ -162,6 +194,13 @@ rules = [
deprecated_reason=DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.WALLABY)
),
policy.DocumentedRuleDefault(
name='delete_subnetpools_tags',
check_str=base.ADMIN_OR_PROJECT_MEMBER,
scope_types=['project'],
description='Delete the subnetpool tags',
operations=ACTION_DELETE_TAGS
),
policy.DocumentedRuleDefault(
name='onboard_network_subnets',
check_str=base.ADMIN_OR_PROJECT_MEMBER,

View File

@ -19,6 +19,21 @@ from neutron.conf.policies import base
COLLECTION_PATH = '/trunks'
RESOURCE_PATH = '/trunks/{id}'
TAGS_PATH = RESOURCE_PATH + '/tags'
TAG_PATH = RESOURCE_PATH + '/tags/{tag_id}'
ACTION_GET_TAGS = [
{'method': 'GET', 'path': TAGS_PATH},
{'method': 'GET', 'path': TAG_PATH},
]
ACTION_PUT_TAGS = [
{'method': 'PUT', 'path': TAGS_PATH},
{'method': 'PUT', 'path': TAG_PATH},
]
ACTION_DELETE_TAGS = [
{'method': 'DELETE', 'path': TAGS_PATH},
{'method': 'DELETE', 'path': TAG_PATH},
]
DEPRECATED_REASON = (
"The trunks API now supports system scope and default roles.")
@ -63,6 +78,13 @@ rules = [
deprecated_reason=DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.WALLABY)
),
policy.DocumentedRuleDefault(
name='get_trunks_tags',
check_str=base.ADMIN_OR_PROJECT_READER,
scope_types=['project'],
description='Get the trunk tags',
operations=ACTION_GET_TAGS
),
policy.DocumentedRuleDefault(
name='update_trunk',
check_str=base.ADMIN_OR_PROJECT_MEMBER,
@ -80,6 +102,13 @@ rules = [
deprecated_reason=DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.WALLABY)
),
policy.DocumentedRuleDefault(
name='update_trunks_tags',
check_str=base.ADMIN_OR_PROJECT_MEMBER,
scope_types=['project'],
description='Update the trunk tags',
operations=ACTION_PUT_TAGS
),
policy.DocumentedRuleDefault(
name='delete_trunk',
check_str=base.ADMIN_OR_PROJECT_MEMBER,
@ -97,6 +126,13 @@ rules = [
deprecated_reason=DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.WALLABY)
),
policy.DocumentedRuleDefault(
name='delete_trunks_tags',
check_str=base.ADMIN_OR_PROJECT_MEMBER,
scope_types=['project'],
description='Delete a trunk',
operations=ACTION_DELETE_TAGS
),
policy.DocumentedRuleDefault(
name='get_subports',
check_str=base.ADMIN_OR_PROJECT_READER,

View File

@ -28,6 +28,8 @@ import webob.exc
from neutron._i18n import _
from neutron.api import extensions
from neutron.api.v2 import resource as api_resource
from neutron.objects import subnet
from neutron import policy
TAG = 'tag'
@ -55,6 +57,7 @@ TAG_ATTRIBUTE_MAP_PORTS[TAGS] = {
'validate': {'type:list_of_unique_strings': MAX_TAG_LEN},
'default': [], 'is_visible': True, 'is_filter': True
}
RESOURCES_AND_PARENTS = {'subnets': ('network', subnet.Subnet.get_network_id)}
class TagResourceNotFound(exceptions.NotFound):
@ -95,75 +98,113 @@ class TaggingController(object):
self.plugin = directory.get_plugin(TAG_PLUGIN_TYPE)
self.supported_resources = TAG_SUPPORTED_RESOURCES
def _get_parent_resource_and_id(self, kwargs):
@staticmethod
def _get_target(ctx, res_id, p_res, p_res_id, tag_id=None):
target = {'id': res_id,
'tenant_id': ctx.project_id,
'project_id': ctx.project_id}
if p_res:
target[p_res + '_id'] = p_res_id
if tag_id:
target['tag_id'] = tag_id
return target
@staticmethod
def _get_pparent_resource_and_id(context, resource, resource_id):
"""Retrieve the parent of the resource and ID (e.g.: subnet->net)"""
parent, getter_id = RESOURCES_AND_PARENTS[resource]
parent_id = getter_id(context.elevated(), resource_id)
return parent, parent_id
def _get_parent_resource_and_id(self, context, kwargs):
parent, parent_id = None, None
for key in kwargs:
for resource in self.supported_resources:
if key == self.supported_resources[resource] + '_id':
return resource, kwargs[key]
return None, None
if resource in RESOURCES_AND_PARENTS.keys():
parent, parent_id = self._get_pparent_resource_and_id(
context, resource, kwargs[key])
return resource, kwargs[key], parent, parent_id
return None, None, None, None
def index(self, request, **kwargs):
# GET /v2.0/networks/{network_id}/tags
parent, parent_id = self._get_parent_resource_and_id(kwargs)
return self.plugin.get_tags(request.context, parent, parent_id)
# GET /v2.0/{parent_resource}/{parent_resource_id}/tags
ctx = request.context
res, res_id, p_res, p_res_id = self._get_parent_resource_and_id(
ctx, kwargs)
target = self._get_target(ctx, res_id, p_res, p_res_id)
policy.enforce(ctx, 'get_%s_%s' % (res, TAGS), target)
return self.plugin.get_tags(ctx, res, res_id)
def show(self, request, id, **kwargs):
# GET /v2.0/networks/{network_id}/tags/{tag}
# GET /v2.0/{parent_resource}/{parent_resource_id}/tags/{tag}
# id == tag
validate_tag(id)
parent, parent_id = self._get_parent_resource_and_id(kwargs)
return self.plugin.get_tag(request.context, parent, parent_id, id)
ctx = request.context
res, res_id, p_res, p_res_id = self._get_parent_resource_and_id(
ctx, kwargs)
target = self._get_target(ctx, res_id, p_res, p_res_id, tag_id=id)
policy.enforce(ctx, 'get_%s_%s' % (res, TAGS), target)
return self.plugin.get_tag(ctx, res, res_id, id)
def create(self, request, **kwargs):
# not supported
# POST /v2.0/networks/{network_id}/tags
# POST /v2.0/{parent_resource}/{parent_resource_id}/tags
raise webob.exc.HTTPNotFound("not supported")
def update(self, request, id, **kwargs):
# PUT /v2.0/networks/{network_id}/tags/{tag}
# PUT /v2.0/{parent_resource}/{parent_resource_id}/tags/{tag}
# id == tag
validate_tag(id)
parent, parent_id = self._get_parent_resource_and_id(kwargs)
notify_tag_action(request.context, 'create.start',
parent, parent_id, [id])
result = self.plugin.update_tag(request.context, parent, parent_id, id)
notify_tag_action(request.context, 'create.end',
parent, parent_id, [id])
ctx = request.context
res, res_id, p_res, p_res_id = self._get_parent_resource_and_id(
ctx, kwargs)
target = self._get_target(ctx, res_id, p_res, p_res_id, tag_id=id)
policy.enforce(ctx, 'update_%s_%s' % (res, TAGS), target)
notify_tag_action(ctx, 'create.start', res, res_id, [id])
result = self.plugin.update_tag(ctx, res, res_id, id)
notify_tag_action(ctx, 'create.end', res, res_id, [id])
return result
def update_all(self, request, body, **kwargs):
# PUT /v2.0/networks/{network_id}/tags
# PUT /v2.0/{parent_resource}/{parent_resource_id}/tags
# body: {"tags": ["aaa", "bbb"]}
validate_tags(body)
parent, parent_id = self._get_parent_resource_and_id(kwargs)
notify_tag_action(request.context, 'update.start',
parent, parent_id, body['tags'])
result = self.plugin.update_tags(request.context, parent,
parent_id, body)
notify_tag_action(request.context, 'update.end',
parent, parent_id, body['tags'])
ctx = request.context
res, res_id, p_res, p_res_id = self._get_parent_resource_and_id(
ctx, kwargs)
target = self._get_target(ctx, res_id, p_res, p_res_id)
policy.enforce(ctx, 'update_%s_%s' % (res, TAGS), target)
notify_tag_action(ctx, 'update.start', res, res_id, body['tags'])
result = self.plugin.update_tags(ctx, res, res_id, body)
notify_tag_action(ctx, 'update.end', res, res_id,
body['tags'])
return result
def delete(self, request, id, **kwargs):
# DELETE /v2.0/networks/{network_id}/tags/{tag}
# DELETE /v2.0/{parent_resource}/{parent_resource_id}/tags/{tag}
# id == tag
validate_tag(id)
parent, parent_id = self._get_parent_resource_and_id(kwargs)
notify_tag_action(request.context, 'delete.start',
parent, parent_id, [id])
result = self.plugin.delete_tag(request.context, parent, parent_id, id)
notify_tag_action(request.context, 'delete.end',
parent, parent_id, [id])
ctx = request.context
res, res_id, p_res, p_res_id = self._get_parent_resource_and_id(
ctx, kwargs)
target = self._get_target(ctx, res_id, p_res, p_res_id, tag_id=id)
policy.enforce(ctx, 'delete_%s_%s' % (res, TAGS), target)
notify_tag_action(ctx, 'delete.start', res, res_id, [id])
result = self.plugin.delete_tag(ctx, res, res_id, id)
notify_tag_action(ctx, 'delete.end', res, res_id, [id])
return result
def delete_all(self, request, **kwargs):
# DELETE /v2.0/networks/{network_id}/tags
parent, parent_id = self._get_parent_resource_and_id(kwargs)
notify_tag_action(request.context, 'delete_all.start',
parent, parent_id)
result = self.plugin.delete_tags(request.context, parent, parent_id)
notify_tag_action(request.context, 'delete_all.end',
parent, parent_id)
# DELETE /v2.0/{parent_resource}/{parent_resource_id}/tags
ctx = request.context
res, res_id, p_res, p_res_id = self._get_parent_resource_and_id(
ctx, kwargs)
target = self._get_target(ctx, res_id, p_res, p_res_id)
policy.enforce(ctx, 'delete_%s_%s' % (res, TAGS), target)
notify_tag_action(ctx, 'delete_all.start', res, res_id)
result = self.plugin.delete_tags(ctx, res, res_id)
notify_tag_action(ctx, 'delete_all.end', res, res_id)
return result

View File

@ -13,6 +13,7 @@
import netaddr
from neutron_lib.api import validators
from neutron_lib import constants as const
from neutron_lib.db import api as db_api
from neutron_lib.db import model_query
from neutron_lib.objects import common_types
from neutron_lib.utils import net as net_utils
@ -22,6 +23,7 @@ from oslo_utils import versionutils
from oslo_versionedobjects import fields as obj_fields
from sqlalchemy import and_, or_
from sqlalchemy import orm
from sqlalchemy.orm import exc as orm_exc
from sqlalchemy.sql import exists
from neutron.db.models import dns as dns_models
@ -545,6 +547,15 @@ class Subnet(base.NeutronDbObject):
return [segment_id for (segment_id,) in query.all()]
@classmethod
@db_api.CONTEXT_READER
def get_network_id(cls, context, subnet_id):
try:
return context.session.query(cls.db_model.network_id).filter(
cls.db_model.id == subnet_id).one()[0]
except orm_exc.NoResultFound:
return None
@base.NeutronObjectRegistry.register
class NetworkSubnetLock(base.NeutronDbObject):

View File

@ -69,6 +69,16 @@ class SystemAdminTests(FloatingIPAPITestCase):
policy.enforce,
self.context, "get_floatingip", self.alt_target)
def test_get_floatingips_tags(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, "get_floatingips_tags", self.target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, "get_floatingips_tags", self.alt_target)
def test_update_floatingip(self):
self.assertRaises(
base_policy.InvalidScope,
@ -79,6 +89,16 @@ class SystemAdminTests(FloatingIPAPITestCase):
policy.enforce,
self.context, "update_floatingip", self.alt_target)
def test_update_floatingips_tags(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, "update_floatingips_tags", self.target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, "update_floatingips_tags", self.alt_target)
def test_delete_floatingip(self):
self.assertRaises(
base_policy.InvalidScope,
@ -132,12 +152,27 @@ class AdminTests(FloatingIPAPITestCase):
self.assertTrue(
policy.enforce(self.context, "get_floatingip", self.alt_target))
def test_get_floatingips_tags(self):
self.assertTrue(
policy.enforce(self.context, "get_floatingips_tags", self.target))
self.assertTrue(
policy.enforce(self.context, "get_floatingips_tags",
self.alt_target))
def test_update_floatingip(self):
self.assertTrue(
policy.enforce(self.context, "update_floatingip", self.target))
self.assertTrue(
policy.enforce(self.context, "update_floatingip", self.alt_target))
def test_update_floatingips_tags(self):
self.assertTrue(
policy.enforce(self.context, "update_floatingips_tags",
self.target))
self.assertTrue(
policy.enforce(self.context, "update_floatingips_tags",
self.alt_target))
def test_delete_floatingip(self):
self.assertTrue(
policy.enforce(self.context, "delete_floatingip", self.target))
@ -178,6 +213,14 @@ class ProjectMemberTests(AdminTests):
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, "get_floatingip", self.alt_target)
def test_get_floatingips_tags(self):
self.assertTrue(
policy.enforce(self.context, "get_floatingips_tags", self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, "get_floatingips_tags",
self.alt_target)
def test_update_floatingip(self):
self.assertTrue(
policy.enforce(self.context, "update_floatingip", self.target))
@ -185,6 +228,15 @@ class ProjectMemberTests(AdminTests):
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, "update_floatingip", self.alt_target)
def test_update_floatingips_tags(self):
self.assertTrue(
policy.enforce(self.context, "update_floatingips_tags",
self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, "update_floatingips_tags",
self.alt_target)
def test_delete_floatingip(self):
self.assertTrue(
policy.enforce(self.context, "delete_floatingip", self.target))
@ -219,6 +271,16 @@ class ProjectReaderTests(ProjectMemberTests):
policy.enforce,
self.context, "update_floatingip", self.alt_target)
def test_update_floatingips_tags(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, "update_floatingips_tags", self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, "update_floatingips_tags", self.alt_target)
def test_delete_floatingip(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,

View File

@ -184,6 +184,18 @@ class SystemAdminTests(NetworkAPITestCase):
self.context, 'get_network:provider:segmentation_id',
self.alt_target)
def test_get_networks_tags(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'get_networks_tags',
self.target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'get_networks_tags',
self.alt_target)
def test_update_network(self):
self.assertRaises(
base_policy.InvalidScope,
@ -279,6 +291,15 @@ class SystemAdminTests(NetworkAPITestCase):
self.context, 'update_network:port_security_enabled',
self.alt_target)
def test_update_networks_tags(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce, self.context, 'update_networks_tags', self.target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce, self.context, 'update_networks_tags',
self.alt_target)
def test_delete_network(self):
self.assertRaises(
base_policy.InvalidScope,
@ -287,6 +308,15 @@ class SystemAdminTests(NetworkAPITestCase):
base_policy.InvalidScope,
policy.enforce, self.context, 'delete_network', self.alt_target)
def test_delete_networks_tags(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce, self.context, 'delete_networks_tags', self.target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce, self.context, 'delete_networks_tags',
self.alt_target)
class SystemMemberTests(SystemAdminTests):
@ -421,6 +451,12 @@ class AdminTests(NetworkAPITestCase):
'get_network:provider:segmentation_id',
self.alt_target))
def test_get_networks_tags(self):
self.assertTrue(
policy.enforce(self.context, 'get_networks_tags', self.target))
self.assertTrue(
policy.enforce(self.context, 'get_networks_tags', self.alt_target))
def test_update_network(self):
self.assertTrue(
policy.enforce(self.context, 'update_network', self.target))
@ -498,12 +534,26 @@ class AdminTests(NetworkAPITestCase):
'update_network:port_security_enabled',
self.alt_target))
def test_update_networks_tags(self):
self.assertTrue(
policy.enforce(self.context, 'update_networks_tags', self.target))
self.assertTrue(
policy.enforce(self.context, 'update_networks_tags',
self.alt_target))
def test_delete_network(self):
self.assertTrue(
policy.enforce(self.context, 'delete_network', self.target))
self.assertTrue(
policy.enforce(self.context, 'delete_network', self.alt_target))
def test_delete_networks_tags(self):
self.assertTrue(
policy.enforce(self.context, 'delete_networks_tags', self.target))
self.assertTrue(
policy.enforce(self.context, 'delete_networks_tags',
self.alt_target))
class ProjectMemberTests(AdminTests):
@ -657,6 +707,14 @@ class ProjectMemberTests(AdminTests):
self.context, 'get_network:provider:segmentation_id',
self.alt_target)
def test_get_networks_tags(self):
self.assertTrue(
policy.enforce(self.context, 'get_networks_tags', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_networks_tags', self.alt_target)
def test_update_network(self):
self.assertTrue(
policy.enforce(self.context, 'update_network', self.target))
@ -751,6 +809,14 @@ class ProjectMemberTests(AdminTests):
self.context, 'update_network:port_security_enabled',
self.alt_target)
def test_update_networks_tags(self):
self.assertTrue(
policy.enforce(self.context, 'update_networks_tags', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_networks_tags', self.alt_target)
def test_delete_network(self):
self.assertTrue(
policy.enforce(self.context, 'delete_network', self.target))
@ -759,6 +825,14 @@ class ProjectMemberTests(AdminTests):
policy.enforce,
self.context, 'delete_network', self.alt_target)
def test_delete_networks_tags(self):
self.assertTrue(
policy.enforce(self.context, 'delete_networks_tags', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_networks_tags', self.alt_target)
class ProjectReaderTests(ProjectMemberTests):
@ -806,6 +880,15 @@ class ProjectReaderTests(ProjectMemberTests):
self.context, 'update_network:port_security_enabled',
self.alt_target)
def test_update_networks_tags(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_networks_tags', self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_networks_tags',
self.alt_target)
def test_delete_network(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
@ -814,6 +897,15 @@ class ProjectReaderTests(ProjectMemberTests):
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'delete_network', self.alt_target)
def test_delete_networks_tags(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'delete_networks_tags', self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'delete_networks_tags',
self.alt_target)
class ServiceRoleTests(NetworkAPITestCase):

View File

@ -44,18 +44,36 @@ class SystemAdminTests(NetworkSegmentRangeAPITestCase):
policy.enforce,
self.context, 'get_network_segment_range', self.target)
def test_get_network_segment_ranges_tags(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'get_network_segment_ranges_tags', self.target)
def test_update_network_segment_range(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'update_network_segment_range', self.target)
def test_update_network_segment_ranges_tags(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'update_network_segment_ranges_tags', self.target)
def test_delete_network_segment_range(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'delete_network_segment_range', self.target)
def test_delete_network_segment_ranges_tags(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'delete_network_segment_ranges_tags', self.target)
class SystemMemberTests(SystemAdminTests):
@ -87,16 +105,31 @@ class AdminTests(NetworkSegmentRangeAPITestCase):
policy.enforce(self.context,
'get_network_segment_range', self.target))
def test_get_network_segment_ranges_tags(self):
self.assertTrue(
policy.enforce(self.context,
'get_network_segment_ranges_tags', self.target))
def test_update_network_segment_range(self):
self.assertTrue(
policy.enforce(self.context,
'update_network_segment_range', self.target))
def test_update_network_segment_ranges_tags(self):
self.assertTrue(
policy.enforce(self.context,
'update_network_segment_ranges_tags', self.target))
def test_delete_network_segment_range(self):
self.assertTrue(
policy.enforce(self.context,
'delete_network_segment_range', self.target))
def test_delete_network_segment_ranges_tags(self):
self.assertTrue(
policy.enforce(self.context,
'delete_network_segment_ranges_tags', self.target))
class ProjectMemberTests(AdminTests):
@ -116,18 +149,36 @@ class ProjectMemberTests(AdminTests):
policy.enforce,
self.context, 'get_network_segment_range', self.target)
def test_get_network_segment_ranges_tags(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_network_segment_ranges_tags', self.target)
def test_update_network_segment_range(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_network_segment_range', self.target)
def test_update_network_segment_ranges_tags(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_network_segment_ranges_tags', self.target)
def test_delete_network_segment_range(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_network_segment_range', self.target)
def test_delete_network_segment_ranges_tags(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_network_segment_ranges_tags', self.target)
class ProjectReaderTests(ProjectMemberTests):

View File

@ -246,6 +246,14 @@ class SystemAdminTests(PortAPITestCase):
policy.enforce, self.context, 'get_port:resource_request',
self.alt_target)
def test_get_ports_tags(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce, self.context, 'get_ports_tags', self.target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce, self.context, 'get_ports_tags', self.alt_target)
def test_update_port(self):
self.assertRaises(
base_policy.InvalidScope,
@ -596,6 +604,12 @@ class AdminTests(PortAPITestCase):
policy.enforce(
self.context, 'get_port:hints', self.alt_target))
def test_get_ports_tags(self):
self.assertTrue(
policy.enforce(self.context, 'get_ports_tags', self.target))
self.assertTrue(
policy.enforce(self.context, 'get_ports_tags', self.alt_target))
def test_update_port(self):
self.assertTrue(
policy.enforce(self.context, 'update_port', self.target))
@ -957,6 +971,13 @@ class ProjectMemberTests(AdminTests):
policy.enforce, self.context, 'get_port:hints',
self.alt_target)
def test_get_ports_tags(self):
self.assertTrue(
policy.enforce(self.context, 'get_ports_tags', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'get_ports_tags', self.alt_target)
def test_update_port(self):
self.assertTrue(
policy.enforce(self.context, 'update_port', self.target))
@ -1113,6 +1134,13 @@ class ProjectMemberTests(AdminTests):
policy.enforce,
self.context, 'update_port:hints', self.alt_target)
def test_update_ports_tags(self):
self.assertTrue(
policy.enforce(self.context, 'update_ports_tags', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_ports_tags', self.alt_target)
def test_delete_port(self):
self.assertTrue(
policy.enforce(self.context, 'delete_port', self.target))
@ -1163,6 +1191,14 @@ class ProjectReaderTests(ProjectMemberTests):
policy.enforce, self.context, 'update_port:binding:vnic_type',
self.alt_target)
def test_update_ports_tags(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_ports_tags', self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_ports_tags', self.alt_target)
def test_delete_port(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,

View File

@ -168,6 +168,16 @@ class SystemAdminTests(RouterAPITestCase):
policy.enforce,
self.context, 'get_router:ha', self.alt_target)
def test_get_routers_tags(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'get_routers_tags', self.target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'get_routers_tags', self.alt_target)
def test_update_router(self):
self.assertRaises(
base_policy.InvalidScope,
@ -272,6 +282,16 @@ class SystemAdminTests(RouterAPITestCase):
self.context, 'update_router:enable_default_route_ecmp',
self.alt_target)
def test_update_routers_tags(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'update_routers_tags', self.target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'update_routers_tags', self.alt_target)
def test_delete_router(self):
self.assertRaises(
base_policy.InvalidScope,
@ -282,6 +302,16 @@ class SystemAdminTests(RouterAPITestCase):
policy.enforce,
self.context, 'delete_router', self.alt_target)
def test_delete_routers_tags(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'delete_routers_tags', self.target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'delete_routers_tags', self.alt_target)
def test_add_router_interface(self):
self.assertRaises(
base_policy.InvalidScope,
@ -429,6 +459,12 @@ class AdminTests(RouterAPITestCase):
self.assertTrue(
policy.enforce(self.context, 'get_router:ha', self.alt_target))
def test_get_routers_tags(self):
self.assertTrue(
policy.enforce(self.context, 'get_routers_tags', self.target))
self.assertTrue(
policy.enforce(self.context, 'get_routers_tags', self.alt_target))
def test_update_router(self):
self.assertTrue(
policy.enforce(self.context, 'update_router', self.target))
@ -491,12 +527,26 @@ class AdminTests(RouterAPITestCase):
'update_router:external_gateway_info:external_fixed_ips',
self.alt_target))
def test_update_routers_tags(self):
self.assertTrue(
policy.enforce(self.context, 'update_routers_tags', self.target))
self.assertTrue(
policy.enforce(self.context, 'update_routers_tags',
self.alt_target))
def test_delete_router(self):
self.assertTrue(
policy.enforce(self.context, 'delete_router', self.target))
self.assertTrue(
policy.enforce(self.context, 'delete_router', self.alt_target))
def test_delete_routers_tags(self):
self.assertTrue(
policy.enforce(self.context, 'delete_routers_tags', self.target))
self.assertTrue(
policy.enforce(self.context, 'delete_routers_tags',
self.alt_target))
def test_add_router_interface(self):
self.assertTrue(
policy.enforce(self.context,
@ -652,6 +702,14 @@ class ProjectMemberTests(AdminTests):
policy.enforce,
self.context, 'get_router:ha', self.alt_target)
def test_get_routers_tags(self):
self.assertTrue(
policy.enforce(self.context, 'get_routers_tags', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_routers_tags', self.alt_target)
def test_update_router(self):
self.assertTrue(
policy.enforce(self.context, 'update_router', self.target))
@ -728,6 +786,14 @@ class ProjectMemberTests(AdminTests):
'update_router:external_gateway_info:external_fixed_ips',
self.alt_target)
def test_update_routers_tags(self):
self.assertTrue(
policy.enforce(self.context, 'update_routers_tags', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_routers_tags', self.alt_target)
def test_delete_router(self):
self.assertTrue(
policy.enforce(self.context, 'delete_router', self.target))
@ -736,6 +802,14 @@ class ProjectMemberTests(AdminTests):
policy.enforce,
self.context, 'delete_router', self.alt_target)
def test_delete_routers_tags(self):
self.assertTrue(
policy.enforce(self.context, 'delete_routers_tags', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_routers_tags', self.alt_target)
def test_add_router_interface(self):
self.assertTrue(
policy.enforce(self.context,
@ -829,6 +903,16 @@ class ProjectReaderTests(ProjectMemberTests):
self.context, 'update_router:external_gateway_info:network_id',
self.alt_target)
def test_update_routers_tags(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_routers_tags', self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_routers_tags', self.alt_target)
def test_delete_router(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
@ -839,6 +923,16 @@ class ProjectReaderTests(ProjectMemberTests):
policy.enforce,
self.context, 'delete_router', self.alt_target)
def test_delete_routers_tags(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_routers_tags', self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_routers_tags', self.alt_target)
def test_add_router_interface(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,

View File

@ -56,6 +56,16 @@ class SystemAdminSecurityGroupTests(SecurityGroupAPITestCase):
policy.enforce,
self.context, 'get_security_group', self.alt_target)
def test_get_security_groups_tags(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'get_security_groups_tags', self.target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'get_security_groups_tags', self.alt_target)
def test_update_security_group(self):
self.assertRaises(
base_policy.InvalidScope,
@ -66,6 +76,16 @@ class SystemAdminSecurityGroupTests(SecurityGroupAPITestCase):
policy.enforce,
self.context, 'update_security_group', self.alt_target)
def test_update_security_groups_tags(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'update_security_groups_tags', self.target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'update_security_groups_tags', self.alt_target)
def test_delete_security_group(self):
self.assertRaises(
base_policy.InvalidScope,
@ -76,6 +96,16 @@ class SystemAdminSecurityGroupTests(SecurityGroupAPITestCase):
policy.enforce,
self.context, 'delete_security_group', self.alt_target)
def test_delete_security_groups_tags(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'delete_security_groups_tags', self.target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'delete_security_groups_tags', self.alt_target)
class SystemMemberSecurityGroupTests(SystemAdminSecurityGroupTests):
@ -111,6 +141,14 @@ class AdminSecurityGroupTests(SecurityGroupAPITestCase):
policy.enforce(
self.context, 'get_security_group', self.alt_target))
def test_get_security_groups_tags(self):
self.assertTrue(
policy.enforce(self.context, 'get_security_groups_tags',
self.target))
self.assertTrue(
policy.enforce(self.context, 'get_security_groups_tags',
self.alt_target))
def test_update_security_group(self):
self.assertTrue(
policy.enforce(self.context, 'update_security_group', self.target))
@ -118,6 +156,14 @@ class AdminSecurityGroupTests(SecurityGroupAPITestCase):
policy.enforce(
self.context, 'update_security_group', self.alt_target))
def test_update_security_groups_tags(self):
self.assertTrue(
policy.enforce(self.context, 'update_security_groups_tags',
self.target))
self.assertTrue(
policy.enforce(self.context, 'update_security_groups_tags',
self.alt_target))
def test_delete_security_group(self):
self.assertTrue(
policy.enforce(self.context, 'delete_security_group', self.target))
@ -125,6 +171,14 @@ class AdminSecurityGroupTests(SecurityGroupAPITestCase):
policy.enforce(
self.context, 'delete_security_group', self.alt_target))
def test_delete_security_groups_tags(self):
self.assertTrue(
policy.enforce(self.context, 'delete_security_groups_tags',
self.target))
self.assertTrue(
policy.enforce(self.context, 'delete_security_groups_tags',
self.alt_target))
class ProjectMemberSecurityGroupTests(AdminSecurityGroupTests):
@ -148,6 +202,14 @@ class ProjectMemberSecurityGroupTests(AdminSecurityGroupTests):
policy.enforce,
self.context, 'get_security_group', self.alt_target)
def test_get_security_groups_tags(self):
self.assertTrue(
policy.enforce(self.context, 'get_security_groups_tags',
self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized, policy.enforce,
self.context, 'get_security_groups_tags', self.alt_target)
def test_update_security_group(self):
self.assertTrue(
policy.enforce(self.context, 'update_security_group', self.target))
@ -156,6 +218,15 @@ class ProjectMemberSecurityGroupTests(AdminSecurityGroupTests):
policy.enforce,
self.context, 'update_security_group', self.alt_target)
def test_update_security_groups_tags(self):
self.assertTrue(
policy.enforce(self.context, 'update_security_groups_tags',
self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_security_groups_tags',
self.alt_target)
def test_delete_security_group(self):
self.assertTrue(
policy.enforce(self.context, 'delete_security_group', self.target))
@ -164,6 +235,14 @@ class ProjectMemberSecurityGroupTests(AdminSecurityGroupTests):
policy.enforce,
self.context, 'delete_security_group', self.alt_target)
def test_delete_security_groups_tags(self):
self.assertTrue(
policy.enforce(self.context, 'delete_security_groups_tags',
self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized, policy.enforce,
self.context, 'delete_security_groups_tags', self.alt_target)
class ProjectReaderSecurityGroupTests(ProjectMemberSecurityGroupTests):
@ -191,6 +270,16 @@ class ProjectReaderSecurityGroupTests(ProjectMemberSecurityGroupTests):
policy.enforce,
self.context, 'update_security_group', self.alt_target)
def test_update_security_groups_tags(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_security_groups_tags', self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_security_groups_tags', self.alt_target)
def test_delete_security_group(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
@ -201,6 +290,16 @@ class ProjectReaderSecurityGroupTests(ProjectMemberSecurityGroupTests):
policy.enforce,
self.context, 'delete_security_group', self.alt_target)
def test_delete_security_groups_tags(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_security_groups_tags', self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_security_groups_tags', self.alt_target)
class ServiceRoleSecurityGroupTests(SecurityGroupAPITestCase):
@ -410,17 +509,6 @@ class ProjectMemberSecurityGroupRuleTests(AdminSecurityGroupRuleTests):
policy.enforce,
self.context, 'get_security_group_rule', self.alt_target)
# Owner of the security group can get rule which belongs to that group,
# even if security group rule belongs to someone else
sg_owner_target = {
'project_id': 'some-other-project',
'security_group:tenant_id': self.project_id,
'security_group_id': self.sg['id'],
'ext_parent_security_group_id': self.sg['id']}
self.assertTrue(
policy.enforce(self.context,
'get_security_group_rule', sg_owner_target))
def test_delete_security_group_rule(self):
self.assertTrue(
policy.enforce(self.context,

View File

@ -44,18 +44,36 @@ class SystemAdminTests(SegmentAPITestCase):
policy.enforce,
self.context, 'get_segment', self.target)
def test_get_segments_tags(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'get_segments_tags', self.target)
def test_update_segment(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'update_segment', self.target)
def test_update_segments_tags(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'update_segments_tags', self.target)
def test_delete_segment(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'delete_segment', self.target)
def test_delete_segments_tags(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'delete_segments_tags', self.target)
class SystemMemberTests(SystemAdminTests):
@ -85,14 +103,26 @@ class AdminTests(SegmentAPITestCase):
self.assertTrue(
policy.enforce(self.context, 'get_segment', self.target))
def test_get_segments_tags(self):
self.assertTrue(
policy.enforce(self.context, 'get_segments_tags', self.target))
def test_update_segment(self):
self.assertTrue(
policy.enforce(self.context, 'update_segment', self.target))
def test_update_segments_tags(self):
self.assertTrue(
policy.enforce(self.context, 'update_segments_tags', self.target))
def test_delete_segment(self):
self.assertTrue(
policy.enforce(self.context, 'delete_segment', self.target))
def test_delete_segments_tags(self):
self.assertTrue(
policy.enforce(self.context, 'delete_segments_tags', self.target))
class ProjectMemberTests(AdminTests):
@ -112,18 +142,36 @@ class ProjectMemberTests(AdminTests):
policy.enforce,
self.context, 'get_segment', self.target)
def test_get_segments_tags(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_segments_tags', self.target)
def test_update_segment(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_segment', self.target)
def test_update_segments_tags(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_segments_tags', self.target)
def test_delete_segment(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_segment', self.target)
def test_delete_segments_tags(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_segments_tags', self.target)
class ProjectReaderTests(ProjectMemberTests):

View File

@ -146,6 +146,20 @@ class SystemAdminTests(SubnetAPITestCase):
policy.enforce,
self.context, 'get_subnet:segment_id', self.alt_target)
def test_get_subnets_tags(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'get_subnets_tags', self.target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'get_subnets_tags', self.target_net_alt_target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'get_subnets_tags', self.alt_target)
def test_update_subnet(self):
self.assertRaises(
base_policy.InvalidScope,
@ -190,6 +204,20 @@ class SystemAdminTests(SubnetAPITestCase):
policy.enforce,
self.context, 'update_subnet:service_types', self.alt_target)
def test_update_subnets_tags(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'update_subnets_tags', self.target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'update_subnets_tags', self.target_net_alt_target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'update_subnets_tags', self.alt_target)
def test_delete_subnet(self):
self.assertRaises(
base_policy.InvalidScope,
@ -204,6 +232,20 @@ class SystemAdminTests(SubnetAPITestCase):
policy.enforce,
self.context, 'delete_subnet', self.alt_target)
def test_delete_subnets_tags(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'delete_subnets_tags', self.target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'delete_subnets_tags', self.target_net_alt_target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'delete_subnets_tags', self.alt_target)
class SystemMemberTests(SystemAdminTests):
@ -277,6 +319,15 @@ class AdminTests(SubnetAPITestCase):
policy.enforce(
self.context, 'get_subnet:segment_id', self.alt_target))
def test_get_subnets_tags(self):
self.assertTrue(
policy.enforce(self.context, 'get_subnets_tags', self.target))
self.assertTrue(
policy.enforce(self.context, 'get_subnets_tags',
self.target_net_alt_target))
self.assertTrue(
policy.enforce(self.context, 'get_subnets_tags', self.alt_target))
def test_update_subnet(self):
self.assertTrue(
policy.enforce(self.context, 'update_subnet', self.target))
@ -310,6 +361,16 @@ class AdminTests(SubnetAPITestCase):
policy.enforce(
self.context, 'update_subnet:service_types', self.alt_target))
def test_update_subnets_tags(self):
self.assertTrue(
policy.enforce(self.context, 'update_subnets_tags', self.target))
self.assertTrue(
policy.enforce(self.context, 'update_subnets_tags',
self.target_net_alt_target))
self.assertTrue(
policy.enforce(self.context, 'update_subnets_tags',
self.alt_target))
def test_delete_subnet(self):
self.assertTrue(
policy.enforce(self.context, 'delete_subnet', self.target))
@ -319,6 +380,16 @@ class AdminTests(SubnetAPITestCase):
self.assertTrue(
policy.enforce(self.context, 'delete_subnet', self.alt_target))
def test_delete_subnets_tags(self):
self.assertTrue(
policy.enforce(self.context, 'delete_subnets_tags', self.target))
self.assertTrue(
policy.enforce(self.context, 'delete_subnets_tags',
self.target_net_alt_target))
self.assertTrue(
policy.enforce(self.context, 'delete_subnets_tags',
self.alt_target))
class ProjectMemberTests(AdminTests):
@ -393,6 +464,17 @@ class ProjectMemberTests(AdminTests):
policy.enforce,
self.context, 'get_subnet:segment_id', self.alt_target)
def test_get_subnets_tags(self):
self.assertTrue(
policy.enforce(self.context, 'get_subnets_tags', self.target))
self.assertTrue(
policy.enforce(self.context, 'get_subnets_tags',
self.target_net_alt_target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_subnets_tags', self.alt_target)
def test_update_subnet(self):
self.assertTrue(
policy.enforce(self.context, 'update_subnet', self.target))
@ -434,6 +516,17 @@ class ProjectMemberTests(AdminTests):
policy.enforce,
self.context, 'update_subnet:service_types', self.alt_target)
def test_update_subnets_tags(self):
self.assertTrue(
policy.enforce(self.context, 'update_subnets_tags', self.target))
self.assertTrue(
policy.enforce(self.context, 'update_subnets_tags',
self.target_net_alt_target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_subnets_tags', self.alt_target)
def test_delete_subnet(self):
self.assertTrue(
policy.enforce(self.context, 'delete_subnet', self.target))
@ -445,6 +538,17 @@ class ProjectMemberTests(AdminTests):
policy.enforce,
self.context, 'delete_subnet', self.alt_target)
def test_delete_subnets_tags(self):
self.assertTrue(
policy.enforce(self.context, 'delete_subnets_tags', self.target))
self.assertTrue(
policy.enforce(self.context, 'delete_subnets_tags',
self.target_net_alt_target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_subnets_tags', self.alt_target)
class ProjectReaderTests(ProjectMemberTests):
@ -480,6 +584,20 @@ class ProjectReaderTests(ProjectMemberTests):
policy.enforce,
self.context, 'update_subnet', self.alt_target)
def test_update_subnets_tags(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_subnets_tags', self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_subnets_tags', self.target_net_alt_target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_subnets_tags', self.alt_target)
def test_delete_subnet(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
@ -494,6 +612,20 @@ class ProjectReaderTests(ProjectMemberTests):
policy.enforce,
self.context, 'delete_subnet', self.alt_target)
def test_delete_subnets_tags(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_subnets_tags', self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_subnets_tags', self.target_net_alt_target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_subnets_tags', self.alt_target)
class ServiceRoleTests(SubnetAPITestCase):

View File

@ -73,6 +73,16 @@ class SystemAdminTests(SubnetpoolAPITestCase):
policy.enforce,
self.context, 'get_subnetpool', self.alt_target)
def test_get_subnetpools_tags(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'get_subnetpools_tags', self.target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'get_subnetpools_tags', self.alt_target)
def test_update_subnetpool(self):
self.assertRaises(
base_policy.InvalidScope,
@ -93,6 +103,16 @@ class SystemAdminTests(SubnetpoolAPITestCase):
policy.enforce,
self.context, 'update_subnetpool:is_default', self.alt_target)
def test_update_subnetpools_tags(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'update_subnetpools_tags', self.target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'update_subnetpools_tags', self.alt_target)
def test_delete_subnetpool(self):
self.assertRaises(
base_policy.InvalidScope,
@ -103,6 +123,16 @@ class SystemAdminTests(SubnetpoolAPITestCase):
policy.enforce,
self.context, 'delete_subnetpool', self.alt_target)
def test_delete_subnetpools_tags(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'delete_subnetpools_tags', self.target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'delete_subnetpools_tags', self.alt_target)
def test_onboard_network_subnets(self):
self.assertRaises(
base_policy.InvalidScope,
@ -182,6 +212,13 @@ class AdminTests(SubnetpoolAPITestCase):
self.assertTrue(
policy.enforce(self.context, 'get_subnetpool', self.alt_target))
def test_get_subnetpools_tags(self):
self.assertTrue(
policy.enforce(self.context, 'get_subnetpools_tags', self.target))
self.assertTrue(
policy.enforce(self.context, 'get_subnetpools_tags',
self.alt_target))
def test_update_subnetpool(self):
self.assertTrue(
policy.enforce(self.context, 'update_subnetpool', self.target))
@ -196,12 +233,28 @@ class AdminTests(SubnetpoolAPITestCase):
policy.enforce(
self.context, 'update_subnetpool:default', self.alt_target))
def test_update_subnetpools_tags(self):
self.assertTrue(
policy.enforce(self.context, 'update_subnetpools_tags',
self.target))
self.assertTrue(
policy.enforce(self.context, 'update_subnetpools_tags',
self.alt_target))
def test_delete_subnetpool(self):
self.assertTrue(
policy.enforce(self.context, 'delete_subnetpool', self.target))
self.assertTrue(
policy.enforce(self.context, 'delete_subnetpool', self.alt_target))
def test_delete_subnetpools_tags(self):
self.assertTrue(
policy.enforce(self.context, 'delete_subnetpools_tags',
self.target))
self.assertTrue(
policy.enforce(self.context, 'delete_subnetpools_tags',
self.alt_target))
def test_onboard_network_subnets(self):
self.assertTrue(
policy.enforce(self.context,
@ -265,6 +318,14 @@ class ProjectMemberTests(AdminTests):
policy.enforce,
self.context, 'get_subnetpool', self.alt_target)
def test_get_subnetpools_tags(self):
self.assertTrue(
policy.enforce(self.context, 'get_subnetpools_tags', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_subnetpools_tags', self.alt_target)
def test_update_subnetpool(self):
self.assertTrue(
policy.enforce(self.context, 'update_subnetpool', self.target))
@ -283,6 +344,15 @@ class ProjectMemberTests(AdminTests):
policy.enforce,
self.context, 'update_subnetpool:is_default', self.alt_target)
def test_update_subnetpools_tags(self):
self.assertTrue(
policy.enforce(self.context, 'update_subnetpools_tags',
self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_subnetpools_tags', self.alt_target)
def test_delete_subnetpool(self):
self.assertTrue(
policy.enforce(self.context, 'delete_subnetpool', self.target))
@ -291,6 +361,15 @@ class ProjectMemberTests(AdminTests):
policy.enforce,
self.context, 'delete_subnetpool', self.alt_target)
def test_delete_subnetpools_tags(self):
self.assertTrue(
policy.enforce(self.context, 'delete_subnetpools_tags',
self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_subnetpools_tags', self.alt_target)
def test_onboard_network_subnets(self):
self.assertTrue(
policy.enforce(self.context,
@ -343,6 +422,16 @@ class ProjectReaderTests(ProjectMemberTests):
policy.enforce,
self.context, 'update_subnetpool', self.alt_target)
def test_update_subnetpools_tags(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_subnetpools_tags', self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_subnetpools_tags', self.alt_target)
def test_delete_subnetpool(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
@ -353,6 +442,16 @@ class ProjectReaderTests(ProjectMemberTests):
policy.enforce,
self.context, 'delete_subnetpool', self.alt_target)
def test_delete_subnetpools_tags(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_subnetpools_tags', self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_subnetpools_tags', self.alt_target)
def test_onboard_network_subnets(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
@ -414,6 +513,12 @@ class ServiceRoleTests(SubnetpoolAPITestCase):
policy.enforce,
self.context, 'get_subnetpool', self.target)
def test_get_subnetpools_tags(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_subnetpools_tags', self.target)
def test_update_subnetpool(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
@ -426,12 +531,24 @@ class ServiceRoleTests(SubnetpoolAPITestCase):
policy.enforce,
self.context, 'update_subnetpool:is_default', self.target)
def test_update_subnetpools_tags(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_subnetpools_tags', self.target)
def test_delete_subnetpool(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_subnetpool', self.target)
def test_delete_subnetpools_tags(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_subnetpools_tags', self.target)
def test_onboard_network_subnets(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,

View File

@ -53,6 +53,16 @@ class SystemAdminTests(TrunkAPITestCase):
policy.enforce,
self.context, 'get_trunk', self.alt_target)
def test_get_trunks_tags(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'get_trunks_tags', self.target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'get_trunks_tags', self.alt_target)
def test_update_trunk(self):
self.assertRaises(
base_policy.InvalidScope,
@ -63,6 +73,16 @@ class SystemAdminTests(TrunkAPITestCase):
policy.enforce,
self.context, 'update_trunk', self.alt_target)
def test_update_trunks_tags(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'update_trunks_tags', self.target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'update_trunks_tags', self.alt_target)
def test_delete_trunk(self):
self.assertRaises(
base_policy.InvalidScope,
@ -73,6 +93,16 @@ class SystemAdminTests(TrunkAPITestCase):
policy.enforce,
self.context, 'delete_trunk', self.alt_target)
def test_delete_trunks_tags(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'delete_trunks_tags', self.target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'delete_trunks_tags', self.alt_target)
def test_get_subports(self):
self.assertRaises(
base_policy.InvalidScope,

View File

@ -0,0 +1,7 @@
---
features:
- |
Added the tags policies for the following resources: network, subnet, port,
router, floating IP, network segment, network segment range, security
group and security group rule. The policies control the creation, the
update and the deletion of the resource tags.