mistral tempest test

Depends-On: Ibc00eeee4af7f8594cb36cc33d35dc396469d8cf

Change-Id: Idd054ec2e88639e86d9df532d8baa43db801fdbe
This commit is contained in:
Eric K 2018-01-26 13:05:21 -08:00
parent 5ab8c3ed6a
commit c43ecf4dcd
2 changed files with 208 additions and 0 deletions

View File

@ -0,0 +1,77 @@
# Copyright 2017 VMware Corporation. All rights reserved.
# All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from tempest import config
from tempest.lib import decorators
from congress_tempest_plugin.tests.scenario import manager_congress
CONF = config.CONF
LOG = logging.getLogger(__name__)
class TestMistralDriver(manager_congress.DatasourceDriverTestBase):
def skip_checks(cls):
super(TestMistralDriver, cls).skip_checks()
if not getattr(CONF.service_available, 'mistral', False):
msg = ("%s skipped because mistral service is not configured" %
raise cls.skipException(msg)
def setUp(self):
super(TestMistralDriver, self).setUp()
self.datasource_name = 'mistral'
self.datasource_id = manager_congress.get_datasource_id(
self.os_admin.congress_client, self.datasource_name)
def test_mistral_workflows_table(self):
table_name = 'workflows'
service_data_fetch_func = (
lambda: self.os_admin.mistral_client.get_list_obj(
table_name, service_data_fetch_func,
missing_attributes_allowed=['description', 'updated_at'])
def test_mistral_actions_table(self):
table_name = 'actions'
service_data_fetch_func = (
lambda: self.os_admin.mistral_client.get_list_obj(
table_name, service_data_fetch_func)
# FIXME(ekcs): enable when we figure out how to use admin project in
# tempest test setup to populate executions with dummy data.
# @decorators.attr(type='smoke')
# def test_mistral_workflow_executions_table(self):
# table_name = 'workflow_executions'
# service_data_fetch_func = lambda: self.service_client.get_list_obj(
# 'executions')[1]['executions']
# self.check_service_data_against_congress_table(
# table_name, service_data_fetch_func)
# @decorators.attr(type='smoke')
# def test_mistral_action_executions_table(self):
# table_name = 'action_executions'
# service_data_fetch_func = lambda: self.service_client.get_list_obj(
# 'action_executions')[1]['action_executions']
# self.check_service_data_against_congress_table(
# table_name, service_data_fetch_func)

View File

@ -13,6 +13,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import random
import re
@ -22,6 +23,9 @@ from oslo_log import log as logging
from tempest.common import credentials_factory as credentials
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib import decorators
from tempest.lib import exceptions
from tempest import manager as tempestmanager
from congress_tempest_plugin.services.congress_network import qos_client
@ -76,6 +80,13 @@ class ScenarioPolicyBase(manager.NetworkScenarioTest):
CONF.alarming_plugin.catalog_type, CONF.identity.region,
# Get mistral client
if getattr(CONF.service_available, 'mistral', False):
import mistral_tempest_tests.services.\
v2.mistral_client as mistral_client
cls.os_admin.mistral_client = mistral_client.MistralClientV2(
auth_prov, 'workflowv2')
def _setup_network_and_servers(self):
self.security_group = self._create_security_group()
self.network, self.subnet, self.router = self.create_networks()
@ -275,3 +286,123 @@ class ScenarioPolicyBase(manager.NetworkScenarioTest):
raise Exception('Failed to create policy rule (%s, %s)'
% (policy_name, rule))
class DatasourceDriverTestBase(ScenarioPolicyBase):
def check_service_data_against_congress_table(
self, table_name, service_data_fetch_func, check_nonempty=True,
if missing_attributes_allowed is None:
missing_attributes_allowed = []
table_schema = (
self.datasource_id, table_name)['columns'])
table_id_col = next(i for i, c in enumerate(table_schema)
if c['name'] == 'id')
def _check_data():
# Fetch data each time, because test may go before service has data
service_data = service_data_fetch_func()
if check_nonempty and len(service_data) == 0:
LOG.debug('Congress %s table source service data is empty. '
'Unable to check data.', table_name)
return False
LOG.debug('Congress %s table source service data: %s',
table_name, service_data)
table_data = (
self.datasource_id, table_name)['results'])
LOG.debug('Congress %s table data: %s', table_name, table_data)
# check same cardinality
if len(service_data) != len(table_data):
LOG.debug('Cardinality mismatch between congress %s '
'table and service data', table_name)
return False
# construct map from id to service data items
service_data_map = {}
for data_item in service_data:
service_data_map[data_item['id']] = data_item
for row in table_data:
service_item = service_data_map[row['data'][table_id_col]]
except KeyError:
return False
for index in range(len(table_schema)):
# case: key is not present in service_item, allow
# if it is expected (sometimes an objects won't have key
# when the value is not present, e.g. description not set)
if (str(row['data'][index]) == 'None' and
'name'] in missing_attributes_allowed and
table_schema[index]['name'] not in service_item):
return True
# normal case: service_item value must equal
# congress table value
if (str(row['data'][index]) !=
return False
return True
if not test_utils.call_until_true(
func=_check_data, duration=100, sleep_for=4):
raise exceptions.TimeoutException("Data did not converge in time "
"or failure in server")
def check_service_data_against_congress_subtable(
self, table_name, service_data_fetch_func,
def _check_data():
# Fetch data each time, because test may go before service has data
service_data = service_data_fetch_func()
LOG.debug('Congress %s table source service data: %s',
table_name, service_data)
table_data = (
self.datasource_id, table_name)['results'])
LOG.debug('Congress %s table data: %s', table_name, table_data)
# construct map from id to service data items
service_data_map = {}
for data_item in service_data:
service_data_map[data_item['id']] = data_item[
expected_number_of_rows = 0
for row in table_data:
row_id, row_data = row['data'][0], row['data'][1]
service_subdata = service_data_map.get(row_id)
if not service_subdata or row_data not in service_subdata:
# congress table has item not in service data.
LOG.debug('Congress %s table has row (%s, %s) not in '
'service data', table_name, row_id, row_data)
return False
expected_number_of_rows += len(service_subdata)
# check cardinality
if expected_number_of_rows != len(table_data):
LOG.debug('Cardinality mismatch between congress %s '
'table and service data', table_name)
return False
return True
if not test_utils.call_until_true(
duration=100, sleep_for=5):
raise exceptions.TimeoutException("Data did not converge in time "
"or failure in server")
def test_update_no_error(self):
if not test_utils.call_until_true(
func=lambda: self.check_datasource_no_error(
duration=30, sleep_for=5):
raise exceptions.TimeoutException('Datasource could not poll '
'without error.')