swift/test/unit/common/middleware/s3api/test_request.py

766 lines
32 KiB
Python

# Copyright (c) 2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mock import patch, MagicMock
import unittest
from swift.common import swob
from swift.common.swob import Request, HTTPNoContent
from swift.common.middleware.s3api.utils import mktime
from swift.common.middleware.s3api.acl_handlers import get_acl_handler
from swift.common.middleware.s3api.subresource import ACL, User, Owner, \
Grant, encode_acl
from test.unit.common.middleware.s3api.test_s3api import S3ApiTestCase
from swift.common.middleware.s3api.request import Request as S3_Request
from swift.common.middleware.s3api.request import S3AclRequest, SigV4Request, \
SIGV4_X_AMZ_DATE_FORMAT
from swift.common.middleware.s3api.response import InvalidArgument, \
NoSuchBucket, InternalError, \
AccessDenied, SignatureDoesNotMatch, RequestTimeTooSkewed
from test.unit import DebugLogger
Fake_ACL_MAP = {
# HEAD Bucket
('HEAD', 'HEAD', 'container'):
{'Resource': 'container',
'Permission': 'READ'},
# GET Bucket
('GET', 'GET', 'container'):
{'Resource': 'container',
'Permission': 'READ'},
# HEAD Object
('HEAD', 'HEAD', 'object'):
{'Resource': 'object',
'Permission': 'READ'},
# GET Object
('GET', 'GET', 'object'):
{'Resource': 'object',
'Permission': 'READ'},
}
def _gen_test_acl_header(owner, permission=None, grantee=None,
resource='container'):
if permission is None:
return ACL(owner, [])
if grantee is None:
grantee = User('test:tester')
return encode_acl(resource, ACL(owner, [Grant(grantee, permission)]))
class FakeResponse(object):
def __init__(self, s3_acl):
self.sysmeta_headers = {}
if s3_acl:
owner = Owner(id='test:tester', name='test:tester')
self.sysmeta_headers.update(
_gen_test_acl_header(owner, 'FULL_CONTROL',
resource='container'))
self.sysmeta_headers.update(
_gen_test_acl_header(owner, 'FULL_CONTROL',
resource='object'))
class FakeSwiftResponse(object):
def __init__(self):
self.environ = {
'PATH_INFO': '/v1/AUTH_test',
'HTTP_X_TENANT_NAME': 'test',
'HTTP_X_USER_NAME': 'tester',
'HTTP_X_AUTH_TOKEN': 'token',
}
class TestRequest(S3ApiTestCase):
def setUp(self):
super(TestRequest, self).setUp()
self.s3api.conf.s3_acl = True
self.swift.s3_acl = True
@patch('swift.common.middleware.s3api.acl_handlers.ACL_MAP', Fake_ACL_MAP)
@patch('swift.common.middleware.s3api.request.S3AclRequest.authenticate',
lambda x, y: None)
def _test_get_response(self, method, container='bucket', obj=None,
permission=None, skip_check=False,
req_klass=S3_Request, fake_swift_resp=None):
path = '/' + container + ('/' + obj if obj else '')
req = Request.blank(path,
environ={'REQUEST_METHOD': method},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
if issubclass(req_klass, S3AclRequest):
s3_req = req_klass(
req.environ, MagicMock(),
True, self.conf.storage_domain,
self.conf.location, self.conf.force_swift_request_proxy_log,
self.conf.dns_compliant_bucket_names,
self.conf.allow_multipart_uploads, self.conf.allow_no_owner)
else:
s3_req = req_klass(
req.environ, MagicMock(),
True, self.conf.storage_domain,
self.conf.location, self.conf.force_swift_request_proxy_log,
self.conf.dns_compliant_bucket_names,
self.conf.allow_multipart_uploads, self.conf.allow_no_owner)
s3_req.set_acl_handler(
get_acl_handler(s3_req.controller_name)(s3_req, DebugLogger()))
with patch('swift.common.middleware.s3api.request.Request.'
'_get_response') as mock_get_resp, \
patch('swift.common.middleware.s3api.subresource.ACL.'
'check_permission') as m_check_permission:
mock_get_resp.return_value = fake_swift_resp \
or FakeResponse(self.conf.s3_acl)
return mock_get_resp, m_check_permission,\
s3_req.get_response(self.s3api)
def test_get_response_without_s3_acl(self):
self.s3api.conf.s3_acl = False
self.swift.s3_acl = False
mock_get_resp, m_check_permission, s3_resp = \
self._test_get_response('HEAD')
self.assertFalse(hasattr(s3_resp, 'bucket_acl'))
self.assertFalse(hasattr(s3_resp, 'object_acl'))
self.assertEqual(mock_get_resp.call_count, 1)
self.assertEqual(m_check_permission.call_count, 0)
def test_get_response_without_match_ACL_MAP(self):
with self.assertRaises(Exception) as e:
self._test_get_response('POST', req_klass=S3AclRequest)
self.assertEqual(e.exception.message,
'No permission to be checked exists')
def test_get_response_without_duplication_HEAD_request(self):
obj = 'object'
mock_get_resp, m_check_permission, s3_resp = \
self._test_get_response('HEAD', obj=obj,
req_klass=S3AclRequest)
self.assertTrue(s3_resp.bucket_acl is not None)
self.assertTrue(s3_resp.object_acl is not None)
self.assertEqual(mock_get_resp.call_count, 1)
args, kargs = mock_get_resp.call_args_list[0]
get_resp_obj = args[3]
self.assertEqual(get_resp_obj, obj)
self.assertEqual(m_check_permission.call_count, 1)
args, kargs = m_check_permission.call_args
permission = args[1]
self.assertEqual(permission, 'READ')
def test_get_response_with_check_object_permission(self):
obj = 'object'
mock_get_resp, m_check_permission, s3_resp = \
self._test_get_response('GET', obj=obj,
req_klass=S3AclRequest)
self.assertTrue(s3_resp.bucket_acl is not None)
self.assertTrue(s3_resp.object_acl is not None)
self.assertEqual(mock_get_resp.call_count, 2)
args, kargs = mock_get_resp.call_args_list[0]
get_resp_obj = args[3]
self.assertEqual(get_resp_obj, obj)
self.assertEqual(m_check_permission.call_count, 1)
args, kargs = m_check_permission.call_args
permission = args[1]
self.assertEqual(permission, 'READ')
def test_get_response_with_check_container_permission(self):
mock_get_resp, m_check_permission, s3_resp = \
self._test_get_response('GET',
req_klass=S3AclRequest)
self.assertTrue(s3_resp.bucket_acl is not None)
self.assertTrue(s3_resp.object_acl is not None)
self.assertEqual(mock_get_resp.call_count, 2)
args, kargs = mock_get_resp.call_args_list[0]
get_resp_obj = args[3]
self.assertTrue(get_resp_obj is '')
self.assertEqual(m_check_permission.call_count, 1)
args, kargs = m_check_permission.call_args
permission = args[1]
self.assertEqual(permission, 'READ')
def test_get_validate_param(self):
def create_s3request_with_param(param, value):
req = Request.blank(
'/bucket?%s=%s' % (param, value),
environ={'REQUEST_METHOD': 'GET'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
return S3_Request(req.environ)
s3req = create_s3request_with_param('max-keys', '1')
# a param in the range
self.assertEqual(s3req.get_validated_param('max-keys', 1000, 1000), 1)
self.assertEqual(s3req.get_validated_param('max-keys', 0, 1), 1)
# a param in the out of the range
self.assertEqual(s3req.get_validated_param('max-keys', 0, 0), 0)
# a param in the out of the integer range
s3req = create_s3request_with_param('max-keys', '1' * 30)
with self.assertRaises(InvalidArgument) as result:
s3req.get_validated_param('max-keys', 1)
self.assertTrue(
'not an integer or within integer range' in result.exception.body)
self.assertEqual(
result.exception.headers['content-type'], 'application/xml')
# a param is negative integer
s3req = create_s3request_with_param('max-keys', '-1')
with self.assertRaises(InvalidArgument) as result:
s3req.get_validated_param('max-keys', 1)
self.assertTrue(
'must be an integer between 0 and' in result.exception.body)
self.assertEqual(
result.exception.headers['content-type'], 'application/xml')
# a param is not integer
s3req = create_s3request_with_param('max-keys', 'invalid')
with self.assertRaises(InvalidArgument) as result:
s3req.get_validated_param('max-keys', 1)
self.assertTrue(
'not an integer or within integer range' in result.exception.body)
self.assertEqual(
result.exception.headers['content-type'], 'application/xml')
def test_authenticate_delete_Authorization_from_s3req(self):
req = Request.blank('/bucket/obj',
environ={'REQUEST_METHOD': 'GET'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
with patch.object(Request, 'get_response') as m_swift_resp, \
patch.object(Request, 'remote_user', 'authorized'):
m_swift_resp.return_value = FakeSwiftResponse()
s3_req = S3AclRequest(req.environ, MagicMock())
self.assertNotIn('swift3.auth_details', s3_req.environ)
self.assertNotIn('HTTP_AUTHORIZATION', s3_req.environ)
self.assertNotIn('Authorization', s3_req.headers)
self.assertEqual(s3_req.token, 'token')
def test_to_swift_req_Authorization_not_exist_in_swreq(self):
container = 'bucket'
obj = 'obj'
method = 'GET'
req = Request.blank('/%s/%s' % (container, obj),
environ={'REQUEST_METHOD': method},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
with patch.object(Request, 'get_response') as m_swift_resp, \
patch.object(Request, 'remote_user', 'authorized'):
m_swift_resp.return_value = FakeSwiftResponse()
s3_req = S3AclRequest(req.environ, MagicMock())
sw_req = s3_req.to_swift_req(method, container, obj)
self.assertNotIn('swift3.auth_details', sw_req.environ)
self.assertNotIn('HTTP_AUTHORIZATION', sw_req.environ)
self.assertNotIn('Authorization', sw_req.headers)
self.assertEqual(sw_req.headers['X-Auth-Token'], 'token')
def test_to_swift_req_subrequest_proxy_access_log(self):
container = 'bucket'
obj = 'obj'
method = 'GET'
# force_swift_request_proxy_log is True
req = Request.blank('/%s/%s' % (container, obj),
environ={'REQUEST_METHOD': method,
'swift.proxy_access_log_made': True},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
with patch.object(Request, 'get_response') as m_swift_resp, \
patch.object(Request, 'remote_user', 'authorized'):
m_swift_resp.return_value = FakeSwiftResponse()
s3_req = S3AclRequest(
req.environ, MagicMock(), force_request_log=True)
sw_req = s3_req.to_swift_req(method, container, obj)
self.assertFalse(sw_req.environ['swift.proxy_access_log_made'])
# force_swift_request_proxy_log is False
req = Request.blank('/%s/%s' % (container, obj),
environ={'REQUEST_METHOD': method,
'swift.proxy_access_log_made': True},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
with patch.object(Request, 'get_response') as m_swift_resp, \
patch.object(Request, 'remote_user', 'authorized'):
m_swift_resp.return_value = FakeSwiftResponse()
s3_req = S3AclRequest(
req.environ, MagicMock(), force_request_log=False)
sw_req = s3_req.to_swift_req(method, container, obj)
self.assertTrue(sw_req.environ['swift.proxy_access_log_made'])
def test_get_container_info(self):
self.swift.register('HEAD', '/v1/AUTH_test/bucket', HTTPNoContent,
{'x-container-read': 'foo',
'X-container-object-count': 5,
'X-container-meta-foo': 'bar'}, None)
req = Request.blank('/bucket', environ={'REQUEST_METHOD': 'GET'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
s3_req = S3_Request(req.environ)
# first, call get_response('HEAD')
info = s3_req.get_container_info(self.app)
self.assertTrue('status' in info) # sanity
self.assertEqual(204, info['status']) # sanity
self.assertEqual('foo', info['read_acl']) # sanity
self.assertEqual('5', info['object_count']) # sanity
self.assertEqual({'foo': 'bar'}, info['meta']) # sanity
with patch('swift.common.middleware.s3api.request.get_container_info',
return_value={'status': 204}) as mock_info:
# Then all calls goes to get_container_info
for x in xrange(10):
info = s3_req.get_container_info(self.swift)
self.assertTrue('status' in info) # sanity
self.assertEqual(204, info['status']) # sanity
self.assertEqual(10, mock_info.call_count)
expected_errors = [(404, NoSuchBucket), (0, InternalError)]
for status, expected_error in expected_errors:
with patch('swift.common.middleware.s3api.request.'
'get_container_info',
return_value={'status': status}):
self.assertRaises(
expected_error, s3_req.get_container_info, MagicMock())
def test_date_header_missing(self):
self.swift.register('HEAD', '/v1/AUTH_test/nojunk', swob.HTTPNotFound,
{}, None)
req = Request.blank('/nojunk',
environ={'REQUEST_METHOD': 'HEAD'},
headers={'Authorization': 'AWS test:tester:hmac'})
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '403')
self.assertEqual(body, '')
def test_date_header_expired(self):
self.swift.register('HEAD', '/v1/AUTH_test/nojunk', swob.HTTPNotFound,
{}, None)
req = Request.blank('/nojunk',
environ={'REQUEST_METHOD': 'HEAD'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': 'Fri, 01 Apr 2014 12:00:00 GMT'})
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '403')
self.assertEqual(body, '')
def test_date_header_with_x_amz_date_valid(self):
self.swift.register('HEAD', '/v1/AUTH_test/nojunk', swob.HTTPNotFound,
{}, None)
req = Request.blank('/nojunk',
environ={'REQUEST_METHOD': 'HEAD'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': 'Fri, 01 Apr 2014 12:00:00 GMT',
'x-amz-date': self.get_date_header()})
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '404')
self.assertEqual(body, '')
def test_date_header_with_x_amz_date_expired(self):
self.swift.register('HEAD', '/v1/AUTH_test/nojunk', swob.HTTPNotFound,
{}, None)
req = Request.blank('/nojunk',
environ={'REQUEST_METHOD': 'HEAD'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header(),
'x-amz-date':
'Fri, 01 Apr 2014 12:00:00 GMT'})
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '403')
self.assertEqual(body, '')
def _test_request_timestamp_sigv4(self, date_header):
# signature v4 here
environ = {
'REQUEST_METHOD': 'GET'}
if 'X-Amz-Date' in date_header:
included_header = 'x-amz-date'
elif 'Date' in date_header:
included_header = 'date'
else:
self.fail('Invalid date header specified as test')
headers = {
'Authorization':
'AWS4-HMAC-SHA256 '
'Credential=test/20130524/US/s3/aws4_request, '
'SignedHeaders=%s,'
'Signature=X' % ';'.join(sorted(['host', included_header])),
'X-Amz-Content-SHA256': '0123456789'}
headers.update(date_header)
req = Request.blank('/', environ=environ, headers=headers)
sigv4_req = SigV4Request(req.environ)
if 'X-Amz-Date' in date_header:
timestamp = mktime(
date_header['X-Amz-Date'], SIGV4_X_AMZ_DATE_FORMAT)
elif 'Date' in date_header:
timestamp = mktime(date_header['Date'])
self.assertEqual(timestamp, int(sigv4_req.timestamp))
def test_request_timestamp_sigv4(self):
access_denied_message = \
'AWS authentication requires a valid Date or x-amz-date header'
# normal X-Amz-Date header
date_header = {'X-Amz-Date': self.get_v4_amz_date_header()}
self._test_request_timestamp_sigv4(date_header)
# normal Date header
date_header = {'Date': self.get_date_header()}
self._test_request_timestamp_sigv4(date_header)
# mangled X-Amz-Date header
date_header = {'X-Amz-Date': self.get_v4_amz_date_header()[:-1]}
with self.assertRaises(AccessDenied) as cm:
self._test_request_timestamp_sigv4(date_header)
self.assertEqual('403 Forbidden', cm.exception.message)
self.assertIn(access_denied_message, cm.exception.body)
# mangled Date header
date_header = {'Date': self.get_date_header()[20:]}
with self.assertRaises(AccessDenied) as cm:
self._test_request_timestamp_sigv4(date_header)
self.assertEqual('403 Forbidden', cm.exception.message)
self.assertIn(access_denied_message, cm.exception.body)
# Negative timestamp
date_header = {'X-Amz-Date': '00160523T054055Z'}
with self.assertRaises(AccessDenied) as cm:
self._test_request_timestamp_sigv4(date_header)
self.assertEqual('403 Forbidden', cm.exception.message)
self.assertIn(access_denied_message, cm.exception.body)
# far-past Date header
date_header = {'Date': 'Tue, 07 Jul 999 21:53:04 GMT'}
with self.assertRaises(AccessDenied) as cm:
self._test_request_timestamp_sigv4(date_header)
self.assertEqual('403 Forbidden', cm.exception.message)
self.assertIn(access_denied_message, cm.exception.body)
# far-future Date header
date_header = {'Date': 'Tue, 07 Jul 9999 21:53:04 GMT'}
with self.assertRaises(RequestTimeTooSkewed) as cm:
self._test_request_timestamp_sigv4(date_header)
self.assertEqual('403 Forbidden', cm.exception.message)
self.assertIn('The difference between the request time and the '
'current time is too large.', cm.exception.body)
def _test_request_timestamp_sigv2(self, date_header):
# signature v4 here
environ = {
'REQUEST_METHOD': 'GET'}
headers = {'Authorization': 'AWS test:tester:hmac'}
headers.update(date_header)
req = Request.blank('/', environ=environ, headers=headers)
sigv2_req = S3_Request(req.environ)
if 'X-Amz-Date' in date_header:
timestamp = mktime(req.headers.get('X-Amz-Date'))
elif 'Date' in date_header:
timestamp = mktime(req.headers.get('Date'))
else:
self.fail('Invalid date header specified as test')
self.assertEqual(timestamp, int(sigv2_req.timestamp))
def test_request_timestamp_sigv2(self):
access_denied_message = \
'AWS authentication requires a valid Date or x-amz-date header'
# In v2 format, normal X-Amz-Date header is same
date_header = {'X-Amz-Date': self.get_date_header()}
self._test_request_timestamp_sigv2(date_header)
# normal Date header
date_header = {'Date': self.get_date_header()}
self._test_request_timestamp_sigv2(date_header)
# mangled X-Amz-Date header
date_header = {'X-Amz-Date': self.get_date_header()[:-20]}
with self.assertRaises(AccessDenied) as cm:
self._test_request_timestamp_sigv2(date_header)
self.assertEqual('403 Forbidden', cm.exception.message)
self.assertIn(access_denied_message, cm.exception.body)
# mangled Date header
date_header = {'Date': self.get_date_header()[:-20]}
with self.assertRaises(AccessDenied) as cm:
self._test_request_timestamp_sigv2(date_header)
self.assertEqual('403 Forbidden', cm.exception.message)
self.assertIn(access_denied_message, cm.exception.body)
# Negative timestamp
date_header = {'X-Amz-Date': '00160523T054055Z'}
with self.assertRaises(AccessDenied) as cm:
self._test_request_timestamp_sigv2(date_header)
self.assertEqual('403 Forbidden', cm.exception.message)
self.assertIn(access_denied_message, cm.exception.body)
# far-past Date header
date_header = {'Date': 'Tue, 07 Jul 999 21:53:04 GMT'}
with self.assertRaises(AccessDenied) as cm:
self._test_request_timestamp_sigv2(date_header)
self.assertEqual('403 Forbidden', cm.exception.message)
self.assertIn(access_denied_message, cm.exception.body)
# far-future Date header
date_header = {'Date': 'Tue, 07 Jul 9999 21:53:04 GMT'}
with self.assertRaises(RequestTimeTooSkewed) as cm:
self._test_request_timestamp_sigv2(date_header)
self.assertEqual('403 Forbidden', cm.exception.message)
self.assertIn('The difference between the request time and the '
'current time is too large.', cm.exception.body)
def test_headers_to_sign_sigv4(self):
environ = {
'REQUEST_METHOD': 'GET'}
# host and x-amz-date
x_amz_date = self.get_v4_amz_date_header()
headers = {
'Authorization':
'AWS4-HMAC-SHA256 '
'Credential=test/20130524/US/s3/aws4_request, '
'SignedHeaders=host;x-amz-content-sha256;x-amz-date,'
'Signature=X',
'X-Amz-Content-SHA256': '0123456789',
'Date': self.get_date_header(),
'X-Amz-Date': x_amz_date}
req = Request.blank('/', environ=environ, headers=headers)
sigv4_req = SigV4Request(req.environ)
headers_to_sign = sigv4_req._headers_to_sign()
self.assertEqual(headers_to_sign, [
('host', 'localhost:80'),
('x-amz-content-sha256', '0123456789'),
('x-amz-date', x_amz_date)])
# no x-amz-date
headers = {
'Authorization':
'AWS4-HMAC-SHA256 '
'Credential=test/20130524/US/s3/aws4_request, '
'SignedHeaders=host;x-amz-content-sha256,'
'Signature=X',
'X-Amz-Content-SHA256': '0123456789',
'Date': self.get_date_header()}
req = Request.blank('/', environ=environ, headers=headers)
sigv4_req = SigV4Request(req.environ)
headers_to_sign = sigv4_req._headers_to_sign()
self.assertEqual(headers_to_sign, [
('host', 'localhost:80'),
('x-amz-content-sha256', '0123456789')])
# SignedHeaders says, host and x-amz-date included but there is not
# X-Amz-Date header
headers = {
'Authorization':
'AWS4-HMAC-SHA256 '
'Credential=test/20130524/US/s3/aws4_request, '
'SignedHeaders=host;x-amz-content-sha256;x-amz-date,'
'Signature=X',
'X-Amz-Content-SHA256': '0123456789',
'Date': self.get_date_header()}
req = Request.blank('/', environ=environ, headers=headers)
with self.assertRaises(SignatureDoesNotMatch):
sigv4_req = SigV4Request(req.environ)
sigv4_req._headers_to_sign()
def test_canonical_uri_sigv2(self):
environ = {
'HTTP_HOST': 'bucket1.s3.test.com',
'REQUEST_METHOD': 'GET'}
headers = {'Authorization': 'AWS test:tester:hmac',
'X-Amz-Date': self.get_date_header()}
# Virtual hosted-style
req = Request.blank('/', environ=environ, headers=headers)
sigv2_req = S3_Request(
req.environ, storage_domain='s3.test.com')
uri = sigv2_req._canonical_uri()
self.assertEqual(uri, '/bucket1/')
self.assertEqual(req.environ['PATH_INFO'], '/')
req = Request.blank('/obj1', environ=environ, headers=headers)
sigv2_req = S3_Request(
req.environ, storage_domain='s3.test.com')
uri = sigv2_req._canonical_uri()
self.assertEqual(uri, '/bucket1/obj1')
self.assertEqual(req.environ['PATH_INFO'], '/obj1')
environ = {
'HTTP_HOST': 's3.test.com',
'REQUEST_METHOD': 'GET'}
# Path-style
req = Request.blank('/', environ=environ, headers=headers)
sigv2_req = S3_Request(req.environ, storage_domain='')
uri = sigv2_req._canonical_uri()
self.assertEqual(uri, '/')
self.assertEqual(req.environ['PATH_INFO'], '/')
req = Request.blank('/bucket1/obj1',
environ=environ,
headers=headers)
sigv2_req = S3_Request(req.environ, storage_domain='')
uri = sigv2_req._canonical_uri()
self.assertEqual(uri, '/bucket1/obj1')
self.assertEqual(req.environ['PATH_INFO'], '/bucket1/obj1')
def test_canonical_uri_sigv4(self):
environ = {
'HTTP_HOST': 'bucket.s3.test.com',
'REQUEST_METHOD': 'GET'}
# host and x-amz-date
x_amz_date = self.get_v4_amz_date_header()
headers = {
'Authorization':
'AWS4-HMAC-SHA256 '
'Credential=test/20130524/US/s3/aws4_request, '
'SignedHeaders=host;x-amz-content-sha256;x-amz-date,'
'Signature=X',
'X-Amz-Content-SHA256': '0123456789',
'Date': self.get_date_header(),
'X-Amz-Date': x_amz_date}
# Virtual hosted-style
self.conf.storage_domain = 's3.test.com'
req = Request.blank('/', environ=environ, headers=headers)
sigv4_req = SigV4Request(req.environ)
uri = sigv4_req._canonical_uri()
self.assertEqual(uri, '/')
self.assertEqual(req.environ['PATH_INFO'], '/')
req = Request.blank('/obj1', environ=environ, headers=headers)
sigv4_req = SigV4Request(req.environ)
uri = sigv4_req._canonical_uri()
self.assertEqual(uri, '/obj1')
self.assertEqual(req.environ['PATH_INFO'], '/obj1')
environ = {
'HTTP_HOST': 's3.test.com',
'REQUEST_METHOD': 'GET'}
# Path-style
self.conf.storage_domain = ''
req = Request.blank('/', environ=environ, headers=headers)
sigv4_req = SigV4Request(req.environ)
uri = sigv4_req._canonical_uri()
self.assertEqual(uri, '/')
self.assertEqual(req.environ['PATH_INFO'], '/')
req = Request.blank('/bucket/obj1',
environ=environ,
headers=headers)
sigv4_req = SigV4Request(req.environ)
uri = sigv4_req._canonical_uri()
self.assertEqual(uri, '/bucket/obj1')
self.assertEqual(req.environ['PATH_INFO'], '/bucket/obj1')
@patch.object(S3_Request, '_validate_headers', lambda *a: None)
def test_check_signature_sigv2(self):
# See https://web.archive.org/web/20151226025049/http://
# docs.aws.amazon.com//AmazonS3/latest/dev/RESTAuthentication.html
req = Request.blank('/photos/puppy.jpg', headers={
'Host': 'johnsmith.s3.amazonaws.com',
'Date': 'Tue, 27 Mar 2007 19:36:42 +0000',
'Authorization': ('AWS AKIAIOSFODNN7EXAMPLE:'
'bWq2s1WEIj+Ydj0vQ697zp+IXMU='),
})
sigv2_req = S3_Request(req.environ, storage_domain='s3.amazonaws.com')
expected_sts = '\n'.join([
'GET',
'',
'',
'Tue, 27 Mar 2007 19:36:42 +0000',
'/johnsmith/photos/puppy.jpg',
])
self.assertEqual(expected_sts, sigv2_req._string_to_sign())
self.assertTrue(sigv2_req.check_signature(
'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY'))
req = Request.blank('/photos/puppy.jpg', method='PUT', headers={
'Content-Type': 'image/jpeg',
'Content-Length': '94328',
'Host': 'johnsmith.s3.amazonaws.com',
'Date': 'Tue, 27 Mar 2007 21:15:45 +0000',
'Authorization': ('AWS AKIAIOSFODNN7EXAMPLE:'
'MyyxeRY7whkBe+bq8fHCL/2kKUg='),
})
sigv2_req = S3_Request(req.environ, storage_domain='s3.amazonaws.com')
expected_sts = '\n'.join([
'PUT',
'',
'image/jpeg',
'Tue, 27 Mar 2007 21:15:45 +0000',
'/johnsmith/photos/puppy.jpg',
])
self.assertEqual(expected_sts, sigv2_req._string_to_sign())
self.assertTrue(sigv2_req.check_signature(
'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY'))
req = Request.blank(
'/?prefix=photos&max-keys=50&marker=puppy',
headers={
'User-Agent': 'Mozilla/5.0',
'Host': 'johnsmith.s3.amazonaws.com',
'Date': 'Tue, 27 Mar 2007 19:42:41 +0000',
'Authorization': ('AWS AKIAIOSFODNN7EXAMPLE:'
'htDYFYduRNen8P9ZfE/s9SuKy0U='),
})
sigv2_req = S3_Request(req.environ, storage_domain='s3.amazonaws.com')
expected_sts = '\n'.join([
'GET',
'',
'',
'Tue, 27 Mar 2007 19:42:41 +0000',
'/johnsmith/',
])
self.assertEqual(expected_sts, sigv2_req._string_to_sign())
self.assertTrue(sigv2_req.check_signature(
'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY'))
if __name__ == '__main__':
unittest.main()