Merge "Add check_signature function to swift3.auth_details"

This commit is contained in:
Jenkins 2017-03-30 02:55:34 +00:00 committed by Gerrit Code Review
commit ad6ff91ca4
3 changed files with 136 additions and 27 deletions

View File

@ -13,8 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
from email.header import Header
from hashlib import sha256, md5
from hashlib import sha1, sha256, md5
import hmac
import re
import six
import string
@ -70,6 +72,7 @@ ALLOWED_SUB_RESOURCES = sorted([
MAX_32BIT_INT = 2147483647
SIGV2_TIMESTAMP_FORMAT = '%Y-%m-%dT%H:%M:%S'
SIGV4_X_AMZ_DATE_FORMAT = '%Y%m%dT%H%M%SZ'
SERVICE = 's3' # useful for mocking out in tests
def _header_strip(value):
@ -108,6 +111,16 @@ class SigV4Mixin(object):
A request class mixin to provide S3 signature v4 functionality
"""
def check_signature(self, secret):
user_signature = self.signature
derived_secret = 'AWS4' + secret
for scope_piece in self.scope:
derived_secret = hmac.new(
derived_secret, scope_piece, sha256).digest()
valid_signature = hmac.new(
derived_secret, self.string_to_sign, sha256).hexdigest()
return user_signature == valid_signature
@property
def _is_query_auth(self):
return 'X-Amz-Credential' in self.params
@ -336,17 +349,19 @@ class SigV4Mixin(object):
cr.append(hashed_payload)
return '\n'.join(cr).encode('utf-8')
@property
def scope(self):
return [self.timestamp.amz_date_format.split('T')[0],
CONF.location, SERVICE, 'aws4_request']
def _string_to_sign(self):
"""
Create 'StringToSign' value in Amazon terminology for v4.
"""
scope = (self.timestamp.amz_date_format.split('T')[0] +
'/' + CONF.location + '/s3/aws4_request')
return ('AWS4-HMAC-SHA256' + '\n'
+ self.timestamp.amz_date_format + '\n'
+ scope + '\n'
+ sha256(self._canonical_request()).hexdigest())
return '\n'.join(['AWS4-HMAC-SHA256',
self.timestamp.amz_date_format,
'/'.join(self.scope),
sha256(self._canonical_request()).hexdigest()])
def get_request_class(env):
@ -381,14 +396,17 @@ class Request(swob.Request):
# NOTE: app is not used by this class, need for compatibility of S3acl
swob.Request.__init__(self, env)
self._timestamp = None
self.access_key, signature = self._parse_auth_info()
self.access_key, self.signature = self._parse_auth_info()
self.bucket_in_host = self._parse_host()
self.container_name, self.object_name = self._parse_uri()
self._validate_headers()
# Lock in string-to-sign now, before we start messing with query params
self.string_to_sign = self._string_to_sign()
self.environ['swift3.auth_details'] = {
'access_key': self.access_key,
'signature': signature,
'string_to_sign': self._string_to_sign(),
'signature': self.signature,
'string_to_sign': self.string_to_sign,
'check_signature': self.check_signature,
}
self.token = None
self.account = None
@ -403,11 +421,17 @@ class Request(swob.Request):
# b626a3ca86e467fc7564eac236b9ee2efd49bdcc, the s3token is in swift3
# repo so probably we need to change s3token to support v4 format.
self.headers['Authorization'] = 'AWS %s:%s' % (
self.access_key, signature)
self.access_key, self.signature)
# Avoids that swift.swob.Response replaces Location header value
# by full URL when absolute path given. See swift.swob for more detail.
self.environ['swift.leave_relative_location'] = True
def check_signature(self, secret):
user_signature = self.signature
valid_signature = base64.b64encode(hmac.new(
secret, self.string_to_sign, sha1).digest()).strip()
return user_signature == valid_signature
@property
def timestamp(self):
"""

View File

@ -18,6 +18,7 @@ from mock import patch, MagicMock
from contextlib import nested
from datetime import datetime
import hashlib
import mock
import requests
import json
import copy
@ -382,14 +383,16 @@ class TestSwift3Middleware(Swift3TestCase):
req.headers['Authorization'] = 'AWS test:tester:hmac'
date_header = self.get_date_header()
req.headers['Date'] = date_header
status, headers, body = self.call_swift3(req)
with mock.patch('swift3.request.Request.check_signature') as mock_cs:
status, headers, body = self.call_swift3(req)
_, _, headers = self.swift.calls_with_headers[-1]
self.assertEqual(req.environ['swift3.auth_details'], {
'access_key': 'test:tester',
'signature': 'hmac',
'string_to_sign': '\n'.join([
'PUT', '', '', date_header,
'/bucket/object?partNumber=1&uploadId=123456789abcdef'])})
'/bucket/object?partNumber=1&uploadId=123456789abcdef']),
'check_signature': mock_cs})
def test_invalid_uri(self):
req = Request.blank('/bucket/invalid\xffname',
@ -712,16 +715,20 @@ class TestSwift3Middleware(Swift3TestCase):
req = SigV4Request(env)
return req
def string_to_sign(path, environ):
return _get_req(path, environ)._string_to_sign()
def canonical_string(path, environ):
return _get_req(path, environ)._canonical_request()
def verify(hash_val, path, environ):
s = string_to_sign(path, environ)
s = s.split('\n')[3]
self.assertEqual(hash_val, s)
# See http://docs.aws.amazon.com/general/latest/gr
# /signature-v4-test-suite.html for where location, service, and
# signing key came from
with patch.object(CONF, 'location', 'us-east-1'), \
patch.object(swift3.request, 'SERVICE', 'host'):
req = _get_req(path, environ)
hash_in_sts = req._string_to_sign().split('\n')[3]
self.assertEqual(hash_val, hash_in_sts)
self.assertTrue(req.check_signature(
'wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY'))
# all next data got from aws4_testsuite from Amazon
# http://docs.aws.amazon.com/general/latest/gr/samples
@ -734,7 +741,9 @@ class TestSwift3Middleware(Swift3TestCase):
'HTTP_AUTHORIZATION': (
'AWS4-HMAC-SHA256 '
'Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, '
'SignedHeaders=date;host, Signature=X'),
'SignedHeaders=date;host, '
'Signature=b27ccfbfa7df52a200ff74193ca6e32d'
'4b48b8856fab7ebf1c595d0670a7e470'),
'HTTP_HOST': 'host.foo.com'}
verify('366b91fb121d72a00f46bbe8d395f53a'
'102b06dfb7e79636515208ed3fa606b1',
@ -746,7 +755,9 @@ class TestSwift3Middleware(Swift3TestCase):
'HTTP_AUTHORIZATION': (
'AWS4-HMAC-SHA256 '
'Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, '
'SignedHeaders=date;host;p, Signature=X'),
'SignedHeaders=date;host;p, '
'Signature=debf546796015d6f6ded8626f5ce9859'
'7c33b47b9164cf6b17b4642036fcb592'),
'HTTP_HOST': 'host.foo.com',
'HTTP_P': 'phfft'}
verify('dddd1902add08da1ac94782b05f9278c'
@ -758,7 +769,9 @@ class TestSwift3Middleware(Swift3TestCase):
'HTTP_AUTHORIZATION': (
'AWS4-HMAC-SHA256 '
'Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, '
'SignedHeaders=date;host, Signature=X'),
'SignedHeaders=date;host, '
'Signature=8d6634c189aa8c75c2e51e106b6b5121'
'bed103fdb351f7d7d4381c738823af74'),
'HTTP_HOST': 'host.foo.com',
'RAW_PATH_INFO': '/%E1%88%B4'}
@ -780,7 +793,9 @@ class TestSwift3Middleware(Swift3TestCase):
'HTTP_AUTHORIZATION': (
'AWS4-HMAC-SHA256 '
'Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, '
'SignedHeaders=date;host, Signature=X'),
'SignedHeaders=date;host, '
'Signature=0dc122f3b28b831ab48ba65cb47300de'
'53fbe91b577fe113edac383730254a3b'),
'HTTP_HOST': 'host.foo.com'}
verify('2f23d14fe13caebf6dfda346285c6d9c'
'14f49eaca8f5ec55c627dd7404f7a727',
@ -792,7 +807,9 @@ class TestSwift3Middleware(Swift3TestCase):
'HTTP_AUTHORIZATION': (
'AWS4-HMAC-SHA256 '
'Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, '
'SignedHeaders=date;host;zoo, Signature=X'),
'SignedHeaders=date;host;zoo, '
'Signature=273313af9d0c265c531e11db70bbd653'
'f3ba074c1009239e8559d3987039cad7'),
'HTTP_HOST': 'host.foo.com',
'HTTP_ZOO': 'ZOOBAR'}
verify('3aae6d8274b8c03e2cc96fc7d6bda4b9'
@ -805,7 +822,9 @@ class TestSwift3Middleware(Swift3TestCase):
'HTTP_AUTHORIZATION': (
'AWS4-HMAC-SHA256 '
'Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, '
'SignedHeaders=date;host;content-type, Signature=X'),
'SignedHeaders=date;host;content-type, '
'Signature=b105eb10c6d318d2294de9d49dd8b031'
'b55e3c3fe139f2e637da70511e9e7b71'),
'HTTP_HOST': 'host.foo.com',
'HTTP_X_AMZ_CONTENT_SHA256':
'3ba8907e7a252327488df390ed517c45'
@ -822,7 +841,9 @@ class TestSwift3Middleware(Swift3TestCase):
'HTTP_AUTHORIZATION': (
'AWS4-HMAC-SHA256 '
'Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, '
'SignedHeaders=date;host;content-type, Signature=X'),
'SignedHeaders=date;host;content-type, '
'Signature=5a15b22cf462f047318703b92e6f4f38'
'884e4a7ab7b1d6426ca46a8bd1c26cbc'),
'HTTP_HOST': 'host.foo.com',
'HTTP_X_AMZ_CONTENT_SHA256':
'3ba8907e7a252327488df390ed517c45'

View File

@ -690,5 +690,69 @@ class TestRequest(Swift3TestCase):
self.assertEqual(uri, '/bucket/obj1')
self.assertEqual(req.environ['PATH_INFO'], '/bucket/obj1')
@patch.object(CONF, 'storage_domain', 's3.amazonaws.com')
@patch.object(S3_Request, '_validate_headers', lambda *a: None)
def test_check_signature_sigv2(self):
# See https://web.archive.org/web/20151226025049/http://
# docs.aws.amazon.com//AmazonS3/latest/dev/RESTAuthentication.html
req = Request.blank('/photos/puppy.jpg', headers={
'Host': 'johnsmith.s3.amazonaws.com',
'Date': 'Tue, 27 Mar 2007 19:36:42 +0000',
'Authorization': ('AWS AKIAIOSFODNN7EXAMPLE:'
'bWq2s1WEIj+Ydj0vQ697zp+IXMU='),
})
sigv2_req = S3_Request(req.environ)
expected_sts = '\n'.join([
'GET',
'',
'',
'Tue, 27 Mar 2007 19:36:42 +0000',
'/johnsmith/photos/puppy.jpg',
])
self.assertEqual(expected_sts, sigv2_req._string_to_sign())
self.assertTrue(sigv2_req.check_signature(
'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY'))
req = Request.blank('/photos/puppy.jpg', method='PUT', headers={
'Content-Type': 'image/jpeg',
'Content-Length': '94328',
'Host': 'johnsmith.s3.amazonaws.com',
'Date': 'Tue, 27 Mar 2007 21:15:45 +0000',
'Authorization': ('AWS AKIAIOSFODNN7EXAMPLE:'
'MyyxeRY7whkBe+bq8fHCL/2kKUg='),
})
sigv2_req = S3_Request(req.environ)
expected_sts = '\n'.join([
'PUT',
'',
'image/jpeg',
'Tue, 27 Mar 2007 21:15:45 +0000',
'/johnsmith/photos/puppy.jpg',
])
self.assertEqual(expected_sts, sigv2_req._string_to_sign())
self.assertTrue(sigv2_req.check_signature(
'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY'))
req = Request.blank(
'/?prefix=photos&max-keys=50&marker=puppy',
headers={
'User-Agent': 'Mozilla/5.0',
'Host': 'johnsmith.s3.amazonaws.com',
'Date': 'Tue, 27 Mar 2007 19:42:41 +0000',
'Authorization': ('AWS AKIAIOSFODNN7EXAMPLE:'
'htDYFYduRNen8P9ZfE/s9SuKy0U='),
})
sigv2_req = S3_Request(req.environ)
expected_sts = '\n'.join([
'GET',
'',
'',
'Tue, 27 Mar 2007 19:42:41 +0000',
'/johnsmith/',
])
self.assertEqual(expected_sts, sigv2_req._string_to_sign())
self.assertTrue(sigv2_req.check_signature(
'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY'))
if __name__ == '__main__':
unittest.main()