Only make one request to Keystone per S3 request

Previously, s3token would make one request to Keystone to validate the
request signature provided by the user, then use the response to add an
X-Auth-Token header to the request environment. This would get picked up
by the authtoken middleware which would make *another* request to
validate the token *we just got*.

Now, we'll populate the request environment with the following headers:

   * X-Identity-Status
   * X-Roles
   * X-User-Id
   * X-User-Name
   * X-Tenant-Id
   * X-Tenant-Name
   * X-Project-Id
   * X-Project-Name

This allows Swift's keystoneauth middleware to function without needing
the authtoken middleware at all.

UpgradeImpact
-------------
The recommended pipeline ordering has changed. Whereas authoken previously
had to be between s3token and keystoneauth like

  ... swift3 s3token authtoken keystoneauth ...

it should now be placed before swift3, as in

  ... authtoken swift3 s3token keystoneauth ...

Alternatively, if Keystone users should only ever access Swift through
the S3 API, the authtoken middleware may be removed entirely. Note that
the old pipeline ordering will continue to work, but still requires two
Keystone requests per client request as before.

To upgrade an existing cluster to take advantage of this change
operators should, for each proxy server:

   1. Upgrade swift3
   2. Optionally, restart proxy-server
   3. Update proxy-server.conf with the new pipeline
   4. Restart proxy-server

Updating proxy-server.conf *before* upgrading swift3 will prevent the
proxy from starting if there is an unexpected reboot before the upgrade.

Closes-Bug: #1653017
Change-Id: I21e38884a2aefbb94b76c76deccd815f01db7362
This commit is contained in:
Tim Burke 2016-12-03 02:33:12 +00:00
parent 1d8d305b57
commit 6ffcc294e3
9 changed files with 331 additions and 79 deletions

View File

@ -143,7 +143,6 @@ class Swift3Middleware(object):
elif 'keystoneauth' in auth_pipeline:
check_filter_order(auth_pipeline,
['s3token',
'authtoken',
'keystoneauth'])
LOGGER.debug('Use keystone middleware.')
elif len(auth_pipeline):

View File

@ -1189,7 +1189,7 @@ class S3AclRequest(Request):
self.user_id = "%s:%s" % (sw_resp.environ['HTTP_X_TENANT_NAME'],
sw_resp.environ['HTTP_X_USER_NAME'])
self.user_id = utf8encode(self.user_id)
self.token = sw_resp.environ['HTTP_X_AUTH_TOKEN']
self.token = sw_resp.environ.get('HTTP_X_AUTH_TOKEN')
# Need to skip S3 authorization since authtoken middleware
# overwrites account in PATH_INFO
del self.headers['Authorization']

View File

@ -37,7 +37,8 @@ import logging
import requests
import six
from swift.common.swob import Request, Response
from swift.common.swob import Request, HTTPBadRequest, HTTPUnauthorized, \
HTTPException
from swift.common.utils import config_true_value, split_path
from swift.common.wsgi import ConfigFileError
@ -46,9 +47,29 @@ from swift3.utils import is_valid_ipv6
PROTOCOL_NAME = 'S3 Token Authentication'
class ServiceError(Exception):
pass
# Headers to purge if they came from (or may have come from) the client
KEYSTONE_AUTH_HEADERS = (
'X-Identity-Status', 'X-Service-Identity-Status',
'X-Domain-Id', 'X-Service-Domain-Id',
'X-Domain-Name', 'X-Service-Domain-Name',
'X-Project-Id', 'X-Service-Project-Id',
'X-Project-Name', 'X-Service-Project-Name',
'X-Project-Domain-Id', 'X-Service-Project-Domain-Id',
'X-Project-Domain-Name', 'X-Service-Project-Domain-Name',
'X-User-Id', 'X-Service-User-Id',
'X-User-Name', 'X-Service-User-Name',
'X-User-Domain-Id', 'X-Service-User-Domain-Id',
'X-User-Domain-Name', 'X-Service-User-Domain-Name',
'X-Roles', 'X-Service-Roles',
'X-Is-Admin-Project',
'X-Service-Catalog',
# Deprecated headers, too...
'X-Tenant-Id',
'X-Tenant-Name',
'X-Tenant',
'X-User',
'X-Role',
)
class S3Token(object):
@ -104,16 +125,16 @@ class S3Token(object):
self._verify = None
def _deny_request(self, code):
error_table = {
'AccessDenied': (401, 'Access denied'),
'InvalidURI': (400, 'Could not parse the specified URI'),
}
resp = Response(content_type='text/xml')
resp.status = error_table[code][0]
error_cls, message = {
'AccessDenied': (HTTPUnauthorized, 'Access denied'),
'InvalidURI': (HTTPBadRequest,
'Could not parse the specified URI'),
}[code]
resp = error_cls(content_type='text/xml')
error_msg = ('<?xml version="1.0" encoding="UTF-8"?>\r\n'
'<Error>\r\n <Code>%s</Code>\r\n '
'<Message>%s</Message>\r\n</Error>\r\n' %
(code, error_table[code][1]))
(code, message))
if six.PY3:
error_msg = error_msg.encode()
resp.body = error_msg
@ -128,14 +149,12 @@ class S3Token(object):
timeout=self._timeout)
except requests.exceptions.RequestException as e:
self._logger.info('HTTP connection exception: %s', e)
resp = self._deny_request('InvalidURI')
raise ServiceError(resp)
raise self._deny_request('InvalidURI')
if response.status_code < 200 or response.status_code >= 300:
self._logger.debug('Keystone reply error: status=%s reason=%s',
response.status_code, response.reason)
resp = self._deny_request('AccessDenied')
raise ServiceError(resp)
raise self._deny_request('AccessDenied')
return response
@ -144,6 +163,10 @@ class S3Token(object):
req = Request(environ)
self._logger.debug('Calling S3Token middleware.')
# Always drop auth headers if we're first in the pipeline
if 'keystone.token_info' not in req.environ:
req.headers.update({h: None for h in KEYSTONE_AUTH_HEADERS})
try:
parts = split_path(req.path, 1, 4, True)
version, account, container, obj = parts
@ -210,26 +233,45 @@ class S3Token(object):
# identified and not doing a second query and just
# pass it through to swiftauth in this case.
try:
# NB: requests.Response, not swob.Response
resp = self._json_request(creds_json)
except ServiceError as e:
resp = e.args[0] # NB: swob.Response, not requests.Response
except HTTPException as e_resp:
if self._delay_auth_decision:
msg = 'Received error, deferring rejection based on error: %s'
self._logger.debug(msg, resp.status)
self._logger.debug(msg, e_resp.status)
return self._app(environ, start_response)
else:
msg = 'Received error, rejecting request with error: %s'
self._logger.debug(msg, resp.status)
return resp(environ, start_response)
self._logger.debug(msg, e_resp.status)
# NB: swob.Response, not requests.Response
return e_resp(environ, start_response)
self._logger.debug('Keystone Reply: Status: %d, Output: %s',
resp.status_code, resp.content)
try:
identity_info = resp.json()
token_id = str(identity_info['access']['token']['id'])
tenant = identity_info['access']['token']['tenant']
except (ValueError, KeyError):
access_info = resp.json()['access']
# Populate the environment similar to auth_token,
# so we don't have to contact Keystone again.
#
# Note that although the strings are unicode following json
# deserialization, Swift's HeaderEnvironProxy handles ensuring
# they're stored as native strings
req.headers.update({
'X-Identity-Status': 'Confirmed',
'X-Roles': ','.join(r['name']
for r in access_info['user']['roles']),
'X-User-Id': access_info['user']['id'],
'X-User-Name': access_info['user']['name'],
'X-Tenant-Id': access_info['token']['tenant']['id'],
'X-Tenant-Name': access_info['token']['tenant']['name'],
'X-Project-Id': access_info['token']['tenant']['id'],
'X-Project-Name': access_info['token']['tenant']['name'],
})
token_id = access_info['token'].get('id')
tenant = access_info['token']['tenant']
req.environ['keystone.token_info'] = resp.json()
except (ValueError, KeyError, TypeError):
if self._delay_auth_decision:
error = ('Error on keystone reply: %d %s - '
'deferring rejection downstream')

View File

@ -7,8 +7,6 @@ ceph_s3:
s3tests.functional.test_headers.test_object_create_bad_authorization_none: {status: KNOWN}
s3tests.functional.test_headers.test_object_create_bad_date_after_end_aws2: {status: KNOWN}
s3tests.functional.test_s3.test_100_continue: {status: KNOWN}
s3tests.functional.test_s3.test_abort_multipart_upload: {status: KNOWN}
s3tests.functional.test_s3.test_abort_multipart_upload_not_found: {status: KNOWN}
s3tests.functional.test_s3.test_atomic_conditional_write_1mb: {status: KNOWN}
s3tests.functional.test_s3.test_atomic_dual_conditional_write_1mb: {status: KNOWN}
s3tests.functional.test_s3.test_bucket_acl_default: {status: KNOWN}
@ -36,20 +34,8 @@ ceph_s3:
s3tests.functional.test_s3.test_cors_origin_wildcard: {status: KNOWN}
s3tests.functional.test_s3.test_list_buckets_anonymous: {status: KNOWN}
s3tests.functional.test_s3.test_list_buckets_invalid_auth: {status: KNOWN}
s3tests.functional.test_s3.test_list_multipart_upload: {status: KNOWN}
s3tests.functional.test_s3.test_logging_toggle: {status: KNOWN}
s3tests.functional.test_s3.test_multipart_copy_multiple_sizes: {status: KNOWN}
s3tests.functional.test_s3.test_multipart_copy_small: {status: KNOWN}
s3tests.functional.test_s3.test_multipart_resend_first_finishes_last: {status: KNOWN}
s3tests.functional.test_s3.test_multipart_upload: {status: KNOWN}
s3tests.functional.test_s3.test_multipart_upload_contents: {status: KNOWN}
s3tests.functional.test_s3.test_multipart_upload_empty: {status: KNOWN}
s3tests.functional.test_s3.test_multipart_upload_incorrect_etag: {status: KNOWN}
s3tests.functional.test_s3.test_multipart_upload_missing_part: {status: KNOWN}
s3tests.functional.test_s3.test_multipart_upload_multiple_sizes: {status: KNOWN}
s3tests.functional.test_s3.test_multipart_upload_resend_part: {status: KNOWN}
s3tests.functional.test_s3.test_multipart_upload_size_too_small: {status: KNOWN}
s3tests.functional.test_s3.test_multipart_upload_small: {status: KNOWN}
s3tests.functional.test_s3.test_object_acl_full_control_verify_owner: {status: KNOWN}
s3tests.functional.test_s3.test_object_acl_xml: {status: KNOWN}
s3tests.functional.test_s3.test_object_acl_xml_read: {status: KNOWN}

View File

@ -53,15 +53,6 @@ use = egg:swift#memcache
use = egg:swift3#s3token
auth_uri = http://localhost:35357/
[filter:authtoken]
paste.filter_factory = keystonemiddleware.auth_token:filter_factory
identity_uri = http://localhost:35357/
auth_uri = http://localhost:5000/
admin_tenant_name = service
admin_user = swift
admin_password = password
cache = swift.cache
[filter:keystoneauth]
use = egg:swift#keystoneauth
operator_roles = admin, swiftoperator

View File

@ -27,7 +27,7 @@ mkdir -p ${TEST_DIR}/certs ${TEST_DIR}/private
# create config files
if [ "$AUTH" == 'keystone' ]; then
MIDDLEWARE="s3token authtoken keystoneauth"
MIDDLEWARE="s3token keystoneauth"
elif [ "$AUTH" == 'tempauth' ]; then
MIDDLEWARE="tempauth"
else

View File

@ -14,21 +14,31 @@
# limitations under the License.
import unittest
from mock import patch
from mock import patch, MagicMock
from contextlib import nested
from datetime import datetime
import hashlib
import base64
import requests
import json
import copy
from urllib import unquote, quote
from swift.common.middleware.keystoneauth import KeystoneAuth
from swift.common import swob, utils
from swift.common.swob import Request
from keystonemiddleware.auth_token import AuthProtocol
from keystoneauth1.access import AccessInfoV2
import swift3
from swift3.test.unit import Swift3TestCase
from swift3.test.unit.helpers import FakeSwift
from swift3.test.unit.test_s3_token_middleware import GOOD_RESPONSE
from swift3.request import SigV4Request, Request as S3Request
from swift3.etree import fromstring
from swift3.middleware import filter_factory
from swift3.middleware import filter_factory, Swift3Middleware
from swift3.s3_token_middleware import S3Token
from swift3.cfg import CONF
@ -521,19 +531,24 @@ class TestSwift3Middleware(Swift3TestCase):
pipeline.return_value = 'swift3 tempauth proxy-server'
self.swift3.check_pipeline(conf)
# This *should* still work; authtoken will remove our auth details,
# but the X-Auth-Token we drop in will remain
# if we found one in the response
pipeline.return_value = 'swift3 s3token authtoken keystoneauth ' \
'proxy-server'
self.swift3.check_pipeline(conf)
# This should work now; no more doubled-up requests to keystone!
pipeline.return_value = 'swift3 s3token keystoneauth proxy-server'
self.swift3.check_pipeline(conf)
pipeline.return_value = 'swift3 swauth proxy-server'
self.swift3.check_pipeline(conf)
# Note that authtoken would need to have delay_auth_decision=True
pipeline.return_value = 'swift3 authtoken s3token keystoneauth ' \
'proxy-server'
with self.assertRaises(ValueError) as cm:
self.swift3.check_pipeline(conf)
self.assertIn('expected filter s3token before authtoken before '
'keystoneauth', cm.exception.message)
self.swift3.check_pipeline(conf)
pipeline.return_value = 'swift3 proxy-server'
with self.assertRaises(ValueError) as cm:
@ -861,6 +876,120 @@ class TestSwift3Middleware(Swift3TestCase):
status, headers, body = self.call_swift3(req)
self.assertEqual(status.split()[0], '403', body)
def test_swift3_with_only_s3_token(self):
self.swift = FakeSwift()
self.keystone_auth = KeystoneAuth(
self.swift, {'operator_roles': 'swift-user'})
self.s3_token = S3Token(
self.keystone_auth, {'auth_uri': 'https://fakehost/identity'})
self.swift3 = Swift3Middleware(self.s3_token, CONF)
req = Request.blank(
'/bucket',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS access:signature',
'Date': self.get_date_header()})
self.swift.register('PUT', '/v1/AUTH_TENANT_ID/bucket',
swob.HTTPCreated, {}, None)
self.swift.register('HEAD', '/v1/AUTH_TENANT_ID',
swob.HTTPOk, {}, None)
with patch.object(self.s3_token, '_json_request') as mock_req:
mock_resp = requests.Response()
mock_resp._content = json.dumps(GOOD_RESPONSE)
mock_resp.status_code = 201
mock_req.return_value = mock_resp
status, headers, body = self.call_swift3(req)
self.assertEqual(body, '')
self.assertEqual(1, mock_req.call_count)
def test_swift3_with_s3_token_and_auth_token(self):
self.swift = FakeSwift()
self.keystone_auth = KeystoneAuth(
self.swift, {'operator_roles': 'swift-user'})
self.auth_token = AuthProtocol(
self.keystone_auth, {'delay_auth_decision': 'True'})
self.s3_token = S3Token(
self.auth_token, {'auth_uri': 'https://fakehost/identity'})
self.swift3 = Swift3Middleware(self.s3_token, CONF)
req = Request.blank(
'/bucket',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS access:signature',
'Date': self.get_date_header()})
self.swift.register('PUT', '/v1/AUTH_TENANT_ID/bucket',
swob.HTTPCreated, {}, None)
self.swift.register('HEAD', '/v1/AUTH_TENANT_ID',
swob.HTTPOk, {}, None)
with patch.object(self.s3_token, '_json_request') as mock_req:
with patch.object(self.auth_token,
'_do_fetch_token') as mock_fetch:
mock_resp = requests.Response()
mock_resp._content = json.dumps(GOOD_RESPONSE)
mock_resp.status_code = 201
mock_req.return_value = mock_resp
mock_access_info = AccessInfoV2(GOOD_RESPONSE)
mock_access_info.will_expire_soon = \
lambda stale_duration: False
mock_fetch.return_value = (MagicMock(), mock_access_info)
status, headers, body = self.call_swift3(req)
self.assertEqual(body, '')
self.assertEqual(1, mock_req.call_count)
# With X-Auth-Token, auth_token will call _do_fetch_token to
# connect to keystone in auth_token, again
self.assertEqual(1, mock_fetch.call_count)
def test_swift3_with_s3_token_no_pass_token_to_auth_token(self):
self.swift = FakeSwift()
self.keystone_auth = KeystoneAuth(
self.swift, {'operator_roles': 'swift-user'})
self.auth_token = AuthProtocol(
self.keystone_auth, {'delay_auth_decision': 'True'})
self.s3_token = S3Token(
self.auth_token, {'auth_uri': 'https://fakehost/identity'})
self.swift3 = Swift3Middleware(self.s3_token, CONF)
req = Request.blank(
'/bucket',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS access:signature',
'Date': self.get_date_header()})
self.swift.register('PUT', '/v1/AUTH_TENANT_ID/bucket',
swob.HTTPCreated, {}, None)
self.swift.register('HEAD', '/v1/AUTH_TENANT_ID',
swob.HTTPOk, {}, None)
with patch.object(self.s3_token, '_json_request') as mock_req:
with patch.object(self.auth_token,
'_do_fetch_token') as mock_fetch:
mock_resp = requests.Response()
no_token_id_good_resp = copy.deepcopy(GOOD_RESPONSE)
# delete token id
del no_token_id_good_resp['access']['token']['id']
mock_resp._content = json.dumps(no_token_id_good_resp)
mock_resp.status_code = 201
mock_req.return_value = mock_resp
mock_access_info = AccessInfoV2(GOOD_RESPONSE)
mock_access_info.will_expire_soon = \
lambda stale_duration: False
mock_fetch.return_value = (MagicMock(), mock_access_info)
status, headers, body = self.call_swift3(req)
# No token provided from keystone result in 401 Unauthorized
# at `swift.common.middleware.keystoneauth` because auth_token
# will remove all auth headers including 'X-Identity-Status'[1]
# and then, set X-Identity-Status: Invalid at [2]
#
# 1: https://github.com/openstack/keystonemiddleware/blob/
# master/keystonemiddleware/auth_token/__init__.py#L620
# 2: https://github.com/openstack/keystonemiddleware/blob/
# master/keystonemiddleware/auth_token/__init__.py#L627-L629
self.assertEqual('403 Forbidden', status)
self.assertEqual(1, mock_req.call_count)
# if no token provided from keystone, we can skip the call to
# fetch the token
self.assertEqual(0, mock_fetch.call_count)
if __name__ == '__main__':
unittest.main()

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import copy
import json
import logging
import time
@ -28,8 +29,24 @@ from swift3 import s3_token_middleware as s3_token
from swift.common.swob import Request, Response
from swift.common.wsgi import ConfigFileError
GOOD_RESPONSE = {'access': {'token': {'id': 'TOKEN_ID',
'tenant': {'id': 'TENANT_ID'}}}}
GOOD_RESPONSE = {'access': {
'user': {
'username': 'S3_USER',
'name': 'S3_USER',
'id': 'USER_ID',
'roles': [
{'name': 'swift-user'},
{'name': '_member_'},
],
},
'token': {
'id': 'TOKEN_ID',
'tenant': {
'id': 'TENANT_ID',
'name': 'TENANT_NAME'
}
}
}}
class TestResponse(requests.Response):
@ -138,20 +155,66 @@ class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase):
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 200)
def test_nukes_auth_headers(self):
client_env = {
'HTTP_X_IDENTITY_STATUS': 'Confirmed',
'HTTP_X_ROLES': 'admin,_member_,swift-user',
'HTTP_X_TENANT_ID': 'cfa'
}
req = Request.blank('/v1/AUTH_cfa/c/o', environ=client_env)
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 200)
for key in client_env:
self.assertNotIn(key, req.environ)
def test_without_auth_storage_token(self):
req = Request.blank('/v1/AUTH_cfa/c/o')
req.headers['Authorization'] = 'AWS badboy'
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 200)
def _assert_authorized(self, req, expect_token=True):
self.assertTrue(req.path.startswith('/v1/AUTH_TENANT_ID'))
expected_headers = {
'X-Identity-Status': 'Confirmed',
'X-Roles': 'swift-user,_member_',
'X-User-Id': 'USER_ID',
'X-User-Name': 'S3_USER',
'X-Tenant-Id': 'TENANT_ID',
'X-Tenant-Name': 'TENANT_NAME',
'X-Project-Id': 'TENANT_ID',
'X-Project-Name': 'TENANT_NAME',
'X-Auth-Token': 'TOKEN_ID',
}
for header, value in expected_headers.items():
if header == 'X-Auth-Token' and not expect_token:
self.assertNotIn(header, req.headers)
continue
self.assertIn(header, req.headers)
self.assertEqual(value, req.headers[header])
# WSGI wants native strings for headers
self.assertIsInstance(req.headers[header], str)
self.assertEqual(1, self.middleware._app.calls)
def test_authorized(self):
req = Request.blank('/v1/AUTH_cfa/c/o')
req.headers['Authorization'] = 'AWS access:signature'
req.headers['X-Storage-Token'] = 'token'
req.get_response(self.middleware)
self.assertTrue(req.path.startswith('/v1/AUTH_TENANT_ID'))
self.assertEqual(req.headers['X-Auth-Token'], 'TOKEN_ID')
self.assertEqual(1, self.middleware._app.calls)
self._assert_authorized(req)
def test_tolerate_missing_token_id(self):
resp = copy.deepcopy(GOOD_RESPONSE)
del resp['access']['token']['id']
self.requests_mock.post(self.TEST_URL,
status_code=201,
json=resp)
req = Request.blank('/v1/AUTH_cfa/c/o')
req.headers['Authorization'] = 'AWS access:signature'
req.headers['X-Storage-Token'] = 'token'
req.get_response(self.middleware)
self._assert_authorized(req, expect_token=False)
def test_authorized_http(self):
protocol = 'http'
@ -169,8 +232,7 @@ class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase):
req.headers['Authorization'] = 'AWS access:signature'
req.headers['X-Storage-Token'] = 'token'
req.get_response(self.middleware)
self.assertTrue(req.path.startswith('/v1/AUTH_TENANT_ID'))
self.assertEqual(req.headers['X-Auth-Token'], 'TOKEN_ID')
self._assert_authorized(req)
def test_authorized_trailing_slash(self):
self.middleware = s3_token.filter_factory({
@ -179,8 +241,7 @@ class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase):
req.headers['Authorization'] = 'AWS access:signature'
req.headers['X-Storage-Token'] = 'token'
req.get_response(self.middleware)
self.assertTrue(req.path.startswith('/v1/AUTH_TENANT_ID'))
self.assertEqual(req.headers['X-Auth-Token'], 'TOKEN_ID')
self._assert_authorized(req)
def test_authorization_nova_toconnect(self):
req = Request.blank('/v1/AUTH_swiftint/c/o')
@ -311,6 +372,7 @@ class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase):
req.headers['Authorization'] = 'AWS access:signature'
req.headers['X-Storage-Token'] = 'token'
req.get_response(self.middleware)
self._assert_authorized(req)
class S3TokenMiddlewareTestBad(S3TokenMiddlewareTestBase):
@ -337,44 +399,87 @@ class S3TokenMiddlewareTestBad(S3TokenMiddlewareTestBase):
req.headers['X-Storage-Token'] = 'token'
resp = req.get_response(self.middleware)
self.assertEqual(resp.status_int, 400) # pylint: disable-msg=E1101
s3_invalid_req = self.middleware._deny_request('InvalidURI')
self.assertEqual(resp.body, s3_invalid_req.body)
s3_invalid_resp = self.middleware._deny_request('InvalidURI')
self.assertEqual(resp.body, s3_invalid_resp.body)
self.assertEqual(
resp.status_int, # pylint: disable-msg=E1101
s3_invalid_req.status_int) # pylint: disable-msg=E1101
s3_invalid_resp.status_int) # pylint: disable-msg=E1101
self.assertEqual(0, self.middleware._app.calls)
def test_fail_to_connect_to_keystone(self):
with mock.patch.object(self.middleware, '_json_request') as o:
s3_invalid_req = self.middleware._deny_request('InvalidURI')
o.side_effect = s3_token.ServiceError(s3_invalid_req)
s3_invalid_resp = self.middleware._deny_request('InvalidURI')
o.side_effect = s3_invalid_resp
req = Request.blank('/v1/AUTH_cfa/c/o')
req.headers['Authorization'] = 'AWS access:signature'
req.headers['X-Storage-Token'] = 'token'
resp = req.get_response(self.middleware)
self.assertEqual(resp.body, s3_invalid_req.body)
self.assertEqual(resp.body, s3_invalid_resp.body)
self.assertEqual(
resp.status_int, # pylint: disable-msg=E1101
s3_invalid_req.status_int) # pylint: disable-msg=E1101
s3_invalid_resp.status_int) # pylint: disable-msg=E1101
self.assertEqual(0, self.middleware._app.calls)
def test_bad_reply(self):
def _test_bad_reply(self, response_body):
self.requests_mock.post(self.TEST_URL,
status_code=201,
text="<badreply>")
text=response_body)
req = Request.blank('/v1/AUTH_cfa/c/o')
req.headers['Authorization'] = 'AWS access:signature'
req.headers['X-Storage-Token'] = 'token'
resp = req.get_response(self.middleware)
s3_invalid_req = self.middleware._deny_request('InvalidURI')
self.assertEqual(resp.body, s3_invalid_req.body)
s3_invalid_resp = self.middleware._deny_request('InvalidURI')
self.assertEqual(resp.body, s3_invalid_resp.body)
self.assertEqual(
resp.status_int, # pylint: disable-msg=E1101
s3_invalid_req.status_int) # pylint: disable-msg=E1101
s3_invalid_resp.status_int) # pylint: disable-msg=E1101
self.assertEqual(0, self.middleware._app.calls)
def test_bad_reply_not_json(self):
self._test_bad_reply('<badreply>')
def _test_bad_reply_missing_parts(self, *parts):
resp = copy.deepcopy(GOOD_RESPONSE)
part_dict = resp
for part in parts[:-1]:
part_dict = part_dict[part]
del part_dict[parts[-1]]
self._test_bad_reply(json.dumps(resp))
def test_bad_reply_missing_token_dict(self):
self._test_bad_reply_missing_parts('access', 'token')
def test_bad_reply_missing_user_dict(self):
self._test_bad_reply_missing_parts('access', 'user')
def test_bad_reply_missing_user_roles(self):
self._test_bad_reply_missing_parts('access', 'user', 'roles')
def test_bad_reply_missing_user_name(self):
self._test_bad_reply_missing_parts('access', 'user', 'name')
def test_bad_reply_missing_user_id(self):
self._test_bad_reply_missing_parts('access', 'user', 'id')
def test_bad_reply_missing_tenant_dict(self):
self._test_bad_reply_missing_parts('access', 'token', 'tenant')
def test_bad_reply_missing_tenant_id(self):
self._test_bad_reply_missing_parts('access', 'token', 'tenant', 'id')
def test_bad_reply_missing_tenant_name(self):
self._test_bad_reply_missing_parts('access', 'token', 'tenant', 'name')
def test_bad_reply_valid_but_bad_json(self):
self._test_bad_reply('{}')
self._test_bad_reply('[]')
self._test_bad_reply('null')
self._test_bad_reply('"foo"')
self._test_bad_reply('1')
self._test_bad_reply('true')
class S3TokenMiddlewareTestDeferredAuth(S3TokenMiddlewareTestBase):
def setUp(self):
@ -411,8 +516,7 @@ class S3TokenMiddlewareTestDeferredAuth(S3TokenMiddlewareTestBase):
def test_fail_to_connect_to_keystone(self):
with mock.patch.object(self.middleware, '_json_request') as o:
s3_invalid_req = self.middleware._deny_request('InvalidURI')
o.side_effect = s3_token.ServiceError(s3_invalid_req)
o.side_effect = self.middleware._deny_request('InvalidURI')
req = Request.blank('/v1/AUTH_cfa/c/o')
req.headers['Authorization'] = 'AWS access:signature'

View File

@ -9,3 +9,4 @@ python-openstackclient
boto
requests-mock>=0.7.0 # Apache-2.0
fixtures<2.0,>=1.3.1 # Apache-2.0/BSD
keystonemiddleware