Add support for v3 Keystone responses

Keystone master currently responds with v3 tokens. Preserve v2 support
for stable/newton and earlier Keystones. Note that stable/ocata simply
doesn't work.

Related-Change: I5d0c18121ba4bf8e33209daa48b9d87864951362
Change-Id: I747de516ab69a47622eecbf8ab3faa34444b3ad5
This commit is contained in:
Tim Burke 2017-01-13 02:39:39 +00:00
parent 1069659fbe
commit 807ed380f9
3 changed files with 237 additions and 16 deletions

View File

@ -73,6 +73,45 @@ KEYSTONE_AUTH_HEADERS = (
)
def parse_v2_response(token):
access_info = token['access']
headers = {
'X-Identity-Status': 'Confirmed',
'X-Roles': ','.join(r['name']
for r in access_info['user']['roles']),
'X-User-Id': access_info['user']['id'],
'X-User-Name': access_info['user']['name'],
'X-Tenant-Id': access_info['token']['tenant']['id'],
'X-Tenant-Name': access_info['token']['tenant']['name'],
'X-Project-Id': access_info['token']['tenant']['id'],
'X-Project-Name': access_info['token']['tenant']['name'],
}
return (
headers,
access_info['token'].get('id'),
access_info['token']['tenant'])
def parse_v3_response(token):
token = token['token']
headers = {
'X-Identity-Status': 'Confirmed',
'X-Roles': ','.join(r['name']
for r in token['roles']),
'X-User-Id': token['user']['id'],
'X-User-Name': token['user']['name'],
'X-User-Domain-Id': token['user']['domain']['id'],
'X-User-Domain-Name': token['user']['domain']['name'],
'X-Tenant-Id': token['project']['id'],
'X-Tenant-Name': token['project']['name'],
'X-Project-Id': token['project']['id'],
'X-Project-Name': token['project']['name'],
'X-Project-Domain-Id': token['project']['domain']['id'],
'X-Project-Domain-Name': token['project']['domain']['name'],
}
return headers, None, token['project']
class S3Token(object):
"""Middleware that handles S3 authentication."""
@ -244,27 +283,22 @@ class S3Token(object):
resp.status_code, resp.content)
try:
access_info = resp.json()['access']
token = resp.json()
if 'access' in token:
headers, token_id, tenant = parse_v2_response(token)
elif 'token' in token:
headers, token_id, tenant = parse_v3_response(token)
else:
raise ValueError
# Populate the environment similar to auth_token,
# so we don't have to contact Keystone again.
#
# Note that although the strings are unicode following json
# deserialization, Swift's HeaderEnvironProxy handles ensuring
# they're stored as native strings
req.headers.update({
'X-Identity-Status': 'Confirmed',
'X-Roles': ','.join(r['name']
for r in access_info['user']['roles']),
'X-User-Id': access_info['user']['id'],
'X-User-Name': access_info['user']['name'],
'X-Tenant-Id': access_info['token']['tenant']['id'],
'X-Tenant-Name': access_info['token']['tenant']['name'],
'X-Project-Id': access_info['token']['tenant']['id'],
'X-Project-Name': access_info['token']['tenant']['name'],
})
token_id = access_info['token'].get('id')
tenant = access_info['token']['tenant']
req.environ['keystone.token_info'] = resp.json()
req.headers.update(headers)
req.environ['keystone.token_info'] = token
except (ValueError, KeyError, TypeError):
if self._delay_auth_decision:
error = ('Error on keystone reply: %d %s - '

View File

@ -33,7 +33,8 @@ from keystoneauth1.access import AccessInfoV2
import swift3
from swift3.test.unit import Swift3TestCase
from swift3.test.unit.helpers import FakeSwift
from swift3.test.unit.test_s3_token_middleware import GOOD_RESPONSE
from swift3.test.unit.test_s3_token_middleware import \
GOOD_RESPONSE, GOOD_RESPONSE_V3
from swift3.request import SigV4Request, Request as S3Request
from swift3.etree import fromstring
from swift3.middleware import filter_factory, Swift3Middleware
@ -908,6 +909,32 @@ class TestSwift3Middleware(Swift3TestCase):
self.assertEqual(body, '')
self.assertEqual(1, mock_req.call_count)
def test_swift3_with_only_s3_token_v3(self):
self.swift = FakeSwift()
self.keystone_auth = KeystoneAuth(
self.swift, {'operator_roles': 'swift-user'})
self.s3_token = S3Token(
self.keystone_auth, {'auth_uri': 'https://fakehost/identity'})
self.swift3 = Swift3Middleware(self.s3_token, CONF)
req = Request.blank(
'/bucket',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS access:signature',
'Date': self.get_date_header()})
self.swift.register('PUT', '/v1/AUTH_PROJECT_ID/bucket',
swob.HTTPCreated, {}, None)
self.swift.register('HEAD', '/v1/AUTH_PROJECT_ID',
swob.HTTPOk, {}, None)
with patch.object(self.s3_token, '_json_request') as mock_req:
mock_resp = requests.Response()
mock_resp._content = json.dumps(GOOD_RESPONSE_V3)
mock_resp.status_code = 200
mock_req.return_value = mock_resp
status, headers, body = self.call_swift3(req)
self.assertEqual(body, '')
self.assertEqual(1, mock_req.call_count)
def test_swift3_with_s3_token_and_auth_token(self):
self.swift = FakeSwift()
self.keystone_auth = KeystoneAuth(

View File

@ -48,6 +48,28 @@ GOOD_RESPONSE = {'access': {
}
}
}}
GOOD_RESPONSE_V3 = {'token': {
'user': {
'domain': {
'name': 'Default',
'id': 'default',
},
'name': 'S3_USER',
'id': 'USER_ID',
},
'project': {
'domain': {
'name': 'PROJECT_DOMAIN_NAME',
'id': 'PROJECT_DOMAIN_ID',
},
'name': 'PROJECT_NAME',
'id': 'PROJECT_ID',
},
'roles': [
{'name': 'swift-user'},
{'name': '_member_'},
],
}}
class TestResponse(requests.Response):
@ -600,3 +622,141 @@ class S3TokenMiddlewareTestDeferredAuth(S3TokenMiddlewareTestBase):
200)
self.assertNotIn('X-Auth-Token', req.headers)
self.assertEqual(1, self.middleware._app.calls)
class S3TokenMiddlewareTestV3(S3TokenMiddlewareTestBase):
def setUp(self):
super(S3TokenMiddlewareTestV3, self).setUp()
self.requests_mock.post(self.TEST_URL,
status_code=200,
json=GOOD_RESPONSE_V3)
def _assert_authorized(self, req,
account_path='/v1/AUTH_PROJECT_ID/'):
self.assertTrue(req.path.startswith(account_path))
expected_headers = {
'X-Identity-Status': 'Confirmed',
'X-Roles': 'swift-user,_member_',
'X-User-Id': 'USER_ID',
'X-User-Name': 'S3_USER',
'X-User-Domain-Id': 'default',
'X-User-Domain-Name': 'Default',
'X-Tenant-Id': 'PROJECT_ID',
'X-Tenant-Name': 'PROJECT_NAME',
'X-Project-Id': 'PROJECT_ID',
'X-Project-Name': 'PROJECT_NAME',
'X-Project-Domain-Id': 'PROJECT_DOMAIN_ID',
'X-Project-Domain-Name': 'PROJECT_DOMAIN_NAME',
}
for header, value in expected_headers.items():
self.assertIn(header, req.headers)
self.assertEqual(value, req.headers[header])
# WSGI wants native strings for headers
self.assertIsInstance(req.headers[header], str)
self.assertNotIn('X-Auth-Token', req.headers)
self.assertEqual(1, self.middleware._app.calls)
def test_authorized(self):
req = Request.blank('/v1/AUTH_cfa/c/o')
req.environ['swift3.auth_details'] = {
'access_key': u'access',
'signature': u'signature',
'string_to_sign': u'token',
}
req.get_response(self.middleware)
self._assert_authorized(req)
def test_authorized_bytes(self):
req = Request.blank('/v1/AUTH_cfa/c/o')
req.environ['swift3.auth_details'] = {
'access_key': b'access',
'signature': b'signature',
'string_to_sign': b'token',
}
req.get_response(self.middleware)
self._assert_authorized(req)
def test_authorized_http(self):
protocol = 'http'
host = 'fakehost'
port = 35357
self.requests_mock.post(
'%s://%s:%s/v2.0/s3tokens' % (protocol, host, port),
status_code=201, json=GOOD_RESPONSE_V3)
self.middleware = (
s3_token.filter_factory({'auth_protocol': 'http',
'auth_host': host,
'auth_port': port})(self.app))
req = Request.blank('/v1/AUTH_cfa/c/o')
req.environ['swift3.auth_details'] = {
'access_key': u'access',
'signature': u'signature',
'string_to_sign': u'token',
}
req.get_response(self.middleware)
self._assert_authorized(req)
def test_authorized_trailing_slash(self):
self.middleware = s3_token.filter_factory({
'auth_uri': self.TEST_AUTH_URI + '/'})(self.app)
req = Request.blank('/v1/AUTH_cfa/c/o')
req.environ['swift3.auth_details'] = {
'access_key': u'access',
'signature': u'signature',
'string_to_sign': u'token',
}
req.get_response(self.middleware)
self._assert_authorized(req)
def test_authorization_nova_toconnect(self):
req = Request.blank('/v1/AUTH_swiftint/c/o')
req.environ['swift3.auth_details'] = {
'access_key': u'access:FORCED_TENANT_ID',
'signature': u'signature',
'string_to_sign': u'token',
}
req.get_response(self.middleware)
self._assert_authorized(req, account_path='/v1/AUTH_FORCED_TENANT_ID/')
def _test_bad_reply_missing_parts(self, *parts):
resp = copy.deepcopy(GOOD_RESPONSE_V3)
part_dict = resp
for part in parts[:-1]:
part_dict = part_dict[part]
del part_dict[parts[-1]]
self.requests_mock.post(self.TEST_URL,
status_code=201,
text=json.dumps(resp))
req = Request.blank('/v1/AUTH_cfa/c/o')
req.environ['swift3.auth_details'] = {
'access_key': u'access',
'signature': u'signature',
'string_to_sign': u'token',
}
resp = req.get_response(self.middleware)
s3_invalid_resp = self.middleware._deny_request('InvalidURI')
self.assertEqual(resp.body, s3_invalid_resp.body)
self.assertEqual(
resp.status_int, # pylint: disable-msg=E1101
s3_invalid_resp.status_int) # pylint: disable-msg=E1101
self.assertEqual(0, self.middleware._app.calls)
def test_bad_reply_missing_parts(self):
self._test_bad_reply_missing_parts('token', 'user', 'id')
self._test_bad_reply_missing_parts('token', 'user', 'name')
self._test_bad_reply_missing_parts('token', 'user', 'domain', 'id')
self._test_bad_reply_missing_parts('token', 'user', 'domain', 'name')
self._test_bad_reply_missing_parts('token', 'user', 'domain')
self._test_bad_reply_missing_parts('token', 'user')
self._test_bad_reply_missing_parts('token', 'project', 'id')
self._test_bad_reply_missing_parts('token', 'project', 'name')
self._test_bad_reply_missing_parts('token', 'project', 'domain', 'id')
self._test_bad_reply_missing_parts('token', 'project', 'domain',
'name')
self._test_bad_reply_missing_parts('token', 'project', 'domain')
self._test_bad_reply_missing_parts('token', 'project')
self._test_bad_reply_missing_parts('token', 'roles')