Merge "Don't forget to check if authorization fails"

This commit is contained in:
Zuul 2023-06-26 14:22:56 +00:00 committed by Gerrit Code Review
commit 409bc26d70
3 changed files with 83 additions and 5 deletions

View File

@ -351,9 +351,11 @@ class AuthContextMiddleware(provider_api.ProviderAPIMixin,
if not context_env.get('is_admin', False):
resp = super(AuthContextMiddleware, self).process_request(request)
if resp:
return resp
if request.token_auth.has_user_token and \
not request.user_token_valid:
raise exception.Unauthorized(_('Not authorized.'))
if request.token_auth.user is not None:
request.set_user_headers(request.token_auth.user)

View File

@ -1468,9 +1468,11 @@ class CADFNotificationsDataTestCase(test_v3.RestfulTestCase):
observer = None
resource_type = EXP_RESOURCE_TYPE
ref = unit.new_service_ref()
ref['type'] = 'identity'
PROVIDERS.catalog_api.create_service(ref['id'], ref.copy())
ref = getattr(self, 'service', None)
if ref is None or ref['type'] != 'identity':
ref = unit.new_service_ref()
ref['type'] = 'identity'
PROVIDERS.catalog_api.create_service(ref['id'], ref.copy())
action = CREATED_OPERATION + '.' + resource_type
initiator = notifications._get_request_audit_info(self.user_id)

View File

@ -273,7 +273,7 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase,
self.region_id = self.region['id']
PROVIDERS.catalog_api.create_region(self.region)
self.service = unit.new_service_ref()
self.service = unit.new_service_ref(type='identity')
self.service_id = self.service['id']
PROVIDERS.catalog_api.create_service(
self.service_id, self.service.copy()
@ -436,6 +436,24 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase,
})
return r.headers.get('X-Subject-Token')
def get_application_credentials_token(self, app_cred_id, app_cred_secret):
"""Convenience method for requesting application credentials token."""
r = self.admin_request(
method='POST',
path='/v3/auth/tokens',
body={
'auth': {
'identity': {
'methods': ['application_credential'],
'application_credential': {
'id': app_cred_id,
'secret': app_cred_secret
}
}
}
})
return r.headers.get('X-Subject-Token')
def get_requested_token(self, auth):
"""Request the specific token we want."""
r = self.v3_create_token(auth)
@ -1253,6 +1271,32 @@ class VersionTestCase(RestfulTestCase):
# because we need the token
class AuthContextMiddlewareTestCase(RestfulTestCase):
def load_fixtures(self, fixtures):
self.load_sample_data()
app_cred_api = PROVIDERS.application_credential_api
access_rules = [
{
'id': uuid.uuid4().hex,
'service': self.service['type'],
'method': 'GET',
'path': '/v3/users/*',
}
]
app_cred = {
'id': uuid.uuid4().hex,
'name': 'appcredtest',
'secret': uuid.uuid4().hex,
'user_id': self.user['id'],
'project_id': self.project['id'],
'description': 'Test Application Credential',
'roles': [{'id': self.role_id}],
'access_rules': access_rules,
}
app_cred_ref = app_cred_api.create_application_credential(app_cred)
self.app_cred_r_id = app_cred_ref['id']
self.app_cred_r_secret = app_cred_ref['secret']
def _middleware_request(self, token, extra_environ=None):
def application(environ, start_response):
@ -1347,6 +1391,36 @@ class AuthContextMiddlewareTestCase(RestfulTestCase):
req_context.project_domain_id)
self.assertFalse(req_context.is_admin)
def test_auth_context_app_cred_with_rule(self):
#
# This is an open-coded _middleware_request(), which allows us to
# supply paths and verify failure. We can refactor later if needed.
#
def application(environ, start_response):
body = b'body'
headers = [('Content-Type', 'text/html; charset=utf8'),
('Content-Length', str(len(body)))]
start_response('200 OK', headers)
return [body]
token = self.get_application_credentials_token(self.app_cred_r_id,
self.app_cred_r_secret)
# Test to failure
app = webtest.TestApp(auth_context.AuthContextMiddleware(application))
resp = app.get('/v3/projects/e3a0883d15ff409e98e59d460f583a68',
headers={authorization.AUTH_TOKEN_HEADER: token},
status=401)
self.assertEqual('401 Unauthorized', resp.status)
# Test to success
app = webtest.TestApp(auth_context.AuthContextMiddleware(application))
resp = app.get('/v3/users/3879328537914be2b394ddf57a4fc73a',
headers={authorization.AUTH_TOKEN_HEADER: token})
self.assertEqual('200 OK', resp.status)
self.assertEqual(b'body', resp.body) # just to make sure it worked
class JsonHomeTestMixin(object):
"""JSON Home test.