Only zero out stats at the start of a shard cycle
Otherwise, we lose track of sharding candidates and currently-sharding containers in recon when the sharding cycle time goes past an hour. Change-Id: I8c6721d9f03c0f738254db37478e2813976fdf1a
This commit is contained in:
parent
2bc60b649b
commit
74356fc3cc
|
@ -207,6 +207,7 @@ class ContainerSharder(ContainerReplicator):
|
|||
conf.get('shard_batch_size', 2))
|
||||
self.reported = 0
|
||||
self.auto_shard = config_true_value(conf.get('auto_shard', False))
|
||||
self.sharding_candidates = []
|
||||
self.recon_candidates_limit = int(
|
||||
conf.get('recon_candidates_limit', 5))
|
||||
self.broker_timeout = config_positive_int_value(
|
||||
|
@ -237,6 +238,7 @@ class ContainerSharder(ContainerReplicator):
|
|||
# all sharding stats that are additional to the inherited replicator
|
||||
# stats are maintained under the 'sharding' key in self.stats
|
||||
self.stats['sharding'] = defaultdict(lambda: defaultdict(int))
|
||||
self.sharding_candidates = []
|
||||
|
||||
def _append_stat(self, category, key, value):
|
||||
if not self.stats['sharding'][category][key]:
|
||||
|
@ -281,13 +283,12 @@ class ContainerSharder(ContainerReplicator):
|
|||
def _identify_sharding_candidate(self, broker, node):
|
||||
own_shard_range = broker.get_own_shard_range()
|
||||
if self._is_sharding_candidate(own_shard_range):
|
||||
self._append_stat(
|
||||
'sharding_candidates', 'all',
|
||||
self.sharding_candidates.append(
|
||||
self._make_stats_info(broker, node, own_shard_range))
|
||||
|
||||
def _transform_sharding_candidate_stats(self):
|
||||
category = self.stats['sharding']['sharding_candidates']
|
||||
candidates = category.pop('all', [])
|
||||
candidates = self.sharding_candidates
|
||||
category['found'] = len(candidates)
|
||||
candidates.sort(key=lambda c: c['object_count'], reverse=True)
|
||||
if self.recon_candidates_limit >= 0:
|
||||
|
@ -316,6 +317,7 @@ class ContainerSharder(ContainerReplicator):
|
|||
self._append_stat('sharding_in_progress', 'all', info)
|
||||
|
||||
def _report_stats(self):
|
||||
# report accumulated stats since start of one sharder cycle
|
||||
default_stats = ('attempted', 'success', 'failure')
|
||||
category_keys = (
|
||||
('visited', default_stats + ('skipped', 'completed')),
|
||||
|
@ -328,7 +330,7 @@ class ContainerSharder(ContainerReplicator):
|
|||
)
|
||||
|
||||
now = time.time()
|
||||
last_report = time.ctime(self.reported)
|
||||
last_report = time.ctime(self.stats['start'])
|
||||
elapsed = now - self.stats['start']
|
||||
sharding_stats = self.stats['sharding']
|
||||
for category, keys in category_keys:
|
||||
|
@ -348,7 +350,6 @@ class ContainerSharder(ContainerReplicator):
|
|||
def _periodic_report_stats(self):
|
||||
if (time.time() - self.reported) >= 3600: # once an hour
|
||||
self._report_stats()
|
||||
self._zero_stats()
|
||||
|
||||
def _check_node(self, node, devices_to_shard):
|
||||
if not node:
|
||||
|
|
|
@ -29,6 +29,8 @@ from collections import defaultdict
|
|||
|
||||
import time
|
||||
|
||||
from copy import deepcopy
|
||||
|
||||
from swift.common import internal_client
|
||||
from swift.container.backend import ContainerBroker, UNSHARDED, SHARDING, \
|
||||
SHARDED
|
||||
|
@ -265,12 +267,14 @@ class TestSharder(BaseTestSharder):
|
|||
with self._mock_sharder(conf) as sharder:
|
||||
sharder._check_node = lambda *args: True
|
||||
sharder.logger.clear()
|
||||
brokers = []
|
||||
for container in ('c1', 'c2'):
|
||||
broker = self._make_broker(
|
||||
container=container, hash_=container + 'hash',
|
||||
device=sharder.ring.devs[0]['device'], part=0)
|
||||
broker.update_metadata({'X-Container-Sysmeta-Sharding':
|
||||
('true', next(self.ts_iter).internal)})
|
||||
brokers.append(broker)
|
||||
|
||||
fake_stats = {
|
||||
'scanned': {'attempted': 1, 'success': 1, 'failure': 0,
|
||||
|
@ -282,19 +286,20 @@ class TestSharder(BaseTestSharder):
|
|||
'found': 1, 'placed': 1, 'unplaced': 0},
|
||||
'audit_root': {'attempted': 5, 'success': 4, 'failure': 1},
|
||||
'audit_shard': {'attempted': 2, 'success': 2, 'failure': 0},
|
||||
'sharding_candidates': {'found': 0, 'top': []}
|
||||
}
|
||||
# NB these are time increments not absolute times...
|
||||
fake_periods = [1, 2, 3, 3600, 4, 15, 15, 0]
|
||||
fake_periods_iter = iter(fake_periods)
|
||||
recon_data = []
|
||||
fake_process_broker_calls = []
|
||||
|
||||
def mock_dump_recon_cache(data, *args):
|
||||
recon_data.append(data)
|
||||
recon_data.append(deepcopy(data))
|
||||
|
||||
with mock.patch('swift.container.sharder.time.time') as fake_time:
|
||||
def fake_process_broker(*args, **kwargs):
|
||||
def fake_process_broker(broker, *args, **kwargs):
|
||||
# increment time and inject some fake stats
|
||||
fake_process_broker_calls.append((broker, args, kwargs))
|
||||
try:
|
||||
fake_time.return_value += next(fake_periods_iter)
|
||||
except StopIteration:
|
||||
|
@ -308,11 +313,15 @@ class TestSharder(BaseTestSharder):
|
|||
'swift.container.sharder.dump_recon_cache',
|
||||
mock_dump_recon_cache):
|
||||
fake_time.return_value = next(fake_periods_iter)
|
||||
sharder._is_sharding_candidate = lambda x: True
|
||||
sharder._process_broker = fake_process_broker
|
||||
with self.assertRaises(Exception) as cm:
|
||||
sharder.run_forever()
|
||||
|
||||
self.assertEqual('Test over', cm.exception.message)
|
||||
# four cycles are started, two brokers visited per cycle, but
|
||||
# fourth never completes
|
||||
self.assertEqual(8, len(fake_process_broker_calls))
|
||||
# expect initial random sleep then one sleep between first and
|
||||
# second pass
|
||||
self.assertEqual(2, mock_sleep.call_count)
|
||||
|
@ -324,30 +333,34 @@ class TestSharder(BaseTestSharder):
|
|||
categories = ('visited', 'scanned', 'created', 'cleaved',
|
||||
'misplaced', 'audit_root', 'audit_shard')
|
||||
|
||||
def check_categories():
|
||||
def check_categories(start_time):
|
||||
for category in categories:
|
||||
line = lines.pop(0)
|
||||
self.assertIn('Since %s' % time.ctime(start_time), line)
|
||||
self.assertIn(category, line)
|
||||
for k, v in fake_stats.get(category, {}).items():
|
||||
self.assertIn('%s:%s' % (k, v), line)
|
||||
|
||||
def check_logs(cycle_time, expect_periodic_stats=False):
|
||||
def check_logs(cycle_time, start_time,
|
||||
expect_periodic_stats=False):
|
||||
self.assertIn('Container sharder cycle starting', lines.pop(0))
|
||||
check_categories()
|
||||
check_categories(start_time)
|
||||
if expect_periodic_stats:
|
||||
check_categories()
|
||||
check_categories(start_time)
|
||||
self.assertIn('Container sharder cycle completed: %.02fs' %
|
||||
cycle_time, lines.pop(0))
|
||||
|
||||
check_logs(sum(fake_periods[1:3]))
|
||||
check_logs(sum(fake_periods[3:5]), expect_periodic_stats=True)
|
||||
check_logs(sum(fake_periods[5:7]))
|
||||
check_logs(sum(fake_periods[1:3]), fake_periods[0])
|
||||
check_logs(sum(fake_periods[3:5]), sum(fake_periods[:3]),
|
||||
expect_periodic_stats=True)
|
||||
check_logs(sum(fake_periods[5:7]), sum(fake_periods[:5]))
|
||||
# final cycle start but then exception pops to terminate test
|
||||
self.assertIn('Container sharder cycle starting', lines.pop(0))
|
||||
self.assertFalse(lines)
|
||||
lines = sharder.logger.get_lines_for_level('error')
|
||||
self.assertIn(
|
||||
'Unhandled exception while dumping progress', lines[0])
|
||||
self.assertIn('Test over', lines[0])
|
||||
|
||||
def check_recon(data, time, last, expected_stats):
|
||||
self.assertEqual(time, data['sharding_time'])
|
||||
|
@ -355,26 +368,68 @@ class TestSharder(BaseTestSharder):
|
|||
self.assertEqual(
|
||||
expected_stats, dict(data['sharding_stats']['sharding']))
|
||||
|
||||
def stats_for_candidate(broker):
|
||||
return {'object_count': 0,
|
||||
'account': broker.account,
|
||||
'meta_timestamp': mock.ANY,
|
||||
'container': broker.container,
|
||||
'file_size': os.stat(broker.db_file).st_size,
|
||||
'path': broker.db_file,
|
||||
'root': broker.path,
|
||||
'node_index': 0}
|
||||
|
||||
self.assertEqual(4, len(recon_data))
|
||||
# stats report at end of first cycle
|
||||
fake_stats.update({'visited': {'attempted': 2, 'skipped': 0,
|
||||
'success': 2, 'failure': 0,
|
||||
'completed': 0}})
|
||||
fake_stats.update({
|
||||
'sharding_candidates': {
|
||||
'found': 2,
|
||||
'top': [stats_for_candidate(call[0])
|
||||
for call in fake_process_broker_calls[:2]]
|
||||
}
|
||||
})
|
||||
check_recon(recon_data[0], sum(fake_periods[1:3]),
|
||||
sum(fake_periods[:3]), fake_stats)
|
||||
# periodic stats report during second cycle
|
||||
# periodic stats report after first broker has been visited during
|
||||
# second cycle - one candidate identified so far this cycle
|
||||
fake_stats.update({'visited': {'attempted': 1, 'skipped': 0,
|
||||
'success': 1, 'failure': 0,
|
||||
'completed': 0}})
|
||||
fake_stats.update({
|
||||
'sharding_candidates': {
|
||||
'found': 1,
|
||||
'top': [stats_for_candidate(call[0])
|
||||
for call in fake_process_broker_calls[2:3]]
|
||||
}
|
||||
})
|
||||
check_recon(recon_data[1], fake_periods[3],
|
||||
sum(fake_periods[:4]), fake_stats)
|
||||
# stats report at end of second cycle
|
||||
check_recon(recon_data[2], fake_periods[4], sum(fake_periods[:5]),
|
||||
fake_stats)
|
||||
# stats report at end of second cycle - both candidates reported
|
||||
fake_stats.update({'visited': {'attempted': 2, 'skipped': 0,
|
||||
'success': 2, 'failure': 0,
|
||||
'completed': 0}})
|
||||
fake_stats.update({
|
||||
'sharding_candidates': {
|
||||
'found': 2,
|
||||
'top': [stats_for_candidate(call[0])
|
||||
for call in fake_process_broker_calls[2:4]]
|
||||
}
|
||||
})
|
||||
check_recon(recon_data[2], sum(fake_periods[3:5]),
|
||||
sum(fake_periods[:5]), fake_stats)
|
||||
# stats report at end of third cycle
|
||||
fake_stats.update({'visited': {'attempted': 2, 'skipped': 0,
|
||||
'success': 2, 'failure': 0,
|
||||
'completed': 0}})
|
||||
fake_stats.update({
|
||||
'sharding_candidates': {
|
||||
'found': 2,
|
||||
'top': [stats_for_candidate(call[0])
|
||||
for call in fake_process_broker_calls[4:6]]
|
||||
}
|
||||
})
|
||||
check_recon(recon_data[3], sum(fake_periods[5:7]),
|
||||
sum(fake_periods[:7]), fake_stats)
|
||||
|
||||
|
@ -1527,7 +1582,7 @@ class TestSharder(BaseTestSharder):
|
|||
with self._mock_sharder(conf=conf) as sharder:
|
||||
for broker in brokers:
|
||||
sharder._identify_sharding_candidate(broker, node)
|
||||
self._assert_stats(expected_stats, sharder, 'sharding_candidates')
|
||||
self.assertFalse(sharder.sharding_candidates)
|
||||
expected_recon = {
|
||||
'found': 0,
|
||||
'top': []}
|
||||
|
@ -1550,8 +1605,7 @@ class TestSharder(BaseTestSharder):
|
|||
'object_count': 100,
|
||||
'meta_timestamp': now.internal,
|
||||
'file_size': os.stat(brokers[0].db_file).st_size}
|
||||
expected_stats = {'all': [stats_0]}
|
||||
self._assert_stats(expected_stats, sharder, 'sharding_candidates')
|
||||
self.assertEqual([stats_0], sharder.sharding_candidates)
|
||||
expected_recon = {
|
||||
'found': 1,
|
||||
'top': [stats_0]}
|
||||
|
@ -1573,7 +1627,7 @@ class TestSharder(BaseTestSharder):
|
|||
'object_count': 100,
|
||||
'meta_timestamp': now.internal,
|
||||
'file_size': None}
|
||||
expected_stats = {'all': [stats_0_b]}
|
||||
self.assertEqual([stats_0_b], sharder.sharding_candidates)
|
||||
self._assert_stats(expected_stats, sharder, 'sharding_candidates')
|
||||
expected_recon = {
|
||||
'found': 1,
|
||||
|
@ -1587,7 +1641,6 @@ class TestSharder(BaseTestSharder):
|
|||
for obj in objects[:50]:
|
||||
brokers[2].put_object(*obj)
|
||||
own_sr = brokers[2].get_own_shard_range()
|
||||
expected_stats = {'all': [stats_0]}
|
||||
for state in ShardRange.STATES:
|
||||
own_sr.update_state(state, state_timestamp=Timestamp.now())
|
||||
brokers[2].merge_shard_ranges([own_sr])
|
||||
|
@ -1596,8 +1649,7 @@ class TestSharder(BaseTestSharder):
|
|||
for broker in brokers:
|
||||
sharder._identify_sharding_candidate(broker, node)
|
||||
with annotate_failure(state):
|
||||
self._assert_stats(
|
||||
expected_stats, sharder, 'sharding_candidates')
|
||||
self.assertEqual([stats_0], sharder.sharding_candidates)
|
||||
|
||||
# reduce the threshold and the second container is included
|
||||
conf = {'shard_container_size': 50,
|
||||
|
@ -1616,8 +1668,7 @@ class TestSharder(BaseTestSharder):
|
|||
'object_count': 50,
|
||||
'meta_timestamp': now.internal,
|
||||
'file_size': os.stat(brokers[2].db_file).st_size}
|
||||
expected_stats = {'all': [stats_0, stats_2]}
|
||||
self._assert_stats(expected_stats, sharder, 'sharding_candidates')
|
||||
self.assertEqual([stats_0, stats_2], sharder.sharding_candidates)
|
||||
expected_recon = {
|
||||
'found': 2,
|
||||
'top': [stats_0, stats_2]}
|
||||
|
@ -1627,7 +1678,6 @@ class TestSharder(BaseTestSharder):
|
|||
|
||||
# a broker not in active state is not included
|
||||
own_sr = brokers[0].get_own_shard_range()
|
||||
expected_stats = {'all': [stats_2]}
|
||||
for state in ShardRange.STATES:
|
||||
if state == ShardRange.ACTIVE:
|
||||
continue
|
||||
|
@ -1638,8 +1688,7 @@ class TestSharder(BaseTestSharder):
|
|||
for broker in brokers:
|
||||
sharder._identify_sharding_candidate(broker, node)
|
||||
with annotate_failure(state):
|
||||
self._assert_stats(
|
||||
expected_stats, sharder, 'sharding_candidates')
|
||||
self.assertEqual([stats_2], sharder.sharding_candidates)
|
||||
|
||||
own_sr.update_state(ShardRange.ACTIVE, state_timestamp=Timestamp.now())
|
||||
brokers[0].merge_shard_ranges([own_sr])
|
||||
|
@ -1659,8 +1708,8 @@ class TestSharder(BaseTestSharder):
|
|||
'object_count': 150,
|
||||
'meta_timestamp': now.internal,
|
||||
'file_size': os.stat(brokers[5].db_file).st_size}
|
||||
expected_stats = {'all': [stats_0, stats_2, stats_5]}
|
||||
self._assert_stats(expected_stats, sharder, 'sharding_candidates')
|
||||
self.assertEqual([stats_0, stats_2, stats_5],
|
||||
sharder.sharding_candidates)
|
||||
# note recon top list is sorted by size
|
||||
expected_recon = {
|
||||
'found': 3,
|
||||
|
@ -1677,7 +1726,8 @@ class TestSharder(BaseTestSharder):
|
|||
with mock_timestamp_now(now):
|
||||
for broker in brokers:
|
||||
sharder._identify_sharding_candidate(broker, node)
|
||||
self._assert_stats(expected_stats, sharder, 'sharding_candidates')
|
||||
self.assertEqual([stats_0, stats_2, stats_5],
|
||||
sharder.sharding_candidates)
|
||||
expected_recon = {
|
||||
'found': 3,
|
||||
'top': [stats_5, stats_0]}
|
||||
|
@ -1722,8 +1772,9 @@ class TestSharder(BaseTestSharder):
|
|||
'meta_timestamp': now.internal,
|
||||
'file_size': os.stat(brokers[1].db_file).st_size}
|
||||
|
||||
expected_stats = {
|
||||
'all': [stats_0, stats_1, stats_2, stats_3, stats_4, stats_5]}
|
||||
self.assertEqual(
|
||||
[stats_0, stats_1, stats_2, stats_3, stats_4, stats_5],
|
||||
sharder.sharding_candidates)
|
||||
self._assert_stats(expected_stats, sharder, 'sharding_candidates')
|
||||
expected_recon = {
|
||||
'found': 6,
|
||||
|
|
Loading…
Reference in New Issue