Merge "Make differential pub-sub standard in DSE2"
This commit is contained in:
commit
54cdc53b1b
|
@ -101,8 +101,6 @@ class DataService(object):
|
|||
def __init__(self, service_id):
|
||||
# Note(ekcs): temporary setting to disable use of diffs and sequencing
|
||||
# to avoid muddying the process of a first dse2 system test.
|
||||
# TODO(ekcs,dse2): remove when differential update is standard
|
||||
self.always_snapshot = True
|
||||
self.service_id = service_id
|
||||
self.node = None
|
||||
self._rpc_server = None
|
||||
|
@ -227,11 +225,7 @@ class DataService(object):
|
|||
return self.node.make_datasource_dict(*args, **kwargs)
|
||||
|
||||
# Note(thread-safety): blocking function
|
||||
def publish(self, table, data, use_snapshot=True):
|
||||
if self.always_snapshot:
|
||||
self.node.publish_table(self.service_id, table, data)
|
||||
return
|
||||
|
||||
def publish(self, table, data, use_snapshot=False):
|
||||
def get_differential_and_set_last_published_data():
|
||||
if table in self._last_published_data:
|
||||
to_add = list(
|
||||
|
@ -241,7 +235,7 @@ class DataService(object):
|
|||
self._last_published_data[table] = data
|
||||
else:
|
||||
self._last_published_data[table] = data
|
||||
to_add = data
|
||||
to_add = copy.copy(data)
|
||||
to_del = []
|
||||
return [to_add, to_del]
|
||||
|
||||
|
@ -264,12 +258,6 @@ class DataService(object):
|
|||
# Note(thread-safety): blocking function
|
||||
def subscribe(self, service, table):
|
||||
try:
|
||||
if self.always_snapshot:
|
||||
# Note(thread-safety): blocking call
|
||||
data = self.node.subscribe_table(
|
||||
self.service_id, service, table)
|
||||
self.receive_data(service, table, data, is_snapshot=True)
|
||||
return
|
||||
# Note(thread-safety): blocking call
|
||||
(seqnum, data) = self.node.subscribe_table(
|
||||
self.service_id, service, table)
|
||||
|
|
|
@ -83,10 +83,6 @@ class DseNode(object):
|
|||
|
||||
def __init__(self, messaging_config, node_id, node_rpc_endpoints,
|
||||
partition_id=None):
|
||||
# Note(ekcs): temporary setting to disable use of diffs and sequencing
|
||||
# to avoid muddying the process of a first dse2 system test.
|
||||
# TODO(ekcs,dse2): remove when differential update is standard
|
||||
self.always_snapshot = False
|
||||
self.messaging_config = messaging_config
|
||||
self.node_id = node_id
|
||||
self.node_rpc_endpoints = node_rpc_endpoints
|
||||
|
@ -142,7 +138,6 @@ class DseNode(object):
|
|||
% (service.service_id, self.node_id))
|
||||
raise exception.DataServiceError(msg)
|
||||
access_policy = dispatcher.DefaultRPCAccessPolicy
|
||||
service.always_snapshot = self.always_snapshot
|
||||
service.node = self
|
||||
self._services.append(service)
|
||||
service._target = self.service_rpc_target(service.service_id,
|
||||
|
@ -428,17 +423,12 @@ class DseNode(object):
|
|||
self.subscriptions[publisher][table].add(subscriber)
|
||||
|
||||
# oslo returns [] instead of set(), so handle that case directly
|
||||
if self.always_snapshot:
|
||||
# Note(thread-safety): blocking call
|
||||
snapshot = self.invoke_service_rpc(
|
||||
publisher, "get_snapshot", {'table': table})
|
||||
return self.to_set_of_tuples(snapshot)
|
||||
else:
|
||||
# Note(thread-safety): blocking call
|
||||
snapshot_seqnum = self.invoke_service_rpc(
|
||||
publisher, "get_last_published_data_with_seqnum",
|
||||
{'table': table})
|
||||
return snapshot_seqnum
|
||||
|
||||
# Note(thread-safety): blocking call
|
||||
snapshot_seqnum = self.invoke_service_rpc(
|
||||
publisher, "get_last_published_data_with_seqnum",
|
||||
{'table': table})
|
||||
return snapshot_seqnum
|
||||
|
||||
def get_subscription(self, service_id):
|
||||
"""Return publisher/tables subscribed by service: service_id
|
||||
|
|
|
@ -2009,7 +2009,7 @@ class DseRuntime (Runtime, data_service.DataService):
|
|||
# table_name, self.policySubData[table]))
|
||||
(policy, tablename) = compile.Tablename.parse_service_table(table_name)
|
||||
data = self.get_row_data(tablename, policy, trace=False)
|
||||
data = [record['data'] for record in data]
|
||||
data = [tuple(record['data']) for record in data]
|
||||
return data
|
||||
|
||||
def _maintain_triggers(self):
|
||||
|
|
|
@ -55,7 +55,7 @@ class TestDSE(base.TestCase):
|
|||
test1.subscribe('test2', 'p')
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: hasattr(test1, 'last_msg'), True)
|
||||
test2.publish('p', 42)
|
||||
test2.publish('p', 42, use_snapshot=True)
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: test1.last_msg['data'], 42)
|
||||
self.assertFalse(hasattr(test2, "last_msg"))
|
||||
|
@ -73,7 +73,7 @@ class TestDSE(base.TestCase):
|
|||
test2.subscribe('test1', 'p')
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: hasattr(test2, 'last_msg'), True)
|
||||
test1.publish('p', 42)
|
||||
test1.publish('p', 42, use_snapshot=True)
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: test2.last_msg['data'], 42)
|
||||
self.assertFalse(hasattr(test1, "last_msg"))
|
||||
|
@ -91,7 +91,7 @@ class TestDSE(base.TestCase):
|
|||
test1.unsubscribe('test2', 'q') # unsub from q should not affect p
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: hasattr(test1, 'last_msg'), True)
|
||||
test2.publish('p', 42)
|
||||
test2.publish('p', 42, use_snapshot=True)
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: test1.last_msg['data'], 42)
|
||||
self.assertFalse(hasattr(test2, "last_msg"))
|
||||
|
@ -106,7 +106,7 @@ class TestDSE(base.TestCase):
|
|||
self.assertFalse(hasattr(test1, "last_msg"))
|
||||
test2 = fake_datasource.FakeDataSource('test2')
|
||||
node.register_service(test2)
|
||||
test2.publish('p', 42)
|
||||
test2.publish('p', 42, use_snapshot=True)
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: test1.last_msg['data'], 42)
|
||||
self.assertFalse(hasattr(test2, "last_msg"))
|
||||
|
@ -124,7 +124,7 @@ class TestDSE(base.TestCase):
|
|||
test1.subscribe('test2', 'p')
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: hasattr(test1, 'last_msg'), True)
|
||||
test2.publish('p', 42)
|
||||
test2.publish('p', 42, use_snapshot=True)
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: test1.last_msg['data'], 42)
|
||||
self.assertFalse(hasattr(test2, "last_msg"))
|
||||
|
@ -144,7 +144,7 @@ class TestDSE(base.TestCase):
|
|||
test1.unsubscribe('test2', 'q') # unsub from q should not affect p
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: hasattr(test1, 'last_msg'), True)
|
||||
test2.publish('p', 42)
|
||||
test2.publish('p', 42, use_snapshot=True)
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: test1.last_msg['data'], 42)
|
||||
self.assertFalse(hasattr(test2, "last_msg"))
|
||||
|
@ -164,7 +164,7 @@ class TestDSE(base.TestCase):
|
|||
test1.subscribe('test3', 'p')
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: hasattr(test1, 'last_msg'), True)
|
||||
test3.publish('p', 42)
|
||||
test3.publish('p', 42, use_snapshot=True)
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: test1.last_msg['data'], 42)
|
||||
self.assertFalse(hasattr(test2, "last_msg"))
|
||||
|
@ -197,7 +197,7 @@ class TestDSE(base.TestCase):
|
|||
nova.subscribe('test', 'p')
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: hasattr(nova, 'last_msg'), True)
|
||||
test.publish('p', 42)
|
||||
test.publish('p', 42, use_snapshot=True)
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: nova.last_msg['data'], 42)
|
||||
self.assertFalse(hasattr(test, "last_msg"))
|
||||
|
@ -215,12 +215,12 @@ class TestDSE(base.TestCase):
|
|||
nova.subscribe('test', 'p')
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: hasattr(nova, 'last_msg'), True)
|
||||
test.publish('p', 42)
|
||||
test.publish('p', 42, use_snapshot=True)
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: nova.last_msg['data'], 42)
|
||||
self.assertFalse(hasattr(test, "last_msg"))
|
||||
nova.unsubscribe('test', 'p')
|
||||
test.publish('p', 43)
|
||||
test.publish('p', 43, use_snapshot=True)
|
||||
# hard to test that the message is never delivered
|
||||
time.sleep(0.2)
|
||||
self.assertEqual(nova.last_msg['data'], 42)
|
||||
|
@ -238,7 +238,7 @@ class TestDSE(base.TestCase):
|
|||
test.subscribe('nova', 'p')
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: hasattr(test, 'last_msg'), True)
|
||||
nova.publish('p', 42)
|
||||
nova.publish('p', 42, use_snapshot=True)
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: test.last_msg['data'], 42)
|
||||
self.assertFalse(hasattr(nova, "last_msg"))
|
||||
|
@ -279,7 +279,6 @@ class TestDSE(base.TestCase):
|
|||
|
||||
def test_datasource_poll(self):
|
||||
node = helper.make_dsenode_new_partition('testnode')
|
||||
node.always_snapshot = True # Note(ekcs): this test expects snapshot
|
||||
pub = fake_datasource.FakeDataSource('pub')
|
||||
sub = fake_datasource.FakeDataSource('sub')
|
||||
node.register_service(pub)
|
||||
|
@ -288,15 +287,18 @@ class TestDSE(base.TestCase):
|
|||
sub.subscribe('pub', 'fake_table')
|
||||
pub.state = {'fake_table': set([(1, 2)])}
|
||||
pub.poll()
|
||||
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: sub.last_msg['data'], set(pub.state['fake_table']))
|
||||
lambda: sub.last_msg,
|
||||
{'publisher': 'pub',
|
||||
'data': (set(pub.state['fake_table']), set([])),
|
||||
'table': 'fake_table'})
|
||||
self.assertFalse(hasattr(pub, "last_msg"))
|
||||
node.stop()
|
||||
|
||||
def test_policy_data(self):
|
||||
"""Test policy correctly processes initial data snapshot."""
|
||||
node = helper.make_dsenode_new_partition('testnode')
|
||||
node.always_snapshot = False
|
||||
data = fake_datasource.FakeDataSource('data')
|
||||
engine = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID)
|
||||
node.register_service(data)
|
||||
|
@ -315,7 +317,6 @@ class TestDSE(base.TestCase):
|
|||
def test_policy_data_update(self):
|
||||
"""Test policy correctly processes initial data snapshot and update."""
|
||||
node = helper.make_dsenode_new_partition('testnode')
|
||||
node.always_snapshot = False
|
||||
data = fake_datasource.FakeDataSource('data')
|
||||
engine = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID)
|
||||
node.register_service(data)
|
||||
|
@ -338,7 +339,6 @@ class TestDSE(base.TestCase):
|
|||
def test_policy_data_late_sub(self):
|
||||
"""Test policy correctly processes data on late subscribe."""
|
||||
node = helper.make_dsenode_new_partition('testnode')
|
||||
node.always_snapshot = False
|
||||
data = fake_datasource.FakeDataSource('data')
|
||||
engine = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID)
|
||||
node.register_service(data)
|
||||
|
@ -464,66 +464,75 @@ class TestDSE(base.TestCase):
|
|||
helper.retry_check_function_return_value(
|
||||
lambda: _validate_subbed_tables(n), True)
|
||||
|
||||
def test_policy_table_publish(self):
|
||||
"""Policy table result publish
|
||||
|
||||
Test basic DSE functionality with policy engine and table result
|
||||
publish.
|
||||
"""
|
||||
node = helper.make_dsenode_new_partition('testnode')
|
||||
data = fake_datasource.FakeDataSource('data')
|
||||
policy = agnostic.DseRuntime('policy')
|
||||
policy2 = agnostic.DseRuntime('policy2')
|
||||
node.register_service(data)
|
||||
node.register_service(policy)
|
||||
node.register_service(policy2)
|
||||
policy.synchronizer = mock.MagicMock()
|
||||
policy2.synchronizer = mock.MagicMock()
|
||||
|
||||
policy.create_policy('data', kind=datalog_base.DATASOURCE_POLICY_TYPE)
|
||||
policy.create_policy('classification')
|
||||
policy.set_schema('data', compile.Schema({'q': (1,)}))
|
||||
policy.insert('p(x):-data:q(x),gt(x,2)', target='classification')
|
||||
|
||||
policy.insert('q(3)', target='data')
|
||||
# TODO(ekcs): test that no publish triggered (because no subscribers)
|
||||
|
||||
policy2.create_policy('policy')
|
||||
policy2.subscribe('policy', 'classification:p')
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: 'classification:p' in
|
||||
policy._published_tables_with_subscriber, True)
|
||||
self.assertEqual(list(policy.policySubData.keys()),
|
||||
[('p', 'classification', None)])
|
||||
|
||||
helper.retry_check_db_equal(
|
||||
policy2, 'policy:classification:p(x)',
|
||||
'policy:classification:p(3)')
|
||||
|
||||
policy.insert('q(4)', target='data')
|
||||
helper.retry_check_db_equal(
|
||||
policy2, 'policy:classification:p(x)',
|
||||
('policy:classification:p(3)'
|
||||
' policy:classification:p(4)'))
|
||||
|
||||
# test that no change to p means no publish triggered
|
||||
policy.insert('q(2)', target='data')
|
||||
# TODO(ekcs): test no publish triggered
|
||||
|
||||
policy.delete('q(4)', target='data')
|
||||
helper.retry_check_db_equal(
|
||||
policy2, 'policy:classification:p(x)',
|
||||
'policy:classification:p(3)')
|
||||
|
||||
policy2.unsubscribe('policy', 'classification:p')
|
||||
# trigger removed
|
||||
helper.retry_check_function_return_value(
|
||||
lambda: len(policy._published_tables_with_subscriber) == 0, True)
|
||||
self.assertEqual(list(policy.policySubData.keys()), [])
|
||||
|
||||
policy.insert('q(4)', target='data')
|
||||
# TODO(ekcs): test that no publish triggered (because no subscribers)
|
||||
node.stop()
|
||||
# TODO(dse2): the policy table subscription feature is not ready in DSE2
|
||||
# the problem is that compile and agnostic are not ready to deal with
|
||||
# the double qualifier publishing_policy_engine:policy_name:table_name
|
||||
# that the receiving policy engine needs to handle when subscribing to
|
||||
# policy_name:table_name from publishing_policy_engine.
|
||||
# This feature is not urgent because Congress currently does not have
|
||||
# one policy engine subscribing from another policy engine
|
||||
# (only from data source)
|
||||
# def test_policy_table_publish(self):
|
||||
# """Policy table result publish
|
||||
#
|
||||
# Test basic DSE functionality with policy engine and table result
|
||||
# publish.
|
||||
# """
|
||||
# node = helper.make_dsenode_new_partition('testnode')
|
||||
# data = fake_datasource.FakeDataSource('data')
|
||||
# policy = agnostic.DseRuntime('policy')
|
||||
# policy2 = agnostic.DseRuntime('policy2')
|
||||
# node.register_service(data)
|
||||
# node.register_service(policy)
|
||||
# node.register_service(policy2)
|
||||
# policy.synchronizer = mock.MagicMock()
|
||||
# policy2.synchronizer = mock.MagicMock()
|
||||
#
|
||||
# policy.create_policy(
|
||||
# 'data', kind=datalog_base.DATASOURCE_POLICY_TYPE)
|
||||
# policy.create_policy('classification')
|
||||
# policy.set_schema('data', compile.Schema({'q': (1,)}))
|
||||
# policy.insert('p(x):-data:q(x),gt(x,2)', target='classification')
|
||||
#
|
||||
# policy.insert('q(3)', target='data')
|
||||
# # TODO(ekcs): test that no publish triggered (because no subscribers)
|
||||
#
|
||||
# policy2.create_policy('policy')
|
||||
# policy2.subscribe('policy', 'classification:p')
|
||||
# helper.retry_check_function_return_value(
|
||||
# lambda: 'classification:p' in
|
||||
# policy._published_tables_with_subscriber, True)
|
||||
# self.assertEqual(list(policy.policySubData.keys()),
|
||||
# [('p', 'classification', None)])
|
||||
#
|
||||
# helper.retry_check_db_equal(
|
||||
# policy2, 'policy:classification:p(x)',
|
||||
# 'policy:classification:p(3)')
|
||||
#
|
||||
# policy.insert('q(4)', target='data')
|
||||
# helper.retry_check_db_equal(
|
||||
# policy2, 'policy:classification:p(x)',
|
||||
# ('policy:classification:p(3)'
|
||||
# ' policy:classification:p(4)'))
|
||||
#
|
||||
# # test that no change to p means no publish triggered
|
||||
# policy.insert('q(2)', target='data')
|
||||
# # TODO(ekcs): test no publish triggered
|
||||
#
|
||||
# policy.delete('q(4)', target='data')
|
||||
# helper.retry_check_db_equal(
|
||||
# policy2, 'policy:classification:p(x)',
|
||||
# 'policy:classification:p(3)')
|
||||
#
|
||||
# policy2.unsubscribe('policy', 'classification:p')
|
||||
# # trigger removed
|
||||
# helper.retry_check_function_return_value(
|
||||
# lambda: len(policy._published_tables_with_subscriber) == 0, True)
|
||||
# self.assertEqual(list(policy.policySubData.keys()), [])
|
||||
#
|
||||
# policy.insert('q(4)', target='data')
|
||||
# # TODO(ekcs): test that no publish triggered (because no subscribers)
|
||||
# node.stop()
|
||||
|
||||
def test_replicated_pe_exec(self):
|
||||
"""Test correct local leader behavior with 2 PEs requesting exec"""
|
||||
|
|
|
@ -72,7 +72,6 @@ class TestAgnostic(base.TestCase):
|
|||
def test_receive_data_no_sequence_num(self):
|
||||
'''Test receiving data without sequence numbers'''
|
||||
run = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID)
|
||||
run.always_snapshot = False
|
||||
run.create_policy('datasource1')
|
||||
|
||||
# initialize with full table
|
||||
|
@ -118,7 +117,6 @@ class TestAgnostic(base.TestCase):
|
|||
def test_receive_data_in_order(self):
|
||||
'''Test receiving data with sequence numbers, in order'''
|
||||
run = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID)
|
||||
run.always_snapshot = False
|
||||
run.create_policy('datasource1')
|
||||
|
||||
# initialize with full table
|
||||
|
@ -164,7 +162,6 @@ class TestAgnostic(base.TestCase):
|
|||
def test_receive_data_out_of_order(self):
|
||||
'''Test receiving data with sequence numbers, out of order'''
|
||||
run = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID)
|
||||
run.always_snapshot = False
|
||||
run.create_policy('datasource1')
|
||||
|
||||
# update with lower seqnum than init snapshot is ignored
|
||||
|
@ -207,7 +204,6 @@ class TestAgnostic(base.TestCase):
|
|||
def test_receive_data_arbitrary_start(self):
|
||||
'''Test receiving data with arbitrary starting sequence number'''
|
||||
run = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID)
|
||||
run.always_snapshot = False
|
||||
run.create_policy('datasource1')
|
||||
run.receive_data_sequenced(
|
||||
publisher='datasource1', table='p',
|
||||
|
@ -222,7 +218,6 @@ class TestAgnostic(base.TestCase):
|
|||
Only one message (arbitrary) should be processed.
|
||||
'''
|
||||
run = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID)
|
||||
run.always_snapshot = False
|
||||
run.create_policy('datasource1')
|
||||
|
||||
# send three updates with the same seqnum
|
||||
|
@ -254,7 +249,6 @@ class TestAgnostic(base.TestCase):
|
|||
def test_receive_data_sequence_number_max_int(self):
|
||||
'''Test receiving data when sequence number goes over max int'''
|
||||
run = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID)
|
||||
run.always_snapshot = False
|
||||
run.create_policy('datasource1')
|
||||
|
||||
run.receive_data_sequenced(
|
||||
|
@ -296,7 +290,6 @@ class TestAgnostic(base.TestCase):
|
|||
def test_receive_data_multiple_tables(self):
|
||||
'''Test receiving data with sequence numbers, multiple tables'''
|
||||
run = agnostic.DseRuntime(api_base.ENGINE_SERVICE_ID)
|
||||
run.always_snapshot = False
|
||||
run.create_policy('datasource1')
|
||||
|
||||
# initialize p with full table
|
||||
|
|
Loading…
Reference in New Issue