Allow subscription before data service exists

If service does not respond, initialize to empty data.
When the service comes into existence,
the data will flow as normal.

Preserves pre-DSE2 behavior.

Closes-Bug: 1637172

Change-Id: Ic4910525a1c0bc13d79b3773854abdf6fe447908
This commit is contained in:
Eric K 2016-11-29 15:44:15 -08:00
parent 987ccb15c8
commit 5beae4bd26
4 changed files with 74 additions and 22 deletions

View File

@ -11,22 +11,22 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from oslo_log import log as logging
from oslo_serialization import jsonutils as json
LOG = logging.getLogger(__name__)
import copy
from oslo_config import cfg
import six
import six # thirdparty import first because needed in import of Queue/queue
import time
if six.PY2:
import Queue as queue_package
else:
import queue as queue_package
from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils as json
from congress import exception
LOG = logging.getLogger(__name__)
class DataServiceInfo(object):
"""Metadata for DataService on the DSE.
@ -262,16 +262,20 @@ class DataService(object):
# Note(thread-safety): blocking function
def subscribe(self, service, table):
if self.always_snapshot:
try:
if self.always_snapshot:
# Note(thread-safety): blocking call
data = self.node.subscribe_table(
self.service_id, service, table)
self.receive_data(service, table, data, is_snapshot=True)
return
# Note(thread-safety): blocking call
data = self.node.subscribe_table(self.service_id, service, table)
self.receive_data(service, table, data, is_snapshot=True)
return
# Note(thread-safety): blocking call
(seqnum, data) = self.node.subscribe_table(
self.service_id, service, table)
self.receive_data_sequenced(
service, table, data, seqnum, is_snapshot=True)
(seqnum, data) = self.node.subscribe_table(
self.service_id, service, table)
self.receive_data_sequenced(
service, table, data, seqnum, is_snapshot=True)
except exception.NotFound:
self.receive_data(service, table, set(), is_snapshot=True)
def unsubscribe(self, service, table):
# Note(thread-safety): it is important to make sure there are no

View File

@ -95,6 +95,19 @@ class TestRuleModel(base.SqlTestCase):
# {'rule': 'p(x) :- beta:q(name=x)'},
# {}, context=self.context)
def test_add_rule_with_cross_policy_table(self):
test_rule = {
"rule": "p(x) :- classification:q(x)",
"name": "test-rule-cross",
"comment": "test-comment"
}
test_rule_id, obj = self.rule_model.add_item(test_rule, {},
context=self.context)
test_rule['id'] = test_rule_id
ret = self.rule_model.get_item(test_rule_id, {},
context=self.context)
self.assertEqual(test_rule, ret)
def test_get_items(self):
ret = self.rule_model.get_items({}, context=self.context)
self.assertTrue(all(p in ret['results']

View File

@ -25,6 +25,7 @@ cfg.CONF.datasource_sync_period = 0
from oslo_messaging import conffixture
from congress.api import base as api_base
from congress.datalog import base as datalog_base
from congress.datalog import compile
from congress.datasources import nova_driver
from congress import exception as congressException
@ -95,6 +96,23 @@ class TestDSE(base.TestCase):
self.assertFalse(hasattr(test2, "last_msg"))
node.stop()
def test_sub_before_service_exists(self):
node = helper.make_dsenode_new_partition('testnode')
test1 = fake_datasource.FakeDataSource('test1')
node.register_service(test1)
test1.subscribe('test2', 'p')
helper.retry_check_function_return_value(
lambda: test1.last_msg['data'], set())
test2 = fake_datasource.FakeDataSource('test2')
node.register_service(test2)
test2.publish('p', 42)
helper.retry_check_function_return_value(
lambda: test1.last_msg['data'], 42)
self.assertFalse(hasattr(test2, "last_msg"))
node.stop()
node.wait()
def test_internode_pubsub(self):
node1 = helper.make_dsenode_new_partition('testnode1')
test1 = fake_datasource.FakeDataSource('test1')
@ -283,7 +301,7 @@ class TestDSE(base.TestCase):
node.register_service(engine)
engine.create_policy('policy1')
engine.create_policy('data')
engine.create_policy('data', kind=datalog_base.DATASOURCE_POLICY_TYPE)
self.insert_rule(engine, 'p(x) :- data:fake_table(x)', 'policy1')
data.state = {'fake_table': set([(1,), (2,)])}
data.poll()
@ -302,7 +320,7 @@ class TestDSE(base.TestCase):
node.register_service(engine)
engine.create_policy('policy1')
engine.create_policy('data')
engine.create_policy('data', kind=datalog_base.DATASOURCE_POLICY_TYPE)
self.insert_rule(engine, 'p(x) :- data:fake_table(x)', 'policy1')
data.state = {'fake_table': set([(1,), (2,)])}
data.poll()
@ -325,7 +343,7 @@ class TestDSE(base.TestCase):
node.register_service(engine)
engine.create_policy('policy1')
engine.create_policy('data')
engine.create_policy('data', kind=datalog_base.DATASOURCE_POLICY_TYPE)
data.state = {'fake_table': set([(1,), (2,)])}
data.poll()
self.insert_rule(engine, 'p(x) :- data:fake_table(x)', 'policy1')
@ -458,7 +476,7 @@ class TestDSE(base.TestCase):
node.register_service(policy)
node.register_service(policy2)
policy.create_policy('data')
policy.create_policy('data', kind=datalog_base.DATASOURCE_POLICY_TYPE)
policy.create_policy('classification')
policy.set_schema('data', compile.Schema({'q': (1,)}))
policy.insert('p(x):-data:q(x),gt(x,2)', target='classification')

View File

@ -134,6 +134,23 @@ class TestRuntime(base.TestCase):
'p(1)',
'Multipolicy deletion select')
def test_cross_policy_rule(self):
"""Test rule that refer to table from another policy."""
run = agnostic.Runtime()
run.create_policy('test1')
run.create_policy('test2')
run.create_policy('test3')
run.insert(
'p(x) :- test1:q(x),test2:q(x),test3:q(x),q(x) q(1) q(2) q(3)',
'test3')
run.insert('q(1)', 'test1')
run.insert('q(1) q(2)', 'test2')
self.assertEqual(
run.select('p(x)', 'test3'),
'p(1)',
'Cross-policy rule select')
def test_policy_types(self):
"""Test types for multiple policies."""
# policy types