Merge aodh tempest tests and configuration in ceilometer

* In order to achieve the tempest plugin split goal, we are merging
  the aodh tempest tests and config in to ceilometer, then we can move
  all the telemetry tests in a single repo.

Change-Id: I4e0952487d0fe73992e8fe9d6ddda3f98054ec60
This commit is contained in:
Chandan Kumar 2017-12-07 12:50:34 +05:30
parent 64fc0cc53f
commit 34ff2a21bf
9 changed files with 379 additions and 1 deletions

View File

@ -0,0 +1,64 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib import exceptions as lib_exc
import tempest.test
from ceilometer.tests.tempest.aodh.service import client
CONF = config.CONF
class BaseAlarmingTest(tempest.test.BaseTestCase):
"""Base test case class for all Alarming API tests."""
credentials = ['primary']
client_manager = client.Manager
def skip_checks(cls):
super(BaseAlarmingTest, cls).skip_checks()
if not CONF.service_available.aodh_plugin:
raise cls.skipException("Aodh support is required")
def setup_clients(cls):
super(BaseAlarmingTest, cls).setup_clients()
cls.alarming_client = cls.os_primary.alarming_client
def resource_setup(cls):
super(BaseAlarmingTest, cls).resource_setup()
cls.alarm_ids = []
def create_alarm(cls, **kwargs):
body = cls.alarming_client.create_alarm(
type='threshold', **kwargs)
return body
def cleanup_resources(method, list_of_ids):
for resource_id in list_of_ids:
except lib_exc.NotFound:
def resource_cleanup(cls):
cls.cleanup_resources(cls.alarming_client.delete_alarm, cls.alarm_ids)
super(BaseAlarmingTest, cls).resource_cleanup()

View File

@ -0,0 +1,94 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
from ceilometer.tests.tempest.aodh.api import base
class TelemetryAlarmingAPITest(base.BaseAlarmingTest):
def resource_setup(cls):
super(TelemetryAlarmingAPITest, cls).resource_setup()
cls.rule = {'meter_name': 'cpu_util',
'comparison_operator': 'gt',
'threshold': 80.0,
'period': 70}
for i in range(2):
def test_alarm_list(self):
# List alarms
alarm_list = self.alarming_client.list_alarms()
# Verify created alarm in the list
fetched_ids = [a['alarm_id'] for a in alarm_list]
missing_alarms = [a for a in self.alarm_ids if a not in fetched_ids]
self.assertEqual(0, len(missing_alarms),
"Failed to find the following created alarm(s)"
" in a fetched list: %s" %
', '.join(str(a) for a in missing_alarms))
def test_create_update_get_delete_alarm(self):
# Create an alarm
alarm_name = data_utils.rand_name('telemetry_alarm')
body = self.alarming_client.create_alarm(
name=alarm_name, type='threshold', threshold_rule=self.rule)
self.assertEqual(alarm_name, body['name'])
alarm_id = body['alarm_id']
self.assertDictContainsSubset(self.rule, body['threshold_rule'])
# Update alarm with new rule and new name
new_rule = {'meter_name': 'cpu',
'comparison_operator': 'eq',
'threshold': 70.0,
'period': 60}
alarm_name_updated = data_utils.rand_name('telemetry-alarm-update')
body = self.alarming_client.update_alarm(
self.assertEqual(alarm_name_updated, body['name'])
self.assertDictContainsSubset(new_rule, body['threshold_rule'])
# Get and verify details of an alarm after update
body = self.alarming_client.show_alarm(alarm_id)
self.assertEqual(alarm_name_updated, body['name'])
self.assertDictContainsSubset(new_rule, body['threshold_rule'])
# Get history for the alarm and verify the same
body = self.alarming_client.show_alarm_history(alarm_id)
self.assertEqual("rule change", body[0]['type'])
self.assertIn(alarm_name_updated, body[0]['detail'])
self.assertEqual("creation", body[1]['type'])
self.assertIn(alarm_name, body[1]['detail'])
# Delete alarm and verify if deleted
self.alarming_client.show_alarm, alarm_id)
def test_set_get_alarm_state(self):
alarm_states = ['ok', 'alarm', 'insufficient data']
alarm = self.create_alarm(threshold_rule=self.rule)
# Set alarm state and verify
new_state =\
[elem for elem in alarm_states if elem != alarm['state']][0]
state = self.alarming_client.alarm_set_state(alarm['alarm_id'],
# Get alarm state and verify
state = self.alarming_client.show_alarm_state(alarm['alarm_id'])

View File

@ -0,0 +1,71 @@
# Copyright 2015 GlobalLogic. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils import uuidutils
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
from ceilometer.tests.tempest.aodh.api import base
class TelemetryAlarmingNegativeTest(base.BaseAlarmingTest):
"""Negative tests for show_alarm, update_alarm, show_alarm_history tests
** show non-existent alarm
** show the deleted alarm
** delete deleted alarm
** update deleted alarm
def test_get_non_existent_alarm(self):
# get the non-existent alarm
non_existent_id = uuidutils.generate_uuid()
self.assertRaises(lib_exc.NotFound, self.alarming_client.show_alarm,
def test_get_update_show_history_delete_deleted_alarm(self):
# get, update and delete the deleted alarm
alarm_name = data_utils.rand_name('telemetry_alarm')
rule = {'meter_name': 'cpu',
'comparison_operator': 'eq',
'threshold': 100.0,
'period': 90}
body = self.alarming_client.create_alarm(
alarm_id = body['alarm_id']
# get the deleted alarm
self.assertRaises(lib_exc.NotFound, self.alarming_client.show_alarm,
# update the deleted alarm
updated_alarm_name = data_utils.rand_name('telemetry_alarm_updated')
updated_rule = {'meter_name': 'cpu_new',
'comparison_operator': 'eq',
'threshold': 70,
'period': 50}
self.assertRaises(lib_exc.NotFound, self.alarming_client.update_alarm,
alarm_id, threshold_rule=updated_rule,
# delete the deleted alarm
self.assertRaises(lib_exc.NotFound, self.alarming_client.delete_alarm,

View File

@ -0,0 +1,126 @@
# Copyright 2014 OpenStack Foundation
# All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from six.moves.urllib import parse as urllib
from tempest import config
from tempest.lib.common import rest_client
from tempest import manager
import ujson
CONF = config.CONF
class AlarmingClient(rest_client.RestClient):
version = '2'
uri_prefix = "v2"
def deserialize(self, body):
return ujson.loads(body.replace("\n", ""))
def serialize(self, body):
return ujson.dumps(body)
def list_alarms(self, query=None):
uri = '%s/alarms' % self.uri_prefix
uri_dict = {}
if query:
uri_dict = {'q.field': query[0],
'q.op': query[1],
'q.value': query[2]}
if uri_dict:
uri += "?%s" % urllib.urlencode(uri_dict)
resp, body = self.get(uri)
self.expected_success(200, resp.status)
body = self.deserialize(body)
return rest_client.ResponseBodyList(resp, body)
def show_alarm(self, alarm_id):
uri = '%s/alarms/%s' % (self.uri_prefix, alarm_id)
resp, body = self.get(uri)
self.expected_success(200, resp.status)
body = self.deserialize(body)
return rest_client.ResponseBody(resp, body)
def show_alarm_history(self, alarm_id):
uri = "%s/alarms/%s/history" % (self.uri_prefix, alarm_id)
resp, body = self.get(uri)
self.expected_success(200, resp.status)
body = self.deserialize(body)
return rest_client.ResponseBodyList(resp, body)
def delete_alarm(self, alarm_id):
uri = "%s/alarms/%s" % (self.uri_prefix, alarm_id)
resp, body = self.delete(uri)
self.expected_success(204, resp.status)
if body:
body = self.deserialize(body)
return rest_client.ResponseBody(resp, body)
def create_alarm(self, **kwargs):
uri = "%s/alarms" % self.uri_prefix
body = self.serialize(kwargs)
resp, body =, body)
self.expected_success(201, resp.status)
body = self.deserialize(body)
return rest_client.ResponseBody(resp, body)
def update_alarm(self, alarm_id, **kwargs):
uri = "%s/alarms/%s" % (self.uri_prefix, alarm_id)
body = self.serialize(kwargs)
resp, body = self.put(uri, body)
self.expected_success(200, resp.status)
body = self.deserialize(body)
return rest_client.ResponseBody(resp, body)
def show_alarm_state(self, alarm_id):
uri = "%s/alarms/%s/state" % (self.uri_prefix, alarm_id)
resp, body = self.get(uri)
self.expected_success(200, resp.status)
body = self.deserialize(body)
return rest_client.ResponseBodyData(resp, body)
def alarm_set_state(self, alarm_id, state):
uri = "%s/alarms/%s/state" % (self.uri_prefix, alarm_id)
body = self.serialize(state)
resp, body = self.put(uri, body)
self.expected_success(200, resp.status)
body = self.deserialize(body)
return rest_client.ResponseBodyData(resp, body)
class Manager(manager.Manager):
default_params = {
'ca_certs': CONF.identity.ca_certificates_file,
'trace_requests': CONF.debug.trace_requests
alarming_params = {
'service': CONF.alarming_plugin.catalog_type,
'region': CONF.identity.region,
'endpoint_type': CONF.alarming_plugin.endpoint_type,
def __init__(self, credentials=None, service=None):
super(Manager, self).__init__(credentials)
def set_alarming_client(self):
self.alarming_client = AlarmingClient(self.auth_provider,

View File

@ -23,6 +23,10 @@ service_option = [cfg.BoolOpt('ceilometer',
help="Whether or not Panko is expected to be"
help="Whether or not Aodh is expected to be"
telemetry_group = cfg.OptGroup(name='telemetry',
@ -31,6 +35,9 @@ telemetry_group = cfg.OptGroup(name='telemetry',
event_group = cfg.OptGroup(name='event',
title='Event Service Options')
alarming_group = cfg.OptGroup(name='alarming_plugin',
title='Alarming Service Options')
TelemetryGroup = [
@ -57,3 +64,14 @@ event_opts = [
'publicURL', 'adminURL', 'internalURL'],
help="The endpoint type to use for the event service."),
AlarmingGroup = [
help="Catalog type of the Alarming service."),
choices=['public', 'admin', 'internal',
'publicURL', 'adminURL', 'internalURL'],
help="The endpoint type to use for the alarming service."),

View File

@ -41,6 +41,9 @@ class CeilometerTempestPlugin(plugins.TempestPlugin):
conf, tempest_config.event_group,
conf, tempest_config.alarming_group,
def get_opt_lists(self):
return [(,
@ -48,4 +51,6 @@ class CeilometerTempestPlugin(plugins.TempestPlugin):