diff --git a/swift/common/memcached.py b/swift/common/memcached.py index d2659cfe94..289b54817e 100644 --- a/swift/common/memcached.py +++ b/swift/common/memcached.py @@ -245,6 +245,13 @@ class MemcacheRing(object): def memcache_servers(self): return list(self._client_cache.keys()) + def _log_error(self, server, cmd, action, msg): + self.logger.error( + "Error %(action)s to memcached: %(server)s" + ": with key_prefix %(key_prefix)s, method %(method)s: %(msg)s", + {'action': action, 'server': server, 'key_prefix': cmd.key_prefix, + 'method': cmd.method, 'msg': msg}) + """ Handles exceptions. @@ -367,7 +374,8 @@ class MemcacheRing(object): self._exception_occurred(server, e, cmd, pool_start_time, action='connecting', sock=sock) if not any_yielded: - self.logger.error('All memcached servers error-limited') + self._log_error('ALL', cmd, 'connecting', + 'All memcached servers error-limited') def _return_conn(self, server, fp, sock): """Returns a server connection to the pool.""" @@ -404,6 +412,14 @@ class MemcacheRing(object): elif not isinstance(value, bytes): value = str(value).encode('utf-8') + if 0 <= self.item_size_warning_threshold <= len(value): + self.logger.warning( + "Item size larger than warning threshold: " + "%d (%s) >= %d (%s)", len(value), + human_readable(len(value)), + self.item_size_warning_threshold, + human_readable(self.item_size_warning_threshold)) + for (server, fp, sock) in self._get_conns(cmd): conn_start_time = tm.time() try: @@ -414,17 +430,7 @@ class MemcacheRing(object): if msg != b'STORED': if not six.PY2: msg = msg.decode('ascii') - self.logger.error( - "Error setting value in memcached: " - "%(server)s: %(msg)s", - {'server': server, 'msg': msg}) - if 0 <= self.item_size_warning_threshold <= len(value): - self.logger.warning( - "Item size larger than warning threshold: " - "%d (%s) >= %d (%s)", len(value), - human_readable(len(value)), - self.item_size_warning_threshold, - human_readable(self.item_size_warning_threshold)) + raise MemcacheConnectionError('failed set: %s' % msg) self._return_conn(server, fp, sock) return except (Exception, Timeout) as e: diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index cd27682504..4ba398a51d 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -345,6 +345,25 @@ class BaseObjectController(Controller): # there will be only one shard range in the list if any return shard_ranges[0] if shard_ranges else None + def _cache_update_namespaces(self, memcache, cache_key, namespaces): + if not memcache: + return + + self.logger.info( + 'Caching updating shards for %s (%d shards)', + cache_key, len(namespaces.bounds)) + try: + memcache.set( + cache_key, namespaces.bounds, + time=self.app.recheck_updating_shard_ranges, + raise_on_error=True) + cache_state = 'set' + except MemcacheConnectionError: + cache_state = 'set_error' + finally: + record_cache_op_metrics(self.logger, self.server_type.lower(), + 'shard_updating', cache_state, None) + def _get_update_shard(self, req, account, container, obj): """ Find the appropriate shard range for an object update. @@ -386,16 +405,9 @@ class BaseObjectController(Controller): if shard_ranges: # only store the list of namespace lower bounds and names into # infocache and memcache. - cached_namespaces = NamespaceBoundList.parse( - shard_ranges) - infocache[cache_key] = cached_namespaces - if memcache: - self.logger.info( - 'Caching updating shards for %s (%d shards)', - cache_key, len(cached_namespaces.bounds)) - memcache.set( - cache_key, cached_namespaces.bounds, - time=self.app.recheck_updating_shard_ranges) + namespaces = NamespaceBoundList.parse(shard_ranges) + infocache[cache_key] = namespaces + self._cache_update_namespaces(memcache, cache_key, namespaces) update_shard = find_namespace(obj, shard_ranges or []) record_cache_op_metrics( self.logger, self.server_type.lower(), 'shard_updating', diff --git a/test/unit/common/test_memcached.py b/test/unit/common/test_memcached.py index ab3ecd6a79..d293275b1e 100644 --- a/test/unit/common/test_memcached.py +++ b/test/unit/common/test_memcached.py @@ -410,16 +410,50 @@ class TestMemcached(unittest.TestCase): self.assertAlmostEqual(float(cache_timeout), esttimeout, delta=1) def test_set_error(self): - memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'], - logger=self.logger) + memcache_client = memcached.MemcacheRing( + ['1.2.3.4:11211'], logger=self.logger, + item_size_warning_threshold=1) mock = MockMemcached() memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool( [(mock, mock)] * 2) - memcache_client.set('too-big', [1, 2, 3]) + now = time.time() + with patch('time.time', return_value=now): + memcache_client.set('too-big', [1, 2, 3]) self.assertEqual( self.logger.get_lines_for_level('error'), - ['Error setting value in memcached: 1.2.3.4:11211: ' - 'SERVER_ERROR object too large for cache']) + ['Error talking to memcached: 1.2.3.4:11211: ' + 'with key_prefix too-big, method set, time_spent 0.0, ' + 'failed set: SERVER_ERROR object too large for cache']) + warning_lines = self.logger.get_lines_for_level('warning') + self.assertEqual(1, len(warning_lines)) + self.assertIn('Item size larger than warning threshold', + warning_lines[0]) + self.assertTrue(mock.close_called) + + def test_set_error_raise_on_error(self): + memcache_client = memcached.MemcacheRing( + ['1.2.3.4:11211'], logger=self.logger, + item_size_warning_threshold=1) + mock = MockMemcached() + memcache_client._client_cache[ + '1.2.3.4:11211'] = MockedMemcachePool( + [(mock, mock)] * 2) + now = time.time() + + with self.assertRaises(MemcacheConnectionError) as cm: + with patch('time.time', return_value=now): + memcache_client.set('too-big', [1, 2, 3], raise_on_error=True) + self.assertIn("No memcached connections succeeded", str(cm.exception)) + self.assertEqual( + self.logger.get_lines_for_level('error'), + ['Error talking to memcached: 1.2.3.4:11211: ' + 'with key_prefix too-big, method set, time_spent 0.0, ' + 'failed set: SERVER_ERROR object too large for cache']) + warning_lines = self.logger.get_lines_for_level('warning') + self.assertEqual(1, len(warning_lines)) + self.assertIn('Item size larger than warning threshold', + warning_lines[0]) + self.assertTrue(mock.close_called) def test_get_failed_connection_mid_request(self): memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'], @@ -620,7 +654,8 @@ class TestMemcached(unittest.TestCase): 'with key_prefix some_key, method set, time_spent 0.0, ' '[Errno 32] Broken pipe', 'Error limiting server 1.2.3.4:11211', - 'All memcached servers error-limited', + 'Error connecting to memcached: ALL: with key_prefix some_key, ' + 'method set: All memcached servers error-limited', ]) self.logger.clear() @@ -628,14 +663,16 @@ class TestMemcached(unittest.TestCase): for _ in range(12): memcache_client.set('some_key', [1, 2, 3]) self.assertEqual(self.logger.get_lines_for_level('error'), [ - 'All memcached servers error-limited', + 'Error connecting to memcached: ALL: with key_prefix some_key, ' + 'method set: All memcached servers error-limited', ] * 12) self.logger.clear() # and get()s are all a "cache miss" self.assertIsNone(memcache_client.get('some_key')) self.assertEqual(self.logger.get_lines_for_level('error'), [ - 'All memcached servers error-limited', + 'Error connecting to memcached: ALL: with key_prefix some_key, ' + 'method get: All memcached servers error-limited', ]) def test_error_disabled(self): @@ -752,7 +789,8 @@ class TestMemcached(unittest.TestCase): 'with key_prefix some_key, method set, time_spent 0.0, ' '[Errno 32] Broken pipe', 'Error limiting server 1.2.3.5:11211', - 'All memcached servers error-limited', + 'Error connecting to memcached: ALL: with key_prefix some_key, ' + 'method set: All memcached servers error-limited', ]) # with default error_limit_time of 60, one call per 6 secs, error limit @@ -776,8 +814,8 @@ class TestMemcached(unittest.TestCase): 'with key_prefix some_key, method set, time_spent 0.0, ' '[Errno 32] Broken pipe', 'Error limiting server 1.2.3.5:11211', - 'All memcached servers error-limited', - ]) + 'Error connecting to memcached: ALL: with key_prefix some_key, ' + 'method set: All memcached servers error-limited']) # with error_limit_time of 70, one call per 6 secs, error_limit_count # of 11, 13th call triggers error limit @@ -791,8 +829,8 @@ class TestMemcached(unittest.TestCase): 'with key_prefix some_key, method set, time_spent 0.0, ' '[Errno 32] Broken pipe', 'Error limiting server 1.2.3.5:11211', - 'All memcached servers error-limited', - ]) + 'Error connecting to memcached: ALL: with key_prefix some_key, ' + 'method set: All memcached servers error-limited']) def test_delete(self): memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'], diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index d0df1719df..abe5072ee8 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -4431,7 +4431,8 @@ class TestReplicatedObjectController( 'account.info.infocache.hit': 2, 'container.info.cache.miss.200': 1, 'container.info.infocache.hit': 1, - 'object.shard_updating.cache.miss.200': 1}, + 'object.shard_updating.cache.miss.200': 1, + 'object.shard_updating.cache.set': 1}, stats) self.assertEqual([], self.app.logger.log_dict['set_statsd_prefix']) info_lines = self.logger.get_lines_for_level('info') @@ -4795,7 +4796,8 @@ class TestReplicatedObjectController( 'object.shard_updating.cache.hit': 1, 'container.info.cache.hit': 1, 'account.info.cache.hit': 1, - 'object.shard_updating.cache.skip.200': 1}, + 'object.shard_updating.cache.skip.200': 1, + 'object.shard_updating.cache.set': 1}, stats) # verify statsd prefix is not mutated self.assertEqual([], self.app.logger.log_dict['set_statsd_prefix']) @@ -4865,7 +4867,99 @@ class TestReplicatedObjectController( 'container.info.infocache.hit': 3, 'object.shard_updating.cache.skip.200': 1, 'object.shard_updating.cache.hit': 1, - 'object.shard_updating.cache.error.200': 1}) + 'object.shard_updating.cache.error.200': 1, + 'object.shard_updating.cache.set': 2 + }) + + do_test('POST', 'sharding') + do_test('POST', 'sharded') + do_test('DELETE', 'sharding') + do_test('DELETE', 'sharded') + do_test('PUT', 'sharding') + do_test('PUT', 'sharded') + + @patch_policies([ + StoragePolicy(0, 'zero', is_default=True, object_ring=FakeRing()), + StoragePolicy(1, 'one', object_ring=FakeRing()), + ]) + def test_backend_headers_update_shard_container_cache_set_error(self): + # verify that backend container update is directed to the shard + # container despite memcache set error + # reset the router post patch_policies + self.app.obj_controller_router = proxy_server.ObjectControllerRouter() + self.app.sort_nodes = lambda nodes, *args, **kwargs: nodes + self.app.recheck_updating_shard_ranges = 3600 + self.app.container_updating_shard_ranges_skip_cache = 0.001 + + def do_test(method, sharding_state): + self.app.logger.clear() # clean capture state + # simulate memcache error when setting updating namespaces; + # expect 4 memcache sets: account info, container info, container + # info again from namespaces GET subrequest, namespaces + cache = FakeMemcache(error_on_set=[False, False, False, True]) + req = Request.blank( + '/v1/a/c/o', {'swift.cache': cache}, + method=method, body='', headers={'Content-Type': 'text/plain'}) + # acct HEAD, cont HEAD, cont shard GET, obj POSTs + status_codes = (200, 200, 200, 202, 202, 202) + resp_headers = {'X-Backend-Storage-Policy-Index': 1, + 'x-backend-sharding-state': sharding_state, + 'X-Backend-Record-Type': 'shard'} + shard_ranges = [ + utils.ShardRange( + '.shards_a/c_not_used', utils.Timestamp.now(), '', 'l'), + utils.ShardRange( + '.shards_a/c_shard', utils.Timestamp.now(), 'l', 'u'), + utils.ShardRange( + '.shards_a/c_nope', utils.Timestamp.now(), 'u', ''), + ] + body = json.dumps([ + dict(shard_range) + for shard_range in shard_ranges]).encode('ascii') + with mock.patch('random.random', return_value=0), \ + mocked_http_conn(*status_codes, headers=resp_headers, + body=body) as fake_conn: + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 202) + + stats = self.app.logger.statsd_client.get_increment_counts() + self.assertEqual({'account.info.cache.miss.200': 1, + 'account.info.infocache.hit': 2, + 'container.info.cache.miss.200': 1, + 'container.info.infocache.hit': 1, + 'object.shard_updating.cache.skip.200': 1, + 'object.shard_updating.cache.set_error': 1}, + stats) + # verify statsd prefix is not mutated + self.assertEqual([], self.app.logger.log_dict['set_statsd_prefix']) + # sanity check: namespaces not in cache + cache_key = 'shard-updating-v2/a/c' + self.assertNotIn(cache_key, req.environ['swift.cache'].store) + + # make sure backend requests included expected container headers + container_headers = {} + for backend_request in fake_conn.requests[3:]: + req_headers = backend_request['headers'] + device = req_headers['x-container-device'] + container_headers[device] = req_headers['x-container-host'] + expectations = { + 'method': method, + 'path': '/0/a/c/o', + 'headers': { + 'X-Container-Partition': '0', + 'Host': 'localhost:80', + 'Referer': '%s http://localhost/v1/a/c/o' % method, + 'X-Backend-Storage-Policy-Index': '1', + 'X-Backend-Quoted-Container-Path': shard_ranges[1].name + }, + } + self._check_request(backend_request, **expectations) + + expected = {} + for i, device in enumerate(['sda', 'sdb', 'sdc']): + expected[device] = '10.0.0.%d:100%d' % (i, i) + self.assertEqual(container_headers, expected) do_test('POST', 'sharding') do_test('POST', 'sharded')