Re:implement AWS signature v4

New algorithm that supports s3v4 was added.

What I did in this patch in detail:

- Implements v4 related code into mix-in class to provide some methods
  for authentication algorithms (e.g. string_to_sign)

- S3Timestamp everywhere. Old code take a lot of complicated timestamp
  translation from/to datetime, time, date header format (str). This
  patch gathers the translation into "timestamp" property method which
  should be actually handled in the validatation.

- Run functional tests for both v2/v4 authentication in the same
  environment at the same time which shows evidence that we have complete
  backword compatibilities and we can adopt v4 w/o anything broken.

*Bonus*
- Fix some minger bugs for singed urls (almostly expired timestamp),
  for header/query mixture and for unit test case mistake.

The reason I implemented this from Andrey's original patch is the
signature v4 stuff is too complicated if we mixes the process/routine
into same class because of a bunch of if/elif/else statements for header
handling. (e.g. if 'X-Amz-Date' in req.headers) Note that it is not his
issue, just AWS is getting complicated algorithms. However, for
maintainansibility, we need more clear code to find easily which statement
is supported on v2/v4 to prevent merge buggy code into master. That is why
I tried to do this. Hopefully this code fits the original author's intention.

NOTE for operators:
- Signature V4 is supported only for keystone auth.
- Set the same value of "region" configuration in keystone to "location" in
  swift3 conf file to enable SigV4.
- Sigv2 and SigV4 can be used at the same cluster configuration.
- This stuff has been supported since Keystone 9.0.0.0b1. (We probably
  need to bump the minimum version for keystone in requirements)

Change-Id: I386abd4ead40f55855657e354fd8ef3fd0d13aa7
Co-Authored-By: Andrey Pavlov <andrey-mp@yandex.ru>
Closes-Bug: #1411078
This commit is contained in:
Kota Tsuyuzaki 2015-08-12 12:43:37 +03:00 committed by Tim Burke
parent 02d1e2c7cb
commit a1cc181bd8
22 changed files with 1316 additions and 180 deletions

View File

@ -37,8 +37,8 @@ use = egg:swift3#swift3
# allow_no_owner = false
#
# Set a region name of your Swift cluster. Note that Swift3 doesn't choose a
# region of the newly created bucket actually. This value is used only for the
# GET Bucket location API.
# region of the newly created bucket actually. This value is used for the
# GET Bucket location API and v4 signatures calculation.
# location = US
#
# Set whether to enforce DNS-compliant bucket names. Note that S3 enforces

View File

@ -50,6 +50,8 @@ from swift.common.swob import Range
from swift.common.utils import json
from swift.common.db import utf8encode
from six.moves.urllib.parse import urlparse # pylint: disable=F0401
from swift3.controllers.base import Controller, bucket_operation, \
object_operation, check_container_existence
from swift3.response import InvalidArgument, ErrorResponse, MalformedXML, \
@ -604,7 +606,20 @@ class UploadController(Controller):
req.get_response(self.app, 'DELETE', container, obj)
result_elem = Element('CompleteMultipartUploadResult')
SubElement(result_elem, 'Location').text = req.host_url + req.path
# NOTE: boto with sig v4 appends port to HTTP_HOST value at the
# request header when the port is non default value and it makes
# req.host_url like as http://localhost:8080:8080/path
# that obviously invalid. Probably it should be resolved at
# swift.common.swob though, tentatively we are parsing and
# reconstructing the correct host_url info here.
# in detail, https://github.com/boto/boto/pull/3513
parsed_url = urlparse(req.host_url)
host_url = '%s://%s' % (parsed_url.scheme, parsed_url.hostname)
if parsed_url.port:
host_url += ':%s' % parsed_url.port
SubElement(result_elem, 'Location').text = host_url + req.path
SubElement(result_elem, 'Bucket').text = req.container_name
SubElement(result_elem, 'Key').text = req.object_name
SubElement(result_elem, 'ETag').text = resp.etag

View File

@ -58,7 +58,7 @@ from swift.common.wsgi import PipelineWrapper, loadcontext
from swift3 import __version__ as swift3_version
from swift3.exception import NotS3Request
from swift3.request import Request, S3AclRequest
from swift3.request import get_request_class
from swift3.response import ErrorResponse, InternalError, MethodNotAllowed, \
ResponseBase
from swift3.cfg import CONF
@ -75,10 +75,8 @@ class Swift3Middleware(object):
def __call__(self, env, start_response):
try:
if CONF.s3_acl:
req = S3AclRequest(env, self.app, self.slo_enabled)
else:
req = Request(env, self.slo_enabled)
req_class = get_request_class(env)
req = req_class(env, self.app, self.slo_enabled)
resp = self.handle_request(req)
except NotS3Request:
resp = self.app

View File

@ -13,14 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import re
import md5
from urllib import quote, unquote
import base64
import email.utils
from email.header import Header
import datetime
from hashlib import sha256
import md5
import re
import six
import string
from urllib import quote, unquote
from swift.common.utils import split_path
from swift.common import swob
@ -45,15 +45,18 @@ from swift3.response import AccessDenied, InvalidArgument, InvalidDigest, \
BucketAlreadyExists, BucketNotEmpty, EntityTooLarge, \
InternalError, NoSuchBucket, NoSuchKey, PreconditionFailed, InvalidRange, \
MissingContentLength, InvalidStorageClass, S3NotImplemented, InvalidURI, \
MalformedXML, InvalidRequest, RequestTimeout, InvalidBucketName, BadDigest
MalformedXML, InvalidRequest, RequestTimeout, InvalidBucketName, \
BadDigest, AuthorizationHeaderMalformed
from swift3.exception import NotS3Request, BadSwiftRequest
from swift3.utils import utf8encode, LOGGER, check_path_header
from swift3.utils import utf8encode, LOGGER, check_path_header, S3Timestamp, \
mktime
from swift3.cfg import CONF
from swift3.subresource import decode_acl, encode_acl
from swift3.utils import sysmeta_header, validate_bucket_name
from swift3.acl_utils import handle_acl_header
from swift3.acl_handlers import get_acl_handler
# List of sub-resources that must be maintained as part of the HMAC
# signature string.
ALLOWED_SUB_RESOURCES = sorted([
@ -65,7 +68,10 @@ ALLOWED_SUB_RESOURCES = sorted([
'response-content-type', 'response-expires', 'cors', 'tagging', 'restore'
])
MAX_32BIT_INT = 2147483647
SIGV2_TIMESTAMP_FORMAT = '%Y-%m-%dT%H:%M:%S'
SIGV4_X_AMZ_DATE_FORMAT = '%Y%m%dT%H%M%SZ'
def _header_acl_property(resource):
@ -86,6 +92,243 @@ def _header_acl_property(resource):
doc='Get and set the %s acl property' % resource)
class SigV4Mixin(object):
"""
A request class mixin to provide S3 signature v4 functionality
:param req_cls: a Request class (Request or S3AclRequest or child classes)
"""
@property
def _is_query_auth(self):
return 'X-Amz-Credential' in self.params
@property
def timestamp(self):
"""
Return timestamp string according to the auth type
The difference from v2 is v4 have to see 'X-Amz-Date' even though
it's query auth type.
"""
if not self._timestamp:
try:
if self._is_query_auth and 'X-Amz-Date' in self.params:
# NOTE(andrey-mp): Date in Signature V4 has different
# format
timestamp = mktime(
self.params['X-Amz-Date'], SIGV4_X_AMZ_DATE_FORMAT)
else:
if self.headers.get('X-Amz-Date'):
timestamp = mktime(
self.headers.get('X-Amz-Date'),
SIGV4_X_AMZ_DATE_FORMAT)
else:
timestamp = mktime(self.headers.get('Date'))
except (ValueError, TypeError):
raise AccessDenied('AWS authentication requires a valid Date '
'or x-amz-date header')
try:
self._timestamp = S3Timestamp(timestamp)
except ValueError:
raise AccessDenied()
return self._timestamp
def _validate_expire_param(self):
"""
:param now: a S3Timestamp instance
"""
expires = self.params['X-Amz-Expires']
if int(self.timestamp) + int(expires) < S3Timestamp.now():
raise AccessDenied('Request has expired')
def _parse_query_authentication(self):
"""
Parse v4 query authentication
- version 4:
'X-Amz-Credential' and 'X-Amz-Signature' should be in param
:raises: AccessDenied
:raises: AuthorizationHeaderMalformed
"""
if self.params.get('X-Amz-Algorithm') != 'AWS4-HMAC-SHA256':
raise InvalidArgument('X-Amz-Algorithm',
self.params.get('X-Amz-Algorithm'))
try:
cred_param = self.params['X-Amz-Credential'].split("/")
access = cred_param[0]
sig = self.params['X-Amz-Signature']
expires = self.params['X-Amz-Expires']
except KeyError:
raise AccessDenied()
try:
signed_headers = self.params['X-Amz-SignedHeaders']
except KeyError:
# TODO: make sure if is it malformed request?
raise AuthorizationHeaderMalformed()
self._signed_headers = set(signed_headers.split(';'))
# credential must be in following format:
# <access-key-id>/<date>/<AWS-region>/<AWS-service>/aws4_request
if not all([access, sig, len(cred_param) == 5, expires]):
raise AccessDenied()
return access, sig
def _parse_header_authentication(self):
"""
Parse v4 header authentication
- version 4:
'X-Amz-Credential' and 'X-Amz-Signature' should be in param
:raises: AccessDenied
:raises: AuthorizationHeaderMalformed
"""
auth_str = self.headers['Authorization']
cred_param = auth_str.partition(
"Credential=")[2].split(',')[0].split("/")
access = cred_param[0]
sig = auth_str.partition("Signature=")[2].split(',')[0]
signed_headers = auth_str.partition(
"SignedHeaders=")[2].split(',', 1)[0]
# credential must be in following format:
# <access-key-id>/<date>/<AWS-region>/<AWS-service>/aws4_request
if not all([access, sig, len(cred_param) == 5]):
raise AccessDenied()
if not signed_headers:
# TODO: make sure if is it Malformed?
raise AuthorizationHeaderMalformed()
self._signed_headers = set(signed_headers.split(';'))
return access, sig
def _canonical_query_string(self):
return '&'.join(
'%s=%s' % (quote(key, safe='-_.~'),
quote(value, safe='-_.~'))
for key, value in sorted(self.params.items())
if key not in ('Signature', 'X-Amz-Signature'))
def _headers_to_sign(self):
"""
Select the headers from the request that need to be included
in the StringToSign.
:return : dict of headers to sign, the keys are all lower case
"""
headers_lower_dict = dict(
(k.lower().strip(), ' '.join((v or '').strip().split()))
for (k, v) in six.iteritems(self.headers))
if 'host' in headers_lower_dict and re.match(
'Boto/2.[0-9].[0-2]',
headers_lower_dict.get('user-agent', '')):
# Boto versions < 2.9.3 strip the port component of the host:port
# header, so detect the user-agent via the header and strip the
# port if we detect an old boto version.
headers_lower_dict['host'] = \
headers_lower_dict['host'].split(':')[0]
headers_to_sign = [
(key, value) for key, value in headers_lower_dict.items()
if key in self._signed_headers]
if len(headers_to_sign) != len(self._signed_headers):
# NOTE: if we are missing the header suggested via
# signed_header in actual header, it results in
# SignatureDoesNotMatch in actual S3 so we can raise
# the error immediately here to save redundant check
# process.
raise SignatureDoesNotMatch()
return dict(headers_to_sign)
def _string_to_sign(self):
"""
Create 'StringToSign' value in Amazon terminology for v4.
"""
scope = (self.timestamp.amz_date_format.split('T')[0] +
'/' + CONF.location + '/s3/aws4_request')
# prepare 'canonical_request'
# Example requests are like following:
#
# GET
# /
# Action=ListUsers&Version=2010-05-08
# content-type:application/x-www-form-urlencoded; charset=utf-8
# host:iam.amazonaws.com
# x-amz-date:20150830T123600Z
#
# content-type;host;x-amz-date
# e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855
#
# 1. Add verb like: GET
cr = [self.method.upper()]
# 2. Add path like: /
path = self._canonical_uri()
cr.append(path)
# 3. Add query like: Action=ListUsers&Version=2010-05-08
cr.append(self._canonical_query_string())
# 4. Add headers like:
# content-type:application/x-www-form-urlencoded; charset=utf-8
# host:iam.amazonaws.com
# x-amz-date:20150830T123600Z
headers_to_sign = self._headers_to_sign()
cr.append('\n'.join(
['%s:%s' % (key, value) for key, value in
sorted(headers_to_sign.items())]) + '\n')
# 5. Add signed headers into canonical request like
# content-type;host;x-amz-date
cr.append(';'.join(sorted(n for n in headers_to_sign)))
# 6. Add payload string at the tail
if 'X-Amz-Credential' in self.params:
# V4 with query parameters only
hashed_payload = 'UNSIGNED-PAYLOAD'
elif 'X-Amz-Content-SHA256' not in self.headers:
msg = 'Missing required header for this request: ' \
'x-amz-content-sha256'
raise InvalidRequest(msg)
else:
hashed_payload = self.headers['X-Amz-Content-SHA256']
cr.append(hashed_payload)
canonical_request = '\n'.join(cr)
return ('AWS4-HMAC-SHA256' + '\n'
+ self.timestamp.amz_date_format + '\n'
+ scope + '\n'
+ sha256(canonical_request.encode('utf-8')).hexdigest())
def get_request_class(env):
"""
Helper function to find a request class to use from Map
"""
if CONF.s3_acl:
request_classes = (S3AclRequest, SigV4S3AclRequest)
else:
request_classes = (Request, SigV4Request)
req = swob.Request(env)
if 'X-Amz-Credential' in req.params or \
req.headers.get('Authorization', '').startswith(
'AWS4-HMAC-SHA256 '):
# This is an Amazon SigV4 request
return request_classes[1]
else:
# The others using Amazon SigV2 class
return request_classes[0]
class Request(swob.Request):
"""
S3 request object.
@ -94,22 +337,70 @@ class Request(swob.Request):
bucket_acl = _header_acl_property('container')
object_acl = _header_acl_property('object')
def __init__(self, env, slo_enabled=True):
def __init__(self, env, app=None, slo_enabled=True):
# NOTE: app is not used by this class, need for compatibility of S3acl
swob.Request.__init__(self, env)
self.access_key = self._parse_authorization()
self._timestamp = None
self.access_key, signature = self._parse_auth_info()
self.bucket_in_host = self._parse_host()
self.container_name, self.object_name = self._parse_uri()
self._validate_headers()
self.token = base64.urlsafe_b64encode(self._canonical_string())
self.token = base64.urlsafe_b64encode(self._string_to_sign())
self.account = None
self.user_id = None
self.slo_enabled = slo_enabled
# NOTE(andrey-mp): substitute authorization header for next modules
# in pipeline (s3token). it uses this and X-Auth-Token in specific
# format.
# (kota_): yeah, the reason we need this is s3token only supports
# v2 like header consists of AWS access:signature. Since the commit
# b626a3ca86e467fc7564eac236b9ee2efd49bdcc, the s3token is in swift3
# repo so probably we need to change s3token to support v4 format.
self.headers['Authorization'] = 'AWS %s:%s' % (
self.access_key, signature)
# Avoids that swift.swob.Response replaces Location header value
# by full URL when absolute path given. See swift.swob for more detail.
self.environ['swift.leave_relative_location'] = True
@property
def timestamp(self):
"""
S3Timestamp from Date header. If X-Amz-Date header specified, it
will be prior to Date header.
:return : S3Timestamp instance
"""
if not self._timestamp:
try:
if self._is_query_auth and 'Timestamp' in self.params:
# If Timestamp speciied in query, it should be prior
# to any Date header (is this right?)
timestamp = mktime(
self.params['Timestamp'], SIGV2_TIMESTAMP_FORMAT)
else:
timestamp = mktime(
self.headers.get('X-Amz-Date',
self.headers.get('Date')))
except ValueError:
raise AccessDenied('AWS authentication requires a valid Date '
'or x-amz-date header')
try:
self._timestamp = S3Timestamp(timestamp)
except ValueError:
raise AccessDenied()
return self._timestamp
@property
def _is_header_auth(self):
return 'Authorization' in self.headers
@property
def _is_query_auth(self):
return 'AWSAccessKeyId' in self.params
def _parse_host(self):
storage_domain = CONF.storage_domain
if not storage_domain:
@ -148,35 +439,94 @@ class Request(swob.Request):
raise InvalidBucketName(bucket)
return (bucket, obj)
def _parse_authorization(self):
if 'AWSAccessKeyId' in self.params:
try:
self.headers['Date'] = self.params['Expires']
self.headers['Authorization'] = \
'AWS %(AWSAccessKeyId)s:%(Signature)s' % self.params
except KeyError:
raise AccessDenied()
if 'Authorization' not in self.headers:
raise NotS3Request()
def _parse_query_authentication(self):
"""
Parse v2 authentication query args
TODO: make sure if 0, 1, 3 is supported?
- version 0, 1, 2, 3:
'AWSAccessKeyId' and 'Signature' should be in param
:return: a tuple of access_key and signature
:raises: AccessDenied
"""
try:
keyword, info = self.headers['Authorization'].split(' ', 1)
except Exception:
access = self.params['AWSAccessKeyId']
expires = self.params['Expires']
# TODO: can we remove this logic here?
# self.headers['Date'] = expires
sig = self.params['Signature']
except KeyError:
raise AccessDenied()
if keyword != 'AWS':
if not all([access, sig, expires]):
raise AccessDenied()
return access, sig
def _parse_header_authentication(self):
"""
Parse v2 header authentication info
:returns: a tuple of access_key and signature
:raises: AccessDenied
"""
auth_str = self.headers['Authorization']
if not auth_str.startswith('AWS ') or ':' not in auth_str:
raise AccessDenied()
# This means signature format V2
access, sig = auth_str.split(' ', 1)[1].rsplit(':', 1)
return access, sig
def _parse_auth_info(self):
"""Extract the access key identifier and signature.
:returns: a tuple of access_key and signature
:raises: NotS3Request
"""
if self._is_query_auth:
return self._parse_query_authentication()
elif self._is_header_auth:
return self._parse_header_authentication()
else:
# if this request is neither query auth nor header auth
# swift3 regard this as not s3 request
raise NotS3Request()
def _validate_expire_param(self):
"""
Validate Expire param
"""
# Expires header is a float since epoch
try:
access_key = info.rsplit(':', 1)[0]
except Exception:
err_msg = 'AWS authorization header is invalid. ' \
'Expected AwsAccessKeyId:signature'
raise InvalidArgument('Authorization',
self.headers['Authorization'], err_msg)
ex = S3Timestamp(float(self.params['Expires']))
except ValueError:
raise AccessDenied()
return access_key
if S3Timestamp.now() > ex:
raise AccessDenied('Request has expired')
def _validate_dates(self):
if self._is_query_auth:
self._validate_expire_param()
# TODO: make sure the case if timestamp param in query
return
date_header = self.headers.get('Date')
amz_date_header = self.headers.get('X-Amz-Date')
if not date_header and not amz_date_header:
raise AccessDenied('AWS authentication requires a valid Date '
'or x-amz-date header')
# Anyways, request timestamp should be validated
epoch = S3Timestamp(0)
if self.timestamp < epoch:
raise AccessDenied()
# If the standard date is too far ahead or behind, it is an
# error
delta = 60 * 5
if abs(int(self.timestamp) - int(S3Timestamp.now())) > delta:
raise RequestTimeTooSkewed()
def _validate_headers(self):
if 'CONTENT_LENGTH' in self.environ:
@ -188,40 +538,7 @@ class Request(swob.Request):
raise InvalidArgument('Content-Length',
self.environ['CONTENT_LENGTH'])
date_header = self.headers.get('x-amz-date',
self.headers.get('Date', None))
if date_header:
now = datetime.datetime.utcnow()
date = email.utils.parsedate(date_header)
if 'Expires' in self.params:
try:
d = email.utils.formatdate(float(self.params['Expires']))
except ValueError:
raise AccessDenied()
# check expiration
expdate = email.utils.parsedate(d)
ex = datetime.datetime(*expdate[0:6])
if now > ex:
raise AccessDenied('Request has expired')
elif date is not None:
epoch = datetime.datetime(1970, 1, 1, 0, 0, 0, 0)
d1 = datetime.datetime(*date[0:6])
if d1 < epoch:
raise AccessDenied()
# If the standard date is too far ahead or behind, it is an
# error
delta = datetime.timedelta(seconds=60 * 5)
if abs(d1 - now) > delta:
raise RequestTimeTooSkewed()
else:
raise AccessDenied('AWS authentication requires a valid Date '
'or x-amz-date header')
else:
raise AccessDenied('AWS authentication requires a valid Date '
'or x-amz-date header')
self._validate_dates()
if 'Content-MD5' in self.headers:
value = self.headers['Content-MD5']
@ -368,9 +685,9 @@ class Request(swob.Request):
raw_path_info = '/' + self.bucket_in_host + raw_path_info
return raw_path_info
def _canonical_string(self):
def _string_to_sign(self):
"""
Canonicalize a request to a token that can be signed.
Create 'StringToSign' value in Amazon terminology for v2.
"""
amz_headers = {}
@ -863,3 +1180,11 @@ class S3AclRequest(Request):
return resp
return self.get_acl_response(app, method, container, obj,
headers, body, query)
class SigV4Request(SigV4Mixin, Request):
pass
class SigV4S3AclRequest(SigV4Mixin, S3AclRequest):
pass

View File

@ -223,6 +223,13 @@ class AmbiguousGrantByEmailAddress(ErrorResponse):
'one account.'
class AuthorizationHeaderMalformed(ErrorResponse):
_status = '400 Bad Request'
_msg = 'The authorization header is malformed; the authorization ' \
'header requires three components: Credential, SignedHeaders, ' \
'and Signature.'
class BadDigest(ErrorResponse):
_status = '400 Bad Request'
_msg = 'The Content-MD5 you specified did not match what we received.'

View File

@ -1,4 +1,7 @@
[DEFAULT]
debug=true
verbose=true
policy_file=%CONF_DIR%/policy.json
admin_token = ADMIN
[database]

View File

@ -23,6 +23,7 @@ user_test_tester2 = testing2
[filter:swift3]
use = egg:swift3#swift3
s3_acl = %S3ACL%
location = US
[filter:catch_errors]
use = egg:swift#catch_errors

View File

@ -112,7 +112,8 @@ nosetests -v ./
rvalue=$?
# cleanup
kill -HUP $proxy_pid $account_pid $container_pid $object_pid $keystone_pid
kill -HUP $proxy_pid $account_pid $container_pid $object_pid
kill -TERM $keystone_pid
# show report
sleep 3

View File

@ -51,6 +51,7 @@ class Connection(object):
S3Connection(aws_access_key, aws_secret_key, is_secure=False,
host=self.host, port=self.port,
calling_format=OrdinaryCallingFormat())
self.conn.auth_region_name = 'US'
def reset(self):
"""

View File

@ -14,6 +14,7 @@
# limitations under the License.
import unittest
import os
from swift3.test.functional import Swift3FunctionalTestCase
from swift3.test.functional.s3_test_client import Connection, \
@ -124,5 +125,18 @@ class TestSwift3Acl(Swift3FunctionalTestCase):
self.conn2.make_request('GET', self.bucket, self.obj, query='acl')
self.assertEquals(get_error_code(body), 'AccessDenied')
@unittest.skipIf(os.environ['AUTH'] == 'tempauth',
'v4 is supported only in keystone')
class TestSwift3AclSigV4(TestSwift3Acl):
@classmethod
def setUpClass(cls):
os.environ['S3_USE_SIGV4'] = "True"
@classmethod
def tearDownClass(cls):
del os.environ['S3_USE_SIGV4']
if __name__ == '__main__':
unittest.main()

View File

@ -14,6 +14,7 @@
# limitations under the License.
import unittest
import os
from swift3.test.functional.s3_test_client import Connection
from swift3.test.functional.utils import get_error_code
@ -313,5 +314,18 @@ class TestSwift3Bucket(Swift3FunctionalTestCase):
status, headers, body = self.conn.make_request('DELETE', 'bucket')
self.assertEquals(get_error_code(body), 'NoSuchBucket')
@unittest.skipIf(os.environ['AUTH'] == 'tempauth',
'v4 is supported only in keystone')
class TestSwift3BucketSigV4(TestSwift3Bucket):
@classmethod
def setUpClass(cls):
os.environ['S3_USE_SIGV4'] = "True"
@classmethod
def tearDownClass(cls):
del os.environ['S3_USE_SIGV4']
if __name__ == '__main__':
unittest.main()

View File

@ -13,6 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import os
from swift3.test.functional.utils import calculate_md5, get_error_code
from swift3.etree import fromstring, tostring, Element, SubElement
from swift3.controllers.multi_delete import MAX_MULTI_DELETE_BODY_SIZE
@ -215,3 +218,19 @@ class TestSwift3MultiDelete(Swift3FunctionalTestCase):
elem = fromstring(body, 'DeleteResult')
resp_objects = elem.findall('Deleted')
self.assertEquals(len(resp_objects), 1)
@unittest.skipIf(os.environ['AUTH'] == 'tempauth',
'v4 is supported only in keystone')
class TestSwift3MultiDeleteSigV4(TestSwift3MultiDelete):
@classmethod
def setUpClass(cls):
os.environ['S3_USE_SIGV4'] = "True"
@classmethod
def tearDownClass(cls):
del os.environ['S3_USE_SIGV4']
if __name__ == '__main__':
unittest.main()

View File

@ -14,13 +14,20 @@
# limitations under the License.
import unittest
import os
import boto
# For an issue with venv and distutils, disable pylint message here
# pylint: disable-msg=E0611,F0401
from distutils.version import StrictVersion
from hashlib import md5
from itertools import izip
from swift3.test.functional.utils import get_error_code, get_error_msg
from swift3.etree import fromstring, tostring, Element, SubElement
from swift3.test.functional import Swift3FunctionalTestCase
from swift3.test.functional.utils import mktime
from swift3.utils import mktime
from swift3.test.functional.s3_test_client import Connection
MIN_SEGMENT_SIZE = 5242880
@ -182,39 +189,8 @@ class TestSwift3MultiUpload(Swift3FunctionalTestCase):
self.assertTrue('etag' not in headers)
elem = fromstring(body, 'CopyPartResult')
last_modified_1 = elem.find('LastModified').text
self.assertTrue(last_modified_1 is not None)
self.assertEquals(resp_etag, etag)
# Upload Part Copy Range
key, upload_id = uploads[1]
src_bucket = 'bucket2'
src_obj = 'obj4'
src_content = 'y' * (MIN_SEGMENT_SIZE / 2) + 'z' * MIN_SEGMENT_SIZE
src_range = 'bytes=0-%d' % (MIN_SEGMENT_SIZE - 1)
etag = md5(src_content[:MIN_SEGMENT_SIZE]).hexdigest()
# prepare src obj
self.conn.make_request('PUT', src_bucket)
self.conn.make_request('PUT', src_bucket, src_obj, body=src_content)
_, headers, _ = self.conn.make_request('HEAD', src_bucket, src_obj)
self.assertCommonResponseHeaders(headers)
status, headers, body, resp_etag = \
self._upload_part_copy(src_bucket, src_obj, bucket,
key, upload_id, 2, src_range)
self.assertEquals(status, 200)
self.assertCommonResponseHeaders(headers)
self.assertTrue('content-type' in headers)
self.assertEquals(headers['content-type'], 'application/xml')
self.assertTrue('content-length' in headers)
self.assertEquals(headers['content-length'], str(len(body)))
self.assertTrue('etag' not in headers)
elem = fromstring(body, 'CopyPartResult')
last_modified_2 = elem.find('LastModified').text
self.assertTrue(last_modified_2 is not None)
last_modified = elem.find('LastModified').text
self.assertTrue(last_modified is not None)
self.assertEquals(resp_etag, etag)
@ -231,14 +207,10 @@ class TestSwift3MultiUpload(Swift3FunctionalTestCase):
for p in elem.iterfind('Part')]
self.assertEquals(
last_modified_gets[0].rsplit('.', 1)[0],
last_modified_1.rsplit('.', 1)[0],
'%r != %r' % (last_modified_gets[0], last_modified_1))
self.assertEquals(
last_modified_gets[1].rsplit('.', 1)[0],
last_modified_2.rsplit('.', 1)[0],
'%r != %r' % (last_modified_gets[1], last_modified_2))
last_modified.rsplit('.', 1)[0],
'%r != %r' % (last_modified_gets[0], last_modified))
# There should be *exactly* two parts in the result
self.assertEqual([], last_modified_gets[2:])
self.assertEqual(1, len(last_modified_gets))
# List Parts
key, upload_id = uploads[0]
@ -573,6 +545,114 @@ class TestSwift3MultiUpload(Swift3FunctionalTestCase):
query=query)
self.assertEquals(status, 200)
def test_object_multi_upload_part_copy_range(self):
bucket = 'bucket'
keys = ['obj1']
uploads = []
results_generator = self._initiate_multi_uploads_result_generator(
bucket, keys)
# Initiate Multipart Upload
for expected_key, (status, headers, body) in \
izip(keys, results_generator):
self.assertEquals(status, 200)
self.assertCommonResponseHeaders(headers)
self.assertTrue('content-type' in headers)
self.assertEquals(headers['content-type'], 'application/xml')
self.assertTrue('content-length' in headers)
self.assertEquals(headers['content-length'], str(len(body)))
elem = fromstring(body, 'InitiateMultipartUploadResult')
self.assertEquals(elem.find('Bucket').text, bucket)
key = elem.find('Key').text
self.assertEquals(expected_key, key)
upload_id = elem.find('UploadId').text
self.assertTrue(upload_id is not None)
self.assertTrue((key, upload_id) not in uploads)
uploads.append((key, upload_id))
self.assertEquals(len(uploads), len(keys)) # sanity
# Upload Part Copy Range
key, upload_id = uploads[0]
src_bucket = 'bucket2'
src_obj = 'obj4'
src_content = 'y' * (MIN_SEGMENT_SIZE / 2) + 'z' * MIN_SEGMENT_SIZE
src_range = 'bytes=0-%d' % (MIN_SEGMENT_SIZE - 1)
etag = md5(src_content[:MIN_SEGMENT_SIZE]).hexdigest()
# prepare src obj
self.conn.make_request('PUT', src_bucket)
self.conn.make_request('PUT', src_bucket, src_obj, body=src_content)
_, headers, _ = self.conn.make_request('HEAD', src_bucket, src_obj)
self.assertCommonResponseHeaders(headers)
status, headers, body, resp_etag = \
self._upload_part_copy(src_bucket, src_obj, bucket,
key, upload_id, 1, src_range)
self.assertEquals(status, 200)
self.assertCommonResponseHeaders(headers)
self.assertTrue('content-type' in headers)
self.assertEquals(headers['content-type'], 'application/xml')
self.assertTrue('content-length' in headers)
self.assertEquals(headers['content-length'], str(len(body)))
self.assertTrue('etag' not in headers)
elem = fromstring(body, 'CopyPartResult')
last_modified = elem.find('LastModified').text
self.assertTrue(last_modified is not None)
self.assertEquals(resp_etag, etag)
# Check last-modified timestamp
key, upload_id = uploads[0]
query = 'uploadId=%s' % upload_id
status, headers, body = \
self.conn.make_request('GET', bucket, key, query=query)
elem = fromstring(body, 'ListPartsResult')
# FIXME: COPY result drops mili/microseconds but GET doesn't
last_modified_gets = [p.find('LastModified').text
for p in elem.iterfind('Part')]
self.assertEquals(
last_modified_gets[0].rsplit('.', 1)[0],
last_modified.rsplit('.', 1)[0],
'%r != %r' % (last_modified_gets[0], last_modified))
# There should be *exactly* one parts in the result
self.assertEqual(1, len(last_modified_gets))
# Abort Multipart Upload
key, upload_id = uploads[0]
query = 'uploadId=%s' % upload_id
status, headers, body = \
self.conn.make_request('DELETE', bucket, key, query=query)
# sanities
self.assertEquals(status, 204)
self.assertCommonResponseHeaders(headers)
self.assertTrue('content-type' in headers)
self.assertEquals(headers['content-type'], 'text/html; charset=UTF-8')
self.assertTrue('content-length' in headers)
self.assertEquals(headers['content-length'], '0')
@unittest.skipIf(os.environ['AUTH'] == 'tempauth',
'v4 is supported only in keystone')
class TestSwift3MultiUploadSigV4(TestSwift3MultiUpload):
@classmethod
def setUpClass(cls):
os.environ['S3_USE_SIGV4'] = "True"
@classmethod
def tearDownClass(cls):
del os.environ['S3_USE_SIGV4']
def test_object_multi_upload_part_copy_range(self):
if StrictVersion(boto.__version__) < StrictVersion('3.0'):
self.skipTest('This stuff got the issue of boto<=2.x')
def test_delete_bucket_multi_upload_object_exisiting(self):
bucket = 'bucket'
keys = ['obj1']

View File

@ -14,6 +14,12 @@
# limitations under the License.
import unittest
import os
import boto
# For an issue with venv and distutils, disable pylint message here
# pylint: disable-msg=E0611,F0401
from distutils.version import StrictVersion
from email.utils import formatdate, parsedate
from time import mktime
@ -768,5 +774,47 @@ class TestSwift3Object(Swift3FunctionalTestCase):
self.assertEquals(status, 200)
self.assertCommonResponseHeaders(headers)
@unittest.skipIf(os.environ['AUTH'] == 'tempauth',
'v4 is supported only in keystone')
class TestSwift3ObjectSigV4(TestSwift3Object):
@classmethod
def setUpClass(cls):
os.environ['S3_USE_SIGV4'] = "True"
@classmethod
def tearDownClass(cls):
del os.environ['S3_USE_SIGV4']
@unittest.skipIf(StrictVersion(boto.__version__) < StrictVersion('3.0'),
'This stuff got the signing issue of boto<=2.x')
def test_put_object_metadata(self):
super(TestSwift3ObjectSigV4, self).test_put_object_metadata()
@unittest.skipIf(StrictVersion(boto.__version__) < StrictVersion('3.0'),
'This stuff got the signing issue of boto<=2.x')
def test_put_object_copy_source_if_modified_since(self):
super(TestSwift3ObjectSigV4, self).\
test_put_object_copy_source_if_modified_since()
@unittest.skipIf(StrictVersion(boto.__version__) < StrictVersion('3.0'),
'This stuff got the signing issue of boto<=2.x')
def test_put_object_copy_source_if_unmodified_since(self):
super(TestSwift3ObjectSigV4, self).\
test_put_object_copy_source_if_unmodified_since()
@unittest.skipIf(StrictVersion(boto.__version__) < StrictVersion('3.0'),
'This stuff got the signing issue of boto<=2.x')
def test_put_object_copy_source_if_match(self):
super(TestSwift3ObjectSigV4,
self).test_put_object_copy_source_if_match()
@unittest.skipIf(StrictVersion(boto.__version__) < StrictVersion('3.0'),
'This stuff got the signing issue of boto<=2.x')
def test_put_object_copy_source_if_none_match(self):
super(TestSwift3ObjectSigV4,
self).test_put_object_copy_source_if_none_match()
if __name__ == '__main__':
unittest.main()

View File

@ -14,6 +14,7 @@
# limitations under the License.
import unittest
import os
from swift3.test.functional.s3_test_client import Connection
from swift3.test.functional.utils import get_error_code
@ -71,5 +72,18 @@ class TestSwift3Service(Swift3FunctionalTestCase):
self.assertIn('AWS authentication requires a valid Date '
'or x-amz-date header', body)
@unittest.skipIf(os.environ['AUTH'] == 'tempauth',
'v4 is supported only in keystone')
class TestSwift3ServiceSigV4(TestSwift3Service):
@classmethod
def setUpClass(cls):
os.environ['S3_USE_SIGV4'] = "True"
@classmethod
def tearDownClass(cls):
del os.environ['S3_USE_SIGV4']
if __name__ == '__main__':
unittest.main()

View File

@ -16,9 +16,6 @@
from hashlib import md5
from swift3.etree import fromstring
import time
from email.utils import parsedate
def get_error_code(body):
elem = fromstring(body, 'Error')
@ -32,25 +29,3 @@ def get_error_msg(body):
def calculate_md5(body):
return md5(body).digest().encode('base64').strip()
def mktime(timestamp_str):
"""
mktime creates a float instance in epoch time really like as time.mktime
the difference from time.mktime is allowing to 2 formats string for the
argumtent for the S3 testing usage.
TODO: support
:param timestamp_str: a string of timestamp formatted as
(a) RFC2822 (e.g. date header)
(b) %Y-%m-%dT%H:%M:%S (e.g. copy result)
:return : a float instance in epoch time
"""
try:
epoch_time = time.mktime(parsedate(timestamp_str))
except TypeError:
epoch_time = time.mktime(
time.strptime(timestamp_str, '%Y-%m-%dT%H:%M:%S'))
return epoch_time

View File

@ -35,14 +35,13 @@ class FakeApp(object):
For S3 requests, Swift auth middleware replaces a user name in
env['PATH_INFO'] with a valid tenant id.
E.g. '/v1/test:tester/bucket/object' will become
'/v1/AUTH_test/bucket/object'. This method emulates the behavior.
'/v1/AUTH_test/bucket/object'. This method emulates the behavior.
"""
_, authorization = env['HTTP_AUTHORIZATION'].split(' ')
tenant_user, sign = authorization.rsplit(':', 1)
tenant, user = tenant_user.rsplit(':', 1)
path = env['PATH_INFO']
env['PATH_INFO'] = path.replace(tenant_user, 'AUTH_' + tenant)
def __call__(self, env, start_response):
@ -84,6 +83,10 @@ class Swift3TestCase(unittest.TestCase):
elem = fromstring(body, 'Error')
return elem.find('./Code').text
def _get_error_message(self, body):
elem = fromstring(body, 'Error')
return elem.find('./Message').text
def _test_method_error(self, method, path, response_class, headers={}):
if not path.startswith('/'):
path = '/' + path # add a missing slash before the path
@ -101,7 +104,11 @@ class Swift3TestCase(unittest.TestCase):
return self._get_error_code(body)
def get_date_header(self):
return email.utils.formatdate(time.mktime(datetime.now().timetuple()))
# email.utils.formatdate returns utc timestamp in default
return email.utils.formatdate(time.time())
def get_v4_amz_date_header(self):
return datetime.utcnow().strftime('%Y%m%dT%H%M%SZ')
def call_app(self, req, app=None, expect_exception=False):
if app is None:

View File

@ -27,7 +27,7 @@ from swift.common.swob import Request
import swift3
from swift3.test.unit import Swift3TestCase
from swift3.request import Request as S3Request
from swift3.request import SigV4Request, Request as S3Request
from swift3.etree import fromstring
from swift3.middleware import filter_factory
from swift3.cfg import CONF
@ -74,7 +74,7 @@ class TestSwift3Middleware(Swift3TestCase):
self.assertEquals(path_info, unquote(raw_path_info))
self.assertEquals(req.path, quote(path_info))
def test_canonical_string(self):
def test_canonical_string_v2(self):
"""
The hashes here were generated by running the same requests against
boto.utils.canonical_string
@ -93,7 +93,7 @@ class TestSwift3Middleware(Swift3TestCase):
'HTTP_AUTHORIZATION': 'AWS X:Y:Z',
})
req.headers.update(headers)
return req._canonical_string()
return req._string_to_sign()
def verify(hash, path, headers):
s = canonical_string(path, headers)
@ -170,17 +170,30 @@ class TestSwift3Middleware(Swift3TestCase):
# Set expire to last 32b timestamp value
# This number can't be higher, because it breaks tests on 32b systems
expire = '2147483647' # 19 Jan 2038 03:14:07
utc_date = datetime.utcnow()
req = Request.blank('/bucket/object?Signature=X&Expires=%s&'
'AWSAccessKeyId=test:tester' % expire,
'AWSAccessKeyId=test:tester&Timestamp=%s' %
(expire, utc_date.isoformat().rsplit('.')[0]),
environ={'REQUEST_METHOD': 'GET'},
headers={'Date': self.get_date_header()})
req.headers['Date'] = datetime.utcnow()
req.content_type = 'text/plain'
status, headers, body = self.call_swift3(req)
self.assertEquals(status.split()[0], '200')
for _, _, headers in self.swift.calls_with_headers:
self.assertEquals(headers['Authorization'], 'AWS test:tester:X')
self.assertEquals(headers['Date'], expire)
def test_signed_urls_no_timestamp(self):
expire = '2147483647' # 19 Jan 2038 03:14:07
req = Request.blank('/bucket/object?Signature=X&Expires=%s&'
'AWSAccessKeyId=test:tester' % expire,
environ={'REQUEST_METHOD': 'GET'})
req.content_type = 'text/plain'
status, headers, body = self.call_swift3(req)
# Curious! But actually S3 doesn't verify any x-amz-date/date headers
# for signed_url access and it also doesn't check timestamp
self.assertEquals(status.split()[0], '200')
for _, _, headers in self.swift.calls_with_headers:
self.assertEquals(headers['Authorization'], 'AWS test:tester:X')
def test_signed_urls_invalid_expire(self):
expire = 'invalid'
@ -194,7 +207,7 @@ class TestSwift3Middleware(Swift3TestCase):
self.assertEquals(self._get_error_code(body), 'AccessDenied')
def test_signed_urls_no_sign(self):
expire = 'invalid'
expire = '2147483647' # 19 Jan 2038 03:14:07
req = Request.blank('/bucket/object?Expires=%s&'
'AWSAccessKeyId=test:tester' % expire,
environ={'REQUEST_METHOD': 'GET'},
@ -204,6 +217,102 @@ class TestSwift3Middleware(Swift3TestCase):
status, headers, body = self.call_swift3(req)
self.assertEquals(self._get_error_code(body), 'AccessDenied')
def test_signed_urls_no_access(self):
expire = '2147483647' # 19 Jan 2038 03:14:07
req = Request.blank('/bucket/object?Expires=%s&'
'AWSAccessKeyId=' % expire,
environ={'REQUEST_METHOD': 'GET'})
req.headers['Date'] = datetime.utcnow()
req.content_type = 'text/plain'
status, headers, body = self.call_swift3(req)
self.assertEquals(self._get_error_code(body), 'AccessDenied')
def test_signed_urls_v4(self):
req = Request.blank(
'/bucket/object'
'?X-Amz-Algorithm=AWS4-HMAC-SHA256'
'&X-Amz-Credential=test:tester/20T20Z/US/s3/aws4_request'
'&X-Amz-Date=%s'
'&X-Amz-Expires=1000'
'&X-Amz-SignedHeaders=host'
'&X-Amz-Signature=X' %
self.get_v4_amz_date_header(),
headers={'Date': self.get_date_header()},
environ={'REQUEST_METHOD': 'GET'})
req.content_type = 'text/plain'
status, headers, body = self.call_swift3(req)
self.assertEquals(status.split()[0], '200', body)
for _, _, headers in self.swift.calls_with_headers:
self.assertEquals('AWS test:tester:X', headers['Authorization'])
self.assertIn('X-Auth-Token', headers)
def test_signed_urls_v4_missing_x_amz_date(self):
req = Request.blank('/bucket/object'
'?X-Amz-Algorithm=AWS4-HMAC-SHA256'
'&X-Amz-Credential=test/20T20Z/US/s3/aws4_request'
'&X-Amz-Expires=1000'
'&X-Amz-SignedHeaders=host'
'&X-Amz-Signature=X',
environ={'REQUEST_METHOD': 'GET'})
req.content_type = 'text/plain'
status, headers, body = self.call_swift3(req)
self.assertEquals(self._get_error_code(body), 'AccessDenied')
def test_signed_urls_v4_invalid_algorithm(self):
req = Request.blank('/bucket/object'
'?X-Amz-Algorithm=FAKE'
'&X-Amz-Credential=test/20T20Z/US/s3/aws4_request'
'&X-Amz-Date=%s'
'&X-Amz-Expires=1000'
'&X-Amz-SignedHeaders=host'
'&X-Amz-Signature=X' %
self.get_v4_amz_date_header(),
environ={'REQUEST_METHOD': 'GET'})
req.content_type = 'text/plain'
status, headers, body = self.call_swift3(req)
self.assertEquals(self._get_error_code(body), 'InvalidArgument')
def test_signed_urls_v4_missing_signed_headers(self):
req = Request.blank('/bucket/object'
'?X-Amz-Algorithm=AWS4-HMAC-SHA256'
'&X-Amz-Credential=test/20T20Z/US/s3/aws4_request'
'&X-Amz-Date=%s'
'&X-Amz-Expires=1000'
'&X-Amz-Signature=X' %
self.get_v4_amz_date_header(),
environ={'REQUEST_METHOD': 'GET'})
req.content_type = 'text/plain'
status, headers, body = self.call_swift3(req)
self.assertEquals(self._get_error_code(body),
'AuthorizationHeaderMalformed')
def test_signed_urls_v4_invalid_credentials(self):
req = Request.blank('/bucket/object'
'?X-Amz-Algorithm=AWS4-HMAC-SHA256'
'&X-Amz-Credential=test'
'&X-Amz-Date=%s'
'&X-Amz-Expires=1000'
'&X-Amz-SignedHeaders=host'
'&X-Amz-Signature=X' %
self.get_v4_amz_date_header(),
environ={'REQUEST_METHOD': 'GET'})
req.content_type = 'text/plain'
status, headers, body = self.call_swift3(req)
self.assertEquals(self._get_error_code(body), 'AccessDenied')
def test_signed_urls_v4_missing_signature(self):
req = Request.blank('/bucket/object'
'?X-Amz-Algorithm=AWS4-HMAC-SHA256'
'&X-Amz-Credential=test/20T20Z/US/s3/aws4_request'
'&X-Amz-Date=%s'
'&X-Amz-Expires=1000'
'&X-Amz-SignedHeaders=host' %
self.get_v4_amz_date_header(),
environ={'REQUEST_METHOD': 'GET'})
req.content_type = 'text/plain'
status, headers, body = self.call_swift3(req)
self.assertEquals(self._get_error_code(body), 'AccessDenied')
def test_bucket_virtual_hosted_style(self):
req = Request.blank('/',
environ={'HTTP_HOST': 'bucket.localhost:80',
@ -447,6 +556,266 @@ class TestSwift3Middleware(Swift3TestCase):
with self.assertRaises(ValueError):
self.swift3.check_pipeline(conf)
def test_signature_v4(self):
environ = {
'REQUEST_METHOD': 'GET'}
headers = {
'Authorization':
'AWS4-HMAC-SHA256 '
'Credential=test:tester/20130524/US/s3/aws4_request, '
'SignedHeaders=host;x-amz-date,'
'Signature=X',
'X-Amz-Date': self.get_v4_amz_date_header(),
'X-Amz-Content-SHA256': '0123456789'}
req = Request.blank('/bucket/object', environ=environ, headers=headers)
req.content_type = 'text/plain'
status, headers, body = self.call_swift3(req)
self.assertEquals(status.split()[0], '200', body)
for _, _, headers in self.swift.calls_with_headers:
self.assertEquals('AWS test:tester:X', headers['Authorization'])
self.assertIn('X-Auth-Token', headers)
def test_signature_v4_no_date(self):
environ = {
'REQUEST_METHOD': 'GET'}
headers = {
'Authorization':
'AWS4-HMAC-SHA256 '
'Credential=test:tester/20130524/US/s3/aws4_request, '
'SignedHeaders=host;range;x-amz-date,'
'Signature=X',
'X-Amz-Content-SHA256': '0123456789'}
req = Request.blank('/bucket/object', environ=environ, headers=headers)
req.content_type = 'text/plain'
status, headers, body = self.call_swift3(req)
self.assertEquals(status.split()[0], '403')
self.assertEquals(self._get_error_code(body), 'AccessDenied')
def test_signature_v4_no_payload(self):
environ = {
'REQUEST_METHOD': 'GET'}
headers = {
'Authorization':
'AWS4-HMAC-SHA256 '
'Credential=test:tester/20130524/US/s3/aws4_request, '
'SignedHeaders=host;x-amz-date,'
'Signature=X',
'X-Amz-Date': self.get_v4_amz_date_header()}
req = Request.blank('/bucket/object', environ=environ, headers=headers)
req.content_type = 'text/plain'
status, headers, body = self.call_swift3(req)
self.assertEquals(status.split()[0], '400')
self.assertEquals(self._get_error_code(body), 'InvalidRequest')
self.assertEquals(
self._get_error_message(body),
'Missing required header for this request: x-amz-content-sha256')
def test_signature_v4_bad_authorization_string(self):
def test(auth_str, error, msg):
environ = {
'REQUEST_METHOD': 'GET'}
headers = {
'Authorization': auth_str,
'X-Amz-Date': self.get_v4_amz_date_header(),
'X-Amz-Content-SHA256': '0123456789'}
req = Request.blank('/bucket/object', environ=environ,
headers=headers)
req.content_type = 'text/plain'
status, headers, body = self.call_swift3(req)
self.assertEquals(self._get_error_code(body), error)
self.assertEquals(self._get_error_message(body), msg)
auth_str = ('AWS4-HMAC-SHA256 '
'SignedHeaders=host;x-amz-date,'
'Signature=X')
test(auth_str, 'AccessDenied', 'Access Denied.')
auth_str = ('AWS4-HMAC-SHA256 '
'Credential=test:tester/20130524/US/s3/aws4_request, '
'Signature=X')
test(auth_str, 'AuthorizationHeaderMalformed',
'The authorization header is malformed; the authorization '
'header requires three components: Credential, SignedHeaders, '
'and Signature.')
auth_str = ('AWS4-HMAC-SHA256 '
'Credential=test:tester/20130524/US/s3/aws4_request, '
'SignedHeaders=host;x-amz-date')
test(auth_str, 'AccessDenied', 'Access Denied.')
def test_canonical_string_v4(self):
def canonical_string(path, environ):
if '?' in path:
path, query_string = path.split('?', 1)
else:
query_string = ''
env = {
'REQUEST_METHOD': 'GET',
'PATH_INFO': path,
'QUERY_STRING': query_string,
'HTTP_DATE': 'Mon, 09 Sep 2011 23:36:00 GMT',
'HTTP_X_AMZ_CONTENT_SHA256': (
'e3b0c44298fc1c149afbf4c8996fb924'
'27ae41e4649b934ca495991b7852b855')
}
env.update(environ)
with patch('swift3.request.Request._validate_headers'):
req = SigV4Request(env)
return req._string_to_sign()
def verify(hash_val, path, environ):
s = canonical_string(path, environ)
s = s.split('\n')[3]
self.assertEquals(hash_val, s)
# all next data got from aws4_testsuite from Amazon
# http://docs.aws.amazon.com/general/latest/gr/samples
# /aws4_testsuite.zip
# Each *expected* hash value is the 4th line in <test-name>.sts in the
# test suite.
# get-vanilla
env = {
'HTTP_AUTHORIZATION': (
'AWS4-HMAC-SHA256 '
'Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, '
'SignedHeaders=date;host, Signature=X'),
'HTTP_HOST': 'host.foo.com'}
verify('366b91fb121d72a00f46bbe8d395f53a'
'102b06dfb7e79636515208ed3fa606b1',
'/', env)
# get-header-value-trim
env = {
'REQUEST_METHOD': 'POST',
'HTTP_AUTHORIZATION': (
'AWS4-HMAC-SHA256 '
'Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, '
'SignedHeaders=date;host;p, Signature=X'),
'HTTP_HOST': 'host.foo.com',
'HTTP_P': 'phfft'}
verify('dddd1902add08da1ac94782b05f9278c'
'08dc7468db178a84f8950d93b30b1f35',
'/', env)
# get-utf8 (not exact)
env = {
'HTTP_AUTHORIZATION': (
'AWS4-HMAC-SHA256 '
'Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, '
'SignedHeaders=date;host, Signature=X'),
'HTTP_HOST': 'host.foo.com',
'RAW_PATH_INFO': '/%E1%88%B4'}
# This might look weird because actually S3 doesn't care about utf-8
# encoded multi-byte bucket name from bucket-in-host name constraint.
# However, aws4_testsuite has only a sample hash with utf-8 *bucket*
# name to make sure the correctness (probably it can be used in other
# aws resource except s3) so, to test also utf-8, skip the bucket name
# validation in the following test.
# NOTE: eventlet's PATH_INFO is unquoted
with patch('swift3.request.validate_bucket_name'):
verify('27ba31df5dbc6e063d8f87d62eb07143'
'f7f271c5330a917840586ac1c85b6f6b',
unquote('/%E1%88%B4'), env)
# get-vanilla-query-order-key
env = {
'HTTP_AUTHORIZATION': (
'AWS4-HMAC-SHA256 '
'Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, '
'SignedHeaders=date;host, Signature=X'),
'HTTP_HOST': 'host.foo.com'}
verify('2f23d14fe13caebf6dfda346285c6d9c'
'14f49eaca8f5ec55c627dd7404f7a727',
'/?a=foo&b=foo', env)
# post-header-value-case
env = {
'REQUEST_METHOD': 'POST',
'HTTP_AUTHORIZATION': (
'AWS4-HMAC-SHA256 '
'Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, '
'SignedHeaders=date;host;zoo, Signature=X'),
'HTTP_HOST': 'host.foo.com',
'HTTP_ZOO': 'ZOOBAR'}
verify('3aae6d8274b8c03e2cc96fc7d6bda4b9'
'bd7a0a184309344470b2c96953e124aa',
'/', env)
# post-x-www-form-urlencoded-parameters
env = {
'REQUEST_METHOD': 'POST',
'HTTP_AUTHORIZATION': (
'AWS4-HMAC-SHA256 '
'Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, '
'SignedHeaders=date;host;content-type, Signature=X'),
'HTTP_HOST': 'host.foo.com',
'HTTP_X_AMZ_CONTENT_SHA256':
'3ba8907e7a252327488df390ed517c45'
'b96dead033600219bdca7107d1d3f88a',
'CONTENT_TYPE':
'application/x-www-form-urlencoded; charset=utf8'}
verify('c4115f9e54b5cecf192b1eaa23b8e88e'
'd8dc5391bd4fde7b3fff3d9c9fe0af1f',
'/', env)
# post-x-www-form-urlencoded
env = {
'REQUEST_METHOD': 'POST',
'HTTP_AUTHORIZATION': (
'AWS4-HMAC-SHA256 '
'Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, '
'SignedHeaders=date;host;content-type, Signature=X'),
'HTTP_HOST': 'host.foo.com',
'HTTP_X_AMZ_CONTENT_SHA256':
'3ba8907e7a252327488df390ed517c45'
'b96dead033600219bdca7107d1d3f88a',
'CONTENT_TYPE':
'application/x-www-form-urlencoded'}
verify('4c5c6e4b52fb5fb947a8733982a8a5a6'
'1b14f04345cbfe6e739236c76dd48f74',
'/', env)
def test_mixture_param_v4(self):
# now we have an Authorization header
headers = {
'Authorization':
'AWS4-HMAC-SHA256 '
'Credential=test/20130524/US/s3/aws4_request_A, '
'SignedHeaders=hostA;rangeA;x-amz-dateA,'
'Signature=X',
'X-Amz-Date': self.get_v4_amz_date_header(),
'X-Amz-Content-SHA256': '0123456789'}
# and then, different auth info (Credential, SignedHeaders, Signature)
# in query
req = Request.blank('/bucket/object'
'?X-Amz-Algorithm=AWS4-HMAC-SHA256'
'&X-Amz-Credential=test/20T20Z/US/s3/aws4_requestB'
'&X-Amz-SignedHeaders=hostB'
'&X-Amz-Signature=Y',
environ={'REQUEST_METHOD': 'GET'},
headers=headers)
req.content_type = 'text/plain'
status, headers, body = self.call_swift3(req)
# FIXME: should this failed as 400 or pass via query auth?
# for now, 403 forbbiden for safety
self.assertEquals(status.split()[0], '403', body)
# But if we are missing Signature in query param
req = Request.blank('/bucket/object'
'?X-Amz-Algorithm=AWS4-HMAC-SHA256'
'&X-Amz-Credential=test/20T20Z/US/s3/aws4_requestB'
'&X-Amz-SignedHeaders=hostB',
environ={'REQUEST_METHOD': 'GET'},
headers=headers)
req.content_type = 'text/plain'
status, headers, body = self.call_swift3(req)
self.assertEquals(status.split()[0], '403', body)
if __name__ == '__main__':
unittest.main()

View File

@ -29,7 +29,7 @@ from swift3.subresource import Owner, Grant, User, ACL, encode_acl, \
decode_acl, ACLPublicRead
from swift3.test.unit.test_s3_acl import s3acl
from swift3.cfg import CONF
from swift3.utils import sysmeta_header
from swift3.utils import sysmeta_header, mktime, S3Timestamp
from swift3.request import MAX_32BIT_INT
xml = '<CompleteMultipartUpload>' \
@ -634,6 +634,21 @@ class TestSwift3MultiUpload(Swift3TestCase):
self.assertEquals(headers.get('X-Object-Meta-Foo'), 'bar')
self.assertEquals(headers.get('Content-Type'), 'baz/quux')
def test_object_multipart_upload_complete_weird_host_name(self):
# This happens via boto signature v4
req = Request.blank('/bucket/object?uploadId=X',
environ={'REQUEST_METHOD': 'POST',
'HTTP_HOST': 'localhost:8080:8080'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header(), },
body=xml)
status, headers, body = self.call_swift3(req)
fromstring(body, 'CompleteMultipartUploadResult')
self.assertEquals(status.split()[0], '200')
_, _, headers = self.swift.calls_with_headers[-2]
self.assertEquals(headers.get('X-Object-Meta-Foo'), 'bar')
def test_object_multipart_upload_complete_segment_too_small(self):
msgs = [
# pre-2.6.0 swift
@ -1230,7 +1245,8 @@ class TestSwift3MultiUpload(Swift3TestCase):
def _test_copy_for_s3acl(self, account, src_permission=None,
src_path='/src_bucket/src_obj', src_headers=None,
head_resp=swob.HTTPOk, put_header=None):
head_resp=swob.HTTPOk, put_header=None,
timestamp=None):
owner = 'test:tester'
grants = [Grant(User(account), src_permission)] \
if src_permission else [Grant(User(owner), 'FULL_CONTROL')]
@ -1248,14 +1264,18 @@ class TestSwift3MultiUpload(Swift3TestCase):
'/bucket/object?partNumber=1&uploadId=X',
environ={'REQUEST_METHOD': 'PUT'},
headers=put_headers)
with patch('swift3.utils.time.time', return_value=1396353600.592270):
timestamp = timestamp or time.time()
with patch('swift3.utils.time.time', return_value=timestamp):
return self.call_swift3(req)
@s3acl
def test_upload_part_copy(self):
last_modified = '2014-04-01T12:00:00.000Z'
status, headers, body = \
self._test_copy_for_s3acl('test:tester')
date_header = self.get_date_header()
timestamp = mktime(date_header)
last_modified = S3Timestamp(timestamp).s3xmlformat
status, headers, body = self._test_copy_for_s3acl(
'test:tester', put_header={'Date': date_header},
timestamp=timestamp)
self.assertEquals(status.split()[0], '200')
self.assertEquals(headers['Content-Type'], 'application/xml')
self.assertTrue(headers.get('etag') is None)

View File

@ -28,6 +28,7 @@ from swift3.test.unit import Swift3TestCase
from swift3.test.unit.test_s3_acl import s3acl
from swift3.subresource import ACL, User, encode_acl, Owner, Grant
from swift3.etree import fromstring
from swift3.utils import mktime, S3Timestamp
from swift3.test.unit.helpers import FakeSwift
@ -288,9 +289,9 @@ class TestSwift3Obj(Swift3TestCase):
self._test_object_GETorHEAD('GET')
_, _, headers = self.swift.calls_with_headers[-1]
self.assertTrue('Authorization' not in headers)
self.assertNotIn('Authorization', headers)
_, _, headers = self.swift.calls_with_headers[0]
self.assertTrue('Authorization' not in headers)
self.assertNotIn('Authorization', headers)
@s3acl
def test_object_GET_Range(self):
@ -466,7 +467,7 @@ class TestSwift3Obj(Swift3TestCase):
self.assertEquals(headers['Content-Length'], '0')
def _test_object_PUT_copy(self, head_resp, put_header=None,
src_path='/some/source'):
src_path='/some/source', timestamp=None):
account = 'test:tester'
grants = [Grant(User(account), 'FULL_CONTROL')]
head_headers = \
@ -476,9 +477,10 @@ class TestSwift3Obj(Swift3TestCase):
self.swift.register('HEAD', '/v1/AUTH_test/some/source',
head_resp, head_headers, None)
put_header = put_header or {}
return self._call_object_copy(src_path, put_header)
return self._call_object_copy(src_path, put_header, timestamp)
def _test_object_PUT_copy_self(self, head_resp, put_header=None):
def _test_object_PUT_copy_self(self, head_resp,
put_header=None, timestamp=None):
account = 'test:tester'
grants = [Grant(User(account), 'FULL_CONTROL')]
head_headers = \
@ -488,9 +490,9 @@ class TestSwift3Obj(Swift3TestCase):
self.swift.register('HEAD', '/v1/AUTH_test/bucket/object',
head_resp, head_headers, None)
put_header = put_header or {}
return self._call_object_copy('/bucket/object', put_header)
return self._call_object_copy('/bucket/object', put_header, timestamp)
def _call_object_copy(self, src_path, put_header):
def _call_object_copy(self, src_path, put_header, timestamp=None):
put_headers = {'Authorization': 'AWS test:tester:hmac',
'X-Amz-Copy-Source': src_path,
'Date': self.get_date_header()}
@ -502,13 +504,18 @@ class TestSwift3Obj(Swift3TestCase):
req.date = datetime.now()
req.content_type = 'text/plain'
with patch('swift3.utils.time.time', return_value=1396353600.000000):
timestamp = timestamp or time.time()
with patch('swift3.utils.time.time', return_value=timestamp):
return self.call_swift3(req)
@s3acl
def test_object_PUT_copy(self):
last_modified = '2014-04-01T12:00:00.000Z'
status, headers, body = self._test_object_PUT_copy(swob.HTTPOk)
date_header = self.get_date_header()
timestamp = mktime(date_header)
last_modified = S3Timestamp(timestamp).s3xmlformat
status, headers, body = self._test_object_PUT_copy(
swob.HTTPOk, put_header={'Date': date_header},
timestamp=timestamp)
self.assertEquals(status.split()[0], '200')
self.assertEquals(headers['Content-Type'], 'application/xml')
self.assertTrue(headers.get('etag') is None)
@ -523,11 +530,14 @@ class TestSwift3Obj(Swift3TestCase):
@s3acl
def test_object_PUT_copy_no_slash(self):
last_modified = '2014-04-01T12:00:00.000Z'
date_header = self.get_date_header()
timestamp = mktime(date_header)
last_modified = S3Timestamp(timestamp).s3xmlformat
# Some clients (like Boto) don't include the leading slash;
# AWS seems to tolerate this so we should, too
status, headers, body = self._test_object_PUT_copy(
swob.HTTPOk, src_path='some/source')
swob.HTTPOk, src_path='some/source',
put_header={'Date': date_header}, timestamp=timestamp)
self.assertEquals(status.split()[0], '200')
self.assertEquals(headers['Content-Type'], 'application/xml')
self.assertTrue(headers.get('etag') is None)
@ -569,10 +579,13 @@ class TestSwift3Obj(Swift3TestCase):
@s3acl
def test_object_PUT_copy_self_metadata_replace(self):
last_modified = '2014-04-01T12:00:00.000Z'
header = {'x-amz-metadata-directive': 'REPLACE'}
status, headers, body = \
self._test_object_PUT_copy_self(swob.HTTPOk, header)
date_header = self.get_date_header()
timestamp = mktime(date_header)
last_modified = S3Timestamp(timestamp).s3xmlformat
header = {'x-amz-metadata-directive': 'REPLACE',
'Date': date_header}
status, headers, body = self._test_object_PUT_copy_self(
swob.HTTPOk, header, timestamp=timestamp)
self.assertEquals(status.split()[0], '200')
self.assertEquals(headers['Content-Type'], 'application/xml')
self.assertTrue(headers.get('etag') is None)

View File

@ -20,12 +20,14 @@ import unittest
from swift.common import swob
from swift.common.swob import Request, HTTPNoContent
from swift3.utils import mktime
from swift3.subresource import ACL, User, Owner, Grant, encode_acl
from swift3.test.unit.test_middleware import Swift3TestCase
from swift3.cfg import CONF
from swift3.request import Request as S3_Request
from swift3.request import S3AclRequest
from swift3.response import InvalidArgument, NoSuchBucket, InternalError
from swift3.request import S3AclRequest, SigV4Request, SIGV4_X_AMZ_DATE_FORMAT
from swift3.response import InvalidArgument, NoSuchBucket, InternalError, \
AccessDenied, SignatureDoesNotMatch
Fake_ACL_MAP = {
@ -249,8 +251,8 @@ class TestRequest(Swift3TestCase):
m_swift_resp.return_value = FakeSwiftResponse()
s3_req = S3AclRequest(req.environ, MagicMock())
sw_req = s3_req.to_swift_req(method, container, obj)
self.assertTrue('HTTP_AUTHORIZATION' not in sw_req.environ)
self.assertTrue('Authorization' not in sw_req.headers)
self.assertNotIn('HTTP_AUTHORIZATION', sw_req.environ)
self.assertNotIn('Authorization', sw_req.headers)
self.assertEquals(sw_req.headers['X-Auth-Token'], 'token')
def test_to_swift_req_subrequest_proxy_access_log(self):
@ -371,5 +373,180 @@ class TestRequest(Swift3TestCase):
self.assertEquals(status.split()[0], '403')
self.assertEquals(body, '')
def _test_request_timestamp_sigv4(self, date_header):
# signature v4 here
environ = {
'REQUEST_METHOD': 'GET'}
if 'X-Amz-Date' in date_header:
included_header = 'x-amz-date'
elif 'Date' in date_header:
included_header = 'date'
else:
self.fail('Invalid date header specified as test')
headers = {
'Authorization':
'AWS4-HMAC-SHA256 '
'Credential=test/20130524/US/s3/aws4_request, '
'SignedHeaders=host;%s,'
'Signature=X' % included_header,
'X-Amz-Content-SHA256': '0123456789'}
headers.update(date_header)
req = Request.blank('/', environ=environ, headers=headers)
sigv4_req = SigV4Request(req.environ)
if 'X-Amz-Date' in date_header:
timestamp = mktime(
date_header['X-Amz-Date'], SIGV4_X_AMZ_DATE_FORMAT)
elif 'Date' in date_header:
timestamp = mktime(date_header['Date'])
self.assertEqual(timestamp, int(sigv4_req.timestamp))
def test_request_timestamp_sigv4(self):
access_denied_message = \
'AWS authentication requires a valid Date or x-amz-date header'
# normal X-Amz-Date header
date_header = {'X-Amz-Date': self.get_v4_amz_date_header()}
self._test_request_timestamp_sigv4(date_header)
# normal Date header
date_header = {'Date': self.get_date_header()}
self._test_request_timestamp_sigv4(date_header)
# mangled X-Amz-Date header
date_header = {'X-Amz-Date': self.get_v4_amz_date_header()[:-1]}
with self.assertRaises(AccessDenied) as cm:
self._test_request_timestamp_sigv4(date_header)
self.assertEqual('403 Forbidden', cm.exception.message)
self.assertIn(access_denied_message, cm.exception.body)
# mangled Date header
date_header = {'Date': self.get_date_header()[20:]}
with self.assertRaises(AccessDenied) as cm:
self._test_request_timestamp_sigv4(date_header)
self.assertEqual('403 Forbidden', cm.exception.message)
self.assertIn(access_denied_message, cm.exception.body)
# Negative timestamp
date_header = {'X-Amz-Date': '00160523T054055Z'}
with self.assertRaises(AccessDenied) as cm:
self._test_request_timestamp_sigv4(date_header)
self.assertEqual('403 Forbidden', cm.exception.message)
def _test_request_timestamp_sigv2(self, date_header):
# signature v4 here
environ = {
'REQUEST_METHOD': 'GET'}
headers = {'Authorization': 'AWS test:tester:hmac'}
headers.update(date_header)
req = Request.blank('/', environ=environ, headers=headers)
sigv2_req = S3_Request(req.environ)
if 'X-Amz-Date' in date_header:
timestamp = mktime(req.headers.get('X-Amz-Date'))
elif 'Date' in date_header:
timestamp = mktime(req.headers.get('Date'))
else:
self.fail('Invalid date header specified as test')
self.assertEqual(timestamp, int(sigv2_req.timestamp))
def test_request_timestamp_sigv2(self):
access_denied_message = \
'AWS authentication requires a valid Date or x-amz-date header'
# In v2 format, normal X-Amz-Date header is same
date_header = {'X-Amz-Date': self.get_date_header()}
self._test_request_timestamp_sigv2(date_header)
# normal Date header
date_header = {'Date': self.get_date_header()}
self._test_request_timestamp_sigv2(date_header)
# mangled X-Amz-Date header
date_header = {'X-Amz-Date': self.get_date_header()[:-20]}
with self.assertRaises(AccessDenied) as cm:
self._test_request_timestamp_sigv2(date_header)
self.assertEqual('403 Forbidden', cm.exception.message)
self.assertIn(access_denied_message, cm.exception.body)
# mangled Date header
date_header = {'Date': self.get_date_header()[:-20]}
with self.assertRaises(AccessDenied) as cm:
self._test_request_timestamp_sigv2(date_header)
self.assertEqual('403 Forbidden', cm.exception.message)
self.assertIn(access_denied_message, cm.exception.body)
def test_headers_to_sign_sigv4(self):
environ = {
'REQUEST_METHOD': 'GET'}
# host and x-amz-date
x_amz_date = self.get_v4_amz_date_header()
headers = {
'Authorization':
'AWS4-HMAC-SHA256 '
'Credential=test/20130524/US/s3/aws4_request, '
'SignedHeaders=host;x-amz-content-sha256;x-amz-date,'
'Signature=X',
'X-Amz-Content-SHA256': '0123456789',
'Date': self.get_date_header(),
'X-Amz-Date': x_amz_date}
req = Request.blank('/', environ=environ, headers=headers)
sigv4_req = SigV4Request(req.environ)
headers_to_sign = sigv4_req._headers_to_sign()
self.assertEqual(['host', 'x-amz-content-sha256', 'x-amz-date'],
sorted(headers_to_sign.keys()))
self.assertEqual(headers_to_sign['host'], 'localhost:80')
self.assertEqual(headers_to_sign['x-amz-date'], x_amz_date)
self.assertEqual(headers_to_sign['x-amz-content-sha256'], '0123456789')
# no x-amz-date
headers = {
'Authorization':
'AWS4-HMAC-SHA256 '
'Credential=test/20130524/US/s3/aws4_request, '
'SignedHeaders=host;x-amz-content-sha256,'
'Signature=X',
'X-Amz-Content-SHA256': '0123456789',
'Date': self.get_date_header()}
req = Request.blank('/', environ=environ, headers=headers)
sigv4_req = SigV4Request(req.environ)
headers_to_sign = sigv4_req._headers_to_sign()
self.assertEqual(['host', 'x-amz-content-sha256'],
sorted(headers_to_sign.keys()))
self.assertEqual(headers_to_sign['host'], 'localhost:80')
self.assertEqual(headers_to_sign['x-amz-content-sha256'], '0123456789')
# SignedHeaders says, host and x-amz-date included but there is not
# X-Amz-Date header
headers = {
'Authorization':
'AWS4-HMAC-SHA256 '
'Credential=test/20130524/US/s3/aws4_request, '
'SignedHeaders=host;x-amz-content-sha256;x-amz-date,'
'Signature=X',
'X-Amz-Content-SHA256': '0123456789',
'Date': self.get_date_header()}
req = Request.blank('/', environ=environ, headers=headers)
with self.assertRaises(SignatureDoesNotMatch):
sigv4_req = SigV4Request(req.environ)
sigv4_req._headers_to_sign()
if __name__ == '__main__':
unittest.main()

View File

@ -18,8 +18,8 @@ import uuid
import base64
import time
from swift.common.utils import get_logger
import email.utils
# Need for check_path_header
from swift.common import utils
@ -143,6 +143,41 @@ class S3Timestamp(utils.Timestamp):
def s3xmlformat(self):
return self.isoformat[:-7] + '.000Z'
@property
def amz_date_format(self):
"""
this format should be like 'YYYYMMDDThhmmssZ'
"""
return self.isoformat.replace(
'-', '').replace(':', '')[:-7] + 'Z'
@classmethod
def now(cls):
return cls(time.time())
def mktime(timestamp_str, time_format='%Y-%m-%dT%H:%M:%S'):
"""
mktime creates a float instance in epoch time really like as time.mktime
the difference from time.mktime is allowing to 2 formats string for the
argumtent for the S3 testing usage.
TODO: support
:param timestamp_str: a string of timestamp formatted as
(a) RFC2822 (e.g. date header)
(b) %Y-%m-%dT%H:%M:%S (e.g. copy result)
:param time_format: a string of format to parase in (b) process
:return : a float instance in epoch time
"""
try:
epoch_time = email.utils.mktime_tz(
email.utils.parsedate_tz(timestamp_str))
except TypeError:
time_tuple = time.strptime(timestamp_str, time_format)
# add timezone info as utc (no time difference)
time_tuple += (0, )
epoch_time = email.utils.mktime_tz(time_tuple)
return epoch_time