Fix KeyError on Storage Policy Container Sync

In the proxy, container_info can return a 'storage_policy' of None.  When
you set a header value on a swob.Request to None that effectively just
delete's the key.  One path through the proxy during container sync was
counting on the the 'X-Backend-Storage-Policy-Index' being set which isn't
the case if the cached container_info if for a pre-policies container.

Also clean up some test cruft, tighten up the interface on FakeConn, and add
some object controller tests to exercise more interesting failure and handoff
code paths.

Change-Id: I643e08c208efdc5a39e840d9e96710ddad54236b
This commit is contained in:
Clay Gerrard 2014-06-25 11:38:54 -07:00 committed by John Dickinson
parent 1feaf6e289
commit fb5a0f21dd
4 changed files with 302 additions and 96 deletions

View File

@ -500,7 +500,7 @@ class ObjectController(Controller):
(object_versions and not
req.environ.get('swift_versioned_copy')):
# make sure proxy-server uses the right policy index
_headers = {POLICY_INDEX: req.headers[POLICY_INDEX],
_headers = {'X-Backend-Storage-Policy-Index': policy_index,
'X-Newest': 'True'}
hreq = Request.blank(req.path_info, headers=_headers,
environ={'REQUEST_METHOD': 'HEAD'})

View File

@ -604,12 +604,27 @@ def fake_http_connect(*code_iter, **kwargs):
class FakeConn(object):
def __init__(self, status, etag=None, body='', timestamp='1',
expect_status=None, headers=None):
self.status = status
if expect_status is None:
self.expect_status = self.status
headers=None):
# connect exception
if isinstance(status, Exception):
raise status
if isinstance(status, tuple):
self.expect_status, self.status = status
else:
self.expect_status = expect_status
self.expect_status, self.status = (None, status)
if not self.expect_status:
# when a swift backend service returns a status before reading
# from the body (mostly an error response) eventlet.wsgi will
# respond with that status line immediately instead of 100
# Continue, even if the client sent the Expect 100 header.
# BufferedHttp and the proxy both see these error statuses
# when they call getexpect, so our FakeConn tries to act like
# our backend services and return certain types of responses
# as expect statuses just like a real backend server would do.
if self.status in (507, 412, 409):
self.expect_status = status
else:
self.expect_status = 100
self.reason = 'Fake'
self.host = '1.2.3.4'
self.port = '1234'
@ -626,6 +641,8 @@ def fake_http_connect(*code_iter, **kwargs):
self._next_sleep = None
def getresponse(self):
if isinstance(self.status, Exception):
raise self.status
exc = kwargs.get('raise_exc')
if exc:
if isinstance(exc, Exception):
@ -636,15 +653,9 @@ def fake_http_connect(*code_iter, **kwargs):
return self
def getexpect(self):
if self.expect_status == -2:
raise HTTPException()
if self.expect_status == -3:
return FakeConn(507)
if self.expect_status == -4:
return FakeConn(201)
if self.expect_status == 412:
return FakeConn(412)
return FakeConn(100)
if isinstance(self.expect_status, Exception):
raise self.expect_status
return FakeConn(self.expect_status)
def getheaders(self):
etag = self.etag
@ -737,10 +748,6 @@ def fake_http_connect(*code_iter, **kwargs):
if 'give_connect' in kwargs:
kwargs['give_connect'](*args, **ckwargs)
status = code_iter.next()
if isinstance(status, tuple):
status, expect_status = status
else:
expect_status = status
etag = etag_iter.next()
headers = headers_iter.next()
timestamp = timestamps_iter.next()
@ -752,7 +759,7 @@ def fake_http_connect(*code_iter, **kwargs):
else:
body = body_iter.next()
return FakeConn(status, etag, body=body, timestamp=timestamp,
expect_status=expect_status, headers=headers)
headers=headers)
connect.code_iter = code_iter

View File

@ -14,33 +14,50 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import time
import unittest
from contextlib import contextmanager
import mock
import swift
from swift.common import utils, swob
from swift.proxy import server as proxy_server
from swift.common.swob import HTTPException
from test.unit import FakeRing, FakeMemcache, fake_http_connect, debug_logger
from swift.common.storage_policy import StoragePolicy
from swift.common.storage_policy import StoragePolicy, POLICIES
from test.unit import patch_policies
from test.unit import FakeRing, FakeMemcache, fake_http_connect, \
debug_logger, patch_policies
@contextmanager
def set_http_connect(*args, **kwargs):
old_connect = swift.proxy.controllers.base.http_connect
new_connect = fake_http_connect(*args, **kwargs)
swift.proxy.controllers.base.http_connect = new_connect
swift.proxy.controllers.obj.http_connect = new_connect
swift.proxy.controllers.account.http_connect = new_connect
swift.proxy.controllers.container.http_connect = new_connect
yield new_connect
swift.proxy.controllers.base.http_connect = old_connect
swift.proxy.controllers.obj.http_connect = old_connect
swift.proxy.controllers.account.http_connect = old_connect
swift.proxy.controllers.container.http_connect = old_connect
try:
swift.proxy.controllers.base.http_connect = new_connect
swift.proxy.controllers.obj.http_connect = new_connect
swift.proxy.controllers.account.http_connect = new_connect
swift.proxy.controllers.container.http_connect = new_connect
yield new_connect
left_over_status = list(new_connect.code_iter)
if left_over_status:
raise AssertionError('left over status %r' % left_over_status)
finally:
swift.proxy.controllers.base.http_connect = old_connect
swift.proxy.controllers.obj.http_connect = old_connect
swift.proxy.controllers.account.http_connect = old_connect
swift.proxy.controllers.container.http_connect = old_connect
class PatchedObjControllerApp(proxy_server.Application):
object_controller = proxy_server.ObjectController
def handle_request(self, req):
with mock.patch('swift.proxy.server.ObjectController',
new=self.object_controller):
return super(PatchedObjControllerApp, self).handle_request(req)
@patch_policies([StoragePolicy(0, 'zero', True,
@ -90,22 +107,33 @@ class TestObjControllerWriteAffinity(unittest.TestCase):
def test_connect_put_node_timeout(self):
controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
self.app.conn_timeout = 0.05
with set_http_connect(200, slow_connect=True):
with set_http_connect(slow_connect=True):
nodes = [dict(ip='', port='', device='')]
res = controller._connect_put_node(nodes, '', '', {}, ('', ''))
self.assertTrue(res is None)
@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())])
@patch_policies([
StoragePolicy(0, 'zero', True),
StoragePolicy(1, 'one'),
StoragePolicy(2, 'two'),
])
class TestObjController(unittest.TestCase):
def setUp(self):
# setup fake rings with handoffs
self.obj_ring = FakeRing(max_more_nodes=3)
for policy in POLICIES:
policy.object_ring = self.obj_ring
logger = debug_logger('proxy-server')
logger.thread_locals = ('txn1', '127.0.0.2')
self.app = proxy_server.Application(
self.app = PatchedObjControllerApp(
None, FakeMemcache(), account_ring=FakeRing(),
container_ring=FakeRing(), logger=logger)
self.controller = proxy_server.ObjectController(self.app,
'a', 'c', 'o')
class FakeContainerInfoObjController(proxy_server.ObjectController):
pass
self.controller = FakeContainerInfoObjController
self.controller.container_info = mock.MagicMock(return_value={
'partition': 1,
'nodes': [
@ -118,101 +146,250 @@ class TestObjController(unittest.TestCase):
'storage_policy': None,
'sync_key': None,
'versions': None})
self.app.object_controller = self.controller
def test_PUT_simple(self):
req = swift.common.swob.Request.blank('/v1/a/c/o')
req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT')
req.headers['content-length'] = '0'
with set_http_connect(201, 201, 201):
resp = self.controller.PUT(req)
resp = req.get_response(self.app)
self.assertEquals(resp.status_int, 201)
def test_PUT_if_none_match(self):
req = swift.common.swob.Request.blank('/v1/a/c/o')
req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT')
req.headers['if-none-match'] = '*'
req.headers['content-length'] = '0'
with set_http_connect(201, 201, 201):
resp = self.controller.PUT(req)
resp = req.get_response(self.app)
self.assertEquals(resp.status_int, 201)
def test_PUT_if_none_match_denied(self):
req = swift.common.swob.Request.blank('/v1/a/c/o')
req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT')
req.headers['if-none-match'] = '*'
req.headers['content-length'] = '0'
with set_http_connect(201, (412, 412), 201):
resp = self.controller.PUT(req)
with set_http_connect(201, 412, 201):
resp = req.get_response(self.app)
self.assertEquals(resp.status_int, 412)
def test_PUT_if_none_match_not_star(self):
req = swift.common.swob.Request.blank('/v1/a/c/o')
req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT')
req.headers['if-none-match'] = 'somethingelse'
req.headers['content-length'] = '0'
with set_http_connect(201, 201, 201):
resp = self.controller.PUT(req)
with set_http_connect():
resp = req.get_response(self.app)
self.assertEquals(resp.status_int, 400)
def test_GET_simple(self):
req = swift.common.swob.Request.blank('/v1/a/c/o')
with set_http_connect(200):
resp = self.controller.GET(req)
resp = req.get_response(self.app)
self.assertEquals(resp.status_int, 200)
def test_DELETE_simple(self):
def test_GET_error(self):
req = swift.common.swob.Request.blank('/v1/a/c/o')
with set_http_connect(503, 200):
resp = req.get_response(self.app)
self.assertEquals(resp.status_int, 200)
def test_GET_handoff(self):
req = swift.common.swob.Request.blank('/v1/a/c/o')
codes = [503] * self.obj_ring.replicas + [200]
with set_http_connect(*codes):
resp = req.get_response(self.app)
self.assertEquals(resp.status_int, 200)
def test_GET_not_found(self):
req = swift.common.swob.Request.blank('/v1/a/c/o')
codes = [404] * (self.obj_ring.replicas +
self.obj_ring.max_more_nodes)
with set_http_connect(*codes):
resp = req.get_response(self.app)
self.assertEquals(resp.status_int, 404)
def test_DELETE_simple(self):
req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE')
with set_http_connect(204, 204, 204):
resp = self.controller.DELETE(req)
resp = req.get_response(self.app)
self.assertEquals(resp.status_int, 204)
def test_DELETE_missing_one(self):
req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE')
with set_http_connect(404, 204, 204):
resp = req.get_response(self.app)
self.assertEquals(resp.status_int, 204)
def test_DELETE_not_found(self):
req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE')
with set_http_connect(404, 404, 204):
resp = req.get_response(self.app)
self.assertEquals(resp.status_int, 404)
def test_DELETE_handoff(self):
req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE')
codes = [204] * self.obj_ring.replicas
with set_http_connect(507, *codes):
resp = req.get_response(self.app)
self.assertEquals(resp.status_int, 204)
def test_POST_as_COPY_simple(self):
req = swift.common.swob.Request.blank('/v1/a/c/o')
with set_http_connect(200, 200, 200, 201, 201, 201) as fake_conn:
resp = self.controller.POST(req)
self.assertRaises(StopIteration, fake_conn.code_iter.next)
req = swift.common.swob.Request.blank('/v1/a/c/o', method='POST')
head_resp = [200] * self.obj_ring.replicas + \
[404] * self.obj_ring.max_more_nodes
put_resp = [201] * self.obj_ring.replicas
codes = head_resp + put_resp
with set_http_connect(*codes):
resp = req.get_response(self.app)
self.assertEquals(resp.status_int, 202)
def test_container_sync_put_x_timestamp_not_found(self):
test_indexes = [None] + [int(p) for p in POLICIES]
for policy_index in test_indexes:
self.controller.container_info.return_value['storage_policy'] = \
policy_index
put_timestamp = utils.Timestamp(time.time()).normal
req = swob.Request.blank(
'/v1/a/c/o', method='PUT', headers={
'Content-Length': 0,
'X-Timestamp': put_timestamp})
ts_iter = itertools.repeat(put_timestamp)
head_resp = [404] * self.obj_ring.replicas + \
[404] * self.obj_ring.max_more_nodes
put_resp = [201] * self.obj_ring.replicas
codes = head_resp + put_resp
with set_http_connect(*codes, timestamps=ts_iter):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 201)
def test_container_sync_put_x_timestamp_match(self):
test_indexes = [None] + [int(p) for p in POLICIES]
for policy_index in test_indexes:
self.controller.container_info.return_value['storage_policy'] = \
policy_index
put_timestamp = utils.Timestamp(time.time()).normal
req = swob.Request.blank(
'/v1/a/c/o', method='PUT', headers={
'Content-Length': 0,
'X-Timestamp': put_timestamp})
ts_iter = itertools.repeat(put_timestamp)
head_resp = [200] * self.obj_ring.replicas + \
[404] * self.obj_ring.max_more_nodes
codes = head_resp
with set_http_connect(*codes, timestamps=ts_iter):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 202)
def test_container_sync_put_x_timestamp_older(self):
ts = (utils.Timestamp(t) for t in itertools.count(int(time.time())))
test_indexes = [None] + [int(p) for p in POLICIES]
for policy_index in test_indexes:
self.controller.container_info.return_value['storage_policy'] = \
policy_index
req = swob.Request.blank(
'/v1/a/c/o', method='PUT', headers={
'Content-Length': 0,
'X-Timestamp': ts.next().internal})
ts_iter = itertools.repeat(ts.next().internal)
head_resp = [200] * self.obj_ring.replicas + \
[404] * self.obj_ring.max_more_nodes
codes = head_resp
with set_http_connect(*codes, timestamps=ts_iter):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 202)
def test_container_sync_put_x_timestamp_newer(self):
ts = (utils.Timestamp(t) for t in itertools.count(int(time.time())))
test_indexes = [None] + [int(p) for p in POLICIES]
for policy_index in test_indexes:
orig_timestamp = ts.next().internal
req = swob.Request.blank(
'/v1/a/c/o', method='PUT', headers={
'Content-Length': 0,
'X-Timestamp': ts.next().internal})
ts_iter = itertools.repeat(orig_timestamp)
head_resp = [200] * self.obj_ring.replicas + \
[404] * self.obj_ring.max_more_nodes
put_resp = [201] * self.obj_ring.replicas
codes = head_resp + put_resp
with set_http_connect(*codes, timestamps=ts_iter):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 201)
def test_container_sync_delete(self):
ts = (utils.Timestamp(t) for t in itertools.count(int(time.time())))
test_indexes = [None] + [int(p) for p in POLICIES]
for policy_index in test_indexes:
req = swob.Request.blank(
'/v1/a/c/o', method='DELETE', headers={
'X-Timestamp': ts.next().internal})
codes = [409] * self.obj_ring.replicas
ts_iter = itertools.repeat(ts.next().internal)
with set_http_connect(*codes, timestamps=ts_iter):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 409)
def test_put_x_timestamp_conflict(self):
ts = (utils.Timestamp(t) for t in itertools.count(int(time.time())))
req = swob.Request.blank(
'/v1/a/c/o', method='PUT', headers={
'Content-Length': 0,
'X-Timestamp': ts.next().internal})
head_resp = [404] * self.obj_ring.replicas + \
[404] * self.obj_ring.max_more_nodes
put_resp = [409] + [201] * self.obj_ring.replicas
codes = head_resp + put_resp
with set_http_connect(*codes):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 201)
def test_COPY_simple(self):
req = swift.common.swob.Request.blank(
'/v1/a/c/o', headers={'Content-Length': 0,
'Destination': 'c/o-copy'})
with set_http_connect(200, 200, 200, 201, 201, 201) as fake_conn:
resp = self.controller.COPY(req)
self.assertRaises(StopIteration, fake_conn.code_iter.next)
'/v1/a/c/o', method='COPY',
headers={'Content-Length': 0,
'Destination': 'c/o-copy'})
head_resp = [200] * self.obj_ring.replicas + \
[404] * self.obj_ring.max_more_nodes
put_resp = [201] * self.obj_ring.replicas
codes = head_resp + put_resp
with set_http_connect(*codes):
resp = req.get_response(self.app)
self.assertEquals(resp.status_int, 201)
def test_HEAD_simple(self):
req = swift.common.swob.Request.blank('/v1/a/c/o')
req = swift.common.swob.Request.blank('/v1/a/c/o', method='HEAD')
with set_http_connect(200):
resp = self.controller.HEAD(req)
resp = req.get_response(self.app)
self.assertEquals(resp.status_int, 200)
def test_HEAD_x_newest(self):
req = swift.common.swob.Request.blank('/v1/a/c/o',
req = swift.common.swob.Request.blank('/v1/a/c/o', method='HEAD',
headers={'X-Newest': 'true'})
with set_http_connect(200, 200, 200) as fake_conn:
resp = self.controller.HEAD(req)
self.assertRaises(StopIteration, fake_conn.code_iter.next)
with set_http_connect(200, 200, 200):
resp = req.get_response(self.app)
self.assertEquals(resp.status_int, 200)
def test_PUT_log_info(self):
# mock out enough to get to the area of the code we want to test
with mock.patch('swift.proxy.controllers.obj.check_object_creation',
mock.MagicMock(return_value=None)):
req = swift.common.swob.Request.blank('/v1/a/c/o')
req.headers['x-copy-from'] = 'somewhere'
try:
self.controller.PUT(req)
except HTTPException:
pass
self.assertEquals(
req.environ.get('swift.log_info'), ['x-copy-from:somewhere'])
# and then check that we don't do that for originating POSTs
req = swift.common.swob.Request.blank('/v1/a/c/o')
req.method = 'POST'
req.headers['x-copy-from'] = 'elsewhere'
try:
self.controller.PUT(req)
except HTTPException:
pass
self.assertEquals(req.environ.get('swift.log_info'), None)
req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT')
req.headers['x-copy-from'] = 'some/where'
req.headers['Content-Length'] = 0
# override FakeConn default resp headers to keep log_info clean
resp_headers = {'x-delete-at': None}
head_resp = [200] * self.obj_ring.replicas + \
[404] * self.obj_ring.max_more_nodes
put_resp = [201] * self.obj_ring.replicas
codes = head_resp + put_resp
with set_http_connect(*codes, headers=resp_headers):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 201)
self.assertEquals(
req.environ.get('swift.log_info'), ['x-copy-from:some/where'])
# and then check that we don't do that for originating POSTs
req = swift.common.swob.Request.blank('/v1/a/c/o')
req.method = 'POST'
req.headers['x-copy-from'] = 'else/where'
with set_http_connect(*codes, headers=resp_headers):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 202)
self.assertEquals(req.environ.get('swift.log_info'), None)
if __name__ == '__main__':

View File

@ -34,7 +34,7 @@ import re
import random
import mock
from eventlet import sleep, spawn, wsgi, listen
from eventlet import sleep, spawn, wsgi, listen, Timeout
from swift.common.utils import json
from test.unit import (
@ -1138,11 +1138,20 @@ class TestObjectController(unittest.TestCase):
with save_globals():
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
# The (201, -4) tuples in there have the effect of letting the
# initial connect succeed, after which getexpect() gets called and
# then the -4 makes the response of that actually be 201 instead of
# 100. Perfectly straightforward.
set_http_connect(200, 200, (201, -4), (201, -4), (201, -4),
# The (201, Exception('test')) tuples in there have the effect of
# changing the status of the initial expect response. The default
# expect response from FakeConn for 201 is 100.
# But the object server won't send a 100 continue line if the
# client doesn't send a expect 100 header (as is the case with
# zero byte PUTs as validated by this test), nevertheless the
# object controller calls getexpect without prejudice. In this
# case the status from the response shows up early in getexpect
# instead of having to wait until getresponse. The Exception is
# in there to ensure that the object controller also *uses* the
# result of getexpect instead of calling getresponse in which case
# our FakeConn will blow up.
success_codes = [(201, Exception('test'))] * 3
set_http_connect(200, 200, *success_codes,
give_connect=test_connect)
req = Request.blank('/v1/a/c/o.jpg', {})
req.content_length = 0
@ -1165,7 +1174,12 @@ class TestObjectController(unittest.TestCase):
with save_globals():
controller = \
proxy_server.ObjectController(self.app, 'a', 'c', 'o.jpg')
set_http_connect(200, 200, 201, 201, 201,
# the (100, 201) tuples in there are just being extra explicit
# about the FakeConn returning the 100 Continue status when the
# object controller calls getexpect. Which is FakeConn's default
# for 201 if no expect_status is specified.
success_codes = [(100, 201)] * 3
set_http_connect(200, 200, *success_codes,
give_connect=test_connect)
req = Request.blank('/v1/a/c/o.jpg', {})
req.content_length = 1
@ -1585,9 +1599,17 @@ class TestObjectController(unittest.TestCase):
res = controller.PUT(req)
expected = str(expected)
self.assertEquals(res.status[:len(expected)], expected)
test_status_map((200, 200, 201, 201, -1), 201)
test_status_map((200, 200, 201, 201, -2), 201) # expect timeout
test_status_map((200, 200, 201, 201, -3), 201) # error limited
test_status_map((200, 200, 201, 201, -1), 201) # connect exc
# connect errors
test_status_map((200, 200, 201, 201, Timeout()), 201)
test_status_map((200, 200, 201, 201, Exception()), 201)
# expect errors
test_status_map((200, 200, 201, 201, (Timeout(), None)), 201)
test_status_map((200, 200, 201, 201, (Exception(), None)), 201)
# response errors
test_status_map((200, 200, 201, 201, (100, Timeout())), 201)
test_status_map((200, 200, 201, 201, (100, Exception())), 201)
test_status_map((200, 200, 201, 201, 507), 201) # error limited
test_status_map((200, 200, 201, -1, -1), 503)
test_status_map((200, 200, 503, 503, -1), 503)