Remove keystoneclient.middleware

The code has been moved to the new keystonemiddleware project and
keystone.middleware was deprecated since Juno. It's time to drop it in
Mitaka.

Remove the directory keystoneclient/middleware/.

Remove test_auth_token_middleware.py, test_memcache_crypt.py and
test_s3_token_middleware.py in keystoneclient/tests/unit/.

Remove the create_middleware_cert shell function from
examples/pki/gen_pki.sh. And remove the call from
examples/pki/run_all.sh.

Remove netaddr, pycrypto and WebOb test dependencies, only needed to
test the removed middleware.

Closes-Bug: #1449066
Change-Id: Iedd6887dcde62177d37e1e1988ed72bcb59c05f6
This commit is contained in:
Victor Stinner 2015-04-27 10:37:01 +02:00 committed by Steve Martinelli
parent 7c1a67aa60
commit 646350c1d6
12 changed files with 10 additions and 4429 deletions

View File

@ -191,11 +191,6 @@ function issue_certs {
check_error $?
}
function create_middleware_cert {
cp $CERTS_DIR/ssl_cert.pem $CERTS_DIR/middleware.pem
cat $PRIVATE_DIR/ssl_key.pem >> $CERTS_DIR/middleware.pem
}
function check_openssl {
echo 'Checking openssl availability ...'
which openssl

View File

@ -26,6 +26,5 @@ generate_ca
ssl_cert_req
cms_signing_cert_req
issue_certs
create_middleware_cert
gen_sample_cms
cleanup

File diff suppressed because it is too large Load Diff

View File

@ -1,209 +0,0 @@
# Copyright 2010-2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Utilities for memcache encryption and integrity check.
Data should be serialized before entering these functions. Encryption
has a dependency on the pycrypto. If pycrypto is not available,
CryptoUnavailableError will be raised.
This module will not be called unless signing or encryption is enabled
in the config. It will always validate signatures, and will decrypt
data if encryption is enabled. It is not valid to mix protection
modes.
"""
import base64
import functools
import hashlib
import hmac
import math
import os
import sys
import six
# make sure pycrypto is available
try:
from Crypto.Cipher import AES
except ImportError:
AES = None
HASH_FUNCTION = hashlib.sha384
DIGEST_LENGTH = HASH_FUNCTION().digest_size
DIGEST_SPLIT = DIGEST_LENGTH // 3
DIGEST_LENGTH_B64 = 4 * int(math.ceil(DIGEST_LENGTH / 3.0))
class InvalidMacError(Exception):
"""raise when unable to verify MACed data.
This usually indicates that data had been expectedly modified in memcache.
"""
pass
class DecryptError(Exception):
"""raise when unable to decrypt encrypted data.
"""
pass
class CryptoUnavailableError(Exception):
"""raise when Python Crypto module is not available.
"""
pass
def assert_crypto_availability(f):
"""Ensure Crypto module is available."""
@functools.wraps(f)
def wrapper(*args, **kwds):
if AES is None:
raise CryptoUnavailableError()
return f(*args, **kwds)
return wrapper
if sys.version_info >= (3, 3):
constant_time_compare = hmac.compare_digest
else:
def constant_time_compare(first, second):
"""Returns True if both string inputs are equal, otherwise False.
This function should take a constant amount of time regardless of
how many characters in the strings match.
"""
if len(first) != len(second):
return False
result = 0
if six.PY3 and isinstance(first, bytes) and isinstance(second, bytes):
for x, y in zip(first, second):
result |= x ^ y
else:
for x, y in zip(first, second):
result |= ord(x) ^ ord(y)
return result == 0
def derive_keys(token, secret, strategy):
"""Derives keys for MAC and ENCRYPTION from the user-provided
secret. The resulting keys should be passed to the protect and
unprotect functions.
As suggested by NIST Special Publication 800-108, this uses the
first 128 bits from the sha384 KDF for the obscured cache key
value, the second 128 bits for the message authentication key and
the remaining 128 bits for the encryption key.
This approach is faster than computing a separate hmac as the KDF
for each desired key.
"""
digest = hmac.new(secret, token + strategy, HASH_FUNCTION).digest()
return {'CACHE_KEY': digest[:DIGEST_SPLIT],
'MAC': digest[DIGEST_SPLIT: 2 * DIGEST_SPLIT],
'ENCRYPTION': digest[2 * DIGEST_SPLIT:],
'strategy': strategy}
def sign_data(key, data):
"""Sign the data using the defined function and the derived key."""
mac = hmac.new(key, data, HASH_FUNCTION).digest()
return base64.b64encode(mac)
@assert_crypto_availability
def encrypt_data(key, data):
"""Encrypt the data with the given secret key.
Padding is n bytes of the value n, where 1 <= n <= blocksize.
"""
iv = os.urandom(16)
cipher = AES.new(key, AES.MODE_CBC, iv)
padding = 16 - len(data) % 16
return iv + cipher.encrypt(data + six.int2byte(padding) * padding)
@assert_crypto_availability
def decrypt_data(key, data):
"""Decrypt the data with the given secret key."""
iv = data[:16]
cipher = AES.new(key, AES.MODE_CBC, iv)
try:
result = cipher.decrypt(data[16:])
except Exception:
raise DecryptError('Encrypted data appears to be corrupted.')
# Strip the last n padding bytes where n is the last value in
# the plaintext
return result[:-1 * six.byte2int([result[-1]])]
def protect_data(keys, data):
"""Given keys and serialized data, returns an appropriately
protected string suitable for storage in the cache.
"""
if keys['strategy'] == b'ENCRYPT':
data = encrypt_data(keys['ENCRYPTION'], data)
encoded_data = base64.b64encode(data)
signature = sign_data(keys['MAC'], encoded_data)
return signature + encoded_data
def unprotect_data(keys, signed_data):
"""Given keys and cached string data, verifies the signature,
decrypts if necessary, and returns the original serialized data.
"""
# cache backends return None when no data is found. We don't mind
# that this particular special value is unsigned.
if signed_data is None:
return None
# First we calculate the signature
provided_mac = signed_data[:DIGEST_LENGTH_B64]
calculated_mac = sign_data(
keys['MAC'],
signed_data[DIGEST_LENGTH_B64:])
# Then verify that it matches the provided value
if not constant_time_compare(provided_mac, calculated_mac):
raise InvalidMacError('Invalid MAC; data appears to be corrupted.')
data = base64.b64decode(signed_data[DIGEST_LENGTH_B64:])
# then if necessary decrypt the data
if keys['strategy'] == b'ENCRYPT':
data = decrypt_data(keys['ENCRYPTION'], data)
return data
def get_cache_key(keys):
"""Given keys generated by derive_keys(), returns a base64
encoded value suitable for use as a cache key in memcached.
"""
return base64.b64encode(keys['CACHE_KEY'])

View File

@ -1,274 +0,0 @@
# Copyright 2012 OpenStack Foundation
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2011,2012 Akira YOSHIYAMA <akirayoshiyama@gmail.com>
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# This source code is based ./auth_token.py and ./ec2_token.py.
# See them for their copyright.
"""
S3 TOKEN MIDDLEWARE
.. warning::
This module is DEPRECATED and may be removed in the 2.0.0 release. The
s3_token middleware has been moved to the `keystonemiddleware repository
<http://docs.openstack.org/developer/keystonemiddleware/>`_.
This WSGI component:
* Get a request from the swift3 middleware with an S3 Authorization
access key.
* Validate s3 token in Keystone.
* Transform the account name to AUTH_%(tenant_name).
"""
import logging
from oslo_serialization import jsonutils
from oslo_utils import strutils
import requests
import six
from six.moves import urllib
import webob
PROTOCOL_NAME = 'S3 Token Authentication'
# TODO(kun): remove it after oslo merge this.
def split_path(path, minsegs=1, maxsegs=None, rest_with_last=False):
"""Validate and split the given HTTP request path.
**Examples**::
['a'] = split_path('/a')
['a', None] = split_path('/a', 1, 2)
['a', 'c'] = split_path('/a/c', 1, 2)
['a', 'c', 'o/r'] = split_path('/a/c/o/r', 1, 3, True)
:param path: HTTP Request path to be split
:param minsegs: Minimum number of segments to be extracted
:param maxsegs: Maximum number of segments to be extracted
:param rest_with_last: If True, trailing data will be returned as part
of last segment. If False, and there is
trailing data, raises ValueError.
:returns: list of segments with a length of maxsegs (non-existent
segments will return as None)
:raises: ValueError if given an invalid path
"""
if not maxsegs:
maxsegs = minsegs
if minsegs > maxsegs:
raise ValueError('minsegs > maxsegs: %d > %d' % (minsegs, maxsegs))
if rest_with_last:
segs = path.split('/', maxsegs)
minsegs += 1
maxsegs += 1
count = len(segs)
if (segs[0] or count < minsegs or count > maxsegs or
'' in segs[1:minsegs]):
raise ValueError('Invalid path: %s' % urllib.parse.quote(path))
else:
minsegs += 1
maxsegs += 1
segs = path.split('/', maxsegs)
count = len(segs)
if (segs[0] or count < minsegs or count > maxsegs + 1 or
'' in segs[1:minsegs] or
(count == maxsegs + 1 and segs[maxsegs])):
raise ValueError('Invalid path: %s' % urllib.parse.quote(path))
segs = segs[1:maxsegs]
segs.extend([None] * (maxsegs - 1 - len(segs)))
return segs
class ServiceError(Exception):
pass
class S3Token(object):
"""Auth Middleware that handles S3 authenticating client calls."""
def __init__(self, app, conf):
"""Common initialization code."""
self.app = app
self.logger = logging.getLogger(conf.get('log_name', __name__))
self.logger.debug('Starting the %s component', PROTOCOL_NAME)
self.logger.warning(
'This middleware module is deprecated as of v0.11.0 in favor of '
'keystonemiddleware.s3_token - please update your WSGI pipeline '
'to reference the new middleware package.')
self.reseller_prefix = conf.get('reseller_prefix', 'AUTH_')
# where to find the auth service (we use this to validate tokens)
auth_host = conf.get('auth_host')
auth_port = int(conf.get('auth_port', 35357))
auth_protocol = conf.get('auth_protocol', 'https')
self.request_uri = '%s://%s:%s' % (auth_protocol, auth_host, auth_port)
# SSL
insecure = strutils.bool_from_string(conf.get('insecure', False))
cert_file = conf.get('certfile')
key_file = conf.get('keyfile')
if insecure:
self.verify = False
elif cert_file and key_file:
self.verify = (cert_file, key_file)
elif cert_file:
self.verify = cert_file
else:
self.verify = None
def deny_request(self, code):
error_table = {
'AccessDenied': (401, 'Access denied'),
'InvalidURI': (400, 'Could not parse the specified URI'),
}
resp = webob.Response(content_type='text/xml')
resp.status = error_table[code][0]
error_msg = ('<?xml version="1.0" encoding="UTF-8"?>\r\n'
'<Error>\r\n <Code>%s</Code>\r\n '
'<Message>%s</Message>\r\n</Error>\r\n' %
(code, error_table[code][1]))
if six.PY3:
error_msg = error_msg.encode()
resp.body = error_msg
return resp
def _json_request(self, creds_json):
headers = {'Content-Type': 'application/json'}
try:
response = requests.post('%s/v2.0/s3tokens' % self.request_uri,
headers=headers, data=creds_json,
verify=self.verify)
except requests.exceptions.RequestException as e:
self.logger.info('HTTP connection exception: %s', e)
resp = self.deny_request('InvalidURI')
raise ServiceError(resp)
if response.status_code < 200 or response.status_code >= 300:
self.logger.debug('Keystone reply error: status=%s reason=%s',
response.status_code, response.reason)
resp = self.deny_request('AccessDenied')
raise ServiceError(resp)
return response
def __call__(self, environ, start_response):
"""Handle incoming request. authenticate and send downstream."""
req = webob.Request(environ)
self.logger.debug('Calling S3Token middleware.')
try:
parts = split_path(req.path, 1, 4, True)
version, account, container, obj = parts
except ValueError:
msg = 'Not a path query, skipping.'
self.logger.debug(msg)
return self.app(environ, start_response)
# Read request signature and access id.
if 'Authorization' not in req.headers:
msg = 'No Authorization header. skipping.'
self.logger.debug(msg)
return self.app(environ, start_response)
token = req.headers.get('X-Auth-Token',
req.headers.get('X-Storage-Token'))
if not token:
msg = 'You did not specify an auth or a storage token. skipping.'
self.logger.debug(msg)
return self.app(environ, start_response)
auth_header = req.headers['Authorization']
try:
access, signature = auth_header.split(' ')[-1].rsplit(':', 1)
except ValueError:
msg = 'You have an invalid Authorization header: %s'
self.logger.debug(msg, auth_header)
return self.deny_request('InvalidURI')(environ, start_response)
# NOTE(chmou): This is to handle the special case with nova
# when we have the option s3_affix_tenant. We will force it to
# connect to another account than the one
# authenticated. Before people start getting worried about
# security, I should point that we are connecting with
# username/token specified by the user but instead of
# connecting to its own account we will force it to go to an
# another account. In a normal scenario if that user don't
# have the reseller right it will just fail but since the
# reseller account can connect to every account it is allowed
# by the swift_auth middleware.
force_tenant = None
if ':' in access:
access, force_tenant = access.split(':')
# Authenticate request.
creds = {'credentials': {'access': access,
'token': token,
'signature': signature}}
creds_json = jsonutils.dumps(creds)
self.logger.debug('Connecting to Keystone sending this JSON: %s',
creds_json)
# NOTE(vish): We could save a call to keystone by having
# keystone return token, tenant, user, and roles
# from this call.
#
# NOTE(chmou): We still have the same problem we would need to
# change token_auth to detect if we already
# identified and not doing a second query and just
# pass it through to swiftauth in this case.
try:
resp = self._json_request(creds_json)
except ServiceError as e:
resp = e.args[0]
msg = 'Received error, exiting middleware with error: %s'
self.logger.debug(msg, resp.status_code)
return resp(environ, start_response)
self.logger.debug('Keystone Reply: Status: %d, Output: %s',
resp.status_code, resp.content)
try:
identity_info = resp.json()
token_id = str(identity_info['access']['token']['id'])
tenant = identity_info['access']['token']['tenant']
except (ValueError, KeyError):
error = 'Error on keystone reply: %d %s'
self.logger.debug(error, resp.status_code, resp.content)
return self.deny_request('InvalidURI')(environ, start_response)
req.headers['X-Auth-Token'] = token_id
tenant_to_connect = force_tenant or tenant['id']
self.logger.debug('Connecting with tenant: %s', tenant_to_connect)
new_tenant_name = '%s%s' % (self.reseller_prefix, tenant_to_connect)
environ['PATH_INFO'] = environ['PATH_INFO'].replace(account,
new_tenant_name)
return self.app(environ, start_response)
def filter_factory(global_conf, **local_conf):
"""Returns a WSGI filter app for use with paste.deploy."""
conf = global_conf.copy()
conf.update(local_conf)
def auth_filter(app):
return S3Token(app, conf)
return auth_filter

File diff suppressed because it is too large Load Diff

View File

@ -1,102 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import six
import testtools
from keystoneclient.middleware import memcache_crypt
from keystoneclient.tests.unit import client_fixtures
class MemcacheCryptPositiveTests(testtools.TestCase):
def setUp(self):
super(MemcacheCryptPositiveTests, self).setUp()
self.useFixture(client_fixtures.Deprecations())
def _setup_keys(self, strategy):
return memcache_crypt.derive_keys(b'token', b'secret', strategy)
def test_constant_time_compare(self):
# make sure it works as a compare, the "constant time" aspect
# isn't appropriate to test in unittests
ctc = memcache_crypt.constant_time_compare
self.assertTrue(ctc('abcd', 'abcd'))
self.assertTrue(ctc('', ''))
self.assertFalse(ctc('abcd', 'efgh'))
self.assertFalse(ctc('abc', 'abcd'))
self.assertFalse(ctc('abc', 'abc\x00'))
self.assertFalse(ctc('', 'abc'))
# For Python 3, we want to test these functions with both str and bytes
# as input.
if six.PY3:
self.assertTrue(ctc(b'abcd', b'abcd'))
self.assertTrue(ctc(b'', b''))
self.assertFalse(ctc(b'abcd', b'efgh'))
self.assertFalse(ctc(b'abc', b'abcd'))
self.assertFalse(ctc(b'abc', b'abc\x00'))
self.assertFalse(ctc(b'', b'abc'))
def test_derive_keys(self):
keys = self._setup_keys(b'strategy')
self.assertEqual(len(keys['ENCRYPTION']),
len(keys['CACHE_KEY']))
self.assertEqual(len(keys['CACHE_KEY']),
len(keys['MAC']))
self.assertNotEqual(keys['ENCRYPTION'],
keys['MAC'])
self.assertIn('strategy', keys)
def test_key_strategy_diff(self):
k1 = self._setup_keys(b'MAC')
k2 = self._setup_keys(b'ENCRYPT')
self.assertNotEqual(k1, k2)
def test_sign_data(self):
keys = self._setup_keys(b'MAC')
sig = memcache_crypt.sign_data(keys['MAC'], b'data')
self.assertEqual(len(sig), memcache_crypt.DIGEST_LENGTH_B64)
def test_encryption(self):
keys = self._setup_keys(b'ENCRYPT')
# what you put in is what you get out
for data in [b'data', b'1234567890123456', b'\x00\xFF' * 13
] + [six.int2byte(x % 256) * x for x in range(768)]:
crypt = memcache_crypt.encrypt_data(keys['ENCRYPTION'], data)
decrypt = memcache_crypt.decrypt_data(keys['ENCRYPTION'], crypt)
self.assertEqual(data, decrypt)
self.assertRaises(memcache_crypt.DecryptError,
memcache_crypt.decrypt_data,
keys['ENCRYPTION'], crypt[:-1])
def test_protect_wrappers(self):
data = b'My Pretty Little Data'
for strategy in [b'MAC', b'ENCRYPT']:
keys = self._setup_keys(strategy)
protected = memcache_crypt.protect_data(keys, data)
self.assertNotEqual(protected, data)
if strategy == b'ENCRYPT':
self.assertNotIn(data, protected)
unprotected = memcache_crypt.unprotect_data(keys, protected)
self.assertEqual(data, unprotected)
self.assertRaises(memcache_crypt.InvalidMacError,
memcache_crypt.unprotect_data,
keys, protected[:-1])
self.assertIsNone(memcache_crypt.unprotect_data(keys, None))
def test_no_pycrypt(self):
aes = memcache_crypt.AES
memcache_crypt.AES = None
self.assertRaises(memcache_crypt.CryptoUnavailableError,
memcache_crypt.encrypt_data, 'token', 'secret',
'data')
memcache_crypt.AES = aes

View File

@ -1,265 +0,0 @@
# Copyright 2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_serialization import jsonutils
import requests
import six
import testtools
import webob
from keystoneclient.middleware import s3_token
from keystoneclient.tests.unit import client_fixtures
from keystoneclient.tests.unit import utils
GOOD_RESPONSE = {'access': {'token': {'id': 'TOKEN_ID',
'tenant': {'id': 'TENANT_ID'}}}}
class FakeApp(object):
"""This represents a WSGI app protected by the auth_token middleware."""
def __call__(self, env, start_response):
resp = webob.Response()
resp.environ = env
return resp(env, start_response)
class S3TokenMiddlewareTestBase(utils.TestCase):
TEST_PROTOCOL = 'https'
TEST_HOST = 'fakehost'
TEST_PORT = 35357
TEST_URL = '%s://%s:%d/v2.0/s3tokens' % (TEST_PROTOCOL,
TEST_HOST,
TEST_PORT)
def setUp(self):
super(S3TokenMiddlewareTestBase, self).setUp()
self.useFixture(client_fixtures.Deprecations())
self.conf = {
'auth_host': self.TEST_HOST,
'auth_port': self.TEST_PORT,
'auth_protocol': self.TEST_PROTOCOL,
}
def start_fake_response(self, status, headers):
self.response_status = int(status.split(' ', 1)[0])
self.response_headers = dict(headers)
class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase):
def setUp(self):
super(S3TokenMiddlewareTestGood, self).setUp()
self.middleware = s3_token.S3Token(FakeApp(), self.conf)
self.requests_mock.post(self.TEST_URL,
status_code=201,
json=GOOD_RESPONSE)
# Ignore the request and pass to the next middleware in the
# pipeline if no path has been specified.
def test_no_path_request(self):
req = webob.Request.blank('/')
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 200)
# Ignore the request and pass to the next middleware in the
# pipeline if no Authorization header has been specified
def test_without_authorization(self):
req = webob.Request.blank('/v1/AUTH_cfa/c/o')
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 200)
def test_without_auth_storage_token(self):
req = webob.Request.blank('/v1/AUTH_cfa/c/o')
req.headers['Authorization'] = 'badboy'
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 200)
def test_authorized(self):
req = webob.Request.blank('/v1/AUTH_cfa/c/o')
req.headers['Authorization'] = 'access:signature'
req.headers['X-Storage-Token'] = 'token'
req.get_response(self.middleware)
self.assertTrue(req.path.startswith('/v1/AUTH_TENANT_ID'))
self.assertEqual(req.headers['X-Auth-Token'], 'TOKEN_ID')
def test_authorized_http(self):
TEST_URL = 'http://%s:%d/v2.0/s3tokens' % (self.TEST_HOST,
self.TEST_PORT)
self.requests_mock.post(TEST_URL, status_code=201, json=GOOD_RESPONSE)
self.middleware = (
s3_token.filter_factory({'auth_protocol': 'http',
'auth_host': self.TEST_HOST,
'auth_port': self.TEST_PORT})(FakeApp()))
req = webob.Request.blank('/v1/AUTH_cfa/c/o')
req.headers['Authorization'] = 'access:signature'
req.headers['X-Storage-Token'] = 'token'
req.get_response(self.middleware)
self.assertTrue(req.path.startswith('/v1/AUTH_TENANT_ID'))
self.assertEqual(req.headers['X-Auth-Token'], 'TOKEN_ID')
def test_authorization_nova_toconnect(self):
req = webob.Request.blank('/v1/AUTH_swiftint/c/o')
req.headers['Authorization'] = 'access:FORCED_TENANT_ID:signature'
req.headers['X-Storage-Token'] = 'token'
req.get_response(self.middleware)
path = req.environ['PATH_INFO']
self.assertTrue(path.startswith('/v1/AUTH_FORCED_TENANT_ID'))
@mock.patch.object(requests, 'post')
def test_insecure(self, MOCK_REQUEST):
self.middleware = (
s3_token.filter_factory({'insecure': 'True'})(FakeApp()))
text_return_value = jsonutils.dumps(GOOD_RESPONSE)
if six.PY3:
text_return_value = text_return_value.encode()
MOCK_REQUEST.return_value = utils.TestResponse({
'status_code': 201,
'text': text_return_value})
req = webob.Request.blank('/v1/AUTH_cfa/c/o')
req.headers['Authorization'] = 'access:signature'
req.headers['X-Storage-Token'] = 'token'
req.get_response(self.middleware)
self.assertTrue(MOCK_REQUEST.called)
mock_args, mock_kwargs = MOCK_REQUEST.call_args
self.assertIs(mock_kwargs['verify'], False)
def test_insecure_option(self):
# insecure is passed as a string.
# Some non-secure values.
true_values = ['true', 'True', '1', 'yes']
for val in true_values:
config = {'insecure': val, 'certfile': 'false_ind'}
middleware = s3_token.filter_factory(config)(FakeApp())
self.assertIs(False, middleware.verify)
# Some "secure" values, including unexpected value.
false_values = ['false', 'False', '0', 'no', 'someweirdvalue']
for val in false_values:
config = {'insecure': val, 'certfile': 'false_ind'}
middleware = s3_token.filter_factory(config)(FakeApp())
self.assertEqual('false_ind', middleware.verify)
# Default is secure.
config = {'certfile': 'false_ind'}
middleware = s3_token.filter_factory(config)(FakeApp())
self.assertIs('false_ind', middleware.verify)
class S3TokenMiddlewareTestBad(S3TokenMiddlewareTestBase):
def setUp(self):
super(S3TokenMiddlewareTestBad, self).setUp()
self.middleware = s3_token.S3Token(FakeApp(), self.conf)
def test_unauthorized_token(self):
ret = {"error":
{"message": "EC2 access key not found.",
"code": 401,
"title": "Unauthorized"}}
self.requests_mock.post(self.TEST_URL, status_code=403, json=ret)
req = webob.Request.blank('/v1/AUTH_cfa/c/o')
req.headers['Authorization'] = 'access:signature'
req.headers['X-Storage-Token'] = 'token'
resp = req.get_response(self.middleware)
s3_denied_req = self.middleware.deny_request('AccessDenied')
self.assertEqual(resp.body, s3_denied_req.body)
self.assertEqual(resp.status_int, s3_denied_req.status_int)
def test_bogus_authorization(self):
req = webob.Request.blank('/v1/AUTH_cfa/c/o')
req.headers['Authorization'] = 'badboy'
req.headers['X-Storage-Token'] = 'token'
resp = req.get_response(self.middleware)
self.assertEqual(resp.status_int, 400)
s3_invalid_req = self.middleware.deny_request('InvalidURI')
self.assertEqual(resp.body, s3_invalid_req.body)
self.assertEqual(resp.status_int, s3_invalid_req.status_int)
def test_fail_to_connect_to_keystone(self):
with mock.patch.object(self.middleware, '_json_request') as o:
s3_invalid_req = self.middleware.deny_request('InvalidURI')
o.side_effect = s3_token.ServiceError(s3_invalid_req)
req = webob.Request.blank('/v1/AUTH_cfa/c/o')
req.headers['Authorization'] = 'access:signature'
req.headers['X-Storage-Token'] = 'token'
resp = req.get_response(self.middleware)
self.assertEqual(resp.body, s3_invalid_req.body)
self.assertEqual(resp.status_int, s3_invalid_req.status_int)
def test_bad_reply(self):
self.requests_mock.post(self.TEST_URL,
status_code=201,
text="<badreply>")
req = webob.Request.blank('/v1/AUTH_cfa/c/o')
req.headers['Authorization'] = 'access:signature'
req.headers['X-Storage-Token'] = 'token'
resp = req.get_response(self.middleware)
s3_invalid_req = self.middleware.deny_request('InvalidURI')
self.assertEqual(resp.body, s3_invalid_req.body)
self.assertEqual(resp.status_int, s3_invalid_req.status_int)
class S3TokenMiddlewareTestUtil(testtools.TestCase):
def setUp(self):
super(S3TokenMiddlewareTestUtil, self).setUp()
self.useFixture(client_fixtures.Deprecations())
def test_split_path_failed(self):
self.assertRaises(ValueError, s3_token.split_path, '')
self.assertRaises(ValueError, s3_token.split_path, '/')
self.assertRaises(ValueError, s3_token.split_path, '//')
self.assertRaises(ValueError, s3_token.split_path, '//a')
self.assertRaises(ValueError, s3_token.split_path, '/a/c')
self.assertRaises(ValueError, s3_token.split_path, '//c')
self.assertRaises(ValueError, s3_token.split_path, '/a/c/')
self.assertRaises(ValueError, s3_token.split_path, '/a//')
self.assertRaises(ValueError, s3_token.split_path, '/a', 2)
self.assertRaises(ValueError, s3_token.split_path, '/a', 2, 3)
self.assertRaises(ValueError, s3_token.split_path, '/a', 2, 3, True)
self.assertRaises(ValueError, s3_token.split_path, '/a/c/o/r', 3, 3)
self.assertRaises(ValueError, s3_token.split_path, '/a', 5, 4)
def test_split_path_success(self):
self.assertEqual(s3_token.split_path('/a'), ['a'])
self.assertEqual(s3_token.split_path('/a/'), ['a'])
self.assertEqual(s3_token.split_path('/a/c', 2), ['a', 'c'])
self.assertEqual(s3_token.split_path('/a/c/o', 3), ['a', 'c', 'o'])
self.assertEqual(s3_token.split_path('/a/c/o/r', 3, 3, True),
['a', 'c', 'o/r'])
self.assertEqual(s3_token.split_path('/a/c', 2, 3, True),
['a', 'c', None])
self.assertEqual(s3_token.split_path('/a/c/', 2), ['a', 'c'])
self.assertEqual(s3_token.split_path('/a/c/', 2, 3), ['a', 'c', ''])
def test_split_path_invalid_path(self):
try:
s3_token.split_path('o\nn e', 2)
except ValueError as err:
self.assertEqual(str(err), 'Invalid path: o%0An%20e')
try:
s3_token.split_path('o\nn e', 2, 3, True)
except ValueError as err:
self.assertEqual(str(err), 'Invalid path: o%0An%20e')

View File

@ -0,0 +1,10 @@
---
prelude: >
keystoneclient.middleware has been removed.
critical:
- >
[`bug 1449066 <https://bugs.launchpad.net/python-keystoneclient/+bug/1449066>`_]
The module `keystoneclient.middleware` has been removed in favor of the
keystonemiddleware library. The aforementioned module has been depreacted
since the v0.10.0 of keystoneclient which was inclued in the Juno release
of OpenStack.

View File

@ -9,7 +9,6 @@ Babel>=1.3
iso8601>=0.1.9
debtcollector>=0.3.0 # Apache-2.0
keystoneauth1>=2.1.0
netaddr!=0.7.16,>=0.7.12
oslo.config>=2.7.0 # Apache-2.0
oslo.i18n>=1.5.0 # Apache-2.0
oslo.serialization>=1.10.0 # Apache-2.0

View File

@ -13,7 +13,6 @@ mock>=1.2
oauthlib>=0.6
oslosphinx!=3.4.0,>=2.5.0 # Apache-2.0
oslotest>=1.10.0 # Apache-2.0
pycrypto>=2.6
reno>=0.1.1 # Apache2
requests-mock>=0.7.0 # Apache-2.0
sphinx!=1.2.0,!=1.3b1,<1.3,>=1.1.2