Avoid coreference between current state and _last_published_data

Previously, all data source drivers replace the entire state on each
poll or push update (doctor). The DSE publish code relied on this
situation to optimize performance by skipping a copy when setting
_last_published_data. The coreference introduced did not matter
because the original state was never edited, only replaced.

But beginning with the webhook drivers
(monasca webhook and vitrage), the drivers edit the current state.
This a bug was introduced where sometimes an update would not be
published to messaging bus because of an incorrect _last_published_data
due to coreference.

This patch fixes the bug. In addition, the _last_published_data
elements are stored as set rather than list to save the need to
convert to set each time a diff is needed. The times when
conversion back to list is needed to publish on the messaging bus
(used for snapshot publish only)
is expected to be far fewer than the times the conversion to set is
needed.

A few additionalogging lines are also introbuced for improved
debuggability.

Change-Id: I332a844beaf8dec4450bb1ade6ce7e41cc3d4d91
This commit is contained in:
Eric K 2018-12-17 17:31:27 -08:00
parent e75c7bd893
commit 5b9ae41024
2 changed files with 39 additions and 30 deletions

View File

@ -226,17 +226,23 @@ class DataService(object):
# Note(thread-safety): blocking function
def publish(self, table, data, use_snapshot=False):
LOG.debug('Publishing table %s', table)
LOG.trace('Parameters: table: %s, data: %s, use_snapshot: %s',
table, data, use_snapshot)
LOG.trace('Last published data %s', self._last_published_data)
data = set(data) # make a copy to avoid co-reference
def get_differential_and_set_last_published_data():
if table in self._last_published_data:
to_add = list(
set(data) - set(self._last_published_data[table]))
to_del = list(
set(self._last_published_data[table]) - set(data))
LOG.trace('Diff against last published data %s',
self._last_published_data[table])
to_add = list(data - self._last_published_data[table])
to_del = list(self._last_published_data[table] - data)
self._last_published_data[table] = data
else:
self._last_published_data[table] = data
to_add = copy.copy(data)
to_add = list(data)
to_del = []
self._last_published_data[table] = data
return [to_add, to_del]
def increment_get_seqnum():
@ -248,6 +254,7 @@ class DataService(object):
if not use_snapshot:
data = get_differential_and_set_last_published_data()
LOG.debug('Differential data to publish %s', data)
if len(data[0]) == 0 and len(data[1]) == 0:
return
@ -445,8 +452,10 @@ class DataService(object):
"""Method that returns the current seqnum & data for given table."""
if table not in self.sender_seqnums:
self.sender_seqnums[table] = 0
self._last_published_data[table] = self.get_snapshot(table)
return (self.sender_seqnums[table], self._last_published_data[table])
self._last_published_data[table] = set(self.get_snapshot(table))
# make a copy to avoid co-reference
return (self.sender_seqnums[table],
list(self._last_published_data[table]))
def get_snapshot(self, table):
"""Method that returns the current data for the given table.

View File

@ -55,9 +55,9 @@ class TestDSE(base.TestCase):
test1.subscribe('test2', 'p')
helper.retry_check_function_return_value(
lambda: hasattr(test1, 'last_msg'), True)
test2.publish('p', 42, use_snapshot=True)
test2.publish('p', [42], use_snapshot=True)
helper.retry_check_function_return_value(
lambda: test1.last_msg['data'], 42)
lambda: test1.last_msg['data'], [42])
self.assertFalse(hasattr(test2, "last_msg"))
node.stop()
@ -73,9 +73,9 @@ class TestDSE(base.TestCase):
test2.subscribe('test1', 'p')
helper.retry_check_function_return_value(
lambda: hasattr(test2, 'last_msg'), True)
test1.publish('p', 42, use_snapshot=True)
test1.publish('p', [42], use_snapshot=True)
helper.retry_check_function_return_value(
lambda: test2.last_msg['data'], 42)
lambda: test2.last_msg['data'], [42])
self.assertFalse(hasattr(test1, "last_msg"))
node.stop()
@ -91,9 +91,9 @@ class TestDSE(base.TestCase):
test1.unsubscribe('test2', 'q') # unsub from q should not affect p
helper.retry_check_function_return_value(
lambda: hasattr(test1, 'last_msg'), True)
test2.publish('p', 42, use_snapshot=True)
test2.publish('p', [42], use_snapshot=True)
helper.retry_check_function_return_value(
lambda: test1.last_msg['data'], 42)
lambda: test1.last_msg['data'], [42])
self.assertFalse(hasattr(test2, "last_msg"))
node.stop()
@ -106,9 +106,9 @@ class TestDSE(base.TestCase):
self.assertFalse(hasattr(test1, "last_msg"))
test2 = fake_datasource.FakeDataSource('test2')
node.register_service(test2)
test2.publish('p', 42, use_snapshot=True)
test2.publish('p', [42], use_snapshot=True)
helper.retry_check_function_return_value(
lambda: test1.last_msg['data'], 42)
lambda: test1.last_msg['data'], [42])
self.assertFalse(hasattr(test2, "last_msg"))
node.stop()
node.wait()
@ -124,9 +124,9 @@ class TestDSE(base.TestCase):
test1.subscribe('test2', 'p')
helper.retry_check_function_return_value(
lambda: hasattr(test1, 'last_msg'), True)
test2.publish('p', 42, use_snapshot=True)
test2.publish('p', [42], use_snapshot=True)
helper.retry_check_function_return_value(
lambda: test1.last_msg['data'], 42)
lambda: test1.last_msg['data'], [42])
self.assertFalse(hasattr(test2, "last_msg"))
node1.stop()
node2.stop()
@ -144,9 +144,9 @@ class TestDSE(base.TestCase):
test1.unsubscribe('test2', 'q') # unsub from q should not affect p
helper.retry_check_function_return_value(
lambda: hasattr(test1, 'last_msg'), True)
test2.publish('p', 42, use_snapshot=True)
test2.publish('p', [42], use_snapshot=True)
helper.retry_check_function_return_value(
lambda: test1.last_msg['data'], 42)
lambda: test1.last_msg['data'], [42])
self.assertFalse(hasattr(test2, "last_msg"))
node1.stop()
node2.stop()
@ -164,9 +164,9 @@ class TestDSE(base.TestCase):
test1.subscribe('test3', 'p')
helper.retry_check_function_return_value(
lambda: hasattr(test1, 'last_msg'), True)
test3.publish('p', 42, use_snapshot=True)
test3.publish('p', [42], use_snapshot=True)
helper.retry_check_function_return_value(
lambda: test1.last_msg['data'], 42)
lambda: test1.last_msg['data'], [42])
self.assertFalse(hasattr(test2, "last_msg"))
self.assertFalse(hasattr(test3, "last_msg"))
node1.stop()
@ -197,9 +197,9 @@ class TestDSE(base.TestCase):
nova.subscribe('test', 'p')
helper.retry_check_function_return_value(
lambda: hasattr(nova, 'last_msg'), True)
test.publish('p', 42, use_snapshot=True)
test.publish('p', [42], use_snapshot=True)
helper.retry_check_function_return_value(
lambda: nova.last_msg['data'], 42)
lambda: nova.last_msg['data'], [42])
self.assertFalse(hasattr(test, "last_msg"))
node.stop()
@ -215,15 +215,15 @@ class TestDSE(base.TestCase):
nova.subscribe('test', 'p')
helper.retry_check_function_return_value(
lambda: hasattr(nova, 'last_msg'), True)
test.publish('p', 42, use_snapshot=True)
test.publish('p', [42], use_snapshot=True)
helper.retry_check_function_return_value(
lambda: nova.last_msg['data'], 42)
lambda: nova.last_msg['data'], [42])
self.assertFalse(hasattr(test, "last_msg"))
nova.unsubscribe('test', 'p')
test.publish('p', 43, use_snapshot=True)
test.publish('p', [43], use_snapshot=True)
# hard to test that the message is never delivered
time.sleep(0.2)
self.assertEqual(nova.last_msg['data'], 42)
self.assertEqual(nova.last_msg['data'], [42])
node.stop()
@mock.patch.object(nova_client, 'Client', spec_set=True, autospec=True)
@ -238,9 +238,9 @@ class TestDSE(base.TestCase):
test.subscribe('nova', 'p')
helper.retry_check_function_return_value(
lambda: hasattr(test, 'last_msg'), True)
nova.publish('p', 42, use_snapshot=True)
nova.publish('p', [42], use_snapshot=True)
helper.retry_check_function_return_value(
lambda: test.last_msg['data'], 42)
lambda: test.last_msg['data'], [42])
self.assertFalse(hasattr(nova, "last_msg"))
node.stop()