Fix RBACEnforcer get_member_from_driver mechanism

Correct an issue with the RBACEnforcer requiring 'member_name' instead
of 'member_key' for the inferred lookup. Due to how flask works and that
all views are instantiated on demand (and not accessible outside of
the active method without a lot of extra introspection), the provider
object now supports a "deferred" lookup mechanism. This mechanism
leverages the descriptor construct and does the lookup of the provider
api property and method at runtime. This, in essence, works like a
"@classproperty" would.

Change-Id: I264384dd521ea60ba6ee98652aaeb939f1a75521
Partial-Bug: #1776504
This commit is contained in:
Morgan Fainberg 2018-08-11 05:17:22 -07:00 committed by morgan fainberg
parent 0a641462cb
commit 22f5f7303f
6 changed files with 73 additions and 22 deletions

View File

@ -41,10 +41,8 @@ def _filter_endpoint(ref):
class EndpointResource(ks_flask.ResourceBase):
collection_key = 'endpoints'
member_key = 'endpoint'
def __init__(self):
super(EndpointResource, self).__init__()
self.get_member_from_driver = PROVIDERS.catalog_api.get_endpoint
get_member_from_driver = PROVIDERS.deferred_provider_lookup(
api='catalog_api', method='get_endpoint')
@staticmethod
def _validate_endpoint_region(endpoint):

View File

@ -34,10 +34,8 @@ PROVIDERS = provider_api.ProviderAPIs
class RoleResource(ks_flask.ResourceBase):
collection_key = 'roles'
member_key = 'role'
def __init__(self):
super(RoleResource, self).__init__()
self.get_member_from_driver = PROVIDERS.role_api.get_role
get_member_from_driver = PROVIDERS.deferred_provider_lookup(
api='role_api', method='get_role')
def _is_domain_role(self, role):
return bool(role.get('domain_id'))

View File

@ -72,6 +72,30 @@ class ProviderAPIRegistry(object):
# Use super to allow setting around class implementation of __setattr__
super(ProviderAPIRegistry, self).__setattr__('locked', True)
def deferred_provider_lookup(self, api, method):
"""Create descriptor that performs lookup of api and method on demand.
For specialized cases, such as the enforcer "get_member_from_driver"
which needs to be effectively a "classmethod", this method returns
a smart descriptor object that does the lookup at runtime instead of
at import time.
:param api: The api to use, e.g. "identity_api"
:type api: str
:param method: the method on the api to return
:type method: str
"""
class DeferredProviderLookup(object):
def __init__(self, api, method):
self.__api = api
self.__method = method
def __get__(self, instance, owner):
api = getattr(ProviderAPIs, self.__api)
return getattr(api, self.__method)
return DeferredProviderLookup(api, method)
class DuplicateProviderError(Exception):
"""Attempting to register a duplicate API provider."""

View File

@ -161,7 +161,7 @@ class RBACEnforcer(object):
# here.
resource = flask.current_app.view_functions[
flask.request.endpoint].view_class
member_name = getattr(resource, 'member_name', None)
member_name = getattr(resource, 'member_key', None)
func = getattr(
resource, 'get_member_from_driver', None)
if member_name is not None and callable(func):
@ -177,9 +177,8 @@ class RBACEnforcer(object):
# TODO(morgan): add (future) support for passing class
# instantiation args.
ret_dict['target'] = {
member_name: func(
resource(),
flask.request.view_args[key])[member_name]}
member_name: func(flask.request.view_args[key])
}
return ret_dict
@staticmethod
@ -333,6 +332,14 @@ class RBACEnforcer(object):
try:
policy_dict.update(cls._extract_member_target_data(
member_target_type, member_target))
except exception.NotFound:
# DEBUG LOG and bubble up the 404 error. This is expected
# behavior. This likely should be specific in each API. This
# should be revisited in the future and each API should make
# the explicit "existence" checks before enforcement.
LOG.debug('Extracting inferred target data resulted in '
'"NOT FOUND (404)".')
raise
except Exception as e: # nosec
# NOTE(morgan): Errors should never bubble up at this point,
# if there is an error getting the target, log it and move
@ -340,7 +347,7 @@ class RBACEnforcer(object):
LOG.warning('Unable to extract inferred target data during '
'enforcement')
LOG.debug(e, exc_info=True)
raise exception.ForbiddenAction(action)
raise exception.ForbiddenAction(action=action)
# Special Case, extract and add subject_token data.
subj_token_target_data = cls._extract_subject_token_target_data()

View File

@ -32,8 +32,27 @@ class TestProviderAPIRegistry(unit.BaseTestCase):
_provides_api = provides_api
driver_namespace = '_TEST_NOTHING'
def do_something(self):
return provides_api
return TestManager(driver_name=None)
def test_deferred_gettr(self):
api_name = '%s_api' % uuid.uuid4().hex
class TestClass(object):
descriptor = provider_api.ProviderAPIs.deferred_provider_lookup(
api=api_name, method='do_something')
test_instance = TestClass()
# Accessing the descriptor will raise the known "attribute" error
self.assertRaises(AttributeError, getattr, test_instance, 'descriptor')
self._create_manager_instance(provides_api=api_name)
# once the provider has been instantiated, we can call the descriptor
# which will return the method (callable) and we can check that the
# return value is as expected.
self.assertEqual(api_name, test_instance.descriptor())
def test_registry_lock(self):
provider_api.ProviderAPIs.lock_provider_registry()
self.assertRaises(RuntimeError, self._create_manager_instance)

View File

@ -94,19 +94,24 @@ class _TestRBACEnforcerBase(rest.RestfulTestCase):
self.flask_blueprint = blueprint
self.cleanup_instance('flask_blueprint', 'url_prefix')
def _driver_simulation_get_method(self, argument_id):
user = self.user_req_admin
return {'id': argument_id,
'value': 'TEST',
'owner_id': user['id']}
def _setup_flask_restful_api(self):
self.restful_api_url_prefix = '/_%s_TEST' % uuid.uuid4().hex
self.restful_api = flask_restful.Api(self.public_app.app,
self.restful_api_url_prefix)
user = self.user_req_admin
driver_simulation_method = self._driver_simulation_get_method
# Very Basic Restful Resource
class RestfulResource(flask_restful.Resource):
def get(self, argument_id):
return {'argument': {
'id': argument_id,
'value': 'TEST',
'owner_id': user['id']}}
return {'argument': driver_simulation_method(argument_id)}
self.restful_api_resource = RestfulResource
self.restful_api.add_resource(
@ -336,8 +341,8 @@ class TestRBACEnforcerRest(_TestRBACEnforcerBase):
# with current @protected (ease of use). For most cases the target
# should be explicitly passed to .enforce_call, but for ease of
# converting / use, the automatic population of data has been added.
self.restful_api_resource.member_name = 'argument'
member_from_driver = self.restful_api_resource.get
self.restful_api_resource.member_key = 'argument'
member_from_driver = self._driver_simulation_get_method
self.restful_api_resource.get_member_from_driver = member_from_driver
argument_id = uuid.uuid4().hex
@ -487,8 +492,8 @@ class TestRBACEnforcerRest(_TestRBACEnforcerBase):
# Check that inferred "get" works as expected for the member target
# setup the restful resource for an inferred "get"
self.restful_api_resource.member_name = 'argument'
member_from_driver = self.restful_api_resource.get
self.restful_api_resource.member_key = 'argument'
member_from_driver = self._driver_simulation_get_method
self.restful_api_resource.get_member_from_driver = member_from_driver
token_path = '/v3/auth/tokens'