Fix upgrade bug in versioned_writes

Previously, versioned_writes assumed that all container servers would
always have the latest Swift code, allowing them to return reversed
listings. This could cause the wrong version of a file to be restored
during rolling upgrades.

Now, versioned_writes will check that the listing returned is actually
reversed. If it isn't, we will revert to getting the full (in-order)
listing of versions and reversing it on the proxy.

Change-Id: Ib53574ff71961592426cb386ef00a75eb5824def
Closes-Bug: 1562083
This commit is contained in:
Tim Burke 2016-03-30 14:19:00 -07:00
parent e786d01192
commit ebf0b22012
2 changed files with 439 additions and 15 deletions

View File

@ -155,15 +155,74 @@ class VersionedWritesContext(WSGIContext):
except ListingIterError:
raise HTTPServerError(request=req)
def _listing_pages_iter(self, account_name, lcontainer, lprefix, env):
marker = ''
def _in_proxy_reverse_listing(self, account_name, lcontainer, lprefix,
env, failed_marker, failed_listing):
'''Get the complete prefix listing and reverse it on the proxy.
This is only necessary if we encounter a response from a
container-server that does not respect the ``reverse`` param
included by default in ``_listing_pages_iter``. This may happen
during rolling upgrades from pre-2.6.0 swift.
:param failed_marker: the marker that was used when we encountered
the non-reversed listing
:param failed_listing: the non-reversed listing that was encountered.
If ``failed_marker`` is blank, we can use this
to save ourselves a request
:returns: an iterator over all objects starting with ``lprefix`` (up
to but not including the failed marker) in reverse order
'''
complete_listing = []
if not failed_marker:
# We've never gotten a reversed listing. So save a request and
# use the failed listing.
complete_listing.extend(failed_listing)
marker = complete_listing[-1]['name'].encode('utf8')
else:
# We've gotten at least one reversed listing. Have to start at
# the beginning.
marker = ''
# First, take the *entire* prefix listing into memory
try:
for page in self._listing_pages_iter(
account_name, lcontainer, lprefix,
env, marker, end_marker=failed_marker, reverse=False):
complete_listing.extend(page)
except ListingIterNotFound:
pass
# Now that we've got everything, return the whole listing as one giant
# reversed page
return reversed(complete_listing)
def _listing_pages_iter(self, account_name, lcontainer, lprefix,
env, marker='', end_marker='', reverse=True):
'''Get "pages" worth of objects that start with a prefix.
The optional keyword arguments ``marker``, ``end_marker``, and
``reverse`` are used similar to how they are for containers. We're
either coming:
- directly from ``_listing_iter``, in which case none of the
optional args are specified, or
- from ``_in_proxy_reverse_listing``, in which case ``reverse``
is ``False`` and both ``marker`` and ``end_marker`` are specified
(although they may still be blank).
'''
while True:
lreq = make_pre_authed_request(
env, method='GET', swift_source='VW',
path='/v1/%s/%s' % (account_name, lcontainer))
lreq.environ['QUERY_STRING'] = \
'format=json&prefix=%s&reverse=on&marker=%s' % (
'format=json&prefix=%s&marker=%s' % (
quote(lprefix), quote(marker))
if end_marker:
lreq.environ['QUERY_STRING'] += '&end_marker=%s' % (
quote(end_marker))
if reverse:
lreq.environ['QUERY_STRING'] += '&reverse=on'
lresp = lreq.get_response(self.app)
if not is_success(lresp.status_int):
if lresp.status_int == HTTP_NOT_FOUND:
@ -179,7 +238,20 @@ class VersionedWritesContext(WSGIContext):
sublisting = json.loads(lresp.body)
if not sublisting:
break
marker = sublisting[-1]['name'].encode('utf-8')
# When using the ``reverse`` param, check that the listing is
# actually reversed
first_item = sublisting[0]['name'].encode('utf-8')
last_item = sublisting[-1]['name'].encode('utf-8')
page_is_after_marker = marker and first_item > marker
if reverse and (first_item < last_item or page_is_after_marker):
# Apparently there's at least one pre-2.6.0 container server
yield self._in_proxy_reverse_listing(
account_name, lcontainer, lprefix,
env, marker, sublisting)
return
marker = last_item
yield sublisting
def handle_obj_versions_put(self, req, object_versions,

View File

@ -14,6 +14,7 @@
# limitations under the License.
import functools
import json
import os
import time
import unittest
@ -54,7 +55,7 @@ def local_tz(func):
return wrapper
class VersionedWritesTestCase(unittest.TestCase):
class VersionedWritesBaseTestCase(unittest.TestCase):
def setUp(self):
self.app = FakeSwift()
conf = {'allow_versioned_writes': 'true'}
@ -105,6 +106,8 @@ class VersionedWritesTestCase(unittest.TestCase):
self.assertEqual(req.method, other.method)
self.assertEqual(req.path, other.path)
class VersionedWritesTestCase(VersionedWritesBaseTestCase):
def test_put_container(self):
self.app.register('PUT', '/v1/a/c', swob.HTTPOk, {}, 'passed')
req = Request.blank('/v1/a/c',
@ -452,7 +455,7 @@ class VersionedWritesTestCase(unittest.TestCase):
'DELETE', '/v1/a/c/o', swob.HTTPOk, {}, 'passed')
self.app.register(
'GET',
'/v1/a/ver_cont?format=json&prefix=001o/&reverse=on&marker=',
'/v1/a/ver_cont?format=json&prefix=001o/&marker=&reverse=on',
swob.HTTPNotFound, {}, None)
cache = FakeCache({'sysmeta': {'versions-location': 'ver_cont'}})
@ -465,12 +468,18 @@ class VersionedWritesTestCase(unittest.TestCase):
self.assertEqual(len(self.authorized), 1)
self.assertRequestEqual(req, self.authorized[0])
prefix_listing_prefix = '/v1/a/ver_cont?format=json&prefix=001o/&'
self.assertEqual(self.app.calls, [
('GET', prefix_listing_prefix + 'marker=&reverse=on'),
('DELETE', '/v1/a/c/o'),
])
def test_delete_latest_version_success(self):
self.app.register(
'DELETE', '/v1/a/c/o', swob.HTTPOk, {}, 'passed')
self.app.register(
'GET',
'/v1/a/ver_cont?format=json&prefix=001o/&reverse=on&marker=',
'/v1/a/ver_cont?format=json&prefix=001o/&marker=&reverse=on',
swob.HTTPOk, {},
'[{"hash": "y", '
'"last_modified": "2014-11-21T14:23:02.206740", '
@ -501,17 +510,58 @@ class VersionedWritesTestCase(unittest.TestCase):
self.assertRequestEqual(req, self.authorized[0])
# check that X-If-Delete-At was removed from DELETE request
calls = self.app.calls_with_headers
method, path, req_headers = calls.pop()
self.assertEqual('DELETE', method)
self.assertTrue(path.startswith('/v1/a/ver_cont/001o/2'))
self.assertFalse('x-if-delete-at' in req_headers or
'X-If-Delete-At' in req_headers)
req_headers = self.app.headers[-1]
self.assertNotIn('x-if-delete-at', [h.lower() for h in req_headers])
prefix_listing_prefix = '/v1/a/ver_cont?format=json&prefix=001o/&'
self.assertEqual(self.app.calls, [
('GET', prefix_listing_prefix + 'marker=&reverse=on'),
('COPY', '/v1/a/ver_cont/001o/2'),
('DELETE', '/v1/a/ver_cont/001o/2'),
])
def test_delete_single_version_success(self):
# check that if the first listing page has just a single item then
# it is not erroneously inferred to be a non-reversed listing
self.app.register(
'DELETE', '/v1/a/c/o', swob.HTTPOk, {}, 'passed')
self.app.register(
'GET',
'/v1/a/ver_cont?format=json&prefix=001o/&marker=&reverse=on',
swob.HTTPOk, {},
'[{"hash": "y", '
'"last_modified": "2014-11-21T14:23:02.206740", '
'"bytes": 3, '
'"name": "001o/1", '
'"content_type": "text/plain"}]')
self.app.register(
'COPY', '/v1/a/ver_cont/001o/1', swob.HTTPCreated,
{}, None)
self.app.register(
'DELETE', '/v1/a/ver_cont/001o/1', swob.HTTPOk,
{}, None)
cache = FakeCache({'sysmeta': {'versions-location': 'ver_cont'}})
req = Request.blank(
'/v1/a/c/o',
environ={'REQUEST_METHOD': 'DELETE', 'swift.cache': cache,
'CONTENT_LENGTH': '0'})
status, headers, body = self.call_vw(req)
self.assertEqual(status, '200 OK')
self.assertEqual(len(self.authorized), 1)
self.assertRequestEqual(req, self.authorized[0])
prefix_listing_prefix = '/v1/a/ver_cont?format=json&prefix=001o/&'
self.assertEqual(self.app.calls, [
('GET', prefix_listing_prefix + 'marker=&reverse=on'),
('COPY', '/v1/a/ver_cont/001o/1'),
('DELETE', '/v1/a/ver_cont/001o/1'),
])
def test_DELETE_on_expired_versioned_object(self):
self.app.register(
'GET',
'/v1/a/ver_cont?format=json&prefix=001o/&reverse=on&marker=',
'/v1/a/ver_cont?format=json&prefix=001o/&marker=&reverse=on',
swob.HTTPOk, {},
'[{"hash": "y", '
'"last_modified": "2014-11-21T14:23:02.206740", '
@ -545,13 +595,21 @@ class VersionedWritesTestCase(unittest.TestCase):
self.assertEqual(len(self.authorized), 1)
self.assertRequestEqual(req, self.authorized[0])
prefix_listing_prefix = '/v1/a/ver_cont?format=json&prefix=001o/&'
self.assertEqual(self.app.calls, [
('GET', prefix_listing_prefix + 'marker=&reverse=on'),
('COPY', '/v1/a/ver_cont/001o/2'),
('COPY', '/v1/a/ver_cont/001o/1'),
('DELETE', '/v1/a/ver_cont/001o/1'),
])
def test_denied_DELETE_of_versioned_object(self):
authorize_call = []
self.app.register(
'DELETE', '/v1/a/c/o', swob.HTTPOk, {}, 'passed')
self.app.register(
'GET',
'/v1/a/ver_cont?format=json&prefix=001o/&reverse=on&marker=',
'/v1/a/ver_cont?format=json&prefix=001o/&marker=&reverse=on',
swob.HTTPOk, {},
'[{"hash": "y", '
'"last_modified": "2014-11-21T14:23:02.206740", '
@ -581,3 +639,297 @@ class VersionedWritesTestCase(unittest.TestCase):
self.assertEqual(status, '403 Forbidden')
self.assertEqual(len(authorize_call), 1)
self.assertRequestEqual(req, authorize_call[0])
prefix_listing_prefix = '/v1/a/ver_cont?format=json&prefix=001o/&'
self.assertEqual(self.app.calls, [
('GET', prefix_listing_prefix + 'marker=&reverse=on'),
])
class VersionedWritesOldContainersTestCase(VersionedWritesBaseTestCase):
def test_delete_latest_version_success(self):
self.app.register(
'DELETE', '/v1/a/c/o', swob.HTTPOk, {}, 'passed')
self.app.register(
'GET', '/v1/a/ver_cont?format=json&prefix=001o/&'
'marker=&reverse=on',
swob.HTTPOk, {},
'[{"hash": "x", '
'"last_modified": "2014-11-21T14:14:27.409100", '
'"bytes": 3, '
'"name": "001o/1", '
'"content_type": "text/plain"}, '
'{"hash": "y", '
'"last_modified": "2014-11-21T14:23:02.206740", '
'"bytes": 3, '
'"name": "001o/2", '
'"content_type": "text/plain"}]')
self.app.register(
'GET', '/v1/a/ver_cont?format=json&prefix=001o/'
'&marker=001o/2',
swob.HTTPNotFound, {}, None)
self.app.register(
'COPY', '/v1/a/ver_cont/001o/2', swob.HTTPCreated,
{}, None)
self.app.register(
'DELETE', '/v1/a/ver_cont/001o/2', swob.HTTPOk,
{}, None)
cache = FakeCache({'sysmeta': {'versions-location': 'ver_cont'}})
req = Request.blank(
'/v1/a/c/o',
headers={'X-If-Delete-At': 1},
environ={'REQUEST_METHOD': 'DELETE', 'swift.cache': cache,
'CONTENT_LENGTH': '0'})
status, headers, body = self.call_vw(req)
self.assertEqual(status, '200 OK')
self.assertEqual(len(self.authorized), 1)
self.assertRequestEqual(req, self.authorized[0])
# check that X-If-Delete-At was removed from DELETE request
req_headers = self.app.headers[-1]
self.assertNotIn('x-if-delete-at', [h.lower() for h in req_headers])
prefix_listing_prefix = '/v1/a/ver_cont?format=json&prefix=001o/&'
self.assertEqual(self.app.calls, [
('GET', prefix_listing_prefix + 'marker=&reverse=on'),
('GET', prefix_listing_prefix + 'marker=001o/2'),
('COPY', '/v1/a/ver_cont/001o/2'),
('DELETE', '/v1/a/ver_cont/001o/2'),
])
def test_DELETE_on_expired_versioned_object(self):
self.app.register(
'GET', '/v1/a/ver_cont?format=json&prefix=001o/&'
'marker=&reverse=on',
swob.HTTPOk, {},
'[{"hash": "x", '
'"last_modified": "2014-11-21T14:14:27.409100", '
'"bytes": 3, '
'"name": "001o/1", '
'"content_type": "text/plain"}, '
'{"hash": "y", '
'"last_modified": "2014-11-21T14:23:02.206740", '
'"bytes": 3, '
'"name": "001o/2", '
'"content_type": "text/plain"}]')
self.app.register(
'GET', '/v1/a/ver_cont?format=json&prefix=001o/'
'&marker=001o/2',
swob.HTTPNotFound, {}, None)
# expired object
self.app.register(
'COPY', '/v1/a/ver_cont/001o/2', swob.HTTPNotFound,
{}, None)
self.app.register(
'COPY', '/v1/a/ver_cont/001o/1', swob.HTTPCreated,
{}, None)
self.app.register(
'DELETE', '/v1/a/ver_cont/001o/1', swob.HTTPOk,
{}, None)
cache = FakeCache({'sysmeta': {'versions-location': 'ver_cont'}})
req = Request.blank(
'/v1/a/c/o',
environ={'REQUEST_METHOD': 'DELETE', 'swift.cache': cache,
'CONTENT_LENGTH': '0'})
status, headers, body = self.call_vw(req)
self.assertEqual(status, '200 OK')
self.assertEqual(len(self.authorized), 1)
self.assertRequestEqual(req, self.authorized[0])
prefix_listing_prefix = '/v1/a/ver_cont?format=json&prefix=001o/&'
self.assertEqual(self.app.calls, [
('GET', prefix_listing_prefix + 'marker=&reverse=on'),
('GET', prefix_listing_prefix + 'marker=001o/2'),
('COPY', '/v1/a/ver_cont/001o/2'),
('COPY', '/v1/a/ver_cont/001o/1'),
('DELETE', '/v1/a/ver_cont/001o/1'),
])
def test_denied_DELETE_of_versioned_object(self):
authorize_call = []
self.app.register(
'DELETE', '/v1/a/c/o', swob.HTTPOk, {}, 'passed')
self.app.register(
'GET', '/v1/a/ver_cont?format=json&prefix=001o/&'
'marker=&reverse=on',
swob.HTTPOk, {},
'[{"hash": "x", '
'"last_modified": "2014-11-21T14:14:27.409100", '
'"bytes": 3, '
'"name": "001o/1", '
'"content_type": "text/plain"}, '
'{"hash": "y", '
'"last_modified": "2014-11-21T14:23:02.206740", '
'"bytes": 3, '
'"name": "001o/2", '
'"content_type": "text/plain"}]')
self.app.register(
'GET', '/v1/a/ver_cont?format=json&prefix=001o/'
'&marker=001o/2',
swob.HTTPNotFound, {}, None)
self.app.register(
'DELETE', '/v1/a/c/o', swob.HTTPForbidden,
{}, None)
def fake_authorize(req):
authorize_call.append(req)
return swob.HTTPForbidden()
cache = FakeCache({'sysmeta': {'versions-location': 'ver_cont'}})
req = Request.blank(
'/v1/a/c/o',
environ={'REQUEST_METHOD': 'DELETE', 'swift.cache': cache,
'swift.authorize': fake_authorize,
'CONTENT_LENGTH': '0'})
status, headers, body = self.call_vw(req)
self.assertEqual(status, '403 Forbidden')
self.assertEqual(len(authorize_call), 1)
self.assertRequestEqual(req, authorize_call[0])
prefix_listing_prefix = '/v1/a/ver_cont?format=json&prefix=001o/&'
self.assertEqual(self.app.calls, [
('GET', prefix_listing_prefix + 'marker=&reverse=on'),
('GET', prefix_listing_prefix + 'marker=001o/2'),
])
def test_partially_upgraded_cluster(self):
old_versions = [
{'hash': 'etag%d' % x,
'last_modified': "2014-11-21T14:14:%02d.409100" % x,
'bytes': 3,
'name': '001o/%d' % x,
'content_type': 'text/plain'}
for x in range(5)]
# first container server can reverse
self.app.register(
'GET', '/v1/a/ver_cont?format=json&prefix=001o/&'
'marker=&reverse=on',
swob.HTTPOk, {}, json.dumps(list(reversed(old_versions[2:]))))
# but all objects are already gone
self.app.register(
'COPY', '/v1/a/ver_cont/001o/4', swob.HTTPNotFound,
{}, None)
self.app.register(
'COPY', '/v1/a/ver_cont/001o/3', swob.HTTPNotFound,
{}, None)
self.app.register(
'COPY', '/v1/a/ver_cont/001o/2', swob.HTTPNotFound,
{}, None)
# second container server can't reverse
self.app.register(
'GET', '/v1/a/ver_cont?format=json&prefix=001o/&'
'marker=001o/2&reverse=on',
swob.HTTPOk, {}, json.dumps(old_versions[3:]))
# subsequent requests shouldn't reverse
self.app.register(
'GET', '/v1/a/ver_cont?format=json&prefix=001o/&'
'marker=&end_marker=001o/2',
swob.HTTPOk, {}, json.dumps(old_versions[:1]))
self.app.register(
'GET', '/v1/a/ver_cont?format=json&prefix=001o/&'
'marker=001o/0&end_marker=001o/2',
swob.HTTPOk, {}, json.dumps(old_versions[1:2]))
self.app.register(
'GET', '/v1/a/ver_cont?format=json&prefix=001o/&'
'marker=001o/1&end_marker=001o/2',
swob.HTTPOk, {}, '[]')
self.app.register(
'COPY', '/v1/a/ver_cont/001o/1', swob.HTTPOk,
{}, None)
self.app.register(
'DELETE', '/v1/a/ver_cont/001o/1', swob.HTTPNoContent,
{}, None)
cache = FakeCache({'sysmeta': {'versions-location': 'ver_cont'}})
req = Request.blank(
'/v1/a/c/o',
environ={'REQUEST_METHOD': 'DELETE', 'swift.cache': cache,
'CONTENT_LENGTH': '0'})
status, headers, body = self.call_vw(req)
self.assertEqual(status, '204 No Content')
prefix_listing_prefix = '/v1/a/ver_cont?format=json&prefix=001o/&'
self.assertEqual(self.app.calls, [
('GET', prefix_listing_prefix + 'marker=&reverse=on'),
('COPY', '/v1/a/ver_cont/001o/4'),
('COPY', '/v1/a/ver_cont/001o/3'),
('COPY', '/v1/a/ver_cont/001o/2'),
('GET', prefix_listing_prefix + 'marker=001o/2&reverse=on'),
('GET', prefix_listing_prefix + 'marker=&end_marker=001o/2'),
('GET', prefix_listing_prefix + 'marker=001o/0&end_marker=001o/2'),
('GET', prefix_listing_prefix + 'marker=001o/1&end_marker=001o/2'),
('COPY', '/v1/a/ver_cont/001o/1'),
('DELETE', '/v1/a/ver_cont/001o/1'),
])
def test_partially_upgraded_cluster_single_result_on_second_page(self):
old_versions = [
{'hash': 'etag%d' % x,
'last_modified': "2014-11-21T14:14:%02d.409100" % x,
'bytes': 3,
'name': '001o/%d' % x,
'content_type': 'text/plain'}
for x in range(5)]
# first container server can reverse
self.app.register(
'GET', '/v1/a/ver_cont?format=json&prefix=001o/&'
'marker=&reverse=on',
swob.HTTPOk, {}, json.dumps(list(reversed(old_versions[-2:]))))
# but both objects are already gone
self.app.register(
'COPY', '/v1/a/ver_cont/001o/4', swob.HTTPNotFound,
{}, None)
self.app.register(
'COPY', '/v1/a/ver_cont/001o/3', swob.HTTPNotFound,
{}, None)
# second container server can't reverse
self.app.register(
'GET', '/v1/a/ver_cont?format=json&prefix=001o/&'
'marker=001o/3&reverse=on',
swob.HTTPOk, {}, json.dumps(old_versions[4:]))
# subsequent requests shouldn't reverse
self.app.register(
'GET', '/v1/a/ver_cont?format=json&prefix=001o/&'
'marker=&end_marker=001o/3',
swob.HTTPOk, {}, json.dumps(old_versions[:2]))
self.app.register(
'GET', '/v1/a/ver_cont?format=json&prefix=001o/&'
'marker=001o/1&end_marker=001o/3',
swob.HTTPOk, {}, json.dumps(old_versions[2:3]))
self.app.register(
'GET', '/v1/a/ver_cont?format=json&prefix=001o/&'
'marker=001o/2&end_marker=001o/3',
swob.HTTPOk, {}, '[]')
self.app.register(
'COPY', '/v1/a/ver_cont/001o/2', swob.HTTPOk,
{}, None)
self.app.register(
'DELETE', '/v1/a/ver_cont/001o/2', swob.HTTPNoContent,
{}, None)
cache = FakeCache({'sysmeta': {'versions-location': 'ver_cont'}})
req = Request.blank(
'/v1/a/c/o',
environ={'REQUEST_METHOD': 'DELETE', 'swift.cache': cache,
'CONTENT_LENGTH': '0'})
status, headers, body = self.call_vw(req)
self.assertEqual(status, '204 No Content')
prefix_listing_prefix = '/v1/a/ver_cont?format=json&prefix=001o/&'
self.assertEqual(self.app.calls, [
('GET', prefix_listing_prefix + 'marker=&reverse=on'),
('COPY', '/v1/a/ver_cont/001o/4'),
('COPY', '/v1/a/ver_cont/001o/3'),
('GET', prefix_listing_prefix + 'marker=001o/3&reverse=on'),
('GET', prefix_listing_prefix + 'marker=&end_marker=001o/3'),
('GET', prefix_listing_prefix + 'marker=001o/1&end_marker=001o/3'),
('GET', prefix_listing_prefix + 'marker=001o/2&end_marker=001o/3'),
('COPY', '/v1/a/ver_cont/001o/2'),
('DELETE', '/v1/a/ver_cont/001o/2'),
])