Merge "Add support for generating form-post signatures"

This commit is contained in:
Zuul 2019-04-03 16:23:40 +00:00 committed by Gerrit Code Review
commit 1e3e5a3892
5 changed files with 308 additions and 2 deletions

View File

@ -9,10 +9,15 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import os
from hashlib import sha1
import hmac
import json
import os
import time
import six
from six.moves.urllib import parse
from openstack.object_store.v1 import account as _account
from openstack.object_store.v1 import container as _container
@ -639,3 +644,117 @@ class Proxy(proxy.Proxy):
include metadata about maximum values and thresholds.
"""
return self._get(_info.Info)
def set_account_temp_url_key(self, key, secondary=False):
"""Set the temporary URL key for the account.
:param key:
Text of the key to use.
:param bool secondary:
Whether this should set the secondary key. (defaults to False)
"""
header = 'Temp-URL-Key'
if secondary:
header += '-2'
return self.set_account_metadata(**{header: key})
def set_container_temp_url_key(self, container, key, secondary=False):
"""Set the temporary URL key for a container.
:param container:
The value can be the name of a container or a
:class:`~openstack.object_store.v1.container.Container` instance.
:param key:
Text of the key to use.
:param bool secondary:
Whether this should set the secondary key. (defaults to False)
"""
header = 'Temp-URL-Key'
if secondary:
header += '-2'
return self.set_container_metadata(container, **{header: key})
def get_temp_url_key(self, container=None):
"""Get the best temporary url key for a given container.
Will first try to return Temp-URL-Key-2 then Temp-URL-Key for
the container, and if neither exist, will attempt to return
Temp-URL-Key-2 then Temp-URL-Key for the account. If neither
exist, will return None.
:param container:
The value can be the name of a container or a
:class:`~openstack.object_store.v1.container.Container` instance.
"""
temp_url_key = None
if container:
container_meta = self.get_container_metadata(container)
temp_url_key = (container_meta.meta_temp_url_key_2
or container_meta.meta_temp_url_key)
if not temp_url_key:
account_meta = self.get_account_metadata()
temp_url_key = (account_meta.meta_temp_url_key_2
or account_meta.meta_temp_url_key)
if temp_url_key and not isinstance(temp_url_key, six.binary_type):
temp_url_key = temp_url_key.encode('utf8')
return temp_url_key
def generate_form_signature(
self, container, object_prefix, redirect_url, max_file_size,
max_upload_count, timeout, temp_url_key=None):
"""Generate a signature for a FormPost upload.
:param container:
The value can be the name of a container or a
:class:`~openstack.object_store.v1.container.Container` instance.
:param object_prefix:
Prefix to apply to limit all object names created using this
signature.
:param redirect_url:
The URL to redirect the browser to after the uploads have
completed.
:param max_file_size:
The maximum file size per file uploaded.
:param max_upload_count:
The maximum number of uploaded files allowed.
:param timeout:
The number of seconds from now to allow the form post to begin.
:param temp_url_key:
The X-Account-Meta-Temp-URL-Key for the account. Optional, if
omitted, the key will be fetched from the container or the account.
"""
max_file_size = int(max_file_size)
if max_file_size < 1:
raise exceptions.SDKException(
'Please use a positive max_file_size value.')
max_upload_count = int(max_upload_count)
if max_upload_count < 1:
raise exceptions.SDKException(
'Please use a positive max_upload_count value.')
if timeout < 1:
raise exceptions.SDKException(
'Please use a positive <timeout> value.')
expires = int(time.time() + int(timeout))
if temp_url_key:
if not isinstance(temp_url_key, six.binary_type):
temp_url_key = temp_url_key.encode('utf8')
else:
temp_url_key = self.get_temp_url_key(container)
if not temp_url_key:
raise exceptions.SDKException(
'temp_url_key was not given, nor was a temporary url key'
' found for the account or the container.')
res = self._get_resource(_container.Container, container)
endpoint = parse.urlparse(self.get_endpoint())
path = '/'.join([endpoint.path, res.name, object_prefix])
data = '%s\n%s\n%s\n%s\n%s' % (path, redirect_url, max_file_size,
max_upload_count, expires)
if six.PY3:
data = data.encode('utf8')
sig = hmac.new(temp_url_key, data, sha1).hexdigest()
return (expires, sig)

View File

@ -98,6 +98,12 @@ class Container(_base.BaseResource):
#: "If-None-Match: \*" header to query whether the server already
#: has a copy of the object before any data is sent.
if_none_match = resource.Header("if-none-match")
#: The secret key value for temporary URLs. If not set,
#: this header is not returned by this operation.
meta_temp_url_key = resource.Header("x-container-meta-temp-url-key")
#: A second secret key value for temporary URLs. If not set,
#: this header is not returned by this operation.
meta_temp_url_key_2 = resource.Header("x-container-meta-temp-url-key-2")
@classmethod
def new(cls, **kwargs):

View File

@ -14,11 +14,13 @@
import tempfile
import mock
import testtools
import openstack.cloud
import openstack.cloud.openstackcloud as oc_oc
from openstack.cloud import exc
from openstack import exceptions
from openstack.tests.unit import base
from openstack.object_store.v1 import _proxy
@ -264,6 +266,178 @@ class TestObject(BaseTestObject):
exc.OpenStackCloudException, self.cloud.list_containers)
self.assert_calls()
@mock.patch('time.time', autospec=True)
def test_generate_form_signature_container_key(self, mock_time):
mock_time.return_value = 12345
self.register_uris([
dict(method='HEAD', uri=self.container_endpoint,
headers={
'Content-Length': '0',
'X-Container-Object-Count': '0',
'Accept-Ranges': 'bytes',
'X-Storage-Policy': 'Policy-0',
'Date': 'Fri, 16 Dec 2016 18:29:05 GMT',
'X-Timestamp': '1481912480.41664',
'X-Trans-Id': 'tx60ec128d9dbf44b9add68-0058543271dfw1',
'X-Container-Bytes-Used': '0',
'X-Container-Meta-Temp-Url-Key': 'amazingly-secure-key',
'Content-Type': 'text/plain; charset=utf-8'})
])
self.assertEqual(
(13345, '60731fb66d46c97cdcb79b6154363179c500b9d9'),
self.cloud.object_store.generate_form_signature(
self.container,
object_prefix='prefix/location',
redirect_url='https://example.com/location',
max_file_size=1024 * 1024 * 1024,
max_upload_count=10, timeout=1000, temp_url_key=None))
self.assert_calls()
@mock.patch('time.time', autospec=True)
def test_generate_form_signature_account_key(self, mock_time):
mock_time.return_value = 12345
self.register_uris([
dict(method='HEAD', uri=self.container_endpoint,
headers={
'Content-Length': '0',
'X-Container-Object-Count': '0',
'Accept-Ranges': 'bytes',
'X-Storage-Policy': 'Policy-0',
'Date': 'Fri, 16 Dec 2016 18:29:05 GMT',
'X-Timestamp': '1481912480.41664',
'X-Trans-Id': 'tx60ec128d9dbf44b9add68-0058543271dfw1',
'X-Container-Bytes-Used': '0',
'Content-Type': 'text/plain; charset=utf-8'}),
dict(method='HEAD', uri=self.endpoint + '/',
headers={
'X-Account-Meta-Temp-Url-Key': 'amazingly-secure-key'}),
])
self.assertEqual(
(13345, '3cb9bc83d5a4136421bb2c1f58b963740566646f'),
self.cloud.object_store.generate_form_signature(
self.container,
object_prefix='prefix/location',
redirect_url='https://example.com/location',
max_file_size=1024 * 1024 * 1024,
max_upload_count=10, timeout=1000, temp_url_key=None))
self.assert_calls()
@mock.patch('time.time')
def test_generate_form_signature_key_argument(self, mock_time):
mock_time.return_value = 12345
self.assertEqual(
(13345, '1c283a05c6628274b732212d9a885265e6f67b63'),
self.cloud.object_store.generate_form_signature(
self.container,
object_prefix='prefix/location',
redirect_url='https://example.com/location',
max_file_size=1024 * 1024 * 1024,
max_upload_count=10, timeout=1000,
temp_url_key='amazingly-secure-key'))
self.assert_calls()
def test_generate_form_signature_no_key(self):
self.register_uris([
dict(method='HEAD', uri=self.container_endpoint,
headers={
'Content-Length': '0',
'X-Container-Object-Count': '0',
'Accept-Ranges': 'bytes',
'X-Storage-Policy': 'Policy-0',
'Date': 'Fri, 16 Dec 2016 18:29:05 GMT',
'X-Timestamp': '1481912480.41664',
'X-Trans-Id': 'tx60ec128d9dbf44b9add68-0058543271dfw1',
'X-Container-Bytes-Used': '0',
'Content-Type': 'text/plain; charset=utf-8'}),
dict(method='HEAD', uri=self.endpoint + '/',
headers={}),
])
self.assertRaises(
exceptions.SDKException,
self.cloud.object_store.generate_form_signature,
self.container,
object_prefix='prefix/location',
redirect_url='https://example.com/location',
max_file_size=1024 * 1024 * 1024,
max_upload_count=10, timeout=1000, temp_url_key=None)
self.assert_calls()
def test_set_account_temp_url_key(self):
key = 'super-secure-key'
self.register_uris([
dict(method='POST', uri=self.endpoint + '/',
status_code=204,
validate=dict(
headers={
'x-account-meta-temp-url-key': key})),
dict(method='HEAD', uri=self.endpoint + '/',
headers={
'x-account-meta-temp-url-key': key}),
])
self.cloud.object_store.set_account_temp_url_key(key)
self.assert_calls()
def test_set_account_temp_url_key_secondary(self):
key = 'super-secure-key'
self.register_uris([
dict(method='POST', uri=self.endpoint + '/',
status_code=204,
validate=dict(
headers={
'x-account-meta-temp-url-key-2': key})),
dict(method='HEAD', uri=self.endpoint + '/',
headers={
'x-account-meta-temp-url-key-2': key}),
])
self.cloud.object_store.set_account_temp_url_key(key, secondary=True)
self.assert_calls()
def test_set_container_temp_url_key(self):
key = 'super-secure-key'
self.register_uris([
dict(method='POST', uri=self.container_endpoint,
status_code=204,
validate=dict(
headers={
'x-container-meta-temp-url-key': key})),
dict(method='HEAD', uri=self.container_endpoint,
headers={
'x-container-meta-temp-url-key': key}),
])
self.cloud.object_store.set_container_temp_url_key(self.container, key)
self.assert_calls()
def test_set_container_temp_url_key_secondary(self):
key = 'super-secure-key'
self.register_uris([
dict(method='POST', uri=self.container_endpoint,
status_code=204,
validate=dict(
headers={
'x-container-meta-temp-url-key-2': key})),
dict(method='HEAD', uri=self.container_endpoint,
headers={
'x-container-meta-temp-url-key-2': key}),
])
self.cloud.object_store.set_container_temp_url_key(
self.container, key, secondary=True)
self.assert_calls()
def test_list_objects(self):
endpoint = '{endpoint}?format=json'.format(
endpoint=self.container_endpoint)

View File

@ -184,6 +184,8 @@ class TestContainer(base.TestCase):
'read_ACL': None,
'sync_key': None,
'sync_to': None,
'meta_temp_url_key': None,
'meta_temp_url_key_2': None,
'timestamp': None,
'versions_location': None,
'write_ACL': None,

View File

@ -0,0 +1,5 @@
---
features:
- |
Added methods to manage object store temp-url keys and
generate signatures needed for FormPost middleware.