Latch shard-stat reporting

The idea is, if none of

  - timestamp,
  - object_count,
  - bytes_used,
  - state, or
  - epoch

has changed, we shouldn't need to send an update back to the root
container.

This is more-or-less comparable to what the container-updater does to
avoid unnecessary writes to the account.

Closes-Bug: #1834097
Change-Id: I1ee7ba5eae3c508064714c4deb4f7c6bbbfa32af
This commit is contained in:
Tim Burke 2019-08-06 18:17:14 -07:00
parent 9b1ca0eb67
commit cedec8c5ef
7 changed files with 284 additions and 39 deletions

View File

@ -4814,6 +4814,8 @@ class ShardRange(object):
value.
:param epoch: optional epoch timestamp which represents the time at which
sharding was enabled for a container.
:param reported: optional indicator that this shard and its stats have
been reported to the root container.
"""
FOUND = 10
CREATED = 20
@ -4864,7 +4866,8 @@ class ShardRange(object):
def __init__(self, name, timestamp, lower=MIN, upper=MAX,
object_count=0, bytes_used=0, meta_timestamp=None,
deleted=False, state=None, state_timestamp=None, epoch=None):
deleted=False, state=None, state_timestamp=None, epoch=None,
reported=False):
self.account = self.container = self._timestamp = \
self._meta_timestamp = self._state_timestamp = self._epoch = None
self._lower = ShardRange.MIN
@ -4883,6 +4886,7 @@ class ShardRange(object):
self.state = self.FOUND if state is None else state
self.state_timestamp = state_timestamp
self.epoch = epoch
self.reported = reported
@classmethod
def _encode(cls, value):
@ -5063,8 +5067,14 @@ class ShardRange(object):
cast to an int, or if meta_timestamp is neither None nor can be
cast to a :class:`~swift.common.utils.Timestamp`.
"""
self.object_count = int(object_count)
self.bytes_used = int(bytes_used)
if self.object_count != int(object_count):
self.object_count = int(object_count)
self.reported = False
if self.bytes_used != int(bytes_used):
self.bytes_used = int(bytes_used)
self.reported = False
if meta_timestamp is None:
self.meta_timestamp = Timestamp.now()
else:
@ -5145,6 +5155,14 @@ class ShardRange(object):
def epoch(self, epoch):
self._epoch = self._to_timestamp(epoch)
@property
def reported(self):
return self._reported
@reported.setter
def reported(self, value):
self._reported = bool(value)
def update_state(self, state, state_timestamp=None):
"""
Set state to the given value and optionally update the state_timestamp
@ -5161,6 +5179,7 @@ class ShardRange(object):
self.state = state
if state_timestamp is not None:
self.state_timestamp = state_timestamp
self.reported = False
return True
@property
@ -5283,6 +5302,7 @@ class ShardRange(object):
yield 'state', self.state
yield 'state_timestamp', self.state_timestamp.internal
yield 'epoch', self.epoch.internal if self.epoch is not None else None
yield 'reported', 1 if self.reported else 0
def copy(self, timestamp=None, **kwargs):
"""
@ -5314,7 +5334,8 @@ class ShardRange(object):
params['name'], params['timestamp'], params['lower'],
params['upper'], params['object_count'], params['bytes_used'],
params['meta_timestamp'], params['deleted'], params['state'],
params['state_timestamp'], params['epoch'])
params['state_timestamp'], params['epoch'],
params.get('reported', 0))
def find_shard_range(item, ranges):

View File

@ -60,7 +60,7 @@ SHARD_UPDATE_STATES = [ShardRange.CREATED, ShardRange.CLEAVED,
# tuples and vice-versa
SHARD_RANGE_KEYS = ('name', 'timestamp', 'lower', 'upper', 'object_count',
'bytes_used', 'meta_timestamp', 'deleted', 'state',
'state_timestamp', 'epoch')
'state_timestamp', 'epoch', 'reported')
POLICY_STAT_TABLE_CREATE = '''
CREATE TABLE policy_stat (
@ -267,6 +267,7 @@ def merge_shards(shard_data, existing):
if existing['timestamp'] < shard_data['timestamp']:
# note that currently we do not roll forward any meta or state from
# an item that was created at older time, newer created time trumps
shard_data['reported'] = 0 # reset the latch
return True
elif existing['timestamp'] > shard_data['timestamp']:
return False
@ -283,6 +284,18 @@ def merge_shards(shard_data, existing):
else:
new_content = True
# We can latch the reported flag
if existing['reported'] and \
existing['object_count'] == shard_data['object_count'] and \
existing['bytes_used'] == shard_data['bytes_used'] and \
existing['state'] == shard_data['state'] and \
existing['epoch'] == shard_data['epoch']:
shard_data['reported'] = 1
else:
shard_data.setdefault('reported', 0)
if shard_data['reported'] and not existing['reported']:
new_content = True
if (existing['state_timestamp'] == shard_data['state_timestamp']
and shard_data['state'] > existing['state']):
new_content = True
@ -595,7 +608,8 @@ class ContainerBroker(DatabaseBroker):
deleted INTEGER DEFAULT 0,
state INTEGER,
state_timestamp TEXT,
epoch TEXT
epoch TEXT,
reported INTEGER DEFAULT 0
);
""" % SHARD_RANGE_TABLE)
@ -1428,10 +1442,13 @@ class ContainerBroker(DatabaseBroker):
# sqlite3.OperationalError: cannot start a transaction
# within a transaction
conn.rollback()
if ('no such table: %s' % SHARD_RANGE_TABLE) not in str(err):
raise
self.create_shard_range_table(conn)
return _really_merge_items(conn)
if 'no such column: reported' in str(err):
self._migrate_add_shard_range_reported(conn)
return _really_merge_items(conn)
if ('no such table: %s' % SHARD_RANGE_TABLE) in str(err):
self.create_shard_range_table(conn)
return _really_merge_items(conn)
raise
def get_reconciler_sync(self):
with self.get() as conn:
@ -1579,6 +1596,17 @@ class ContainerBroker(DatabaseBroker):
CONTAINER_STAT_VIEW_SCRIPT +
'COMMIT;')
def _migrate_add_shard_range_reported(self, conn):
"""
Add the reported column to the 'shard_range' table.
"""
conn.executescript('''
BEGIN;
ALTER TABLE %s
ADD COLUMN reported INTEGER DEFAULT 0;
COMMIT;
''' % SHARD_RANGE_TABLE)
def _reclaim_other_stuff(self, conn, age_timestamp, sync_timestamp):
super(ContainerBroker, self)._reclaim_other_stuff(
conn, age_timestamp, sync_timestamp)
@ -1628,7 +1656,7 @@ class ContainerBroker(DatabaseBroker):
elif states is not None:
included_states.add(states)
def do_query(conn):
def do_query(conn, use_reported_column=True):
condition = ''
conditions = []
params = []
@ -1646,21 +1674,27 @@ class ContainerBroker(DatabaseBroker):
params.append(self.path)
if conditions:
condition = ' WHERE ' + ' AND '.join(conditions)
if use_reported_column:
columns = SHARD_RANGE_KEYS
else:
columns = SHARD_RANGE_KEYS[:-1] + ('0 as reported', )
sql = '''
SELECT %s
FROM %s%s;
''' % (', '.join(SHARD_RANGE_KEYS), SHARD_RANGE_TABLE, condition)
''' % (', '.join(columns), SHARD_RANGE_TABLE, condition)
data = conn.execute(sql, params)
data.row_factory = None
return [row for row in data]
try:
with self.maybe_get(connection) as conn:
with self.maybe_get(connection) as conn:
try:
return do_query(conn)
except sqlite3.OperationalError as err:
if ('no such table: %s' % SHARD_RANGE_TABLE) not in str(err):
except sqlite3.OperationalError as err:
if ('no such table: %s' % SHARD_RANGE_TABLE) in str(err):
return []
if 'no such column: reported' in str(err):
return do_query(conn, use_reported_column=False)
raise
return []
@classmethod
def resolve_shard_range_states(cls, states):

View File

@ -618,7 +618,8 @@ class ContainerSharder(ContainerReplicator):
def _send_shard_ranges(self, account, container, shard_ranges,
headers=None):
body = json.dumps([dict(sr) for sr in shard_ranges]).encode('ascii')
body = json.dumps([dict(sr, reported=0)
for sr in shard_ranges]).encode('ascii')
part, nodes = self.ring.get_nodes(account, container)
headers = headers or {}
headers.update({'X-Backend-Record-Type': RECORD_TYPE_SHARD,
@ -1468,7 +1469,7 @@ class ContainerSharder(ContainerReplicator):
def _update_root_container(self, broker):
own_shard_range = broker.get_own_shard_range(no_default=True)
if not own_shard_range:
if not own_shard_range or own_shard_range.reported:
return
# persist the reported shard metadata
@ -1478,9 +1479,12 @@ class ContainerSharder(ContainerReplicator):
include_own=True,
include_deleted=True)
# send everything
self._send_shard_ranges(
broker.root_account, broker.root_container,
shard_ranges)
if self._send_shard_ranges(
broker.root_account, broker.root_container, shard_ranges):
# on success, mark ourselves as reported so we don't keep
# hammering the root
own_shard_range.reported = True
broker.merge_shard_ranges(own_shard_range)
def _process_broker(self, broker, node, part):
broker.get_info() # make sure account/container are populated

View File

@ -189,6 +189,7 @@ class TestManageShardRanges(unittest.TestCase):
' "meta_timestamp": "%s",' % now.internal,
' "name": "a/c",',
' "object_count": 0,',
' "reported": 0,',
' "state": "sharding",',
' "state_timestamp": "%s",' % now.internal,
' "timestamp": "%s",' % now.internal,
@ -230,6 +231,7 @@ class TestManageShardRanges(unittest.TestCase):
' "meta_timestamp": "%s",' % now.internal,
' "name": "a/c",',
' "object_count": 0,',
' "reported": 0,',
' "state": "sharding",',
' "state_timestamp": "%s",' % now.internal,
' "timestamp": "%s",' % now.internal,

View File

@ -7224,7 +7224,8 @@ class TestShardRange(unittest.TestCase):
upper='', object_count=0, bytes_used=0,
meta_timestamp=ts_1.internal, deleted=0,
state=utils.ShardRange.FOUND,
state_timestamp=ts_1.internal, epoch=None)
state_timestamp=ts_1.internal, epoch=None,
reported=0)
assert_initialisation_ok(dict(empty_run, name='a/c', timestamp=ts_1),
expect)
assert_initialisation_ok(dict(name='a/c', timestamp=ts_1), expect)
@ -7233,11 +7234,13 @@ class TestShardRange(unittest.TestCase):
upper='u', object_count=2, bytes_used=10,
meta_timestamp=ts_2, deleted=0,
state=utils.ShardRange.CREATED,
state_timestamp=ts_3.internal, epoch=ts_4)
state_timestamp=ts_3.internal, epoch=ts_4,
reported=0)
expect.update({'lower': 'l', 'upper': 'u', 'object_count': 2,
'bytes_used': 10, 'meta_timestamp': ts_2.internal,
'state': utils.ShardRange.CREATED,
'state_timestamp': ts_3.internal, 'epoch': ts_4})
'state_timestamp': ts_3.internal, 'epoch': ts_4,
'reported': 0})
assert_initialisation_ok(good_run.copy(), expect)
# obj count and bytes used as int strings
@ -7255,6 +7258,11 @@ class TestShardRange(unittest.TestCase):
assert_initialisation_ok(good_deleted,
dict(expect, deleted=1))
good_reported = good_run.copy()
good_reported['reported'] = 1
assert_initialisation_ok(good_reported,
dict(expect, reported=1))
assert_initialisation_fails(dict(good_run, timestamp='water balloon'))
assert_initialisation_fails(
@ -7293,7 +7301,7 @@ class TestShardRange(unittest.TestCase):
'upper': upper, 'object_count': 10, 'bytes_used': 100,
'meta_timestamp': ts_2.internal, 'deleted': 0,
'state': utils.ShardRange.FOUND, 'state_timestamp': ts_3.internal,
'epoch': ts_4}
'epoch': ts_4, 'reported': 0}
self.assertEqual(expected, sr_dict)
self.assertIsInstance(sr_dict['lower'], six.string_types)
self.assertIsInstance(sr_dict['upper'], six.string_types)
@ -7308,6 +7316,14 @@ class TestShardRange(unittest.TestCase):
for key in sr_dict:
bad_dict = dict(sr_dict)
bad_dict.pop(key)
if key == 'reported':
# This was added after the fact, and we need to be able to eat
# data from old servers
utils.ShardRange.from_dict(bad_dict)
utils.ShardRange(**bad_dict)
continue
# The rest were present from the beginning
with self.assertRaises(KeyError):
utils.ShardRange.from_dict(bad_dict)
# But __init__ still (generally) works!

View File

@ -735,10 +735,12 @@ class TestContainerBroker(unittest.TestCase):
self.assertEqual(info['put_timestamp'], start.internal)
self.assertTrue(Timestamp(info['created_at']) >= start)
self.assertEqual(info['delete_timestamp'], '0')
if self.__class__ in (TestContainerBrokerBeforeMetadata,
TestContainerBrokerBeforeXSync,
TestContainerBrokerBeforeSPI,
TestContainerBrokerBeforeShardRanges):
if self.__class__ in (
TestContainerBrokerBeforeMetadata,
TestContainerBrokerBeforeXSync,
TestContainerBrokerBeforeSPI,
TestContainerBrokerBeforeShardRanges,
TestContainerBrokerBeforeShardRangeReportedColumn):
self.assertEqual(info['status_changed_at'], '0')
else:
self.assertEqual(info['status_changed_at'],
@ -1025,6 +1027,8 @@ class TestContainerBroker(unittest.TestCase):
"SELECT object_count FROM shard_range").fetchone()[0], 0)
self.assertEqual(conn.execute(
"SELECT bytes_used FROM shard_range").fetchone()[0], 0)
self.assertEqual(conn.execute(
"SELECT reported FROM shard_range").fetchone()[0], 0)
# Reput same event
broker.merge_shard_ranges(
@ -1050,6 +1054,64 @@ class TestContainerBroker(unittest.TestCase):
"SELECT object_count FROM shard_range").fetchone()[0], 0)
self.assertEqual(conn.execute(
"SELECT bytes_used FROM shard_range").fetchone()[0], 0)
self.assertEqual(conn.execute(
"SELECT reported FROM shard_range").fetchone()[0], 0)
# Mark it as reported
broker.merge_shard_ranges(
ShardRange('"a/{<shardrange \'&\' name>}"', timestamp,
'low', 'up', meta_timestamp=meta_timestamp,
reported=True))
with broker.get() as conn:
self.assertEqual(conn.execute(
"SELECT name FROM shard_range").fetchone()[0],
'"a/{<shardrange \'&\' name>}"')
self.assertEqual(conn.execute(
"SELECT timestamp FROM shard_range").fetchone()[0],
timestamp)
self.assertEqual(conn.execute(
"SELECT meta_timestamp FROM shard_range").fetchone()[0],
meta_timestamp)
self.assertEqual(conn.execute(
"SELECT lower FROM shard_range").fetchone()[0], 'low')
self.assertEqual(conn.execute(
"SELECT upper FROM shard_range").fetchone()[0], 'up')
self.assertEqual(conn.execute(
"SELECT deleted FROM shard_range").fetchone()[0], 0)
self.assertEqual(conn.execute(
"SELECT object_count FROM shard_range").fetchone()[0], 0)
self.assertEqual(conn.execute(
"SELECT bytes_used FROM shard_range").fetchone()[0], 0)
self.assertEqual(conn.execute(
"SELECT reported FROM shard_range").fetchone()[0], 1)
# Reporting latches it
broker.merge_shard_ranges(
ShardRange('"a/{<shardrange \'&\' name>}"', timestamp,
'low', 'up', meta_timestamp=meta_timestamp,
reported=False))
with broker.get() as conn:
self.assertEqual(conn.execute(
"SELECT name FROM shard_range").fetchone()[0],
'"a/{<shardrange \'&\' name>}"')
self.assertEqual(conn.execute(
"SELECT timestamp FROM shard_range").fetchone()[0],
timestamp)
self.assertEqual(conn.execute(
"SELECT meta_timestamp FROM shard_range").fetchone()[0],
meta_timestamp)
self.assertEqual(conn.execute(
"SELECT lower FROM shard_range").fetchone()[0], 'low')
self.assertEqual(conn.execute(
"SELECT upper FROM shard_range").fetchone()[0], 'up')
self.assertEqual(conn.execute(
"SELECT deleted FROM shard_range").fetchone()[0], 0)
self.assertEqual(conn.execute(
"SELECT object_count FROM shard_range").fetchone()[0], 0)
self.assertEqual(conn.execute(
"SELECT bytes_used FROM shard_range").fetchone()[0], 0)
self.assertEqual(conn.execute(
"SELECT reported FROM shard_range").fetchone()[0], 1)
# Put new event
timestamp = next(self.ts).internal
@ -1077,11 +1139,14 @@ class TestContainerBroker(unittest.TestCase):
"SELECT object_count FROM shard_range").fetchone()[0], 1)
self.assertEqual(conn.execute(
"SELECT bytes_used FROM shard_range").fetchone()[0], 2)
self.assertEqual(conn.execute(
"SELECT reported FROM shard_range").fetchone()[0], 0)
# Put old event
broker.merge_shard_ranges(
ShardRange('"a/{<shardrange \'&\' name>}"', old_put_timestamp,
'lower', 'upper', 1, 2, meta_timestamp=meta_timestamp))
'lower', 'upper', 1, 2, meta_timestamp=meta_timestamp,
reported=True))
with broker.get() as conn:
self.assertEqual(conn.execute(
"SELECT name FROM shard_range").fetchone()[0],
@ -1102,6 +1167,8 @@ class TestContainerBroker(unittest.TestCase):
"SELECT object_count FROM shard_range").fetchone()[0], 1)
self.assertEqual(conn.execute(
"SELECT bytes_used FROM shard_range").fetchone()[0], 2)
self.assertEqual(conn.execute(
"SELECT reported FROM shard_range").fetchone()[0], 0)
# Put old delete event
broker.merge_shard_ranges(
@ -1978,10 +2045,12 @@ class TestContainerBroker(unittest.TestCase):
self.assertEqual(info['hash'], '00000000000000000000000000000000')
self.assertEqual(info['put_timestamp'], Timestamp(1).internal)
self.assertEqual(info['delete_timestamp'], '0')
if self.__class__ in (TestContainerBrokerBeforeMetadata,
TestContainerBrokerBeforeXSync,
TestContainerBrokerBeforeSPI,
TestContainerBrokerBeforeShardRanges):
if self.__class__ in (
TestContainerBrokerBeforeMetadata,
TestContainerBrokerBeforeXSync,
TestContainerBrokerBeforeSPI,
TestContainerBrokerBeforeShardRanges,
TestContainerBrokerBeforeShardRangeReportedColumn):
self.assertEqual(info['status_changed_at'], '0')
else:
self.assertEqual(info['status_changed_at'],
@ -3275,10 +3344,12 @@ class TestContainerBroker(unittest.TestCase):
self.assertEqual(0, info['storage_policy_index']) # sanity check
self.assertEqual(0, info['object_count'])
self.assertEqual(0, info['bytes_used'])
if self.__class__ in (TestContainerBrokerBeforeMetadata,
TestContainerBrokerBeforeXSync,
TestContainerBrokerBeforeSPI,
TestContainerBrokerBeforeShardRanges):
if self.__class__ in (
TestContainerBrokerBeforeMetadata,
TestContainerBrokerBeforeXSync,
TestContainerBrokerBeforeSPI,
TestContainerBrokerBeforeShardRanges,
TestContainerBrokerBeforeShardRangeReportedColumn):
self.assertEqual(info['status_changed_at'], '0')
else:
self.assertEqual(timestamp.internal, info['status_changed_at'])
@ -5315,6 +5386,75 @@ class TestContainerBrokerBeforeShardRanges(ContainerBrokerMigrationMixin,
FROM shard_range''')
def pre_reported_create_shard_range_table(self, conn):
"""
Copied from ContainerBroker before the
reported column was added; used for testing with
TestContainerBrokerBeforeShardRangeReportedColumn.
Create a shard_range table with no 'reported' column.
:param conn: DB connection object
"""
conn.execute("""
CREATE TABLE shard_range (
ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
timestamp TEXT,
lower TEXT,
upper TEXT,
object_count INTEGER DEFAULT 0,
bytes_used INTEGER DEFAULT 0,
meta_timestamp TEXT,
deleted INTEGER DEFAULT 0,
state INTEGER,
state_timestamp TEXT,
epoch TEXT
);
""")
conn.execute("""
CREATE TRIGGER shard_range_update BEFORE UPDATE ON shard_range
BEGIN
SELECT RAISE(FAIL, 'UPDATE not allowed; DELETE and INSERT');
END;
""")
class TestContainerBrokerBeforeShardRangeReportedColumn(
ContainerBrokerMigrationMixin, TestContainerBroker):
"""
Tests for ContainerBroker against databases created
before the shard_ranges table was added.
"""
# *grumble grumble* This should include container_info/policy_stat :-/
expected_db_tables = {'outgoing_sync', 'incoming_sync', 'object',
'sqlite_sequence', 'container_stat', 'shard_range'}
def setUp(self):
super(TestContainerBrokerBeforeShardRangeReportedColumn,
self).setUp()
ContainerBroker.create_shard_range_table = \
pre_reported_create_shard_range_table
broker = ContainerBroker(':memory:', account='a', container='c')
broker.initialize(Timestamp('1').internal, 0)
with self.assertRaises(sqlite3.DatabaseError) as raised, \
broker.get() as conn:
conn.execute('''SELECT reported
FROM shard_range''')
self.assertIn('no such column: reported', str(raised.exception))
def tearDown(self):
super(TestContainerBrokerBeforeShardRangeReportedColumn,
self).tearDown()
broker = ContainerBroker(':memory:', account='a', container='c')
broker.initialize(Timestamp('1').internal, 0)
with broker.get() as conn:
conn.execute('''SELECT reported
FROM shard_range''')
class TestUpdateNewItemFromExisting(unittest.TestCase):
# TODO: add test scenarios that have swift_bytes in content_type
t0 = '1234567890.00000'

View File

@ -4189,6 +4189,7 @@ class TestSharder(BaseTestSharder):
def capture_send(conn, data):
bodies.append(data)
self.assertFalse(broker.get_own_shard_range().reported) # sanity
with self._mock_sharder() as sharder:
with mocked_http_conn(204, 204, 204,
give_send=capture_send) as mock_conn:
@ -4198,6 +4199,7 @@ class TestSharder(BaseTestSharder):
self.assertEqual('PUT', req['method'])
self.assertEqual([expected_sent] * 3,
[json.loads(b) for b in bodies])
self.assertTrue(broker.get_own_shard_range().reported)
def test_update_root_container_own_range(self):
broker = self._make_broker()
@ -4230,6 +4232,32 @@ class TestSharder(BaseTestSharder):
with annotate_failure(state):
check_only_own_shard_range_sent(state)
def test_update_root_container_already_reported(self):
broker = self._make_broker()
def check_already_reported_not_sent(state):
own_shard_range = broker.get_own_shard_range()
own_shard_range.reported = True
self.assertTrue(own_shard_range.update_state(
state, state_timestamp=next(self.ts_iter)))
# Check that updating state clears the flag
self.assertFalse(own_shard_range.reported)
# If we claim to have already updated...
own_shard_range.reported = True
broker.merge_shard_ranges([own_shard_range])
# ... then there's nothing to send
with self._mock_sharder() as sharder:
with mocked_http_conn() as mock_conn:
sharder._update_root_container(broker)
self.assertFalse(mock_conn.requests)
for state in ShardRange.STATES:
with annotate_failure(state):
check_already_reported_not_sent(state)
def test_update_root_container_all_ranges(self):
broker = self._make_broker()
other_shard_ranges = self._make_shard_ranges((('', 'h'), ('h', '')))