Merge "Avoid coreference between current state and _last_published_data"
This commit is contained in:
commit
21604e9834
|
@ -226,17 +226,23 @@ class DataService(object):
|
|||
|
||||
# Note(thread-safety): blocking function
|
||||
def publish(self, table, data, use_snapshot=False):
|
||||
LOG.debug('Publishing table %s', table)
|
||||
LOG.trace('Parameters: table: %s, data: %s, use_snapshot: %s',
|
||||
table, data, use_snapshot)
|
||||
LOG.trace('Last published data %s', self._last_published_data)
|
||||
data = set(data) # make a copy to avoid co-reference
|
||||
|
||||
def get_differential_and_set_last_published_data():
|
||||
if table in self._last_published_data:
|
||||
to_add = list(
|
||||
set(data) - set(self._last_published_data[table]))
|
||||
to_del = list(
|
||||
set(self._last_published_data[table]) - set(data))
|
||||
LOG.trace('Diff against last published data %s',
|
||||
self._last_published_data[table])
|
||||
to_add = list(data - self._last_published_data[table])
|
||||
to_del = list(self._last_published_data[table] - data)
|
||||
self._last_published_data[table] = data
|
||||
else:
|
||||
self._last_published_data[table] = data
|
||||
to_add = copy.copy(data)
|
||||
to_add = list(data)
|
||||
to_del = []
|
||||
self._last_published_data[table] = data
|
||||
return [to_add, to_del]
|
||||
|
||||
def increment_get_seqnum():
|
||||
|
@ -248,6 +254,7 @@ class DataService(object):
|
|||
|
||||
if not use_snapshot:
|
||||
data = get_differential_and_set_last_published_data()
|
||||
LOG.debug('Differential data to publish %s', data)
|
||||
if len(data[0]) == 0 and len(data[1]) == 0:
|
||||
return
|
||||
|
||||
|
@ -445,8 +452,10 @@ class DataService(object):
|
|||
"""Method that returns the current seqnum & data for given table."""
|
||||
if table not in self.sender_seqnums:
|
||||
self.sender_seqnums[table] = 0
|
||||
self._last_published_data[table] = self.get_snapshot(table)
|
||||
return (self.sender_seqnums[table], self._last_published_data[table])
|
||||
self._last_published_data[table] = set(self.get_snapshot(table))
|
||||
# make a copy to avoid co-reference
|
||||
return (self.sender_seqnums[table],
|
||||
list(self._last_published_data[table]))
|
||||
|
||||
def get_snapshot(self, table):
|
||||
"""Method that returns the current data for the given table.
|
||||
|
|
|
@ -55,9 +55,9 @@ class TestDSE(base.TestCase):
|
|||
test1.subscribe('test2', 'p')
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: hasattr(test1, 'last_msg'), True)
|
||||
test2.publish('p', 42, use_snapshot=True)
|
||||
test2.publish('p', [42], use_snapshot=True)
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: test1.last_msg['data'], 42)
|
||||
lambda: test1.last_msg['data'], [42])
|
||||
self.assertFalse(hasattr(test2, "last_msg"))
|
||||
node.stop()
|
||||
|
||||
|
@ -73,9 +73,9 @@ class TestDSE(base.TestCase):
|
|||
test2.subscribe('test1', 'p')
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: hasattr(test2, 'last_msg'), True)
|
||||
test1.publish('p', 42, use_snapshot=True)
|
||||
test1.publish('p', [42], use_snapshot=True)
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: test2.last_msg['data'], 42)
|
||||
lambda: test2.last_msg['data'], [42])
|
||||
self.assertFalse(hasattr(test1, "last_msg"))
|
||||
node.stop()
|
||||
|
||||
|
@ -91,9 +91,9 @@ class TestDSE(base.TestCase):
|
|||
test1.unsubscribe('test2', 'q') # unsub from q should not affect p
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: hasattr(test1, 'last_msg'), True)
|
||||
test2.publish('p', 42, use_snapshot=True)
|
||||
test2.publish('p', [42], use_snapshot=True)
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: test1.last_msg['data'], 42)
|
||||
lambda: test1.last_msg['data'], [42])
|
||||
self.assertFalse(hasattr(test2, "last_msg"))
|
||||
node.stop()
|
||||
|
||||
|
@ -106,9 +106,9 @@ class TestDSE(base.TestCase):
|
|||
self.assertFalse(hasattr(test1, "last_msg"))
|
||||
test2 = fake_datasource.FakeDataSource('test2')
|
||||
node.register_service(test2)
|
||||
test2.publish('p', 42, use_snapshot=True)
|
||||
test2.publish('p', [42], use_snapshot=True)
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: test1.last_msg['data'], 42)
|
||||
lambda: test1.last_msg['data'], [42])
|
||||
self.assertFalse(hasattr(test2, "last_msg"))
|
||||
node.stop()
|
||||
node.wait()
|
||||
|
@ -124,9 +124,9 @@ class TestDSE(base.TestCase):
|
|||
test1.subscribe('test2', 'p')
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: hasattr(test1, 'last_msg'), True)
|
||||
test2.publish('p', 42, use_snapshot=True)
|
||||
test2.publish('p', [42], use_snapshot=True)
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: test1.last_msg['data'], 42)
|
||||
lambda: test1.last_msg['data'], [42])
|
||||
self.assertFalse(hasattr(test2, "last_msg"))
|
||||
node1.stop()
|
||||
node2.stop()
|
||||
|
@ -144,9 +144,9 @@ class TestDSE(base.TestCase):
|
|||
test1.unsubscribe('test2', 'q') # unsub from q should not affect p
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: hasattr(test1, 'last_msg'), True)
|
||||
test2.publish('p', 42, use_snapshot=True)
|
||||
test2.publish('p', [42], use_snapshot=True)
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: test1.last_msg['data'], 42)
|
||||
lambda: test1.last_msg['data'], [42])
|
||||
self.assertFalse(hasattr(test2, "last_msg"))
|
||||
node1.stop()
|
||||
node2.stop()
|
||||
|
@ -164,9 +164,9 @@ class TestDSE(base.TestCase):
|
|||
test1.subscribe('test3', 'p')
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: hasattr(test1, 'last_msg'), True)
|
||||
test3.publish('p', 42, use_snapshot=True)
|
||||
test3.publish('p', [42], use_snapshot=True)
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: test1.last_msg['data'], 42)
|
||||
lambda: test1.last_msg['data'], [42])
|
||||
self.assertFalse(hasattr(test2, "last_msg"))
|
||||
self.assertFalse(hasattr(test3, "last_msg"))
|
||||
node1.stop()
|
||||
|
@ -197,9 +197,9 @@ class TestDSE(base.TestCase):
|
|||
nova.subscribe('test', 'p')
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: hasattr(nova, 'last_msg'), True)
|
||||
test.publish('p', 42, use_snapshot=True)
|
||||
test.publish('p', [42], use_snapshot=True)
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: nova.last_msg['data'], 42)
|
||||
lambda: nova.last_msg['data'], [42])
|
||||
self.assertFalse(hasattr(test, "last_msg"))
|
||||
node.stop()
|
||||
|
||||
|
@ -215,15 +215,15 @@ class TestDSE(base.TestCase):
|
|||
nova.subscribe('test', 'p')
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: hasattr(nova, 'last_msg'), True)
|
||||
test.publish('p', 42, use_snapshot=True)
|
||||
test.publish('p', [42], use_snapshot=True)
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: nova.last_msg['data'], 42)
|
||||
lambda: nova.last_msg['data'], [42])
|
||||
self.assertFalse(hasattr(test, "last_msg"))
|
||||
nova.unsubscribe('test', 'p')
|
||||
test.publish('p', 43, use_snapshot=True)
|
||||
test.publish('p', [43], use_snapshot=True)
|
||||
# hard to test that the message is never delivered
|
||||
time.sleep(0.2)
|
||||
self.assertEqual(nova.last_msg['data'], 42)
|
||||
self.assertEqual(nova.last_msg['data'], [42])
|
||||
node.stop()
|
||||
|
||||
@mock.patch.object(nova_client, 'Client', spec_set=True, autospec=True)
|
||||
|
@ -238,9 +238,9 @@ class TestDSE(base.TestCase):
|
|||
test.subscribe('nova', 'p')
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: hasattr(test, 'last_msg'), True)
|
||||
nova.publish('p', 42, use_snapshot=True)
|
||||
nova.publish('p', [42], use_snapshot=True)
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: test.last_msg['data'], 42)
|
||||
lambda: test.last_msg['data'], [42])
|
||||
self.assertFalse(hasattr(nova, "last_msg"))
|
||||
node.stop()
|
||||
|
||||
|
|
Loading…
Reference in New Issue