Merge "Remove get_admin_roles and associated logic"

This commit is contained in:
Jenkins 2015-06-09 19:57:30 +00:00 committed by Gerrit Code Review
commit 909f7fe68b
11 changed files with 81 additions and 145 deletions

View File

@ -130,8 +130,7 @@ class RequestContextSerializer(om_serializer.Serializer):
tenant_id = rpc_ctxt_dict.pop('tenant_id', None)
if not tenant_id:
tenant_id = rpc_ctxt_dict.pop('project_id', None)
return context.Context(user_id, tenant_id,
load_admin_roles=False, **rpc_ctxt_dict)
return context.Context(user_id, tenant_id, **rpc_ctxt_dict)
class Service(service.Service):

View File

@ -18,6 +18,7 @@
import copy
import datetime
from debtcollector import removals
from oslo_context import context as oslo_context
from oslo_log import log as logging
@ -36,9 +37,8 @@ class ContextBase(oslo_context.RequestContext):
"""
def __init__(self, user_id, tenant_id, is_admin=None, read_deleted="no",
roles=None, timestamp=None, load_admin_roles=True,
request_id=None, tenant_name=None, user_name=None,
overwrite=True, auth_token=None, **kwargs):
roles=None, timestamp=None, request_id=None, tenant_name=None,
user_name=None, overwrite=True, auth_token=None, **kwargs):
"""Object initialization.
:param read_deleted: 'no' indicates deleted records are hidden, 'yes'
@ -68,11 +68,6 @@ class ContextBase(oslo_context.RequestContext):
self.is_advsvc = self.is_admin or policy.check_is_advsvc(self)
if self.is_admin is None:
self.is_admin = policy.check_is_admin(self)
elif self.is_admin and load_admin_roles:
# Ensure context is populated with admin roles
admin_roles = policy.get_admin_roles()
if admin_roles:
self.roles = list(set(self.roles) | set(admin_roles))
@property
def project_id(self):
@ -150,12 +145,12 @@ class Context(ContextBase):
return self._session
@removals.removed_kwarg('load_admin_roles')
def get_admin_context(read_deleted="no", load_admin_roles=True):
return Context(user_id=None,
tenant_id=None,
is_admin=True,
read_deleted=read_deleted,
load_admin_roles=load_admin_roles,
overwrite=False)

View File

@ -384,6 +384,10 @@ def check(context, action, target, plugin=None, might_not_exist=False,
:return: Returns True if access is permitted else False.
"""
# If we already know the context has admin rights do not perform an
# additional check and authorize the operation
if context.is_admin:
return True
if might_not_exist and not (_ENFORCER.rules and action in _ENFORCER.rules):
return True
match_rule, target, credentials = _prepare_check(context,
@ -417,6 +421,10 @@ def enforce(context, action, target, plugin=None, pluralized=None):
:raises neutron.openstack.common.policy.PolicyNotAuthorized:
if verification fails.
"""
# If we already know the context has admin rights do not perform an
# additional check and authorize the operation
if context.is_admin:
return True
rule, target, credentials = _prepare_check(context,
action,
target,
@ -459,25 +467,3 @@ def _extract_roles(rule, roles):
elif hasattr(rule, 'rules'):
for rule in rule.rules:
_extract_roles(rule, roles)
def get_admin_roles():
"""Return a list of roles which are granted admin rights according
to policy settings.
"""
# NOTE(salvatore-orlando): This function provides a solution for
# populating implicit contexts with the appropriate roles so that
# they correctly pass policy checks, and will become superseded
# once all explicit policy checks are removed from db logic and
# plugin modules. For backward compatibility it returns the literal
# admin if ADMIN_CTX_POLICY is not defined
init()
if not _ENFORCER.rules or ADMIN_CTX_POLICY not in _ENFORCER.rules:
return ['admin']
try:
admin_ctx_rule = _ENFORCER.rules[ADMIN_CTX_POLICY]
except (KeyError, TypeError):
return
roles = []
_extract_roles(admin_ctx_rule, roles)
return roles

View File

@ -37,7 +37,6 @@ def get_admin_test_context(db_url):
tenant_id=None,
is_admin=True,
read_deleted="no",
load_admin_roles=True,
overwrite=False)
facade = session.EngineFacade(db_url, mysql_sql_mode='STRICT_ALL_TABLES')
ctx._session = facade.get_session(autocommit=False, expire_on_commit=True)

View File

@ -437,12 +437,14 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
def _make_subnet(self, fmt, network, gateway, cidr,
allocation_pools=None, ip_version=4, enable_dhcp=True,
dns_nameservers=None, host_routes=None, shared=None,
ipv6_ra_mode=None, ipv6_address_mode=None):
ipv6_ra_mode=None, ipv6_address_mode=None,
tenant_id=None, set_context=False):
res = self._create_subnet(fmt,
net_id=network['network']['id'],
cidr=cidr,
gateway_ip=gateway,
tenant_id=network['network']['tenant_id'],
tenant_id=(tenant_id or
network['network']['tenant_id']),
allocation_pools=allocation_pools,
ip_version=ip_version,
enable_dhcp=enable_dhcp,
@ -450,7 +452,8 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
host_routes=host_routes,
shared=shared,
ipv6_ra_mode=ipv6_ra_mode,
ipv6_address_mode=ipv6_address_mode)
ipv6_address_mode=ipv6_address_mode,
set_context=set_context)
# Things can go wrong - raise HTTP exc with res code only
# so it can be caught by unit tests
if res.status_int >= webob.exc.HTTPClientError.code:
@ -584,7 +587,9 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
host_routes=None,
shared=None,
ipv6_ra_mode=None,
ipv6_address_mode=None):
ipv6_address_mode=None,
tenant_id=None,
set_context=False):
with optional_ctx(network, self.network) as network_to_use:
subnet = self._make_subnet(fmt or self.fmt,
network_to_use,
@ -597,7 +602,9 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
host_routes,
shared=shared,
ipv6_ra_mode=ipv6_ra_mode,
ipv6_address_mode=ipv6_address_mode)
ipv6_address_mode=ipv6_address_mode,
tenant_id=tenant_id,
set_context=set_context)
yield subnet
@contextlib.contextmanager
@ -3665,7 +3672,7 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
def _test_validate_subnet_ipv6_modes(self, cur_subnet=None,
expect_success=True, **modes):
plugin = manager.NeutronManager.get_plugin()
ctx = context.get_admin_context(load_admin_roles=False)
ctx = context.get_admin_context()
new_subnet = {'ip_version': 6,
'cidr': 'fe80::/64',
'enable_dhcp': True,
@ -4580,8 +4587,7 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
plugin = manager.NeutronManager.get_plugin()
e = self.assertRaises(exception,
plugin._validate_subnet,
context.get_admin_context(
load_admin_roles=False),
context.get_admin_context(),
subnet)
self.assertThat(
str(e),

View File

@ -1120,34 +1120,28 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
expected_code=error_code)
def test_router_add_interface_subnet_with_bad_tenant_returns_404(self):
with mock.patch('neutron.context.Context.to_dict') as tdict:
tenant_id = _uuid()
admin_context = {'roles': ['admin']}
tenant_context = {'tenant_id': 'bad_tenant',
'roles': []}
tdict.return_value = admin_context
with self.router(tenant_id=tenant_id) as r:
with self.network(tenant_id=tenant_id) as n:
with self.subnet(network=n) as s:
tdict.return_value = tenant_context
err_code = exc.HTTPNotFound.code
self._router_interface_action('add',
r['router']['id'],
s['subnet']['id'],
None,
err_code)
tdict.return_value = admin_context
body = self._router_interface_action('add',
r['router']['id'],
s['subnet']['id'],
None)
self.assertIn('port_id', body)
tdict.return_value = tenant_context
self._router_interface_action('remove',
r['router']['id'],
s['subnet']['id'],
None,
err_code)
tenant_id = _uuid()
with self.router(tenant_id=tenant_id, set_context=True) as r:
with self.network(tenant_id=tenant_id, set_context=True) as n:
with self.subnet(network=n, set_context=True) as s:
err_code = exc.HTTPNotFound.code
self._router_interface_action('add',
r['router']['id'],
s['subnet']['id'],
None,
expected_code=err_code,
tenant_id='bad_tenant')
body = self._router_interface_action('add',
r['router']['id'],
s['subnet']['id'],
None)
self.assertIn('port_id', body)
self._router_interface_action('remove',
r['router']['id'],
s['subnet']['id'],
None,
expected_code=err_code,
tenant_id='bad_tenant')
def test_router_add_interface_subnet_with_port_from_other_tenant(self):
tenant_id = _uuid()
@ -1270,33 +1264,33 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
HTTPBadRequest.code)
def test_router_add_interface_port_bad_tenant_returns_404(self):
with mock.patch('neutron.context.Context.to_dict') as tdict:
admin_context = {'roles': ['admin']}
tenant_context = {'tenant_id': 'bad_tenant',
'roles': []}
tdict.return_value = admin_context
with self.router() as r:
with self.port() as p:
tdict.return_value = tenant_context
err_code = exc.HTTPNotFound.code
self._router_interface_action('add',
r['router']['id'],
None,
p['port']['id'],
err_code)
tdict.return_value = admin_context
self._router_interface_action('add',
r['router']['id'],
None,
p['port']['id'])
tenant_id = _uuid()
with self.router(tenant_id=tenant_id, set_context=True) as r:
with self.network(tenant_id=tenant_id, set_context=True) as n:
with self.subnet(tenant_id=tenant_id, network=n,
set_context=True) as s:
with self.port(tenant_id=tenant_id, subnet=s,
set_context=True) as p:
err_code = exc.HTTPNotFound.code
self._router_interface_action('add',
r['router']['id'],
None,
p['port']['id'],
expected_code=err_code,
tenant_id='bad_tenant')
self._router_interface_action('add',
r['router']['id'],
None,
p['port']['id'],
tenant_id=tenant_id)
tdict.return_value = tenant_context
# clean-up
self._router_interface_action('remove',
r['router']['id'],
None,
p['port']['id'],
err_code)
# clean-up should fail as well
self._router_interface_action('remove',
r['router']['id'],
None,
p['port']['id'],
expected_code=err_code,
tenant_id='bad_tenant')
def test_router_add_interface_dup_subnet1_returns_400(self):
with self.router() as r:

View File

@ -322,7 +322,7 @@ class QuotaExtensionDbTestCase(QuotaExtensionTestCase):
tenant_id = 'tenant_id1'
self.assertRaises(exceptions.QuotaResourceUnknown,
quota.QUOTAS.limit_check,
context.get_admin_context(load_admin_roles=False),
context.get_admin_context(),
tenant_id,
foobar=1)

View File

@ -231,8 +231,7 @@ class TestContrailSubnetsV2(test_plugin.TestSubnetsV2,
'nexthop': '1.2.3.4'}]}
error = self.assertRaises(exception,
FAKE_SERVER._validate_subnet,
neutron_context.get_admin_context(
load_admin_roles=False),
neutron_context.get_admin_context(),
subnet)
self.assertThat(
str(error),

View File

@ -105,14 +105,6 @@ class TestNeutronContext(base.BaseTestCase):
self.assertIsNone(ctx_dict['auth_token'])
self.assertFalse(hasattr(ctx, 'session'))
def test_neutron_context_with_load_roles_true(self):
ctx = context.get_admin_context()
self.assertIn('admin', ctx.roles)
def test_neutron_context_with_load_roles_false(self):
ctx = context.get_admin_context(load_admin_roles=False)
self.assertFalse(ctx.roles)
def test_neutron_context_elevated_retains_request_id(self):
ctx = context.Context('user_id', 'tenant_id')
self.assertFalse(ctx.is_admin)

View File

@ -359,6 +359,9 @@ class NeutronPolicyTestCase(base.BaseTestCase):
def test_check_is_admin_with_admin_context_succeeds(self):
admin_context = context.get_admin_context()
# explicitly set roles as this test verifies user credentials
# with the policy engine
admin_context.roles = ['admin']
self.assertTrue(policy.check_is_admin(admin_context))
def test_check_is_admin_with_user_context_fails(self):
@ -559,44 +562,6 @@ class NeutronPolicyTestCase(base.BaseTestCase):
def test_enforce_tenant_id_check_invalid_parent_resource_raises(self):
self._test_enforce_tenant_id_raises('tenant_id:%(foobaz_tenant_id)s')
def test_get_roles_context_is_admin_rule_missing(self):
rules = dict((k, common_policy.parse_rule(v)) for k, v in {
"some_other_rule": "role:admin",
}.items())
policy.set_rules(common_policy.Rules(rules))
# 'admin' role is expected for bw compatibility
self.assertEqual(['admin'], policy.get_admin_roles())
def test_get_roles_with_role_check(self):
rules = dict((k, common_policy.parse_rule(v)) for k, v in {
policy.ADMIN_CTX_POLICY: "role:admin",
}.items())
policy.set_rules(common_policy.Rules(rules))
self.assertEqual(['admin'], policy.get_admin_roles())
def test_get_roles_with_rule_check(self):
rules = dict((k, common_policy.parse_rule(v)) for k, v in {
policy.ADMIN_CTX_POLICY: "rule:some_other_rule",
"some_other_rule": "role:admin",
}.items())
policy.set_rules(common_policy.Rules(rules))
self.assertEqual(['admin'], policy.get_admin_roles())
def test_get_roles_with_or_check(self):
self.rules = dict((k, common_policy.parse_rule(v)) for k, v in {
policy.ADMIN_CTX_POLICY: "rule:rule1 or rule:rule2",
"rule1": "role:admin_1",
"rule2": "role:admin_2"
}.items())
self.assertEqual(['admin_1', 'admin_2'],
policy.get_admin_roles())
def test_get_roles_with_other_rules(self):
self.rules = dict((k, common_policy.parse_rule(v)) for k, v in {
policy.ADMIN_CTX_POLICY: "role:xxx or other:value",
}.items())
self.assertEqual(['xxx'], policy.get_admin_roles())
def _test_set_rules_with_deprecated_policy(self, input_rules,
expected_rules):
policy.set_rules(input_rules.copy())

View File

@ -6,6 +6,7 @@ pbr>=0.11,<2.0
Paste
PasteDeploy>=1.5.0
Routes>=1.12.3,!=2.0
debtcollector>=0.3.0 # Apache-2.0
eventlet>=0.17.3
greenlet>=0.3.2
httplib2>=0.7.5