congress/congress/tests/helper.py

523 lines
18 KiB
Python

# Copyright (c) 2013 VMware, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
import json
import os
import tenacity
import time
from oslo_config import cfg
from oslo_log import log as logging
from oslo_messaging import conffixture
from congress.datalog import compile
from congress.datalog import unify
from congress.policy_engines import agnostic
from congress.dse2 import dse_node
LOG = logging.getLogger(__name__)
ROOTDIR = os.path.dirname(__file__)
ETCDIR = os.path.join(ROOTDIR, 'etc')
# single, global variable used to ensure different tests from
# different subclasses of TestCase all can get a unique ID
# so that the tests do not interact on oslo-messaging
partition_counter = 0
def make_dsenode_new_partition(node_id,
messaging_config=None,
node_rpc_endpoints=None):
"""Get new DseNode in it's own new DSE partition."""
messaging_config = messaging_config or generate_messaging_config()
node_rpc_endpoints = node_rpc_endpoints or []
return dse_node.DseNode(messaging_config, node_id, node_rpc_endpoints,
partition_id=get_new_partition())
def make_dsenode_same_partition(existing,
node_id,
messaging_config=None,
node_rpc_endpoints=None):
"""Get new DseNode in the same DSE partition as existing (node or part)."""
partition_id = (existing.partition_id if
isinstance(existing, dse_node.DseNode) else existing)
messaging_config = messaging_config or generate_messaging_config()
node_rpc_endpoints = node_rpc_endpoints or []
return dse_node.DseNode(
messaging_config, node_id, node_rpc_endpoints, partition_id)
def get_new_partition():
"""Create a new partition number, unique within each process."""
global partition_counter
old = partition_counter
partition_counter += 1
return old
def generate_messaging_config():
mc_fixture = conffixture.ConfFixture(cfg.CONF)
mc_fixture.conf.transport_url = 'kombu+memory://'
messaging_config = mc_fixture.conf
messaging_config.rpc_response_timeout = 10
return messaging_config
def etcdir(*p):
return os.path.join(ETCDIR, *p)
def root_path():
"""Return path to root of source code."""
x = os.path.realpath(__file__)
x, y = os.path.split(x) # drop "helper.py"
x, y = os.path.split(x) # drop "tests"
x, y = os.path.split(x) # drop "congress"
return x
def source_path():
"""Return path to root of source code."""
x = os.path.realpath(__file__)
x, y = os.path.split(x) # drop "helper.py"
x, y = os.path.split(x) # drop "tests"
return x
def data_module_path(file):
"""Return path to dataservice module with given FILEname."""
path = source_path()
path = os.path.join(path, "datasources")
path = os.path.join(path, file)
return path
def policy_module_path():
"""Return path to policy engine module."""
path = source_path()
path = os.path.join(path, "policy_engines")
path = os.path.join(path, "agnostic.py")
return path
def api_module_path():
"""Return path to api module."""
path = source_path()
path = os.path.join(path, "datasources")
path = os.path.join(path, "test_driver.py")
return path
def test_path(file=None):
"""Return path to root of top-level tests. Joined with file if provided."""
path = source_path()
path = os.path.join(path, "tests")
if file is not None:
path = os.path.join(path, file)
return path
def datasource_config_path():
"""Return path to configuration info for datasources."""
path = test_path()
path = os.path.join(path, "datasources.conf")
return path
def datasource_openstack_args():
"""Return basic args for creating an openstack datasource."""
return {'username': '',
'password': '',
'auth_url': '',
'tenant_name': '',
'poll_time': 1}
def pause(factor=1):
"""Timeout so other threads can run."""
time.sleep(factor * 1)
def datalog_same(actual_code, correct_code, msg=None):
return datalog_equal(
actual_code, correct_code, msg=msg,
equal=lambda x, y: unify.same(x, y) is not None)
def datalog_equal(actual_code, correct_code,
msg=None, equal=None, theories=None,
output_diff=True):
"""Check equality.
Check if the strings given by actual_code
and CORRECT_CODE represent the same datalog.
"""
def minus(iter1, iter2, invert=False):
extra = []
for i1 in iter1:
found = False
for i2 in iter2:
# for asymmetric equality checks
if invert:
test_result = equal(i2, i1)
else:
test_result = equal(i1, i2)
if test_result:
found = True
break
if not found:
extra.append(i1)
return extra
if equal is None:
equal = lambda x, y: x == y
LOG.debug("** Checking equality: %s **", msg)
actual = compile.parse(actual_code, theories=theories)
correct = compile.parse(correct_code, theories=theories)
extra = minus(actual, correct)
# in case EQUAL is asymmetric, always supply actual as the first arg
# and set INVERT to true
missing = minus(correct, actual, invert=True)
if output_diff:
output_diffs(extra, missing, msg)
LOG.debug("** Finished equality: %s **", msg)
is_equal = len(extra) == 0 and len(missing) == 0
if not is_equal:
LOG.debug('datalog_equal failed, extras: %s, missing: %s', extra,
missing)
return is_equal
def db_equal(actual_string, correct_string, output_diff=True):
"""Check if two strings representing data theories are the same."""
actual = agnostic.string_to_database(actual_string)
correct = agnostic.string_to_database(correct_string)
return check_db_diffs(actual, correct, output_diff=output_diff)
def check_db_diffs(actual, correct, output_diff=True):
extra = actual - correct
missing = correct - actual
extra = [e for e in extra if not e[0].startswith("___")]
missing = [m for m in missing if not m[0].startswith("___")]
if output_diff:
output_diffs(extra, missing, actual=actual)
return len(extra) == 0 and len(missing) == 0
def output_diffs(extra, missing, actual=None):
if len(extra) > 0:
print("Extra tuples")
print(", ".join([str(x) for x in extra]))
if len(missing) > 0:
print("Missing tuples")
print(", ".join([str(x) for x in missing]))
if len(extra) > 0 or len(missing) > 0:
print("Resulting database: {}".format(str(actual)))
def str2form(formula_string, theories=None):
return compile.parse1(formula_string, theories=theories)
def str2pol(policy_string, theories=None):
return compile.parse(policy_string, theories=theories)
def pol2str(policy):
return " ".join(str(x) for x in policy)
def form2str(formula):
return str(formula)
@tenacity.retry(stop=tenacity.stop_after_attempt(1000),
wait=tenacity.wait_fixed(0.1))
def retry_check_for_last_message(obj):
if not hasattr(obj, "last_msg"):
raise AttributeError("Missing 'last_msg' attribute")
@tenacity.retry(stop=tenacity.stop_after_attempt(1000),
wait=tenacity.wait_fixed(0.1))
def retry_check_for_message_to_arrive(obj):
if not hasattr(obj.msg, "body"):
raise AttributeError("Missing 'body' attribute")
@tenacity.retry(stop=tenacity.stop_after_attempt(1000),
wait=tenacity.wait_fixed(0.1))
def retry_check_for_message_data(obj, data):
if not hasattr(obj.msg, "body"):
raise AttributeError("Missing 'body' attribute")
if obj.get_msg_data() != data:
raise TestFailureException("Missing expected data in msg")
@tenacity.retry(stop=tenacity.stop_after_attempt(1000),
wait=tenacity.wait_fixed(0.1))
def retry_check_nonempty_last_policy_change(obj):
if not hasattr(obj, "last_policy_change"):
raise AttributeError("Missing 'last_policy_change' attribute")
if obj.last_policy_change is None:
raise TestFailureException("last_policy_change == None")
if len(obj.last_policy_change) == 0:
raise TestFailureException("last_policy_change == 0")
@tenacity.retry(stop=tenacity.stop_after_attempt(1000),
wait=tenacity.wait_fixed(0.1))
def retry_check_empty_last_policy_change(obj):
if not hasattr(obj, "last_policy_change"):
raise AttributeError("Missing 'last_policy_change' attribute")
if len(obj.last_policy_change) != 0:
raise TestFailureException("last_policy_change != 0")
@tenacity.retry(stop=tenacity.stop_after_attempt(1000),
wait=tenacity.wait_fixed(0.1))
def retry_check_db_equal(policy, query, correct, target=None):
if not hasattr(policy, "select"):
raise AttributeError("Missing 'select' attribute")
if target is None:
actual = policy.select(query)
else:
actual = policy.select(query, target=target)
if not db_equal(actual, correct, output_diff=False):
raise TestFailureException(
"Query {} produces {}, should produce {}".format(
str(query), str(actual), str(correct)))
@tenacity.retry(stop=tenacity.stop_after_attempt(1000),
wait=tenacity.wait_fixed(0.1))
def retry_check_number_of_updates(deepsix, value):
if not hasattr(deepsix, "number_of_updates"):
raise AttributeError("Missing 'number_of_updates' attribute")
if deepsix.number_of_updates != value:
raise TestFailureException("number_of_updates is {}, not {}".format(
deepsix.number_of_updates, value))
@tenacity.retry(stop=tenacity.stop_after_attempt(1000),
wait=tenacity.wait_fixed(0.1))
def retry_check_subscriptions(deepsix, subscription_list):
if not check_subscriptions(deepsix, subscription_list):
raise TestFailureException(
"{} does not have subscription list {}".format(
deepsix.name, str(subscription_list)))
def check_subscriptions(deepsix, subscription_list):
"""Check subscriptions.
Check that the instance DEEPSIX is subscribed to all of the
(key, dataindex) pairs in KEY_DATAINDEX_LIST. Return True if
all subscriptions exists; otherwise returns False.
"""
actual = set([(value.key, value.dataindex)
for value in deepsix.subdata.values()])
correct = set(subscription_list)
missing = correct - actual
if missing:
LOG.debug("Missing key/dataindex subscriptions: %s", missing)
return not missing
@tenacity.retry(stop=tenacity.stop_after_attempt(1000),
wait=tenacity.wait_fixed(0.1))
def retry_check_subscribers(deepsix, subscriber_list):
if not check_subscribers(deepsix, subscriber_list):
raise TestFailureException(
"{} does not have subscriber list {}".format(
deepsix.name, str(subscriber_list)))
@tenacity.retry(stop=tenacity.stop_after_attempt(1000),
wait=tenacity.wait_fixed(0.1))
def retry_check_no_subscribers(deepsix, subscriber_list):
"""Check that deepsix has none of the subscribers in subscriber_list"""
if check_subscribers(deepsix, subscriber_list, any_=True):
raise TestFailureException(
"{} still has some subscribers in list {}".format(
deepsix.name, str(subscriber_list)))
def check_subscribers(deepsix, subscriber_list, any_=False):
"""Check subscribers.
Check that the instance DEEPSIX includes subscriptions for all of
the (name, dataindex) pairs in SUBSCRIBER_LIST. Return True if
all subscribers exist; otherwise returns False.
If any_=True, then return True if ANY subscribers exist in subscriber_list
"""
actual = set([(name, pubdata.dataindex)
for pubdata in deepsix.pubdata.copy().values()
for name in pubdata.subscribers])
correct = set(subscriber_list)
missing = correct - actual
if missing:
LOG.debug("Missing name/dataindex subscribers: %s", missing)
if any_:
return (len(missing) < len(actual))
return not missing
@tenacity.retry(stop=tenacity.stop_after_attempt(20),
wait=tenacity.wait_fixed(1))
def retry_check_function_return_value(f, expected_value):
"""Check if function f returns expected key."""
result = f()
if result != expected_value:
raise TestFailureException(
"Expected value '%s' not received. "
"Got %s instead." % (expected_value, result))
@tenacity.retry(stop=tenacity.stop_after_attempt(10),
wait=tenacity.wait_fixed(0.5))
def retry_check_function_return_value_not_eq(f, value):
"""Check if function f does not return expected value."""
result = f()
if result == value:
raise TestFailureException(
"Actual value '%s' should be different "
"from '%s'" % (result, value))
@tenacity.retry(stop=tenacity.stop_after_attempt(10),
wait=tenacity.wait_fixed(0.5))
def retry_til_exception(expected_exception, f):
"""Check if function f does not return expected value."""
try:
val = f()
raise TestFailureException("No exception thrown; received %s" % val)
except expected_exception:
return
except Exception as e:
raise TestFailureException("Wrong exception thrown: %s" % e)
@tenacity.retry(stop=tenacity.stop_after_attempt(20),
wait=tenacity.wait_fixed(1))
def retry_check_function_return_value_table(f, expected_values):
"""Check if function f returns expected table."""
result = f()
actual = set(tuple(x) for x in result)
correct = set(tuple(x) for x in expected_values)
extra = actual - correct
missing = correct - actual
if len(extra) > 0 or len(missing) > 0:
s = "Actual: %s\nExpected: %s\n" % (result, expected_values)
if len(extra) > 0:
s += "Extra: %s\n" % extra
if len(missing) > 0:
s += "Missing: %s\n" % missing
raise TestFailureException(s)
class FakeRequest(object):
def __init__(self, body):
self.body = json.dumps(body)
class FakeServiceObj(object):
def __init__(self):
self.state = {}
class TestFailureException(Exception):
"""Custom exception thrown on test failure
Facilitates using assertRaises to check for failure on retry tests
(generic Exception in assertRaises disallowed by pep8 check/gate)
"""
def __init__(self, *args, **kwargs):
Exception.__init__(self, *args, **kwargs)
def supported_drivers():
"""Get list of supported drivers by congress"""
results = [
{"id": "monasca",
"description": "Datasource driver that interfaces with monasca."},
{"id": "plexxi",
"description": "Datasource driver that interfaces with PlexxiCore."},
{"id": "doctor",
"description": "Datasource driver that allows external systems "
"to push data in accordance with OPNFV Doctor "
"Inspector southbound interface specification."},
{"id": "aodh",
"description": "Datasource driver that interfaces with aodh."},
{"id": "neutronv2_qos",
"description": "Datasource driver that interfaces with QoS "
"extension of OpenStack Networking aka Neutron."},
{"id": "cloudfoundryv2",
"description": "Datasource driver that interfaces with cloudfoundry"},
{"id": "heat",
"description": "Datasource driver that interfaces with OpenStack "
"orchestration aka heat."},
{"id": "nova",
"description": "Datasource driver that interfaces with OpenStack "
"Compute aka nova."},
{"id": "murano",
"description": "Datasource driver that interfaces with murano"},
{"id": "neutronv2",
"description": "Datasource driver that interfaces with OpenStack "
"Networking aka Neutron."},
{"id": "swift",
"description": "Datasource driver that interfaces with swift."},
{"id": "ironic",
"description": "Datasource driver that interfaces with OpenStack "
"bare metal aka ironic."},
{"id": "cinder",
"description": "Datasource driver that interfaces with OpenStack "
"cinder."},
{"id": "fake_datasource",
"description": "This is a fake driver used for testing"},
{"id": "config",
"description": "Datasource driver that allows OS configs retrieval."},
{"id": "glancev2",
"description": "Datasource driver that interfaces with OpenStack "
"Images aka Glance."},
{"id": "vcenter",
"description": "Datasource driver that interfaces with vcenter"},
{"id": "keystonev3",
"description": "Datasource driver that interfaces with keystone."},
{"id": "keystone",
"description": "Datasource driver that interfaces with keystone."},
{"id": "mistral",
"description": "Datasource driver that interfaces with Mistral."},
{"id": "vitrage",
"description": "Datasource driver that accepts Vitrage "
"webhook alarm notifications."},
{"id": "monasca_webhook_driver",
"description": "Datasource driver that accepts Monasca webhook "
"alarm notifications."}]
return results