python-keystoneclient/keystoneclient/contrib/ec2/utils.py

289 lines
12 KiB
Python

# Copyright 2012 OpenStack Foundation
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2011 - 2012 Justin Santa Barbara
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import base64
import hashlib
import hmac
import re
import six
from six.moves import urllib
from keystoneclient.i18n import _
class Ec2Signer(object):
"""Utility class for EC2 signing of request.
This allows a request to be signed with an AWS style signature,
which can then be used for authentication via the keystone ec2
authentication extension.
"""
def __init__(self, secret_key):
self.secret_key = secret_key.encode()
self.hmac = hmac.new(self.secret_key, digestmod=hashlib.sha1)
if hashlib.sha256:
self.hmac_256 = hmac.new(self.secret_key, digestmod=hashlib.sha256)
def _v4_creds(self, credentials):
"""Detect if the credentials are for a v4 signed request.
Check is needed since AWS removed the SignatureVersion field from
the v4 request spec...
This expects a dict of the request headers to be passed in the
credentials dict, since the recommended way to pass v4 creds is
via the 'Authorization' header
see http://docs.aws.amazon.com/general/latest/gr/
sigv4-signed-request-examples.html
Alternatively X-Amz-Algorithm can be specified as a query parameter,
and the authentication data can also passed as query parameters.
Note a hash of the request body is also required in the credentials
for v4 auth to work in the body_hash key, calculated via:
hashlib.sha256(req.body).hexdigest()
"""
try:
auth_str = credentials['headers']['Authorization']
if auth_str.startswith('AWS4-HMAC-SHA256'):
return True
except KeyError:
# Alternatively the Authorization data can be passed via
# the query params list, check X-Amz-Algorithm=AWS4-HMAC-SHA256
try:
if (credentials['params']['X-Amz-Algorithm'] ==
'AWS4-HMAC-SHA256'):
return True
except KeyError: # nosec(cjschaef): in cases of not finding
# entries, simply return False
pass
return False
def generate(self, credentials):
"""Generate auth string according to what SignatureVersion is given."""
signature_version = credentials['params'].get('SignatureVersion')
if signature_version == '0':
return self._calc_signature_0(credentials['params'])
if signature_version == '1':
return self._calc_signature_1(credentials['params'])
if signature_version == '2':
return self._calc_signature_2(credentials['params'],
credentials['verb'],
credentials['host'],
credentials['path'])
if self._v4_creds(credentials):
return self._calc_signature_4(credentials['params'],
credentials['verb'],
credentials['host'],
credentials['path'],
credentials['headers'],
credentials['body_hash'])
if signature_version is not None:
raise Exception(_('Unknown signature version: %s') %
signature_version)
else:
raise Exception(_('Unexpected signature format'))
@staticmethod
def _get_utf8_value(value):
"""Get the UTF8-encoded version of a value."""
if not isinstance(value, (six.binary_type, six.text_type)):
value = str(value)
if isinstance(value, six.text_type):
return value.encode('utf-8')
else:
return value
def _calc_signature_0(self, params):
"""Generate AWS signature version 0 string."""
s = (params['Action'] + params['Timestamp']).encode('utf-8')
self.hmac.update(s)
return base64.b64encode(self.hmac.digest()).decode('utf-8')
def _calc_signature_1(self, params):
"""Generate AWS signature version 1 string."""
keys = list(params)
keys.sort(key=six.text_type.lower)
for key in keys:
self.hmac.update(key.encode('utf-8'))
val = self._get_utf8_value(params[key])
self.hmac.update(val)
return base64.b64encode(self.hmac.digest()).decode('utf-8')
@staticmethod
def _canonical_qs(params):
"""Construct a sorted, correctly encoded query string.
This is required for _calc_signature_2 and _calc_signature_4.
"""
keys = list(params)
keys.sort()
pairs = []
for key in keys:
val = Ec2Signer._get_utf8_value(params[key])
val = urllib.parse.quote(val, safe='-_~')
pairs.append(urllib.parse.quote(key, safe='') + '=' + val)
qs = '&'.join(pairs)
return qs
def _calc_signature_2(self, params, verb, server_string, path):
"""Generate AWS signature version 2 string."""
string_to_sign = '%s\n%s\n%s\n' % (verb, server_string, path)
if self.hmac_256:
current_hmac = self.hmac_256
params['SignatureMethod'] = 'HmacSHA256'
else:
current_hmac = self.hmac
params['SignatureMethod'] = 'HmacSHA1'
string_to_sign += self._canonical_qs(params)
current_hmac.update(string_to_sign.encode('utf-8'))
b64 = base64.b64encode(current_hmac.digest()).decode('utf-8')
return b64
def _calc_signature_4(self, params, verb, server_string, path, headers,
body_hash):
"""Generate AWS signature version 4 string."""
def sign(key, msg):
return hmac.new(key, self._get_utf8_value(msg),
hashlib.sha256).digest()
def signature_key(datestamp, region_name, service_name):
"""Signature key derivation.
See http://docs.aws.amazon.com/general/latest/gr/
signature-v4-examples.html#signature-v4-examples-python
"""
k_date = sign(self._get_utf8_value(b"AWS4" + self.secret_key),
datestamp)
k_region = sign(k_date, region_name)
k_service = sign(k_region, service_name)
k_signing = sign(k_service, "aws4_request")
return k_signing
def auth_param(param_name):
"""Get specified auth parameter.
Provided via one of:
- the Authorization header
- the X-Amz-* query parameters
"""
try:
auth_str = headers['Authorization']
param_str = auth_str.partition(
'%s=' % param_name)[2].split(',')[0]
except KeyError:
param_str = params.get('X-Amz-%s' % param_name)
return param_str
def date_param():
"""Get the X-Amz-Date' value.
The value can be either a header or parameter.
Note AWS supports parsing the Date header also, but this is not
currently supported here as it will require some format mangling
So the X-Amz-Date value must be YYYYMMDDTHHMMSSZ format, then it
can be used to match against the YYYYMMDD format provided in the
credential scope.
see:
http://docs.aws.amazon.com/general/latest/gr/
sigv4-date-handling.html
"""
try:
return headers['X-Amz-Date']
except KeyError:
return params.get('X-Amz-Date')
def canonical_header_str():
# Get the list of headers to include, from either
# - the Authorization header (SignedHeaders key)
# - the X-Amz-SignedHeaders query parameter
headers_lower = dict((k.lower().strip(), v.strip())
for (k, v) in six.iteritems(headers))
# Boto versions < 2.9.3 strip the port component of the host:port
# header, so detect the user-agent via the header and strip the
# port if we detect an old boto version. FIXME: remove when all
# distros package boto >= 2.9.3, this is a transitional workaround
user_agent = headers_lower.get('user-agent', '')
strip_port = re.match('Boto/2.[0-9].[0-2]', user_agent)
header_list = []
sh_str = auth_param('SignedHeaders')
for h in sh_str.split(';'):
if h not in headers_lower:
continue
if h == 'host' and strip_port:
header_list.append('%s:%s' %
(h, headers_lower[h].split(':')[0]))
continue
header_list.append('%s:%s' % (h, headers_lower[h]))
return '\n'.join(header_list) + '\n'
def canonical_query_str(verb, params):
# POST requests pass parameters in through the request body
canonical_qs = ''
if verb.upper() != 'POST':
canonical_qs = self._canonical_qs(params)
return canonical_qs
# Create canonical request:
# http://docs.aws.amazon.com/general/latest/gr/
# sigv4-create-canonical-request.html
# Get parameters and headers in expected string format
cr = "\n".join((verb.upper(), path,
canonical_query_str(verb, params),
canonical_header_str(),
auth_param('SignedHeaders'),
body_hash))
# Check the date, reject any request where the X-Amz-Date doesn't
# match the credential scope
credential = auth_param('Credential')
credential_split = credential.split('/')
credential_scope = '/'.join(credential_split[1:])
credential_date = credential_split[1]
param_date = date_param()
if not param_date.startswith(credential_date):
raise Exception(_('Request date mismatch error'))
# Create the string to sign
# http://docs.aws.amazon.com/general/latest/gr/
# sigv4-create-string-to-sign.html
cr = cr.encode('utf-8')
string_to_sign = '\n'.join(('AWS4-HMAC-SHA256',
param_date,
credential_scope,
hashlib.sha256(cr).hexdigest()))
# Calculate the derived key, this requires a datestamp, region
# and service, which can be extracted from the credential scope
(req_region, req_service) = credential_split[2:4]
s_key = signature_key(credential_date, req_region, req_service)
# Finally calculate the signature!
signature = hmac.new(s_key, self._get_utf8_value(string_to_sign),
hashlib.sha256).hexdigest()
return signature