Check for user container existence

Change-Id: Ib6fff87b1bc3572918e47566a330fb4f64c32d7d
This commit is contained in:
Alistair Coles 2024-04-23 14:36:43 +01:00
parent ab5b544892
commit 51eb0e75d2
2 changed files with 159 additions and 92 deletions

View File

@ -15,7 +15,7 @@
import binascii
import json
from swift.common.http import HTTP_CONFLICT, is_success
from swift.common.http import HTTP_CONFLICT, is_success, HTTP_NOT_FOUND
from swift.common.middleware.symlink import TGT_OBJ_SYMLINK_HDR, \
ALLOW_RESERVED_NAMES
from swift.common.storage_policy import POLICIES
@ -23,7 +23,8 @@ from swift.common.utils import generate_unique_id, drain_and_close, \
config_positive_int_value, reiterate
from swift.common.swob import Request, normalize_etag, \
wsgi_to_str, wsgi_quote, HTTPInternalServerError, HTTPOk, \
HTTPConflict, HTTPBadRequest, HTTPException, HTTPNotFound, HTTPNoContent
HTTPConflict, HTTPBadRequest, HTTPException, HTTPNotFound, HTTPNoContent, \
HTTPServiceUnavailable
from swift.common.utils import get_logger, Timestamp, md5, public
from swift.common.registry import register_swift_info
from swift.common.request_helpers import get_reserved_name, \
@ -164,6 +165,16 @@ class BaseMPUHandler(object):
self.container)
self.parts_container = get_reserved_name('mpu_parts', self.container)
def _check_user_container_exists(self):
info = get_container_info(self.req.environ, self.app,
swift_source=MPU_SWIFT_SOURCE)
if is_success(info['status']):
return info
elif info['status'] == HTTP_NOT_FOUND:
raise HTTPNotFound()
else:
raise HTTPServiceUnavailable()
def make_path(self, *parts):
return '/'.join(['', 'v1', self.account] + [p for p in parts])
@ -210,19 +221,9 @@ class MPUHandler(BaseMPUHandler):
* List Multipart Uploads
* Initiate Multipart Upload
"""
def handle_request(self):
if self.req.method == 'GET':
resp = self.list_uploads()
elif self.req.method == 'POST' and self.obj:
resp = self.create_upload()
else:
# TODO: should we return 405 for any unsupported container?uploads
# method? Swift typically ignores unrecognised headers and
# params, but there is a risk that the user thinks that, for
# example, DELETE container?uploads will just abort all the MPU
# sessions (whereas it might delete the container).
resp = None
return resp
def __init__(self, mw, req):
super(MPUHandler, self).__init__(mw, req)
self.user_container_info = self._check_user_container_exists()
@public
def list_uploads(self):
@ -241,9 +242,7 @@ class MPUHandler(BaseMPUHandler):
def _ensure_container_exists(self, container):
# TODO: make storage policy specific parts bucket
info = get_container_info(self.req.environ, self.app,
swift_source=MPU_SWIFT_SOURCE)
policy_name = POLICIES[info['storage_policy']].name
policy_name = POLICIES[self.user_container_info['storage_policy']].name
# container_name = wsgi_unquote(wsgi_quote(container_name))
path = self.make_path(container)
@ -305,13 +304,15 @@ class MPUSessionHandler(BaseMPUHandler):
* Complete Multipart Upload
* Upload Part and Upload Part Copy.
"""
def __init__(self, mw, req, upload_id):
def __init__(self, mw, req):
super(MPUSessionHandler, self).__init__(mw, req)
self.upload_id = upload_id
self.session_name = '/'.join([self.reserved_obj, upload_id])
self.user_container_info = self._check_user_container_exists()
self.upload_id = get_upload_id(req)
self.session_name = '/'.join([self.reserved_obj, self.upload_id])
self.session_path = self.make_path(self.sessions_container,
self.session_name)
self.session = None
self.session = self._load_session()
self.req.headers.setdefault('X-Timestamp', Timestamp.now().internal)
self.manifest_relative_path = '/'.join(
[self.manifests_container, self.reserved_obj, self.upload_id])
self.manifest_path = self.make_path(self.manifest_relative_path)
@ -342,24 +343,6 @@ class MPUSessionHandler(BaseMPUHandler):
else:
return {}
def handle_request(self):
self.session = self._load_session()
self.req.headers.setdefault('X-Timestamp', Timestamp.now().internal)
part_number = get_valid_part_num(self.req)
if self.req.method == 'PUT' and part_number > 0:
resp = self.upload_part(part_number)
elif self.req.method == 'GET':
resp = self.list_parts()
# TODO: support HEAD? return basic metadata about the upload (not
# required for s3api)
elif self.req.method == 'POST':
resp = self.complete_upload()
elif self.req.method == 'DELETE':
resp = self.abort_upload()
else:
resp = None
return resp
def upload_part(self, part_number):
part_path = self.make_path(self.parts_container, self.reserved_obj,
self.upload_id, str(part_number))
@ -630,6 +613,39 @@ class MPUMiddleware(object):
self.min_part_size = config_positive_int_value(
conf.get('min_part_size', 5242880))
def handle_request(self, req, container, obj):
# this defines the MPU API
upload_id = get_upload_id(req)
part_number = get_valid_part_num(req)
if obj and upload_id:
if req.method == 'PUT' and part_number is not None:
resp = MPUSessionHandler(self, req).upload_part(part_number)
elif req.method == 'GET':
resp = MPUSessionHandler(self, req).list_parts()
elif req.method == 'POST':
resp = MPUSessionHandler(self, req).complete_upload()
elif req.method == 'DELETE':
resp = MPUSessionHandler(self, req).abort_upload()
else:
resp = None
elif container and 'uploads' in req.params:
if req.method == 'GET':
resp = MPUHandler(self, req).list_uploads()
elif obj and req.method == 'POST':
resp = MPUHandler(self, req).create_upload()
else:
resp = None
elif obj:
resp = MPUObjHandler(self, req).handle_request()
else:
resp = None
# TODO: should we return 405 for any unsupported container?uploads
# method? Swift typically ignores unrecognised headers and
# params, but there is a risk that the user thinks that, for
# example, DELETE container?uploads will just abort all the MPU
# sessions (whereas it might delete the container).
return resp
def __call__(self, env, start_response):
req = Request(env)
try:
@ -640,23 +656,10 @@ class MPUMiddleware(object):
if is_reserved_name(account, container, obj):
return self.app(env, start_response)
upload_id = get_upload_id(req)
if obj and upload_id:
handler = MPUSessionHandler(self, req, upload_id)
elif container and 'uploads' in req.params:
handler = MPUHandler(self, req)
elif obj:
handler = MPUObjHandler(self, req)
else:
handler = None
if handler:
try:
resp = handler.handle_request()
except HTTPException as err:
resp = err
else:
resp = None
try:
resp = self.handle_request(req, container, obj)
except HTTPException as err:
resp = err
resp = resp or self.app
return resp(env, start_response)

View File

@ -20,7 +20,7 @@ import mock
from swift.common import swob
from swift.common.middleware.mpu import MPUMiddleware
from swift.common.swob import Request, HTTPOk, HTTPNotFound, HTTPCreated, \
HTTPAccepted, HTTPNoContent
HTTPAccepted, HTTPNoContent, HTTPServiceUnavailable, HTTPPreconditionFailed
from swift.common.utils import md5, quote, Timestamp
from test.debug_logger import debug_logger
from swift.proxy.controllers.base import ResponseCollection, ResponseData
@ -35,16 +35,26 @@ def mock_generate_unique_id(fake_id):
yield
class TestMPUMiddleware(unittest.TestCase):
class BaseTestMPUMiddleware(unittest.TestCase):
# TODO: assert 'X-Backend-Allow-Reserved-Names' in backend requests
def setUp(self):
self.app = FakeSwift()
self.ts_iter = make_timestamp_iter()
self.debug_logger = debug_logger()
self._setup_user_ac_info_requests()
def _setup_user_ac_info_requests(self):
self.ac_info_calls = [('HEAD', '/v1/a', HTTPOk, {}),
('HEAD', '/v1/a/c', HTTPOk, {})]
for call in self.ac_info_calls:
self.app.register(*call)
class TestMPUMiddleware(BaseTestMPUMiddleware):
def setUp(self):
super(TestMPUMiddleware, self).setUp()
def _setup_mpu_create_requests(self):
self.app.register('HEAD', '/v1/a', HTTPOk, {})
self.app.register('HEAD', '/v1/a/c', HTTPOk, {})
self.app.register(
'HEAD', '/v1/a/\x00mpu_sessions\x00c', HTTPNotFound, {})
self.app.register(
@ -143,16 +153,16 @@ class TestMPUMiddleware(unittest.TestCase):
'content_type': 'application/mpu',
'last_modified': '1970-01-01T00:00:00.000000'}
for i in range(3)]
self.app.register('GET', '/v1/a/\x00mpu_sessions\x00c', HTTPOk, {},
body=json.dumps(listing).encode('ascii'))
registered_calls = [('GET', '/v1/a/\x00mpu_sessions\x00c', HTTPOk, {},
json.dumps(listing).encode('ascii'))]
for call in registered_calls:
self.app.register(*call)
req = Request.blank('/v1/a/c?uploads')
req.method = 'GET'
mw = MPUMiddleware(self.app, {}, logger=self.debug_logger)
resp = req.get_response(mw)
self.assertEqual(200, resp.status_int)
expected = [
('GET', '/v1/a/\x00mpu_sessions\x00c'),
]
expected = [call[:2] for call in self.ac_info_calls + registered_calls]
self.assertEqual(expected, self.app.calls)
exp_listing = [{'name': 'o/test-id-%d' % i, 'hash': 'etag',
'bytes': 0,
@ -164,14 +174,14 @@ class TestMPUMiddleware(unittest.TestCase):
def test_upload_part(self):
ts_session = next(self.ts_iter)
ts_part = next(self.ts_iter)
self.app.register(
'HEAD', '/v1/a/\x00mpu_sessions\x00c/\x00o/test-id', HTTPOk,
{'X-Timestamp': ts_session.internal,
'Content-Type': 'application/x-mpu',
})
self.app.register(
'PUT', '/v1/a/\x00mpu_parts\x00c/\x00o/test-id/1', HTTPCreated,
{'X-Timestamp': ts_part.internal})
registered_calls = [
('HEAD', '/v1/a/\x00mpu_sessions\x00c/\x00o/test-id', HTTPOk,
{'X-Timestamp': ts_session.internal,
'Content-Type': 'application/x-mpu', }),
('PUT', '/v1/a/\x00mpu_parts\x00c/\x00o/test-id/1', HTTPCreated,
{'X-Timestamp': ts_part.internal})]
for call in registered_calls:
self.app.register(*call)
req = Request.blank('/v1/a/c/o?upload-id=test-id&part-number=1')
req.method = 'PUT'
req.body = b'testing'
@ -181,29 +191,31 @@ class TestMPUMiddleware(unittest.TestCase):
self.assertEqual(md5(b'testing', usedforsecurity=False).hexdigest(),
resp.headers.get('Etag'))
self.assertEqual('7', resp.headers['Content-Length'])
expected = [('HEAD', '/v1/a/\x00mpu_sessions\x00c/\x00o/test-id'),
('PUT', '/v1/a/\x00mpu_parts\x00c/\x00o/test-id/1')]
expected = [call[:2] for call in self.ac_info_calls + registered_calls]
self.assertEqual(expected, self.app.calls)
def test_list_parts(self):
ts_session = next(self.ts_iter)
self.app.register(
'HEAD', '/v1/a/\x00mpu_sessions\x00c/\x00o/test-id', HTTPOk,
{'X-Timestamp': ts_session.internal,
'Content-Type': 'application/x-mpu'})
listing = [{'name': '\x00o/test-id/%d' % i, 'hash': 'etag%d' % i,
'bytes': i,
'content_type': 'text/plain',
'last_modified': '1970-01-01T00:00:00.000000'}
for i in range(3)]
self.app.register('GET', '/v1/a/\x00mpu_parts\x00c', HTTPOk, {},
body=json.dumps(listing).encode('ascii'))
registered_calls = [
('HEAD', '/v1/a/\x00mpu_sessions\x00c/\x00o/test-id', HTTPOk,
{'X-Timestamp': ts_session.internal,
'Content-Type': 'application/x-mpu'}),
('GET', '/v1/a/\x00mpu_parts\x00c', HTTPOk, {},
json.dumps(listing).encode('ascii'))
]
for call in registered_calls:
self.app.register(*call)
req = Request.blank('/v1/a/c/o?upload-id=test-id')
req.method = 'GET'
mw = MPUMiddleware(self.app, {}, logger=self.debug_logger)
resp = req.get_response(mw)
self.assertEqual(200, resp.status_int)
expected = [
expected = [call[:2] for call in self.ac_info_calls] + [
('HEAD', '/v1/a/\x00mpu_sessions\x00c/\x00o/test-id'),
('GET', '/v1/a/\x00mpu_parts\x00c?prefix=%s'
% quote('\x00o/test-id', safe='')),
@ -254,10 +266,10 @@ class TestMPUMiddleware(unittest.TestCase):
resp_body = b''.join(resp.app_iter)
self.assertEqual(200, resp.status_int)
self.assertEqual(b'', resp_body)
expected = [call[:2] for call in registered_calls]
expected = [call[:2] for call in self.ac_info_calls + registered_calls]
self.assertEqual(expected, self.app.calls)
session_hdrs = self.app.headers[1]
session_hdrs = self.app.headers[3]
self.assertEqual(
{'Content-Type': 'application/x-mpu',
'Host': 'localhost:80',
@ -274,7 +286,7 @@ class TestMPUMiddleware(unittest.TestCase):
{"path": "\x00mpu_parts\x00c/\x00o/test-id/2",
"etag": "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"}],
json.loads(actual_manifest_body))
manifest_hdrs = self.app.headers[2]
manifest_hdrs = self.app.headers[4]
self.assertEqual(
{'Accept': 'application/json',
'Content-Length': '196',
@ -293,7 +305,7 @@ class TestMPUMiddleware(unittest.TestCase):
actual_mpu_body = self.app.uploaded.get('/v1/a/c/o')[1]
self.assertEqual(b'', actual_mpu_body)
mpu_hdrs = self.app.headers[3]
mpu_hdrs = self.app.headers[5]
self.assertEqual(
{'Content-Length': '0',
'Host': 'localhost:80',
@ -331,7 +343,7 @@ class TestMPUMiddleware(unittest.TestCase):
resp = req.get_response(mw)
b''.join(resp.app_iter)
self.assertEqual(409, resp.status_int)
expected = [call[:2] for call in registered_calls]
expected = [call[:2] for call in self.ac_info_calls + registered_calls]
self.assertEqual(expected, self.app.calls)
def test_complete_mpu_session_deleted(self):
@ -354,7 +366,7 @@ class TestMPUMiddleware(unittest.TestCase):
resp = req.get_response(mw)
b''.join(resp.app_iter)
self.assertEqual(404, resp.status_int)
expected = [call[:2] for call in registered_calls]
expected = [call[:2] for call in self.ac_info_calls + registered_calls]
self.assertEqual(expected, self.app.calls)
def _do_test_abort_mpu(self, extra_session_resp_headers):
@ -394,10 +406,10 @@ class TestMPUMiddleware(unittest.TestCase):
resp_body = b''.join(resp.app_iter)
self.assertEqual(204, resp.status_int)
self.assertEqual(b'', resp_body)
expected = [call[:2] for call in registered_calls]
expected = [call[:2] for call in self.ac_info_calls + registered_calls]
self.assertEqual(expected, self.app.calls)
session_hdrs = self.app.headers[2]
session_hdrs = self.app.headers[4]
return session_hdrs, ts_abort
def test_abort_mpu(self):
@ -458,7 +470,7 @@ class TestMPUMiddleware(unittest.TestCase):
resp = req.get_response(mw)
b''.join(resp.app_iter)
self.assertEqual(404, resp.status_int)
expected = [call[:2] for call in registered_calls]
expected = [call[:2] for call in self.ac_info_calls + registered_calls]
self.assertEqual(expected, self.app.calls)
def test_mpu_async_cleanup_DELETE(self):
@ -508,3 +520,55 @@ class TestMPUMiddleware(unittest.TestCase):
'/v1/a/\x00mpu_manifests\x00c/\x00o/test-id/marker-deleted'),
]
self.assertEqual(exp_calls, self.app.calls)
class TestMpuMiddlewareErrors(BaseTestMPUMiddleware):
def setUp(self):
super(TestMpuMiddlewareErrors, self).setUp()
self.requests = [
# list uploads
Request.blank('/v1/a/c?uploads=true',
environ={'REQUEST_METHOD': 'GET'}),
# create upload
Request.blank('/v1/a/c/o?uploads=true',
environ={'REQUEST_METHOD': 'POST'}),
# upload part
Request.blank('/v1/a/c/o?upload-id=test-id&part-number=1',
environ={'REQUEST_METHOD': 'PUT'}),
# list parts
Request.blank('/v1/a/c/o?upload-id=test-id',
environ={'REQUEST_METHOD': 'GET'}),
# complete upload
Request.blank('/v1/a/c/o?upload-id=test-id',
environ={'REQUEST_METHOD': 'POST'}),
# abort upload
Request.blank('/v1/a/c/o?upload-id=test-id',
environ={'REQUEST_METHOD': 'DELETE'}),
]
def test_api_requests_user_container_not_found(self):
self.app.register('HEAD', '/v1/a/c', HTTPNotFound, {})
for req in self.requests:
self.app.clear_calls()
resp = req.get_response(MPUMiddleware(self.app, {}))
self.assertEqual(404, resp.status_int)
self.assertEqual([call[:2] for call in self.ac_info_calls],
self.app.calls)
def test_api_requests_user_container_unavailable(self):
self.app.register('HEAD', '/v1/a/c', HTTPServiceUnavailable, {})
for req in self.requests:
self.app.clear_calls()
resp = req.get_response(MPUMiddleware(self.app, {}))
self.assertEqual(503, resp.status_int)
self.assertEqual([call[:2] for call in self.ac_info_calls],
self.app.calls)
def test_api_requests_user_unexpected_error(self):
self.app.register('HEAD', '/v1/a/c', HTTPPreconditionFailed, {})
for req in self.requests:
self.app.clear_calls()
resp = req.get_response(MPUMiddleware(self.app, {}))
self.assertEqual(503, resp.status_int)
self.assertEqual([call[:2] for call in self.ac_info_calls],
self.app.calls)