Security update for monasca-log-api
Created middleware to authorize access to log api. Only configured roles (i.e. default) can access the api. Also middleware detect if the request would come from monasca-log-agent. Summary: - middleware added (logic + tests) - extended documentation Additionaly: - added better tox processing - added minimum coverage Change-Id: Ic848bfa3a8552887661f8223078efe3a4bca5c37
This commit is contained in:
parent
1feaa74013
commit
33e9d7aaa3
|
@ -0,0 +1,13 @@
|
|||
monasca_log_api.middleware package
|
||||
==================================
|
||||
|
||||
Submodules
|
||||
----------
|
||||
|
||||
monasca_log_api.middleware.role_middleware module
|
||||
-------------------------------------------------
|
||||
|
||||
.. automodule:: monasca_log_api.middleware.role_middleware
|
||||
:members:
|
||||
:undoc-members:
|
||||
:show-inheritance:
|
|
@ -8,6 +8,7 @@ Subpackages
|
|||
|
||||
monasca_log_api.api
|
||||
monasca_log_api.v2
|
||||
monasca_log_api.middleware
|
||||
|
||||
Submodules
|
||||
----------
|
||||
|
|
|
@ -28,3 +28,8 @@ cafile =
|
|||
certfile =
|
||||
keyfile =
|
||||
insecure = false
|
||||
|
||||
[roles_middleware]
|
||||
path = /v2.0/log
|
||||
default_roles = monasca-user
|
||||
agent_roles = monasca-log-agent
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
name = monasca_log_api
|
||||
|
||||
[pipeline:main]
|
||||
pipeline = auth api
|
||||
pipeline = auth roles api
|
||||
|
||||
[app:api]
|
||||
paste.app_factory = monasca_log_api.server:launch
|
||||
|
@ -10,6 +10,9 @@ paste.app_factory = monasca_log_api.server:launch
|
|||
[filter:auth]
|
||||
paste.filter_factory = keystonemiddleware.auth_token:filter_factory
|
||||
|
||||
[filter:roles]
|
||||
paste.filter_factory = monasca_log_api.middleware.role_middleware:RoleMiddleware.factory
|
||||
|
||||
[server:main]
|
||||
use = egg:gunicorn#main
|
||||
host = 127.0.0.1
|
||||
|
|
|
@ -41,7 +41,7 @@ import monasca.log.api.infrastructure.servlet.PreAuthenticationFilter.ErrorCaptu
|
|||
public class PostAuthenticationFilter implements Filter {
|
||||
static final String CONFIRMED_STATUS = "CONFIRMED";
|
||||
static final String X_ROLES_ATTRIBUTE = "X-ROLES";
|
||||
static final String X_MONASCA_AGENT = "X-MONASCA_AGENT";
|
||||
static final String X_MONASCA_LOG_AGENT = "X-MONASCA-LOG-AGENT";
|
||||
static final String X_IDENTITY_STATUS_ATTRIBUTE = "X-IDENTITY-STATUS";
|
||||
private static final String X_TENANT_ID_ATTRIBUTE = "X-PROJECT-ID";
|
||||
static final String X_TENANT_ID_HEADER = "X-Tenant-Id";
|
||||
|
@ -136,7 +136,7 @@ public class PostAuthenticationFilter implements Filter {
|
|||
}
|
||||
}
|
||||
if (agentUser) {
|
||||
request.setAttribute(X_MONASCA_AGENT, true);
|
||||
request.setAttribute(X_MONASCA_LOG_AGENT, true);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
|
|
|
@ -26,7 +26,7 @@ import org.slf4j.LoggerFactory;
|
|||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.ws.rs.core.Context;
|
||||
|
||||
import static monasca.log.api.infrastructure.servlet.PostAuthenticationFilter.X_MONASCA_AGENT;
|
||||
import static monasca.log.api.infrastructure.servlet.PostAuthenticationFilter.X_MONASCA_LOG_AGENT;
|
||||
|
||||
public class RoleAuthorizationFilter implements ContainerRequestFilter {
|
||||
private static final Logger logger = LoggerFactory.getLogger
|
||||
|
@ -39,10 +39,10 @@ public class RoleAuthorizationFilter implements ContainerRequestFilter {
|
|||
@Override
|
||||
public ContainerRequest filter(ContainerRequest containerRequest) {
|
||||
String method = containerRequest.getMethod();
|
||||
Object isAgent = httpServletRequest.getAttribute(X_MONASCA_AGENT);
|
||||
Object isAgent = httpServletRequest.getAttribute(X_MONASCA_LOG_AGENT);
|
||||
String pathInfo = httpServletRequest.getPathInfo();
|
||||
|
||||
// X_MONASCA_AGENT is only set if the only valid role for this user is an agent role
|
||||
// X_MONASCA_LOG_AGENT is only set if the only valid role for this user is an agent role
|
||||
if (isAgent != null) {
|
||||
if (!(method.equals("POST") && validPath(pathInfo, VALID_MONASCA_AGENT_POST_PATHS)) &&
|
||||
!(method.equals("GET") && validPath(pathInfo, VALID_MONASCA_AGENT_GET_PATHS))) {
|
||||
|
|
|
@ -43,4 +43,4 @@ class LogsApi(object):
|
|||
:param res: current response
|
||||
|
||||
"""
|
||||
res.status = falcon.HTTP_501
|
||||
res.status = falcon.HTTP_501 # pragma: no cover
|
||||
|
|
|
@ -0,0 +1,175 @@
|
|||
# Copyright 2015 FUJITSU LIMITED
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
from oslo_middleware import base as om
|
||||
from webob import response
|
||||
|
||||
CONF = cfg.CONF
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
role_m_opts = [
|
||||
cfg.ListOpt(name='path',
|
||||
default='/',
|
||||
help='List of paths where middleware applies to'),
|
||||
cfg.ListOpt(name='default_roles',
|
||||
default=None,
|
||||
help='List of roles allowed to enter api'),
|
||||
cfg.ListOpt(name='agent_roles',
|
||||
default=None,
|
||||
help=('List of roles, that if set, mean that request '
|
||||
'comes from agent, thus is authorized in the same '
|
||||
'time'))
|
||||
]
|
||||
role_m_group = cfg.OptGroup(name='roles_middleware', title='roles_middleware')
|
||||
|
||||
CONF.register_group(role_m_group)
|
||||
CONF.register_opts(role_m_opts, role_m_group)
|
||||
|
||||
_X_IDENTITY_STATUS = 'X-Identity-Status'
|
||||
_X_ROLES = 'X-Roles'
|
||||
_X_MONASCA_LOG_AGENT = 'X-MONASCA-LOG-AGENT'
|
||||
_CONFIRMED_STATUS = 'Confirmed'
|
||||
|
||||
|
||||
def _ensure_lower_roles(roles):
|
||||
if not roles:
|
||||
return []
|
||||
return [role.strip().lower() for role in roles]
|
||||
|
||||
|
||||
def _intersect(a, b):
|
||||
return list(set(a) & set(b))
|
||||
|
||||
|
||||
class RoleMiddleware(om.Middleware):
|
||||
"""Authorization middleware for X-Roles header.
|
||||
|
||||
RoleMiddleware is responsible for authorizing user's
|
||||
access against **X-Roles** header. Middleware
|
||||
expects authentication to be completed (i.e. keystone middleware
|
||||
has been already called).
|
||||
|
||||
If tenant is authenticated and authorized middleware
|
||||
exits silently (that is considered a success). Otherwise
|
||||
middleware produces JSON response according to following schema
|
||||
|
||||
.. code-block:: json
|
||||
|
||||
{
|
||||
'title': u'Unauthorized',
|
||||
'message': explanation (str)
|
||||
}
|
||||
|
||||
Configuration example
|
||||
|
||||
.. code-block:: cfg
|
||||
|
||||
[roles_middleware]
|
||||
path = /v2.0/log
|
||||
default_roles = monasca-user
|
||||
agent_roles = monasca-log-agent
|
||||
|
||||
Configuration explained:
|
||||
|
||||
* path (list) - path (or list of paths) middleware should be applied
|
||||
* agent_roles (list) - list of roles that identifies tenant as an agent
|
||||
* default_roles (list) - list of roles that should be authorized
|
||||
|
||||
Note:
|
||||
Being an agent means that tenant is automatically authorized.
|
||||
Note:
|
||||
Middleware works only for configured paths and for all
|
||||
requests apart from HTTP method **OPTIONS**.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, application):
|
||||
super(RoleMiddleware, self).__init__(application)
|
||||
middleware = CONF.roles_middleware
|
||||
|
||||
self._path = middleware.path
|
||||
self._default_roles = _ensure_lower_roles(middleware.default_roles)
|
||||
self._agent_roles = _ensure_lower_roles(middleware.agent_roles)
|
||||
|
||||
LOG.debug('RolesMiddleware initialized for paths=%s', self._path)
|
||||
|
||||
def process_request(self, req):
|
||||
if not self._can_apply_middleware(req):
|
||||
LOG.debug('%s skipped in role middleware', req.path)
|
||||
return None
|
||||
|
||||
is_authenticated = self._is_authenticated(req)
|
||||
is_authorized, is_agent = self._is_authorized(req)
|
||||
tenant_id = req.headers.get('X-Tenant-Id')
|
||||
|
||||
req.environ[_X_MONASCA_LOG_AGENT] = is_agent
|
||||
|
||||
LOG.debug('%s is authenticated=%s, authorized=%s, log_agent=%s',
|
||||
tenant_id, is_authenticated, is_authorized, is_agent)
|
||||
|
||||
if is_authenticated and is_authorized:
|
||||
LOG.debug('%s has been authenticated and authorized', tenant_id)
|
||||
return # do return nothing to enter API internal
|
||||
|
||||
# whoops
|
||||
if is_authorized:
|
||||
explanation = u'Failed to authenticate request for %s' % tenant_id
|
||||
else:
|
||||
explanation = (u'Tenant %s is missing a required role to access '
|
||||
u'this service' % tenant_id)
|
||||
|
||||
if explanation is not None:
|
||||
LOG.error(explanation)
|
||||
json_body = {u'title': u'Unauthorized', u'message': explanation}
|
||||
return response.Response(status=401,
|
||||
json_body=json_body,
|
||||
content_type='application/json')
|
||||
|
||||
def _is_authorized(self, req):
|
||||
headers = req.headers
|
||||
roles = headers.get(_X_ROLES)
|
||||
|
||||
if not roles:
|
||||
LOG.warn('Couldn\'t locate %s header, or it was empty', _X_ROLES)
|
||||
return False, False
|
||||
else:
|
||||
roles = _ensure_lower_roles(roles.split(','))
|
||||
|
||||
is_agent = len(_intersect(roles, self._agent_roles)) > 0
|
||||
is_authorized = (len(_intersect(roles, self._default_roles)) > 0 or
|
||||
is_agent)
|
||||
|
||||
return is_authorized, is_agent
|
||||
|
||||
def _is_authenticated(self, req):
|
||||
headers = req.headers
|
||||
if _X_IDENTITY_STATUS in headers:
|
||||
status = req.headers.get(_X_IDENTITY_STATUS)
|
||||
return _CONFIRMED_STATUS == status
|
||||
return False
|
||||
|
||||
def _can_apply_middleware(self, req):
|
||||
path = req.path
|
||||
method = req.method
|
||||
|
||||
if method == 'OPTIONS':
|
||||
return False
|
||||
|
||||
if self._path:
|
||||
for p in self._path:
|
||||
if path.startswith(p):
|
||||
return True
|
||||
return False # if no configured paths, or nothing matches
|
|
@ -0,0 +1,327 @@
|
|||
# Copyright 2015 FUJITSU LIMITED
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import unittest
|
||||
|
||||
import mock
|
||||
from webob import response
|
||||
|
||||
from monasca_log_api.middleware import role_middleware as rm
|
||||
|
||||
|
||||
class SideLogicTestEnsureLowerRoles(unittest.TestCase):
|
||||
def test_should_ensure_lower_roles(self):
|
||||
roles = ['CMM-Admin', ' CmM-User ']
|
||||
expected = ['cmm-admin', 'cmm-user']
|
||||
self.assertItemsEqual(expected, rm._ensure_lower_roles(roles))
|
||||
|
||||
def test_should_return_empty_array_for_falsy_input_1(self):
|
||||
roles = []
|
||||
expected = []
|
||||
self.assertItemsEqual(expected, rm._ensure_lower_roles(roles))
|
||||
|
||||
def test_should_return_empty_array_for_falsy_input_2(self):
|
||||
roles = None
|
||||
expected = []
|
||||
self.assertItemsEqual(expected, rm._ensure_lower_roles(roles))
|
||||
|
||||
|
||||
class SideLogicTestIntersect(unittest.TestCase):
|
||||
def test_should_intersect_seqs(self):
|
||||
seq_1 = [1, 2, 3]
|
||||
seq_2 = [2]
|
||||
|
||||
expected = [2]
|
||||
|
||||
self.assertItemsEqual(expected, rm._intersect(seq_1, seq_2))
|
||||
self.assertItemsEqual(expected, rm._intersect(seq_2, seq_1))
|
||||
|
||||
def test_should_intersect_empty(self):
|
||||
seq_1 = []
|
||||
seq_2 = []
|
||||
|
||||
expected = []
|
||||
|
||||
self.assertItemsEqual(expected, rm._intersect(seq_1, seq_2))
|
||||
self.assertItemsEqual(expected, rm._intersect(seq_2, seq_1))
|
||||
|
||||
def test_should_not_intersect_without_common_elements(self):
|
||||
seq_1 = [1, 2, 3]
|
||||
seq_2 = [4, 5, 6]
|
||||
|
||||
expected = []
|
||||
|
||||
self.assertItemsEqual(expected, rm._intersect(seq_1, seq_2))
|
||||
self.assertItemsEqual(expected, rm._intersect(seq_2, seq_1))
|
||||
|
||||
|
||||
class RolesMiddlewareSideLogicTest(unittest.TestCase):
|
||||
def test_should_apply_middleware_for_valid_path(self):
|
||||
paths = ['/', '/v2.0/', '/v2.0/log/']
|
||||
|
||||
instance = rm.RoleMiddleware(None)
|
||||
instance._path = paths
|
||||
|
||||
for p in paths:
|
||||
req = mock.Mock()
|
||||
req.method = 'GET'
|
||||
req.path = p
|
||||
self.assertTrue(instance._can_apply_middleware(req))
|
||||
|
||||
def test_should_apply_middleware_for_invalid_path(self):
|
||||
paths = ['/v2.0/', '/v2.0/log/']
|
||||
|
||||
instance = rm.RoleMiddleware(None)
|
||||
instance._path = paths
|
||||
|
||||
for p in paths:
|
||||
pp = 'test/%s' % p
|
||||
req = mock.Mock()
|
||||
req.method = 'GET'
|
||||
req.path = pp
|
||||
self.assertFalse(instance._can_apply_middleware(req))
|
||||
|
||||
def test_should_reject_OPTIONS_request(self):
|
||||
instance = rm.RoleMiddleware(None)
|
||||
req = mock.Mock()
|
||||
req.method = 'OPTIONS'
|
||||
req.path = '/'
|
||||
self.assertFalse(instance._can_apply_middleware(req))
|
||||
|
||||
def test_should_return_true_if_authenticated(self):
|
||||
instance = rm.RoleMiddleware(None)
|
||||
|
||||
req = mock.Mock()
|
||||
req.headers = {rm._X_IDENTITY_STATUS: rm._CONFIRMED_STATUS}
|
||||
|
||||
self.assertTrue(instance._is_authenticated(req))
|
||||
|
||||
def test_should_return_false_if_not_authenticated(self):
|
||||
instance = rm.RoleMiddleware(None)
|
||||
|
||||
req = mock.Mock()
|
||||
req.headers = {rm._X_IDENTITY_STATUS: 'Some_Other_Status'}
|
||||
|
||||
self.assertFalse(instance._is_authenticated(req))
|
||||
|
||||
def test_should_return_false_if_identity_status_not_found(self):
|
||||
instance = rm.RoleMiddleware(None)
|
||||
|
||||
req = mock.Mock()
|
||||
req.headers = {}
|
||||
|
||||
self.assertFalse(instance._is_authenticated(req))
|
||||
|
||||
def test_should_return_true_if_authorized_no_agent(self):
|
||||
roles = 'cmm-admin,cmm-user'
|
||||
roles_array = roles.split(',')
|
||||
|
||||
instance = rm.RoleMiddleware(None)
|
||||
instance._default_roles = roles_array
|
||||
instance._agent_roles = []
|
||||
|
||||
req = mock.Mock()
|
||||
req.headers = {rm._X_ROLES: roles}
|
||||
|
||||
is_authorized, is_agent = instance._is_authorized(req)
|
||||
|
||||
self.assertFalse(is_agent)
|
||||
self.assertTrue(is_authorized)
|
||||
|
||||
def test_should_return_true_if_authorized_with_agent(self):
|
||||
roles = 'cmm-admin,cmm-user'
|
||||
roles_array = roles.split(',')
|
||||
|
||||
default_roles = [roles_array[0]]
|
||||
admin_roles = [roles_array[1]]
|
||||
|
||||
instance = rm.RoleMiddleware(None)
|
||||
instance._default_roles = default_roles
|
||||
instance._agent_roles = admin_roles
|
||||
|
||||
req = mock.Mock()
|
||||
req.headers = {rm._X_ROLES: roles}
|
||||
|
||||
is_authorized, is_agent = instance._is_authorized(req)
|
||||
|
||||
self.assertTrue(is_agent)
|
||||
self.assertTrue(is_authorized)
|
||||
|
||||
def test_should_return_not_authorized_no_x_roles(self):
|
||||
roles = 'cmm-admin,cmm-user'
|
||||
roles_array = roles.split(',')
|
||||
|
||||
default_roles = [roles_array[0]]
|
||||
admin_roles = [roles_array[1]]
|
||||
|
||||
instance = rm.RoleMiddleware(None)
|
||||
instance._default_roles = default_roles
|
||||
instance._agent_roles = admin_roles
|
||||
|
||||
req = mock.Mock()
|
||||
req.headers = {}
|
||||
|
||||
is_authorized, is_agent = instance._is_authorized(req)
|
||||
|
||||
self.assertFalse(is_agent)
|
||||
self.assertFalse(is_authorized)
|
||||
|
||||
def test_should_return_authorized_if_at_least_agent_true(self):
|
||||
roles = 'cmm-admin,cmm-user'
|
||||
roles_array = roles.split(',')
|
||||
|
||||
default_roles = ['different_role']
|
||||
admin_roles = [roles_array[1]]
|
||||
|
||||
instance = rm.RoleMiddleware(None)
|
||||
instance._default_roles = default_roles
|
||||
instance._agent_roles = admin_roles
|
||||
|
||||
req = mock.Mock()
|
||||
req.headers = {rm._X_ROLES: roles}
|
||||
|
||||
is_authorized, is_agent = instance._is_authorized(req)
|
||||
|
||||
self.assertTrue(is_agent)
|
||||
self.assertTrue(is_authorized)
|
||||
|
||||
|
||||
class RolesMiddlewareLogicTest(unittest.TestCase):
|
||||
def test_not_process_further_if_cannot_apply_path(self):
|
||||
roles = 'cmm-admin,cmm-user'
|
||||
roles_array = roles.split(',')
|
||||
|
||||
default_roles = [roles_array[0]]
|
||||
admin_roles = [roles_array[1]]
|
||||
|
||||
instance = rm.RoleMiddleware(None)
|
||||
instance._default_roles = default_roles
|
||||
instance._agent_roles = admin_roles
|
||||
instance._path = ['/test']
|
||||
|
||||
# spying
|
||||
instance._is_authenticated = mock.Mock()
|
||||
instance._is_authorized = mock.Mock()
|
||||
|
||||
req = mock.Mock()
|
||||
req.headers = {rm._X_ROLES: roles}
|
||||
req.path = '/different/test'
|
||||
|
||||
instance.process_request(req=req)
|
||||
|
||||
self.assertFalse(instance._is_authenticated.called)
|
||||
self.assertFalse(instance._is_authorized.called)
|
||||
|
||||
def test_not_process_further_if_cannot_apply_method(self):
|
||||
roles = 'cmm-admin,cmm-user'
|
||||
roles_array = roles.split(',')
|
||||
|
||||
default_roles = [roles_array[0]]
|
||||
admin_roles = [roles_array[1]]
|
||||
|
||||
instance = rm.RoleMiddleware(None)
|
||||
instance._default_roles = default_roles
|
||||
instance._agent_roles = admin_roles
|
||||
instance._path = ['/test']
|
||||
|
||||
# spying
|
||||
instance._is_authenticated = mock.Mock()
|
||||
instance._is_authorized = mock.Mock()
|
||||
|
||||
req = mock.Mock()
|
||||
req.headers = {rm._X_ROLES: roles}
|
||||
req.path = '/test'
|
||||
req.method = 'OPTIONS'
|
||||
|
||||
instance.process_request(req=req)
|
||||
|
||||
self.assertFalse(instance._is_authenticated.called)
|
||||
self.assertFalse(instance._is_authorized.called)
|
||||
|
||||
def test_should_return_None_if_authenticated_authorized(self):
|
||||
instance = rm.RoleMiddleware(None)
|
||||
is_authorized = True
|
||||
is_agent = True
|
||||
|
||||
instance._can_apply_middleware = mock.Mock(return_value=True)
|
||||
instance._is_authorized = mock.Mock(return_value=(is_authorized,
|
||||
is_agent))
|
||||
instance._is_authenticated = mock.Mock(return_value=True)
|
||||
|
||||
req = mock.Mock()
|
||||
req.environ = {}
|
||||
|
||||
result = instance.process_request(req=req)
|
||||
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_should_produce_json_response_if_not_authorized_but_authenticated(
|
||||
self):
|
||||
instance = rm.RoleMiddleware(None)
|
||||
is_authorized = False
|
||||
is_agent = False
|
||||
is_authenticated = True
|
||||
|
||||
instance._can_apply_middleware = mock.Mock(return_value=True)
|
||||
instance._is_authorized = mock.Mock(return_value=(is_authorized,
|
||||
is_agent))
|
||||
instance._is_authenticated = mock.Mock(return_value=is_authenticated)
|
||||
|
||||
req = mock.Mock()
|
||||
req.environ = {}
|
||||
req.headers = {
|
||||
'X-Tenant-Id': '11111111'
|
||||
}
|
||||
|
||||
result = instance.process_request(req=req)
|
||||
|
||||
self.assertIsNotNone(result)
|
||||
self.assertTrue(isinstance(result, response.Response))
|
||||
|
||||
status = result.status_code
|
||||
json_body = result.json_body
|
||||
message = json_body.get('message')
|
||||
|
||||
self.assertIn('is missing a required role to access', message)
|
||||
self.assertEqual(401, status)
|
||||
|
||||
def test_should_produce_json_response_if_not_authenticated_but_authorized(
|
||||
self):
|
||||
instance = rm.RoleMiddleware(None)
|
||||
is_authorized = True
|
||||
is_agent = True
|
||||
is_authenticated = False
|
||||
|
||||
instance._can_apply_middleware = mock.Mock(return_value=True)
|
||||
instance._is_authorized = mock.Mock(return_value=(is_authorized,
|
||||
is_agent))
|
||||
instance._is_authenticated = mock.Mock(return_value=is_authenticated)
|
||||
|
||||
req = mock.Mock()
|
||||
req.environ = {}
|
||||
req.headers = {
|
||||
'X-Tenant-Id': '11111111'
|
||||
}
|
||||
|
||||
result = instance.process_request(req=req)
|
||||
|
||||
self.assertIsNotNone(result)
|
||||
self.assertTrue(isinstance(result, response.Response))
|
||||
|
||||
status = result.status_code
|
||||
json_body = result.json_body
|
||||
message = json_body.get('message')
|
||||
|
||||
self.assertIn('Failed to authenticate request for', message)
|
||||
self.assertEqual(401, status)
|
|
@ -3,10 +3,10 @@ falcon==0.3.0
|
|||
gunicorn>=19.2.0
|
||||
keystonemiddleware
|
||||
oslo.config>=1.2.1
|
||||
oslo.middleware
|
||||
oslo.log
|
||||
oslo.serialization
|
||||
oslo.utils
|
||||
oslo.middleware<=1.0.0
|
||||
oslo.log<=1.0.0
|
||||
oslo.serialization<=1.6.0
|
||||
oslo.utils<=1.6.0
|
||||
pastedeploy>=1.3.3
|
||||
pbr>=1.6.0,<2.0
|
||||
six>=1.9.0
|
||||
|
|
Loading…
Reference in New Issue