Migrate harness to DSE2

Pull API models off the bus and give them an argument
to the bus so they can still RPC.  Necessary since API models
are part of 1 service on the bus in DSE2 (and never needed to be
on the bus for DSE1).

Introduce new creation process that bootstraps system using DSE2.

Change-Id: Ic91e2f7918970766fad8e2aff4a0ef88fb5777e2
This commit is contained in:
Tim Hinrichs 2016-02-15 12:06:09 -08:00
parent d7f122db21
commit 2354bf4c58
24 changed files with 993 additions and 216 deletions

View File

@ -29,12 +29,6 @@ def d6service(name, keys, inbox, datapath, args):
class ActionsModel(base.APIModel):
"""Model for handling API requests about Actions."""
def __init__(self, name, keys='', inbox=None, dataPath=None,
policy_engine=None, datasource_mgr=None):
super(ActionsModel, self).__init__(name, keys, inbox=inbox,
dataPath=dataPath,
policy_engine=policy_engine,
datasource_mgr=datasource_mgr)
def get_items(self, params, context=None):
"""Retrieve items from this model.

View File

@ -24,9 +24,11 @@ import webob
import webob.dec
from congress.api import webservice
from congress.dse2 import data_service
LOG = logging.getLogger(__name__)
API_SERVICE_NAME = '__api'
class ApiApplication(object):
@ -64,7 +66,7 @@ class ApiApplication(object):
return response
class ResourceManager(object):
class ResourceManager(data_service.DataService):
"""A container for REST API resources.
This container is meant to be called from one or more wsgi servers/ports.
@ -75,6 +77,7 @@ class ResourceManager(object):
def __init__(self):
self.handlers = []
super(ResourceManager, self).__init__(API_SERVICE_NAME)
def register_handler(self, handler, search_index=None):
"""Register a new resource handler.

View File

@ -19,27 +19,24 @@ from __future__ import absolute_import
from oslo_config import cfg
# Use new deepsix when appropriate
if getattr(cfg.CONF, 'distributed_architecture', None):
from congress.dse2 import deepsix2 as deepsix
else:
from congress.dse import deepsix
from congress import exception
class APIModel(deepsix.deepSix):
class APIModel(object):
"""Base Class for handling API requests."""
def __init__(self, name, keys='', inbox=None, dataPath=None,
policy_engine=None, datasource_mgr=None):
super(APIModel, self).__init__(name, keys, inbox=inbox,
dataPath=dataPath)
policy_engine=None, datasource_mgr=None, bus=None):
# super(APIModel, self).__init__(name, keys, inbox=inbox,
# dataPath=dataPath)
self.engine = policy_engine
self.datasource_mgr = datasource_mgr
self.bus = bus
self.name = name
def invoke_rpc(self, caller, name, kwargs):
if getattr(cfg.CONF, 'distributed_architecture', None):
return self.rpc(caller, name, kwargs)
return self.bus.rpc(caller, name, kwargs)
else:
func = getattr(caller, name, None)
if func:

View File

@ -39,11 +39,13 @@ def d6service(name, keys, inbox, datapath, args):
class DatasourceModel(base.APIModel):
"""Model for handling API requests about Datasources."""
def __init__(self, name, keys='', inbox=None, dataPath=None,
policy_engine=None, datasource_mgr=None, synchronizer=None):
policy_engine=None, datasource_mgr=None, bus=None,
synchronizer=None):
super(DatasourceModel, self).__init__(name, keys, inbox=inbox,
dataPath=dataPath,
policy_engine=policy_engine,
datasource_mgr=datasource_mgr)
datasource_mgr=datasource_mgr,
bus=bus)
self.synchronizer = synchronizer
self.dist_arch = getattr(cfg.CONF, 'distributed_architecture', False)
@ -60,7 +62,7 @@ class DatasourceModel(base.APIModel):
dict will also be rendered for the user.
"""
if self.dist_arch:
self.datasource_mgr = self
self.datasource_mgr = self.bus
results = self.datasource_mgr.get_datasources(filter_secret=True)
@ -91,7 +93,7 @@ class DatasourceModel(base.APIModel):
obj = None
try:
if self.dist_arch:
obj = self.add_datasource(item=item)
obj = self.bus.add_datasource(item=item)
# Get the schema for the datasource using service_id
schema = self.invoke_rpc(obj['name'], 'get_datasource_schema',
{'source_id': obj['name']})
@ -117,11 +119,11 @@ class DatasourceModel(base.APIModel):
ds_id = context.get('ds_id')
try:
if self.dist_arch:
datasource = self.get_datasource(ds_id)
datasource = self.bus.get_datasource(ds_id)
args = {'name': datasource['name'],
'disallow_dangling_refs': True}
self.invoke_rpc(self.engine, 'delete_policy', args)
self.delete_datasource(datasource)
self.bus.delete_datasource(datasource)
else:
self.datasource_mgr.delete_datasource(ds_id)
except (exception.DatasourceNotFound,

View File

@ -34,12 +34,6 @@ def d6service(name, keys, inbox, datapath, args):
class RowModel(base.APIModel):
"""Model for handling API requests about Rows."""
def __init__(self, name, keys='', inbox=None, dataPath=None,
policy_engine=None, datasource_mgr=None):
super(RowModel, self).__init__(name, keys, inbox=inbox,
dataPath=dataPath,
policy_engine=policy_engine,
datasource_mgr=datasource_mgr)
# TODO(thinrichs): No rows have IDs right now. Maybe eventually
# could make ID the hash of the row, but then might as well
@ -76,7 +70,6 @@ class RowModel(base.APIModel):
# Get the caller, it should be either policy or datasource
caller, source_id = api_utils.get_id_from_context(
context, self.datasource_mgr, self.engine)
table_id = context['table_id']
try:
args = {'table_id': table_id, 'source_id': source_id,

View File

@ -33,11 +33,6 @@ def d6service(name, keys, inbox, datapath, args):
class SchemaModel(base.APIModel):
"""Model for handling API requests about Schemas."""
def __init__(self, name, keys, inbox=None, dataPath=None,
datasource_mgr=None):
super(SchemaModel, self).__init__(name, keys, inbox=inbox,
dataPath=dataPath,
datasource_mgr=datasource_mgr)
def get_item(self, id_, params, context=None):
"""Retrieve item with id id_ from model.

View File

@ -34,12 +34,6 @@ def d6service(name, keys, inbox, datapath, args):
class StatusModel(base.APIModel):
"""Model for handling API requests about Statuses."""
def __init__(self, name, keys='', inbox=None, dataPath=None,
policy_engine=None, datasource_mgr=None):
super(StatusModel, self).__init__(name, keys, inbox=inbox,
dataPath=dataPath,
policy_engine=policy_engine,
datasource_mgr=datasource_mgr)
def get_item(self, id_, params, context=None):
"""Retrieve item with id id_ from model.

View File

@ -20,8 +20,8 @@ from __future__ import absolute_import
from oslo_log import log as logging
from congress.api import api_utils
from congress.api import base
from congress.api import webservice
from congress.dse import deepsix
from congress import exception
@ -33,12 +33,12 @@ def d6service(name, keys, inbox, datapath, args):
dataPath=datapath, **args)
class DatasourceDriverModel(deepsix.deepSix):
class DatasourceDriverModel(base.APIModel):
"""Model for handling API requests about DatasourceDriver."""
def __init__(self, name, keys, inbox=None, dataPath=None,
datasource_mgr=None):
def __init__(self, name, keys='', inbox=None, dataPath=None,
datasource_mgr=None, bus=None):
super(DatasourceDriverModel, self).__init__(name, keys, inbox=inbox,
dataPath=dataPath)
dataPath=dataPath, bus=bus)
self.datasource_mgr = datasource_mgr
def rpc(self, caller, name, *args, **kwargs):

View File

@ -33,12 +33,6 @@ def d6service(name, keys, inbox, datapath, args):
class TableModel(base.APIModel):
"""Model for handling API requests about Tables."""
def __init__(self, name, keys='', inbox=None, dataPath=None,
policy_engine=None, datasource_mgr=None):
super(TableModel, self).__init__(name, keys, inbox=inbox,
dataPath=dataPath,
policy_engine=policy_engine,
datasource_mgr=datasource_mgr)
def get_item(self, id_, params, context=None):
"""Retrieve item with id id_ from model.

View File

@ -26,13 +26,28 @@ import sys
from oslo_config import cfg
from oslo_log import log as logging
from congress.api import action_model
from congress.api import application
from congress.api import datasource_model
from congress.api import policy_model
from congress.api import router
from congress.api import row_model
from congress.api import rule_model
from congress.api import schema_model
from congress.api import status_model
from congress.api.system import driver_model
from congress.api import table_model
from congress.datalog import base
from congress.dse import d6cage
from congress.dse2 import dse_node
from congress import exception
from congress.managers import datasource as datasource_manager
from congress.policy_engines.agnostic import Dse2Runtime
from congress.tests import helper
LOG = logging.getLogger(__name__)
ENGINE_SERVICE_NAME = 'engine'
def create(rootdir, config_override=None):
@ -252,6 +267,183 @@ def create(rootdir, config_override=None):
return cage
def create2(config_override=None, node=None):
"""Get Congress up and running when src is installed in rootdir.
i.e. ROOTDIR=/path/to/congress/congress.
CONFIG_OVERRIDE is a dictionary of dictionaries with configuration
values that overrides those provided in CONFIG_FILE. The top-level
dictionary has keys for the CONFIG_FILE sections, and the second-level
dictionaries store values for that section.
:param node is a DseNode
"""
LOG.debug("Starting Congress with config_override=%s",
config_override)
# create services
services = {}
services[ENGINE_SERVICE_NAME] = create_policy_engine()
services['api'], services['api_service'] = create_api(
services[ENGINE_SERVICE_NAME])
services['datasources'] = create_datasources(services[ENGINE_SERVICE_NAME])
# create message bus and attach services
if node:
bus = node
else:
messaging_config = helper.generate_messaging_config()
bus = dse_node.DseNode(messaging_config, "root", [])
bus.config = config_override or {}
bus.register_service(services[ENGINE_SERVICE_NAME])
for ds in services['datasources']:
bus.register_service(ds)
bus.register_service(services['api_service'])
# TODO(dse2): Need this?
# initialize_policy_engine(services[ENGINE_SERVICE_NAME], services['api'])
# TODO(dse2): Figure out what to do about the synchronizer
# # Start datasource synchronizer after explicitly starting the
# # datasources, because the explicit call to create a datasource
# # will crash if the synchronizer creates the datasource first.
# synchronizer_path = os.path.join(src_path, "synchronizer.py")
# LOG.info("main::start() synchronizer: %s", synchronizer_path)
# cage.loadModule("Synchronizer", synchronizer_path)
# cage.createservice(
# name="synchronizer",
# moduleName="Synchronizer",
# description="DB synchronizer instance",
# args={'poll_time': cfg.CONF.datasource_sync_period})
# synchronizer = cage.service_object('synchronizer')
# engine.set_synchronizer(synchronizer)
return services
def create_api(policy_engine):
"""Return service that encapsulates api logic for DSE2."""
# ResourceManager inherits from DataService
api_resource_mgr = application.ResourceManager()
models = create_api_models(policy_engine, api_resource_mgr)
router.APIRouterV1(api_resource_mgr, models)
return models, api_resource_mgr
def create_api_models(policy_engine, bus):
"""Create all the API models and return as a dictionary for DSE2."""
policy_engine = policy_engine.name
datasource_mgr = None
res = {}
res['api-policy'] = policy_model.PolicyModel(
'api-policy', policy_engine=policy_engine, bus=bus)
res['api-rule'] = rule_model.RuleModel(
'api-rule', policy_engine=policy_engine, bus=bus)
res['api-row'] = row_model.RowModel(
'api-row', policy_engine=policy_engine,
datasource_mgr=datasource_mgr, bus=bus)
# TODO(dse2): migrate this to DSE2 and then reenable
res['api-datasource'] = datasource_model.DatasourceModel(
'api-datasource', policy_engine=policy_engine, bus=bus)
res['api-schema'] = schema_model.SchemaModel(
'api-schema', datasource_mgr=datasource_mgr, bus=bus)
res['api-table'] = table_model.TableModel(
'api-table', policy_engine=policy_engine,
datasource_mgr=datasource_mgr, bus=bus)
res['api-status'] = status_model.StatusModel(
'api-status', policy_engine=policy_engine,
datasource_mgr=datasource_mgr, bus=bus)
res['api-action'] = action_model.ActionsModel(
'api-action', policy_engine=policy_engine,
datasource_mgr=datasource_mgr, bus=bus)
res['api-system'] = driver_model.DatasourceDriverModel(
'api-system', datasource_mgr=datasource_mgr, bus=bus)
return res
def create_policy_engine():
"""Create policy engine and initialize it using the api models."""
engine = Dse2Runtime(ENGINE_SERVICE_NAME)
engine.initialize_table_subscriptions()
engine.debug_mode() # should take this out for production
return engine
def initialize_policy_engine(engine, api):
"""Initialize the policy engine using the API."""
# Load policies from database
engine.persistent_load_policies()
# TODO(dse2): check that we can move this here, now that we
# have flexible schema handling. If so, remove following
# comment.
# Insert rules. Needs to be done after datasources are loaded
# so that we can compile away column references at read time.
# If datasources loaded after this, we don't have schemas.
engine.persistent_load_rules()
# if this is the first time we are running Congress, need
# to create the default theories (which cannot be deleted)
api_policy = api['api-policy']
engine.DEFAULT_THEORY = 'classification'
engine.builtin_policy_names.add(engine.DEFAULT_THEORY)
try:
api_policy.add_item({'name': engine.DEFAULT_THEORY,
'description': 'default policy'}, {})
except KeyError:
pass
engine.ACTION_THEORY = 'action'
engine.builtin_policy_names.add(engine.ACTION_THEORY)
try:
api_policy.add_item({'kind': base.ACTION_POLICY_TYPE,
'name': engine.ACTION_THEORY,
'description': 'default action policy'},
{})
except KeyError:
pass
# TODO(dse2): delete this subscription and the associated tests.
# Don't want 2 paths for updating policy.
engine.subscribe('api-rule', 'policy-update',
callback=engine.receive_policy_update)
def create_datasources(engine):
"""Create datasource services, modify engine, and return datasources."""
# TODO(dse2): port this code to DSE2. There were never any tests for
# this code, so write those too. In particular, should be able to
# remove the datasourceManager entirely.
datasource_mgr = datasource_manager.DataSourceManager()
datasources = []
for datasource in datasource_mgr.get_datasources():
if not datasource['enabled']:
LOG.info("module %s not enabled, skip loading", datasource['name'])
continue
driver_info = datasource_mgr.get_driver_info(datasource['driver'])
engine.create_policy(datasource['name'],
kind=base.DATASOURCE_POLICY_TYPE)
try:
ds = datasource_mgr.createservice(
name=datasource['name'],
moduleName=driver_info['module'],
args=datasource['config'],
module_driver=True,
type_='datasource_driver',
id_=datasource['id'])
datasources.append(ds)
except datasource_mgr.DataServiceError:
# FIXME(arosen): If createservice raises congress-server
# dies here. So we catch this exception so the server does
# not die. We need to refactor the dse code so it just
# keeps retrying the driver gracefully...
continue
engine.set_schema(ds.name, ds.get_schema())
return datasources
def load_data_service(service_name, config, cage, rootdir, id_):
"""Load service.

View File

@ -492,7 +492,7 @@ class Runtime (object):
def initialize_datasource(self, name, schema):
"""Initializes datasource by creating policy and setting schema. """
try:
self.create_policy(name, kind=base.DATABASE_POLICY_TYPE)
self.create_policy(name, kind=base.DATASOURCE_POLICY_TYPE)
except KeyError:
raise exception.DatasourceNameInUse(value=name)
try:

View File

@ -55,12 +55,16 @@ def congress_app_factory(global_conf, **local_conf):
# replated with new API model creation method. If All API models can
# be generated without any argument, we don't need to make dict here
# and API process instantiate all API model in APIRouterV1().
cage = harness.create(root_path, data_path)
api_process_dict = dict([[name, service_obj['object']]
for name, service_obj
in cage.getservices().items()
if 'object' in service_obj])
if getattr(cfg.CONF, "distributed_architecture", False):
services = harness.create2(root_path, data_path)
return application.ApiApplication(services['api_service'])
else:
cage = harness.create(root_path, data_path)
api_process_dict = dict([[name, service_obj['object']]
for name, service_obj
in cage.getservices().items()
if 'object' in service_obj])
api_resource_mgr = application.ResourceManager()
router.APIRouterV1(api_resource_mgr, api_process_dict)
return application.ApiApplication(api_resource_mgr)
api_resource_mgr = application.ResourceManager()
router.APIRouterV1(api_resource_mgr, api_process_dict)
return application.ApiApplication(api_resource_mgr)

View File

@ -237,8 +237,8 @@ class TestDsePerformance(testbase.SqlTestCase):
policy = self.engine.DEFAULT_THEORY
formula = compile.parse1(
'q(1) :- data:p(1, 2.3, "foo", "bar", 1, %s)' % ('a'*100 + '1'))
self.api['rule'].publish(
'policy-update', [compile.Event(formula, target=policy)])
self.engine.process_policy_update(
[compile.Event(formula, target=policy)])
# Poll data and wait til it arrives at engine
driver.poll()
@ -283,8 +283,8 @@ class TestDsePerformance(testbase.SqlTestCase):
formula = compile.parse1(
'q(1) :- data:p(1, 2.3, "foo", "bar", 1, %s)' % ('a'*100 + '1'))
LOG.info("publishing rule")
self.api['rule'].publish(
'policy-update', [compile.Event(formula, target=policy)])
self.engine.process_policy_update(
[compile.Event(formula, target=policy)])
# Poll data and wait til it arrives at engine
driver.poll()

View File

@ -35,7 +35,6 @@ from oslo_log import log as logging
from congress.api import webservice
from congress.common import config
from congress.datalog import base as datalog_base
from congress.datalog import compile
from congress import harness
from congress.tests import base
import congress.tests.datasources.test_neutron_driver as test_neutron
@ -162,15 +161,6 @@ class TestCongress(base.SqlTestCase):
args = ['--config-file', helper.etcdir('congress.conf.test')]
config.init(args)
def test_startup(self):
"""Test that everything is properly loaded at startup."""
engine = self.engine
api = self.api
helper.retry_check_subscriptions(
engine, [(api['rule'].name, 'policy-update')])
helper.retry_check_subscribers(
api['rule'], [(engine.name, 'policy-update')])
def test_synchronize_policy_no_erratic_change(self):
"""Test that synchronize_policies does not changes init state"""
with mock.patch.object(self.engine, 'delete_policy') as d:
@ -180,65 +170,6 @@ class TestCongress(base.SqlTestCase):
d.assert_not_called()
c.assert_not_called()
def test_policy_subscriptions(self):
"""Test that policy engine subscriptions adjust to policy changes."""
engine = self.engine
api = self.api
cage = self.cage
policy = engine.DEFAULT_THEORY
# Send formula
formula = test_neutron.create_network_group('p')
LOG.debug("Sending formula: %s", formula)
api['rule'].publish(
'policy-update', [compile.Event(formula, target=policy)])
# check we have the proper subscriptions
self.assertTrue('neutron' in cage.services)
neutron = cage.service_object('neutron')
helper.retry_check_subscriptions(engine, [('neutron', 'networks')])
helper.retry_check_subscribers(neutron, [(engine.name, 'networks')])
def test_neutron(self):
"""Test polling and publishing of neutron updates."""
engine = self.engine
api = self.api
cage = self.cage
policy = engine.DEFAULT_THEORY
# Send formula
formula = test_neutron.create_network_group('p')
LOG.debug("Sending formula: %s", formula)
api['rule'].publish(
'policy-update', [compile.Event(formula, target=policy)])
helper.retry_check_nonempty_last_policy_change(engine)
LOG.debug("All services: %s", cage.services.keys())
neutron = cage.service_object('neutron')
neutron.poll()
ans = ('p("240ff9df-df35-43ae-9df5-27fae87f2492") ')
helper.retry_check_db_equal(engine, 'p(x)', ans, target=policy)
def test_multiple(self):
"""Test polling and publishing of multiple neutron instances."""
api = self.api
cage = self.cage
engine = self.engine
policy = engine.DEFAULT_THEORY
# Send formula
formula = test_neutron.create_networkXnetwork_group('p')
api['rule'].publish(
'policy-update', [compile.Event(formula, target=policy)])
helper.retry_check_nonempty_last_policy_change(engine)
# poll datasources
neutron = cage.service_object('neutron')
neutron2 = cage.service_object('neutron2')
neutron.poll()
neutron2.poll()
# check answer
ans = ('p("240ff9df-df35-43ae-9df5-27fae87f2492", '
' "240ff9df-df35-43ae-9df5-27fae87f2492") ')
helper.retry_check_db_equal(engine, 'p(x,y)', ans, target=policy)
def test_datasource_api_model(self):
"""Test the datasource api model.

View File

@ -15,12 +15,18 @@
from oslo_config import cfg
from congress.policy_engines.agnostic import Dse2Runtime
from congress import harness
from congress.tests import fake_datasource
from congress.tests import helper
def setup_config(services=[]):
def setup_config(with_fake_datasource=True):
"""Setup DseNode for testing.
:param services is an array of DataServices
:param api is a dictionary mapping api name to API model instance
"""
cfg.CONF.set_override('distributed_architecture', True)
# Load the fake driver.
cfg.CONF.set_override(
@ -28,13 +34,23 @@ def setup_config(services=[]):
['congress.tests.fake_datasource.FakeDataSource'])
node = helper.make_dsenode_new_partition("testnode")
engine = Dse2Runtime('engine')
data = fake_datasource.FakeDataSource('data')
services = harness.create2(node=node)
node.register_service(engine)
node.register_service(data)
# Always register engine and fake datasource
# engine = Dse2Runtime('engine')
# node.register_service(engine)
data = None
if with_fake_datasource:
data = fake_datasource.FakeDataSource('data')
node.register_service(data)
for service in services:
node.register_service(service)
# Register provided apis (and no others)
# (ResourceManager inherits from DataService)
# api_map = {a.name: a for a in api}
# api_resource_mgr = application.ResourceManager()
# router.APIRouterV1(api_resource_mgr, api)
# node.register_service(api_resource_mgr)
return {'node': node, 'engine': engine, 'data': data}
engine = services[harness.ENGINE_SERVICE_NAME]
api = services['api']
return {'node': node, 'engine': engine, 'data': data, 'api': api}

View File

@ -21,7 +21,6 @@ from __future__ import absolute_import
from oslo_config import cfg
cfg.CONF.distributed_architecture = True
from congress.api import action_model
from congress.api import webservice
from congress.tests import base
from congress.tests2.api import base as api_base
@ -30,9 +29,8 @@ from congress.tests2.api import base as api_base
class TestActionModel(base.SqlTestCase):
def setUp(self):
super(TestActionModel, self).setUp()
self.action_model = action_model.ActionsModel(
'api-action', policy_engine='engine')
services = api_base.setup_config([self.action_model])
services = api_base.setup_config()
self.action_model = services['api']['api-action']
self.datasource = services['data']
def test_get_datasource_actions(self):

View File

@ -20,7 +20,6 @@ from __future__ import absolute_import
from oslo_config import cfg
cfg.CONF.distributed_architecture = True
from congress.api import datasource_model
from congress.api import webservice
from congress import exception
from congress.tests import base
@ -30,12 +29,11 @@ from congress.tests2.api import base as api_base
class TestDatasourceModel(base.SqlTestCase):
def setUp(self):
super(TestDatasourceModel, self).setUp()
self.datasource_model = datasource_model.DatasourceModel(
'test_datasource', policy_engine='engine')
self.config = api_base.setup_config([self.datasource_model])
self.data = self.config['data']
self.node = self.config['node']
self.engine = self.config['engine']
services = api_base.setup_config()
self.datasource_model = services['api']['api-datasource']
self.data = services['data']
self.node = services['node']
self.engine = services['engine']
self.datasource = self._get_datasource_request()
self.node.add_datasource(self.datasource)

View File

@ -25,8 +25,6 @@ import mock
from oslo_utils import uuidutils
from congress.api import error_codes
from congress.api import policy_model
from congress.api import rule_model
from congress.api import webservice
from congress.tests import base
from congress.tests import helper
@ -37,11 +35,9 @@ class TestPolicyModel(base.SqlTestCase):
def setUp(self):
super(TestPolicyModel, self).setUp()
self.policy_model = policy_model.PolicyModel('api-policy',
policy_engine='engine')
self.rule_api = rule_model.RuleModel('api-rule',
policy_engine='engine')
services = api_base.setup_config([self.policy_model, self.rule_api])
services = api_base.setup_config()
self.policy_model = services['api']['api-policy']
self.rule_api = services['api']['api-rule']
self.node = services['node']
self.engine = services['engine']
self.initial_policies = set(self.engine.policy_names())

View File

@ -20,9 +20,6 @@ from __future__ import absolute_import
from oslo_config import cfg
cfg.CONF.distributed_architecture = True
from congress.api import policy_model
from congress.api import row_model
from congress.api import rule_model
from congress.api import webservice
from congress.tests import base
from congress.tests2.api import base as api_base
@ -32,15 +29,12 @@ class TestRowModel(base.SqlTestCase):
def setUp(self):
super(TestRowModel, self).setUp()
self.policy_model = policy_model.PolicyModel(
'api-policy', policy_engine='engine')
self.rule_model = rule_model.RuleModel('api-rule',
policy_engine='engine')
self.row_model = row_model.RowModel('api-row', policy_engine='engine')
result = api_base.setup_config([self.policy_model, self.rule_model,
self.row_model])
self.node = result['node']
self.data = result['data']
services = api_base.setup_config()
self.policy_model = services['api']['api-policy']
self.rule_model = services['api']['api-rule']
self.row_model = services['api']['api-row']
self.node = services['node']
self.data = services['data']
def test_get_items_datasource_row(self):
# adjust datasource to have required value

View File

@ -21,7 +21,6 @@ import mock
from oslo_config import cfg
cfg.CONF.distributed_architecture = True
from congress.api import policy_model
from congress.api import rule_model
from congress.api import webservice
from congress.tests import base
@ -32,12 +31,11 @@ class TestRuleModel(base.SqlTestCase):
def setUp(self):
super(TestRuleModel, self).setUp()
self.rule_model = rule_model.RuleModel('api-rule',
policy_engine='engine')
self.policy_model = policy_model.PolicyModel('api-policy',
policy_engine='engine')
result = api_base.setup_config([self.policy_model, self.rule_model])
self.node = result['node']
services = api_base.setup_config()
self.policy_model = services['api']['api-policy']
self.rule_model = services['api']['api-rule']
self.node = services['node']
self.policy_model.add_item({'name': 'classification'}, {})
self.action_policy = self._add_action_policy()
self.context = {'policy_id': self.action_policy["name"]}

View File

@ -21,7 +21,6 @@ from oslo_config import cfg
cfg.CONF.distributed_architecture = True
from congress.api import api_utils
from congress.api import schema_model
from congress.api import webservice
from congress.tests import base
from congress.tests2.api import base as api_base
@ -30,9 +29,9 @@ from congress.tests2.api import base as api_base
class TestSchemaModel(base.TestCase):
def setUp(self):
super(TestSchemaModel, self).setUp()
self.schema_model = schema_model.SchemaModel("test_schema", {})
self.config = api_base.setup_config([self.schema_model])
self.data = self.config['data']
services = api_base.setup_config()
self.schema_model = services['api']['api-schema']
self.data = services['data']
def test_get_item_all_table(self):
context = {'ds_id': self.data.service_id}

View File

@ -22,9 +22,6 @@ import uuid
from oslo_config import cfg
cfg.CONF.distributed_architecture = True
from congress.api import policy_model
from congress.api import rule_model
from congress.api import status_model
from congress.api import webservice
from congress.tests import base
from congress.tests2.api import base as api_base
@ -33,16 +30,12 @@ from congress.tests2.api import base as api_base
class TestStatusModel(base.SqlTestCase):
def setUp(self):
super(TestStatusModel, self).setUp()
self.policy_model = policy_model.PolicyModel('api-policy',
policy_engine='engine')
self.rule_model = rule_model.RuleModel('api-rule',
policy_engine='engine')
self.status_model = status_model.StatusModel('api-status',
policy_engine='engine')
result = api_base.setup_config([self.policy_model, self.rule_model,
self.status_model])
self.node = result['node']
self.datasource = result['data']
services = api_base.setup_config()
self.policy_model = services['api']['api-policy']
self.rule_model = services['api']['api-rule']
self.status_model = services['api']['api-status']
self.node = services['node']
self.datasource = services['data']
def test_get_datasource_status(self):
context = {'ds_id': self.datasource.service_id}

View File

@ -20,9 +20,6 @@ from __future__ import absolute_import
from oslo_config import cfg
cfg.CONF.distributed_architecture = True
from congress.api import policy_model
from congress.api import rule_model
from congress.api import table_model
from congress.api import webservice
from congress.tests import base
from congress.tests2.api import base as api_base
@ -31,22 +28,13 @@ from congress.tests2.api import base as api_base
class TestTableModel(base.SqlTestCase):
def setUp(self):
super(TestTableModel, self).setUp()
# Here we load the fake driver
cfg.CONF.set_override(
'drivers',
['congress.tests.fake_datasource.FakeDataSource'])
self.table_model = table_model.TableModel('api-table',
policy_engine='engine')
self.api_rule = rule_model.RuleModel('api-rule',
policy_engine='engine')
self.policy_model = policy_model.PolicyModel('api-policy',
policy_engine='engine')
result = api_base.setup_config([self.table_model, self.api_rule,
self.policy_model])
self.node = result['node']
self.engine = result['engine']
self.data = result['data']
services = api_base.setup_config()
self.policy_model = services['api']['api-policy']
self.table_model = services['api']['api-table']
self.api_rule = services['api']['api-rule']
self.node = services['node']
self.engine = services['engine']
self.data = services['data']
# create test policy
self._create_test_policy()

View File

@ -0,0 +1,698 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2014 VMware, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
test_congress
----------------------------------
Tests for `congress` module.
"""
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
from oslo_config import cfg
cfg.CONF.distributed_architecture = True
from oslo_log import log as logging
from congress.common import config
from congress import harness
from congress.tests import base
from congress.tests import helper
from congress.tests2.api import base as api_base
LOG = logging.getLogger(__name__)
class TestCongress(base.SqlTestCase):
def setUp(self):
"""Setup tests that use multiple mock neutron instances."""
super(TestCongress, self).setUp()
self.services = api_base.setup_config(with_fake_datasource=False)
self.api = self.services['api']
self.node = self.services['node']
def setup_config(self):
args = ['--config-file', helper.etcdir('congress.conf.test')]
config.init(args)
def test_startup(self):
self.assertIsNotNone(self.services['api'])
self.assertIsNotNone(self.services[harness.ENGINE_SERVICE_NAME])
self.assertIsNotNone(self.services[harness.ENGINE_SERVICE_NAME].node)
def test_policy(self):
self.create_policy('alpha')
self.insert_rule('q(1, 2) :- true', 'alpha')
self.insert_rule('q(2, 3) :- true', 'alpha')
helper.retry_check_function_return_value(
lambda: self.query('q', 'alpha'),
{'results': [{'data': (1, 2)}, {'data': (2, 3)}]})
def test_policy_datasource(self):
self.create_policy('alpha')
self.create_fake_datasource('fake')
data = self.node.service_object('fake')
data.state = {'fake_table': set([(1, 2)])}
data.poll()
self.insert_rule('q(x) :- fake:fake_table(x,y)', 'alpha')
helper.retry_check_function_return_value(
lambda: self.query('q', 'alpha'), {'results': [{'data': (1,)}]})
# TODO(dse2): enable rules to be inserted before data created.
# Maybe just have subscription handle errors gracefull when
# asking for a snapshot and return [].
# self.insert_rule('p(x) :- fake:fake_table(x)', 'alpha')
def create_policy(self, name):
self.api['api-policy'].add_item({'name': name}, {})
def insert_rule(self, rule, policy):
context = {'policy_id': policy}
return self.api['api-rule'].add_item(
{'rule': rule}, {}, context=context)
def create_fake_datasource(self, name):
item = {'name': name,
'driver': 'fake_datasource',
'description': 'hello world!',
'enabled': True,
'type': None,
'config': {'auth_url': 'foo',
'username': 'armax',
'password': '<hidden>',
'tenant_name': 'armax'}}
return self.api['api-datasource'].add_item(item, params={})
def query(self, tablename, policyname):
context = {'policy_id': policyname,
'table_id': tablename}
return self.api['api-row'].get_items({}, context)
# TODO(dse2): port this test
# class TestCongress(base.SqlTestCase):
# def setUp(self):
# """Setup tests that use multiple mock neutron instances."""
# super(TestCongress, self).setUp()
# # create neutron mock and tell cage to use that mock
# # https://code.google.com/p/pymox/wiki/MoxDocumentation
# mock_factory = mox.Mox()
# neutron_mock = mock_factory.CreateMock(
# neutronclient.v2_0.client.Client)
# neutron_mock2 = mock_factory.CreateMock(
# neutronclient.v2_0.client.Client)
# config_override = {'neutron2': {'username': 'demo', 'tenant_name':
# 'demo', 'password': 'password',
# 'auth_url':
# 'http://127.0.0.1:5000/v2.0',
# 'module':
# 'datasources/neutron_driver.py'},
# 'nova': {'username': 'demo',
# 'tenant_name': 'demo',
# 'password': 'password',
# 'auth_url': 'http://127.0.0.1:5000/v2.0',
# 'module': 'datasources/nova_driver.py'},
# 'neutron': {'username': 'demo',
# 'tenant_name': 'demo',
# 'password': 'password',
# 'auth_url':
# 'http://127.0.0.1:5000/v2.0',
# 'module':
# 'datasources/neutron_driver.py'}}
# cage = harness.create2(helper.root_path(), config_override)
# engine = cage.service_object('engine')
# api = {'policy': cage.service_object('api-policy'),
# 'rule': cage.service_object('api-rule'),
# 'table': cage.service_object('api-table'),
# 'row': cage.service_object('api-row'),
# 'datasource': cage.service_object('api-datasource'),
# 'status': cage.service_object('api-status'),
# 'schema': cage.service_object('api-schema')}
# config = {'username': 'demo',
# 'auth_url': 'http://127.0.0.1:5000/v2.0',
# 'tenant_name': 'demo',
# 'password': 'password',
# 'module': 'datasources/neutron_driver.py',
# 'poll_time': 0}
# engine.create_policy('neutron')
# engine.create_policy('neutron2')
# engine.create_policy('nova')
# harness.load_data_service(
# 'neutron', config, cage,
# os.path.join(helper.root_path(), "congress"), 1)
# service = cage.service_object('neutron')
# engine.set_schema('neutron', service.get_schema())
# harness.load_data_service(
# 'neutron2', config, cage,
# os.path.join(helper.root_path(), "congress"), 2)
# engine.set_schema('neutron2', service.get_schema())
# config['module'] = 'datasources/nova_driver.py'
# harness.load_data_service(
# 'nova', config, cage,
# os.path.join(helper.root_path(), "congress"), 3)
# engine.set_schema('nova', service.get_schema())
# cage.service_object('neutron').neutron = neutron_mock
# cage.service_object('neutron2').neutron = neutron_mock2
# # delete all policies that aren't builtin, so we have clean slate
# names = set(engine.policy_names()) - engine.builtin_policy_names
# for name in names:
# try:
# api['policy'].delete_item(name, {})
# except KeyError:
# pass
# # Turn off schema checking
# engine.module_schema = None
# # initialize neutron_mocks
# network1 = test_neutron.network_response
# port_response = test_neutron.port_response
# router_response = test_neutron.router_response
# sg_group_response = test_neutron.security_group_response
# neutron_mock.list_networks().InAnyOrder().AndReturn(network1)
# neutron_mock.list_ports().InAnyOrder().AndReturn(port_response)
# neutron_mock.list_routers().InAnyOrder().AndReturn(router_response)
# neutron_mock.list_security_groups().InAnyOrder().AndReturn(
# sg_group_response)
# neutron_mock2.list_networks().InAnyOrder().AndReturn(network1)
# neutron_mock2.list_ports().InAnyOrder().AndReturn(port_response)
# neutron_mock2.list_routers().InAnyOrder().AndReturn(router_response)
# neutron_mock2.list_security_groups().InAnyOrder().AndReturn(
# sg_group_response)
# mock_factory.ReplayAll()
# self.cage = cage
# self.engine = engine
# self.api = api
# def setup_config(self):
# args = ['--config-file', helper.etcdir('congress.conf.test')]
# config.init(args)
# def test_startup(self):
# """Test that everything is properly loaded at startup."""
# engine = self.engine
# api = self.api
# helper.retry_check_subscriptions(
# engine, [(api['rule'].name, 'policy-update')])
# helper.retry_check_subscribers(
# api['rule'], [(engine.name, 'policy-update')])
# def test_policy_subscriptions(self):
# """Test that policy engine subscriptions adjust to policy changes."""
# engine = self.engine
# api = self.api
# cage = self.cage
# policy = engine.DEFAULT_THEORY
# # Send formula
# formula = test_neutron.create_network_group('p')
# LOG.debug("Sending formula: %s", formula)
# api['rule'].publish(
# 'policy-update', [compile.Event(formula, target=policy)])
# # check we have the proper subscriptions
# self.assertTrue('neutron' in cage.services)
# neutron = cage.service_object('neutron')
# helper.retry_check_subscriptions(engine, [('neutron', 'networks')])
# helper.retry_check_subscribers(neutron, [(engine.name, 'networks')])
# def test_neutron(self):
# """Test polling and publishing of neutron updates."""
# engine = self.engine
# api = self.api
# cage = self.cage
# policy = engine.DEFAULT_THEORY
# # Send formula
# formula = test_neutron.create_network_group('p')
# LOG.debug("Sending formula: %s", formula)
# api['rule'].publish(
# 'policy-update', [compile.Event(formula, target=policy)])
# helper.retry_check_nonempty_last_policy_change(engine)
# LOG.debug("All services: %s", cage.services.keys())
# neutron = cage.service_object('neutron')
# neutron.poll()
# ans = ('p("240ff9df-df35-43ae-9df5-27fae87f2492") ')
# helper.retry_check_db_equal(engine, 'p(x)', ans, target=policy)
# def test_multiple(self):
# """Test polling and publishing of multiple neutron instances."""
# api = self.api
# cage = self.cage
# engine = self.engine
# policy = engine.DEFAULT_THEORY
# # Send formula
# formula = test_neutron.create_networkXnetwork_group('p')
# api['rule'].publish(
# 'policy-update', [compile.Event(formula, target=policy)])
# helper.retry_check_nonempty_last_policy_change(engine)
# # poll datasources
# neutron = cage.service_object('neutron')
# neutron2 = cage.service_object('neutron2')
# neutron.poll()
# neutron2.poll()
# # check answer
# ans = ('p("240ff9df-df35-43ae-9df5-27fae87f2492", '
# ' "240ff9df-df35-43ae-9df5-27fae87f2492") ')
# helper.retry_check_db_equal(engine, 'p(x,y)', ans, target=policy)
# def test_datasource_api_model(self):
# """Test the datasource api model.
# Same as test_multiple except we use the api interface
# instead of the DSE interface.
# """
# self.skipTest("Move to test/api/api_model and use fake driver...")
# # FIXME(arosen): we should break out these tests into
# # congress/tests/api/test_datasource.py
# with mock.patch(
# "congress.managers.datasource.DataSourceDriverManager."
# "get_datasource_drivers_info") as get_info:
# get_info.return_value = [{'datasource_driver': 'neutron'},
# {'datasource_driver': 'neutron2'},
# {'datasource_driver': 'nova'}]
# api = self.api
# engine = self.engine
# # Insert formula (which creates neutron services)
# net_formula = test_neutron.create_networkXnetwork_group('p')
# LOG.debug("Sending formula: %s", net_formula)
# context = {'policy_id': engine.DEFAULT_THEORY}
# api['rule'].add_item(
# {'rule': str(net_formula)}, {}, context=context)
# datasources = api['datasource'].get_items({})['results']
# datasources = [d['datasource_driver'] for d in datasources]
# self.assertEqual(set(datasources),
# set(['neutron', 'neutron2', 'nova']))
# def test_row_api_model(self):
# """Test the row api model."""
# self.skipTest("Move to test/api/test_row_api_model..")
# api = self.api
# engine = self.engine
# # add some rules defining tables
# context = {'policy_id': engine.DEFAULT_THEORY}
# api['rule'].add_item(
# {'rule': 'p(x) :- q(x)'},
# {}, context=context)
# api['rule'].add_item(
# {'rule': 'p(x) :- r(x)'},
# {}, context=context)
# api['rule'].add_item(
# {'rule': 'q(x) :- r(x)'},
# {}, context=context)
# api['rule'].add_item(
# {'rule': 'r(1) :- true'},
# {}, context=context)
# # without tracing
# context['table_id'] = 'p'
# ans = api['row'].get_items({}, context=context)
# s = frozenset([tuple(x['data']) for x in ans['results']])
# t = frozenset([(1,)])
# self.assertEqual(s, t, "Rows without tracing")
# self.assertTrue('trace' not in ans, "Rows should have no Trace")
# self.assertEqual(len(ans['results']), 1) # no duplicates
# # with tracing
# ans = api['row'].get_items({'trace': 'true'}, context=context)
# s = frozenset([tuple(x['data']) for x in ans['results']])
# t = frozenset([(1,)])
# self.assertEqual(s, t, "Rows with tracing")
# self.assertTrue('trace' in ans, "Rows should have trace")
# self.assertEqual(len(ans['trace'].split('\n')), 16)
# # unknown policy table
# context = {'policy_id': engine.DEFAULT_THEORY,
# 'table_id': 'unktable'}
# ans = api['row'].get_items({}, context=context)
# self.assertEqual(len(ans['results']), 0)
# # unknown policy
# context = {'policy_id': 'unkpolicy', 'table_id': 'unktable'}
# ans = api['row'].get_items({}, context=context)
# self.assertEqual(len(ans['results']), 0)
# # unknown datasource table
# context = {'ds_id': 'neutron', 'table_id': 'unktable'}
# ans = api['row'].get_items({}, context=context)
# self.assertEqual(len(ans['results']), 0)
# # unknown datasource
# context = {'ds_id': 'unkds', 'table_id': 'unktable'}
# ans = api['row'].get_items({}, context=context)
# self.assertEqual(len(ans['results']), 0)
# def test_policy_api_model_execute(self):
# def _execute_api(client, action, action_args):
# LOG.info("_execute_api called on %s and %s", action, action_args)
# positional_args = action_args['positional']
# named_args = action_args['named']
# method = reduce(getattr, action.split('.'), client)
# method(*positional_args, **named_args)
# class NovaClient(object):
# def __init__(self, testkey):
# self.testkey = testkey
# def _get_testkey(self):
# return self.testkey
# def disconnectNetwork(self, arg1, arg2, arg3):
# self.testkey = "arg1=%s arg2=%s arg3=%s" % (arg1, arg2, arg3)
# nova_client = NovaClient("testing")
# nova = self.cage.service_object('nova')
# nova._execute_api = _execute_api
# nova.nova_client = nova_client
# api = self.api
# body = {'name': 'nova:disconnectNetwork',
# 'args': {'positional': ['value1', 'value2'],
# 'named': {'arg3': 'value3'}}}
# request = helper.FakeRequest(body)
# result = api['policy'].execute_action({}, {}, request)
# self.assertEqual(result, {})
# expected_result = "arg1=value1 arg2=value2 arg3=value3"
# f = nova.nova_client._get_testkey
# helper.retry_check_function_return_value(f, expected_result)
# def test_rule_insert_delete(self):
# self.api['policy'].add_item({'name': 'alice'}, {})
# context = {'policy_id': 'alice'}
# (id1, _) = self.api['rule'].add_item(
# {'rule': 'p(x) :- plus(y, 1, x), q(y)'}, {}, context=context)
# ds = self.api['rule'].get_items({}, context)['results']
# self.assertEqual(len(ds), 1)
# self.api['rule'].delete_item(id1, {}, context)
# ds = self.engine.policy_object('alice').content()
# self.assertEqual(len(ds), 0)
# # TODO(thinrichs): Clean up this file. In particular, make it possible
# # to group all of the policy-execute tests into their own class.
# # Execute[...] tests
# def test_policy_execute(self):
# class NovaClient(object):
# def __init__(self, testkey):
# self.testkey = testkey
# def disconnectNetwork(self, arg1):
# LOG.info("disconnectNetwork called on %s", arg1)
# self.testkey = "arg1=%s" % arg1
# nova_client = NovaClient(None)
# nova = self.cage.service_object('nova')
# nova.nova_client = nova_client
# # insert rule and data
# self.api['policy'].add_item({'name': 'alice'}, {})
# (id1, _) = self.api['rule'].add_item(
# {'rule': 'execute[nova:disconnectNetwork(x)] :- q(x)'}, {},
# context={'policy_id': 'alice'})
# self.assertEqual(len(self.engine.logger.messages), 0)
# (id2, _) = self.api['rule'].add_item(
# {'rule': 'q(1)'}, {}, context={'policy_id': 'alice'})
# self.assertEqual(len(self.engine.logger.messages), 1)
# ans = "arg1=1"
# f = lambda: nova.nova_client.testkey
# helper.retry_check_function_return_value(f, ans)
# # insert more data
# self.api['rule'].add_item(
# {'rule': 'q(2)'}, {}, context={'policy_id': 'alice'})
# self.assertEqual(len(self.engine.logger.messages), 2)
# ans = "arg1=2"
# f = lambda: nova.nova_client.testkey
# helper.retry_check_function_return_value(f, ans)
# # insert irrelevant data
# self.api['rule'].add_item(
# {'rule': 'r(3)'}, {}, context={'policy_id': 'alice'})
# self.assertEqual(len(self.engine.logger.messages), 2)
# # delete relevant data
# self.api['rule'].delete_item(
# id2, {}, context={'policy_id': 'alice'})
# self.assertEqual(len(self.engine.logger.messages), 2)
# # delete policy rule
# self.api['rule'].delete_item(
# id1, {}, context={'policy_id': 'alice'})
# self.assertEqual(len(self.engine.logger.messages), 2)
# def test_policy_execute_data_first(self):
# class NovaClient(object):
# def __init__(self, testkey):
# self.testkey = testkey
# def disconnectNetwork(self, arg1):
# LOG.info("disconnectNetwork called on %s", arg1)
# self.testkey = "arg1=%s" % arg1
# nova_client = NovaClient(None)
# nova = self.cage.service_object('nova')
# nova.nova_client = nova_client
# # insert rule and data
# self.api['policy'].add_item({'name': 'alice'}, {})
# self.api['rule'].add_item(
# {'rule': 'q(1)'}, {}, context={'policy_id': 'alice'})
# self.assertEqual(len(self.engine.logger.messages), 0)
# self.api['rule'].add_item(
# {'rule': 'execute[nova:disconnectNetwork(x)] :- q(x)'}, {},
# context={'policy_id': 'alice'})
# self.assertEqual(len(self.engine.logger.messages), 1)
# ans = "arg1=1"
# f = lambda: nova.nova_client.testkey
# helper.retry_check_function_return_value(f, ans)
# def test_policy_execute_dotted(self):
# class NovaClient(object):
# def __init__(self, testkey):
# self.testkey = testkey
# self.servers = ServersClass()
# class ServersClass(object):
# def __init__(self):
# self.ServerManager = ServerManagerClass()
# class ServerManagerClass(object):
# def __init__(self):
# self.testkey = None
# def pause(self, id_):
# self.testkey = "arg1=%s" % id_
# nova_client = NovaClient(None)
# nova = self.cage.service_object('nova')
# nova.nova_client = nova_client
# self.api['policy'].add_item({'name': 'alice'}, {})
# self.api['rule'].add_item(
# {'rule': 'execute[nova:servers.ServerManager.pause(x)] :- q(x)'},
# {}, context={'policy_id': 'alice'})
# self.assertEqual(len(self.engine.logger.messages), 0)
# self.api['rule'].add_item(
# {'rule': 'q(1)'}, {}, context={'policy_id': 'alice'})
# self.assertEqual(len(self.engine.logger.messages), 1)
# ans = "arg1=1"
# f = lambda: nova.nova_client.servers.ServerManager.testkey
# helper.retry_check_function_return_value(f, ans)
# def test_policy_execute_no_args(self):
# class NovaClient(object):
# def __init__(self, testkey):
# self.testkey = testkey
# def disconnectNetwork(self):
# LOG.info("disconnectNetwork called")
# self.testkey = "noargs"
# nova_client = NovaClient(None)
# nova = self.cage.service_object('nova')
# nova.nova_client = nova_client
# # Note: this probably isn't the behavior we really want.
# # But at least we have a test documenting that behavior.
# # insert rule and data
# self.api['policy'].add_item({'name': 'alice'}, {})
# (id1, rule1) = self.api['rule'].add_item(
# {'rule': 'execute[nova:disconnectNetwork()] :- q(x)'}, {},
# context={'policy_id': 'alice'})
# self.assertEqual(len(self.engine.logger.messages), 0)
# (id2, rule2) = self.api['rule'].add_item(
# {'rule': 'q(1)'}, {}, context={'policy_id': 'alice'})
# self.assertEqual(len(self.engine.logger.messages), 1)
# ans = "noargs"
# f = lambda: nova.nova_client.testkey
# helper.retry_check_function_return_value(f, ans)
# # insert more data (which DOES NOT cause an execution)
# (id3, rule3) = self.api['rule'].add_item(
# {'rule': 'q(2)'}, {}, context={'policy_id': 'alice'})
# self.assertEqual(len(self.engine.logger.messages), 1)
# # delete all data
# self.api['rule'].delete_item(
# id2, {}, context={'policy_id': 'alice'})
# self.assertEqual(len(self.engine.logger.messages), 1)
# self.api['rule'].delete_item(
# id3, {}, context={'policy_id': 'alice'})
# self.assertEqual(len(self.engine.logger.messages), 1)
# # insert data (which now DOES cause an execution)
# (id4, rule3) = self.api['rule'].add_item(
# {'rule': 'q(3)'}, {}, context={'policy_id': 'alice'})
# self.assertEqual(len(self.engine.logger.messages), 2)
# ans = "noargs"
# f = lambda: nova.nova_client.testkey
# helper.retry_check_function_return_value(f, ans)
# # delete policy rule
# self.api['rule'].delete_item(
# id1, {}, context={'policy_id': 'alice'})
# self.assertEqual(len(self.engine.logger.messages), 2)
# def test_datasource_request_refresh(self):
# # Remember that neutron does not poll automatically here, which
# # is why this test actually testing request_refresh
# neutron = self.cage.service_object('neutron')
# LOG.info("neutron.state: %s", neutron.state)
# self.assertEqual(len(neutron.state['ports']), 0)
# # TODO(thinrichs): Seems we can't test the datasource API at all.
# # api['datasource'].request_refresh_action(
# # {}, context, helper.FakeRequest({}))
# neutron.request_refresh()
# f = lambda: len(neutron.state['ports'])
# helper.retry_check_function_return_value_not_eq(f, 0)
# def test_neutron_policy_execute(self):
# class NeutronClient(object):
# def __init__(self, testkey):
# self.testkey = testkey
# def disconnectNetwork(self, arg1):
# LOG.info("disconnectNetwork called on %s", arg1)
# self.testkey = "arg1=%s" % arg1
# neutron_client = NeutronClient(None)
# neutron = self.cage.service_object('neutron')
# neutron.neutron = neutron_client
# # insert rule and data
# self.api['policy'].add_item({'name': 'alice'}, {})
# (id1, _) = self.api['rule'].add_item(
# {'rule': 'execute[neutron:disconnectNetwork(x)] :- q(x)'}, {},
# context={'policy_id': 'alice'})
# self.assertEqual(len(self.engine.logger.messages), 0)
# (id2, _) = self.api['rule'].add_item(
# {'rule': 'q(1)'}, {}, context={'policy_id': 'alice'})
# self.assertEqual(len(self.engine.logger.messages), 1)
# ans = "arg1=1"
# f = lambda: neutron.neutron.testkey
# helper.retry_check_function_return_value(f, ans)
# def test_datasource_api_model_execute(self):
# def _execute_api(client, action, action_args):
# positional_args = action_args.get('positional', [])
# named_args = action_args.get('named', {})
# method = reduce(getattr, action.split('.'), client)
# method(*positional_args, **named_args)
# class NovaClient(object):
# def __init__(self, testkey):
# self.testkey = testkey
# def _get_testkey(self):
# return self.testkey
# def disconnect(self, arg1, arg2, arg3):
# self.testkey = "arg1=%s arg2=%s arg3=%s" % (arg1, arg2, arg3)
# def disconnect_all(self):
# self.testkey = "action_has_no_args"
# nova_client = NovaClient("testing")
# nova = self.cage.service_object('nova')
# nova._execute_api = _execute_api
# nova.nova_client = nova_client
# execute_action = self.api['datasource'].execute_action
# # Positive test: valid body args, ds_id
# context = {'ds_id': 'nova'}
# body = {'name': 'disconnect',
# 'args': {'positional': ['value1', 'value2'],
# 'named': {'arg3': 'value3'}}}
# request = helper.FakeRequest(body)
# result = execute_action({}, context, request)
# self.assertEqual(result, {})
# expected_result = "arg1=value1 arg2=value2 arg3=value3"
# f = nova.nova_client._get_testkey
# helper.retry_check_function_return_value(f, expected_result)
# # Positive test: no body args
# context = {'ds_id': 'nova'}
# body = {'name': 'disconnect_all'}
# request = helper.FakeRequest(body)
# result = execute_action({}, context, request)
# self.assertEqual(result, {})
# expected_result = "action_has_no_args"
# f = nova.nova_client._get_testkey
# helper.retry_check_function_return_value(f, expected_result)
# # Negative test: invalid ds_id
# context = {'ds_id': 'unknown_ds'}
# self.assertRaises(webservice.DataModelException, execute_action,
# {}, context, request)
# # Negative test: no ds_id
# context = {}
# self.assertRaises(webservice.DataModelException, execute_action,
# {}, context, request)
# # Negative test: empty body
# context = {'ds_id': 'nova'}
# bad_request = helper.FakeRequest({})
# self.assertRaises(webservice.DataModelException, execute_action,
# {}, context, bad_request)
# # Negative test: no body name/action
# context = {'ds_id': 'nova'}
# body = {'args': {'positional': ['value1', 'value2'],
# 'named': {'arg3': 'value3'}}}
# bad_request = helper.FakeRequest(body)
# self.assertRaises(webservice.DataModelException, execute_action,
# {}, context, bad_request)