Merge "Make differential pub-sub standard in DSE2"

This commit is contained in:
Zuul 2018-03-28 18:28:46 +00:00 committed by Gerrit Code Review
commit 54cdc53b1b
5 changed files with 94 additions and 114 deletions

View File

@ -101,8 +101,6 @@ class DataService(object):
def __init__(self, service_id):
# Note(ekcs): temporary setting to disable use of diffs and sequencing
# to avoid muddying the process of a first dse2 system test.
# TODO(ekcs,dse2): remove when differential update is standard
self.always_snapshot = True
self.service_id = service_id
self.node = None
self._rpc_server = None
@ -227,11 +225,7 @@ class DataService(object):
return self.node.make_datasource_dict(*args, **kwargs)
# Note(thread-safety): blocking function
def publish(self, table, data, use_snapshot=True):
if self.always_snapshot:
self.node.publish_table(self.service_id, table, data)
return
def publish(self, table, data, use_snapshot=False):
def get_differential_and_set_last_published_data():
if table in self._last_published_data:
to_add = list(
@ -241,7 +235,7 @@ class DataService(object):
self._last_published_data[table] = data
else:
self._last_published_data[table] = data
to_add = data
to_add = copy.copy(data)
to_del = []
return [to_add, to_del]
@ -264,12 +258,6 @@ class DataService(object):
# Note(thread-safety): blocking function
def subscribe(self, service, table):
try:
if self.always_snapshot:
# Note(thread-safety): blocking call
data = self.node.subscribe_table(
self.service_id, service, table)
self.receive_data(service, table, data, is_snapshot=True)
return
# Note(thread-safety): blocking call
(seqnum, data) = self.node.subscribe_table(
self.service_id, service, table)

View File

@ -83,10 +83,6 @@ class DseNode(object):
def __init__(self, messaging_config, node_id, node_rpc_endpoints,
partition_id=None):
# Note(ekcs): temporary setting to disable use of diffs and sequencing
# to avoid muddying the process of a first dse2 system test.
# TODO(ekcs,dse2): remove when differential update is standard
self.always_snapshot = False
self.messaging_config = messaging_config
self.node_id = node_id
self.node_rpc_endpoints = node_rpc_endpoints
@ -142,7 +138,6 @@ class DseNode(object):
% (service.service_id, self.node_id))
raise exception.DataServiceError(msg)
access_policy = dispatcher.DefaultRPCAccessPolicy
service.always_snapshot = self.always_snapshot
service.node = self
self._services.append(service)
service._target = self.service_rpc_target(service.service_id,
@ -428,17 +423,12 @@ class DseNode(object):
self.subscriptions[publisher][table].add(subscriber)
# oslo returns [] instead of set(), so handle that case directly
if self.always_snapshot:
# Note(thread-safety): blocking call
snapshot = self.invoke_service_rpc(
publisher, "get_snapshot", {'table': table})
return self.to_set_of_tuples(snapshot)
else:
# Note(thread-safety): blocking call
snapshot_seqnum = self.invoke_service_rpc(
publisher, "get_last_published_data_with_seqnum",
{'table': table})
return snapshot_seqnum
# Note(thread-safety): blocking call
snapshot_seqnum = self.invoke_service_rpc(
publisher, "get_last_published_data_with_seqnum",
{'table': table})
return snapshot_seqnum
def get_subscription(self, service_id):
"""Return publisher/tables subscribed by service: service_id

View File

@ -2009,7 +2009,7 @@ class DseRuntime (Runtime, data_service.DataService):
# table_name, self.policySubData[table]))
(policy, tablename) = compile.Tablename.parse_service_table(table_name)
data = self.get_row_data(tablename, policy, trace=False)
data = [record['data'] for record in data]
data = [tuple(record['data']) for record in data]
return data
def _maintain_triggers(self):

View File

@ -55,7 +55,7 @@ class TestDSE(base.TestCase):
test1.subscribe('test2', 'p')
helper.retry_check_function_return_value(
lambda: hasattr(test1, 'last_msg'), True)
test2.publish('p', 42)
test2.publish('p', 42, use_snapshot=True)
helper.retry_check_function_return_value(
lambda: test1.last_msg['data'], 42)
self.assertFalse(hasattr(test2, "last_msg"))
@ -73,7 +73,7 @@ class TestDSE(base.TestCase):
test2.subscribe('test1', 'p')
helper.retry_check_function_return_value(
lambda: hasattr(test2, 'last_msg'), True)
test1.publish('p', 42)
test1.publish('p', 42, use_snapshot=True)
helper.retry_check_function_return_value(
lambda: test2.last_msg['data'], 42)
self.assertFalse(hasattr(test1, "last_msg"))
@ -91,7 +91,7 @@ class TestDSE(base.TestCase):
test1.unsubscribe('test2', 'q') # unsub from q should not affect p
helper.retry_check_function_return_value(
lambda: hasattr(test1, 'last_msg'), True)
test2.publish('p', 42)
test2.publish('p', 42, use_snapshot=True)
helper.retry_check_function_return_value(
lambda: test1.last_msg['data'], 42)
self.assertFalse(hasattr(test2, "last_msg"))
@ -106,7 +106,7 @@ class TestDSE(base.TestCase):
self.assertFalse(hasattr(test1, "last_msg"))
test2 = fake_datasource.FakeDataSource('test2')
node.register_service(test2)
test2.publish('p', 42)
test2.publish('p', 42, use_snapshot=True)
helper.retry_check_function_return_value(
lambda: test1.last_msg['data'], 42)
self.assertFalse(hasattr(test2, "last_msg"))
@ -124,7 +124,7 @@ class TestDSE(base.TestCase):
test1.subscribe('test2', 'p')
helper.retry_check_function_return_value(
lambda: hasattr(test1, 'last_msg'), True)
test2.publish('p', 42)
test2.publish('p', 42, use_snapshot=True)
helper.retry_check_function_return_value(
lambda: test1.last_msg['data'], 42)
self.assertFalse(hasattr(test2, "last_msg"))
@ -144,7 +144,7 @@ class TestDSE(base.TestCase):
test1.unsubscribe('test2', 'q') # unsub from q should not affect p
helper.retry_check_function_return_value(
lambda: hasattr(test1, 'last_msg'), True)
test2.publish('p', 42)
test2.publish('p', 42, use_snapshot=True)
helper.retry_check_function_return_value(
lambda: test1.last_msg['data'], 42)
self.assertFalse(hasattr(test2, "last_msg"))
@ -164,7 +164,7 @@ class TestDSE(base.TestCase):
test1.subscribe('test3', 'p')
helper.retry_check_function_return_value(
lambda: hasattr(test1, 'last_msg'), True)
test3.publish('p', 42)
test3.publish('p', 42, use_snapshot=True)
helper.retry_check_function_return_value(
lambda: test1.last_msg['data'], 42)
self.assertFalse(hasattr(test2, "last_msg"))
@ -197,7 +197,7 @@ class TestDSE(base.TestCase):
nova.subscribe('test', 'p')
helper.retry_check_function_return_value(
lambda: hasattr(nova, 'last_msg'), True)
test.publish('p', 42)
test.publish('p', 42, use_snapshot=True)
helper.retry_check_function_return_value(
lambda: nova.last_msg['data'], 42)
self.assertFalse(hasattr(test, "last_msg"))
@ -215,12 +215,12 @@ class TestDSE(base.TestCase):
nova.subscribe('test', 'p')
helper.retry_check_function_return_value(
lambda: hasattr(nova, 'last_msg'), True)
test.publish('p', 42)
test.publish('p', 42, use_snapshot=True)
helper.retry_check_function_return_value(
lambda: nova.last_msg['data'], 42)
self.assertFalse(hasattr(test, "last_msg"))
nova.unsubscribe('test', 'p')
test.publish('p', 43)
test.publish('p', 43, use_snapshot=True)
# hard to test that the message is never delivered
time.sleep(0.2)
self.assertEqual(nova.last_msg['data'], 42)
@ -238,7 +238,7 @@ class TestDSE(base.TestCase):
test.subscribe('nova', 'p')
helper.retry_check_function_return_value(
lambda: hasattr(test, 'last_msg'), True)
nova.publish('p', 42)
nova.publish('p', 42, use_snapshot=True)
helper.retry_check_function_return_value(
lambda: test.last_msg['data'], 42)
self.assertFalse(hasattr(nova, "last_msg"))
@ -279,7 +279,6 @@ class TestDSE(base.TestCase):
def test_datasource_poll(self):
node = helper.make_dsenode_new_partition('testnode')
node.always_snapshot = True # Note(ekcs): this test expects snapshot
pub = fake_datasource.FakeDataSource('pub')
sub = fake_datasource.FakeDataSource('sub')
node.register_service(pub)
@ -288,15 +287,18 @@ class TestDSE(base.TestCase):
sub.subscribe('pub', 'fake_table')
pub.state = {'fake_table': set([(1, 2)])}
pub.poll()
helper.retry_check_function_return_value(
lambda: sub.last_msg['data'], set(pub.state['fake_table']))
lambda: sub.last_msg,
{'publisher': 'pub',
'data': (set(pub.state['fake_table']), set([])),
'table': 'fake_table'})
self.assertFalse(hasattr(pub, "last_msg"))
node.stop()
def test_policy_data(self):
"""Test policy correctly processes initial data snapshot."""
node = helper.make_dsenode_new_partition('testnode')
node.always_snapshot = False
data = fake_datasource.FakeDataSource('data')
engine = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID)
node.register_service(data)
@ -315,7 +317,6 @@ class TestDSE(base.TestCase):
def test_policy_data_update(self):
"""Test policy correctly processes initial data snapshot and update."""
node = helper.make_dsenode_new_partition('testnode')
node.always_snapshot = False
data = fake_datasource.FakeDataSource('data')
engine = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID)
node.register_service(data)
@ -338,7 +339,6 @@ class TestDSE(base.TestCase):
def test_policy_data_late_sub(self):
"""Test policy correctly processes data on late subscribe."""
node = helper.make_dsenode_new_partition('testnode')
node.always_snapshot = False
data = fake_datasource.FakeDataSource('data')
engine = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID)
node.register_service(data)
@ -464,66 +464,75 @@ class TestDSE(base.TestCase):
helper.retry_check_function_return_value(
lambda: _validate_subbed_tables(n), True)
def test_policy_table_publish(self):
"""Policy table result publish
Test basic DSE functionality with policy engine and table result
publish.
"""
node = helper.make_dsenode_new_partition('testnode')
data = fake_datasource.FakeDataSource('data')
policy = agnostic.DseRuntime('policy')
policy2 = agnostic.DseRuntime('policy2')
node.register_service(data)
node.register_service(policy)
node.register_service(policy2)
policy.synchronizer = mock.MagicMock()
policy2.synchronizer = mock.MagicMock()
policy.create_policy('data', kind=datalog_base.DATASOURCE_POLICY_TYPE)
policy.create_policy('classification')
policy.set_schema('data', compile.Schema({'q': (1,)}))
policy.insert('p(x):-data:q(x),gt(x,2)', target='classification')
policy.insert('q(3)', target='data')
# TODO(ekcs): test that no publish triggered (because no subscribers)
policy2.create_policy('policy')
policy2.subscribe('policy', 'classification:p')
helper.retry_check_function_return_value(
lambda: 'classification:p' in
policy._published_tables_with_subscriber, True)
self.assertEqual(list(policy.policySubData.keys()),
[('p', 'classification', None)])
helper.retry_check_db_equal(
policy2, 'policy:classification:p(x)',
'policy:classification:p(3)')
policy.insert('q(4)', target='data')
helper.retry_check_db_equal(
policy2, 'policy:classification:p(x)',
('policy:classification:p(3)'
' policy:classification:p(4)'))
# test that no change to p means no publish triggered
policy.insert('q(2)', target='data')
# TODO(ekcs): test no publish triggered
policy.delete('q(4)', target='data')
helper.retry_check_db_equal(
policy2, 'policy:classification:p(x)',
'policy:classification:p(3)')
policy2.unsubscribe('policy', 'classification:p')
# trigger removed
helper.retry_check_function_return_value(
lambda: len(policy._published_tables_with_subscriber) == 0, True)
self.assertEqual(list(policy.policySubData.keys()), [])
policy.insert('q(4)', target='data')
# TODO(ekcs): test that no publish triggered (because no subscribers)
node.stop()
# TODO(dse2): the policy table subscription feature is not ready in DSE2
# the problem is that compile and agnostic are not ready to deal with
# the double qualifier publishing_policy_engine:policy_name:table_name
# that the receiving policy engine needs to handle when subscribing to
# policy_name:table_name from publishing_policy_engine.
# This feature is not urgent because Congress currently does not have
# one policy engine subscribing from another policy engine
# (only from data source)
# def test_policy_table_publish(self):
# """Policy table result publish
#
# Test basic DSE functionality with policy engine and table result
# publish.
# """
# node = helper.make_dsenode_new_partition('testnode')
# data = fake_datasource.FakeDataSource('data')
# policy = agnostic.DseRuntime('policy')
# policy2 = agnostic.DseRuntime('policy2')
# node.register_service(data)
# node.register_service(policy)
# node.register_service(policy2)
# policy.synchronizer = mock.MagicMock()
# policy2.synchronizer = mock.MagicMock()
#
# policy.create_policy(
# 'data', kind=datalog_base.DATASOURCE_POLICY_TYPE)
# policy.create_policy('classification')
# policy.set_schema('data', compile.Schema({'q': (1,)}))
# policy.insert('p(x):-data:q(x),gt(x,2)', target='classification')
#
# policy.insert('q(3)', target='data')
# # TODO(ekcs): test that no publish triggered (because no subscribers)
#
# policy2.create_policy('policy')
# policy2.subscribe('policy', 'classification:p')
# helper.retry_check_function_return_value(
# lambda: 'classification:p' in
# policy._published_tables_with_subscriber, True)
# self.assertEqual(list(policy.policySubData.keys()),
# [('p', 'classification', None)])
#
# helper.retry_check_db_equal(
# policy2, 'policy:classification:p(x)',
# 'policy:classification:p(3)')
#
# policy.insert('q(4)', target='data')
# helper.retry_check_db_equal(
# policy2, 'policy:classification:p(x)',
# ('policy:classification:p(3)'
# ' policy:classification:p(4)'))
#
# # test that no change to p means no publish triggered
# policy.insert('q(2)', target='data')
# # TODO(ekcs): test no publish triggered
#
# policy.delete('q(4)', target='data')
# helper.retry_check_db_equal(
# policy2, 'policy:classification:p(x)',
# 'policy:classification:p(3)')
#
# policy2.unsubscribe('policy', 'classification:p')
# # trigger removed
# helper.retry_check_function_return_value(
# lambda: len(policy._published_tables_with_subscriber) == 0, True)
# self.assertEqual(list(policy.policySubData.keys()), [])
#
# policy.insert('q(4)', target='data')
# # TODO(ekcs): test that no publish triggered (because no subscribers)
# node.stop()
def test_replicated_pe_exec(self):
"""Test correct local leader behavior with 2 PEs requesting exec"""

View File

@ -72,7 +72,6 @@ class TestAgnostic(base.TestCase):
def test_receive_data_no_sequence_num(self):
'''Test receiving data without sequence numbers'''
run = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID)
run.always_snapshot = False
run.create_policy('datasource1')
# initialize with full table
@ -118,7 +117,6 @@ class TestAgnostic(base.TestCase):
def test_receive_data_in_order(self):
'''Test receiving data with sequence numbers, in order'''
run = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID)
run.always_snapshot = False
run.create_policy('datasource1')
# initialize with full table
@ -164,7 +162,6 @@ class TestAgnostic(base.TestCase):
def test_receive_data_out_of_order(self):
'''Test receiving data with sequence numbers, out of order'''
run = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID)
run.always_snapshot = False
run.create_policy('datasource1')
# update with lower seqnum than init snapshot is ignored
@ -207,7 +204,6 @@ class TestAgnostic(base.TestCase):
def test_receive_data_arbitrary_start(self):
'''Test receiving data with arbitrary starting sequence number'''
run = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID)
run.always_snapshot = False
run.create_policy('datasource1')
run.receive_data_sequenced(
publisher='datasource1', table='p',
@ -222,7 +218,6 @@ class TestAgnostic(base.TestCase):
Only one message (arbitrary) should be processed.
'''
run = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID)
run.always_snapshot = False
run.create_policy('datasource1')
# send three updates with the same seqnum
@ -254,7 +249,6 @@ class TestAgnostic(base.TestCase):
def test_receive_data_sequence_number_max_int(self):
'''Test receiving data when sequence number goes over max int'''
run = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID)
run.always_snapshot = False
run.create_policy('datasource1')
run.receive_data_sequenced(
@ -296,7 +290,6 @@ class TestAgnostic(base.TestCase):
def test_receive_data_multiple_tables(self):
'''Test receiving data with sequence numbers, multiple tables'''
run = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID)
run.always_snapshot = False
run.create_policy('datasource1')
# initialize p with full table