Change tempest.conf in devstack

Change auth in tempest.conf to use dynamic credentials

Change-Id: Ib163cbcb8b853f3f543fb9264dfa2ec18c47ac22
This commit is contained in:
kaiyan-sheng 2016-01-06 13:08:56 -07:00 committed by Kaiyan Sheng
parent 4ebe49861e
commit ede4f99e82
4 changed files with 533 additions and 532 deletions

View File

@ -35,5 +35,4 @@ uri_v3 = http://127.0.0.1:35357/v3/
[auth]
allow_tenant_isolation = true
tempest_roles = monasca-user
use_dynamic_credentials = true

View File

@ -22,13 +22,15 @@ source $BASE/new/tempest/.venv/bin/activate
sudo pip install nose
sudo pip install numpy
(cd $BASE/new/tempest/; oslo-config-generator --config-file etc/config-generator.tempest.conf --output-file etc/tempest.conf)
sudo cat $BASE/new/monasca-api/devstack/files/tempest/tempest.conf >> $BASE/new/tempest/etc/tempest.conf
(cd $BASE/new/tempest/; sudo oslo-config-generator --config-file etc/config-generator.tempest.conf --output-file etc/tempest.conf)
(cd $BASE/new/; sudo sh -c 'cat monasca-api/devstack/files/tempest/tempest.conf >> tempest/etc/tempest.conf')
sudo cp $BASE/new/tempest/etc/logging.conf.sample $BASE/new/tempest/etc/logging.conf
(cd $BASE/new/monasca-api/; sudo pip install -r requirements.txt -r test-requirements.txt)
(cd $BASE/new/monasca-api/; sudo python setup.py install)
(cd $BASE/new/tempest/; sudo testr init)
(cd $BASE/new/tempest/; sudo sh -c 'testr list-tests monasca_tempest_tests | grep gate > monasca_tempest_tests_gate')
(cd $BASE/new/tempest/; sudo testr init; sudo sh -c 'testr run --subunit --load-list=monasca_tempest_tests_gate | subunit-trace --fails')
(cd $BASE/new/tempest/; sudo sh -c 'testr run --subunit --load-list=monasca_tempest_tests_gate | subunit-trace --fails')

View File

@ -34,180 +34,180 @@ class TestAlarms(base.BaseMonascaTest):
def resource_cleanup(cls):
super(TestAlarms, cls).resource_cleanup()
# @test.attr(type="gate")
# def test_list_alarms(self):
# alarm_definition_ids, expected_metric \
# = self._create_alarms_for_test_alarms(num=1)
# resp, response_body = self.monasca_client.list_alarms()
# self.assertEqual(200, resp.status)
# for element in response_body['elements']:
# self._verify_alarm_keys(element)
# metric = element['metrics'][0]
# if metric['name'] == expected_metric['name']:
# self._verify_metric_in_alarm(metric, expected_metric)
# return
# self.fail("Failed test_list_alarms: cannot find the alarm just "
# "created.")
@test.attr(type="gate")
def test_list_alarms(self):
alarm_definition_ids, expected_metric \
= self._create_alarms_for_test_alarms(num=1)
resp, response_body = self.monasca_client.list_alarms()
self.assertEqual(200, resp.status)
for element in response_body['elements']:
self._verify_alarm_keys(element)
metric = element['metrics'][0]
if metric['name'] == expected_metric['name']:
self._verify_metric_in_alarm(metric, expected_metric)
return
self.fail("Failed test_list_alarms: cannot find the alarm just "
"created.")
# @test.attr(type="gate")
# def test_list_alarms_by_alarm_definition_id(self):
# alarm_definition_ids, expected_metric \
# = self._create_alarms_for_test_alarms(num=1)
# query_param = '?alarm_definition_id=' + str(alarm_definition_ids[0])
# resp, response_body = self.monasca_client.list_alarms(query_param)
# self._verify_list_alarms_elements(resp, response_body,
# expect_num_elements=1)
# element = response_body['elements'][0]
# metric = element['metrics'][0]
# self._verify_metric_in_alarm(metric, expected_metric)
@test.attr(type="gate")
def test_list_alarms_by_alarm_definition_id(self):
alarm_definition_ids, expected_metric \
= self._create_alarms_for_test_alarms(num=1)
query_param = '?alarm_definition_id=' + str(alarm_definition_ids[0])
resp, response_body = self.monasca_client.list_alarms(query_param)
self._verify_list_alarms_elements(resp, response_body,
expect_num_elements=1)
element = response_body['elements'][0]
metric = element['metrics'][0]
self._verify_metric_in_alarm(metric, expected_metric)
# @test.attr(type="gate")
# def test_list_alarms_by_metric_name(self):
# alarm_definition_ids, expected_metric \
# = self._create_alarms_for_test_alarms(num=1)
# query_parms = '?metric_name=' + expected_metric['name']
# resp, response_body = self.monasca_client.list_alarms(query_parms)
# self._verify_list_alarms_elements(resp, response_body,
# expect_num_elements=1)
# element = response_body['elements'][0]
# metric = element['metrics'][0]
# self._verify_metric_in_alarm(metric, expected_metric)
# self.assertEqual(alarm_definition_ids[0],
# element['alarm_definition']['id'])
@test.attr(type="gate")
def test_list_alarms_by_metric_name(self):
alarm_definition_ids, expected_metric \
= self._create_alarms_for_test_alarms(num=1)
query_parms = '?metric_name=' + expected_metric['name']
resp, response_body = self.monasca_client.list_alarms(query_parms)
self._verify_list_alarms_elements(resp, response_body,
expect_num_elements=1)
element = response_body['elements'][0]
metric = element['metrics'][0]
self._verify_metric_in_alarm(metric, expected_metric)
self.assertEqual(alarm_definition_ids[0], element[
'alarm_definition']['id'])
# @test.attr(type="gate")
# def test_list_alarms_by_metric_dimensions(self):
# alarm_definition_ids, expected_metric \
# = self._create_alarms_for_test_alarms(num=1)
# for key in expected_metric['dimensions']:
# value = expected_metric['dimensions'][key]
# query_parms = '?metric_dimensions=' + key + ':' + value
# resp, response_body = self.monasca_client.list_alarms(query_parms)
# self._verify_list_alarms_elements(resp, response_body,
# expect_num_elements=1)
# element = response_body['elements'][0]
# metric = element['metrics'][0]
# self._verify_metric_in_alarm(metric, expected_metric)
# self.assertEqual(alarm_definition_ids[0],
# element['alarm_definition']['id'])
@test.attr(type="gate")
def test_list_alarms_by_metric_dimensions(self):
alarm_definition_ids, expected_metric \
= self._create_alarms_for_test_alarms(num=1)
for key in expected_metric['dimensions']:
value = expected_metric['dimensions'][key]
query_parms = '?metric_dimensions=' + key + ':' + value
resp, response_body = self.monasca_client.list_alarms(query_parms)
self._verify_list_alarms_elements(resp, response_body,
expect_num_elements=1)
element = response_body['elements'][0]
metric = element['metrics'][0]
self._verify_metric_in_alarm(metric, expected_metric)
self.assertEqual(alarm_definition_ids[0],
element['alarm_definition']['id'])
# @test.attr(type="gate")
# def test_list_alarms_by_state(self):
# helpers.delete_alarm_definitions(self.monasca_client)
# self._create_alarms_for_test_alarms(num=3)
# resp, response_body = self.monasca_client.list_alarms()
# self._verify_list_alarms_elements(resp, response_body,
# expect_num_elements=3)
# elements = response_body['elements']
# len0 = len(elements)
# query_parms = '?state=UNDETERMINED'
# resp, response_body1 = self.monasca_client.list_alarms(query_parms)
# len1 = len(response_body1['elements'])
# self.assertEqual(200, resp.status)
# query_parms = '?state=OK'
# resp, response_body2 = self.monasca_client.list_alarms(query_parms)
# len2 = len(response_body2['elements'])
# self.assertEqual(200, resp.status)
# query_parms = '?state=ALARM'
# resp, response_body3 = self.monasca_client.list_alarms(query_parms)
# len3 = len(response_body3['elements'])
# self.assertEqual(200, resp.status)
# self.assertEqual(len0, len1 + len2 + len3)
@test.attr(type="gate")
def test_list_alarms_by_state(self):
helpers.delete_alarm_definitions(self.monasca_client)
self._create_alarms_for_test_alarms(num=3)
resp, response_body = self.monasca_client.list_alarms()
self._verify_list_alarms_elements(resp, response_body,
expect_num_elements=3)
elements = response_body['elements']
len0 = len(elements)
query_parms = '?state=UNDETERMINED'
resp, response_body1 = self.monasca_client.list_alarms(query_parms)
len1 = len(response_body1['elements'])
self.assertEqual(200, resp.status)
query_parms = '?state=OK'
resp, response_body2 = self.monasca_client.list_alarms(query_parms)
len2 = len(response_body2['elements'])
self.assertEqual(200, resp.status)
query_parms = '?state=ALARM'
resp, response_body3 = self.monasca_client.list_alarms(query_parms)
len3 = len(response_body3['elements'])
self.assertEqual(200, resp.status)
self.assertEqual(len0, len1 + len2 + len3)
# @test.attr(type="gate")
# def test_list_alarms_by_lifecycle_state(self):
# alarm_definition_ids, expected_metric \
# = self._create_alarms_for_test_alarms(num=1)
# query_param = '?alarm_definition_id=' + str(alarm_definition_ids[0])
# resp, response_body = self.monasca_client.list_alarms(query_param)
# self.assertEqual(200, resp.status)
# alarm_id = response_body['elements'][0]['id']
# self.monasca_client.patch_alarm(id=alarm_id, lifecycle_state="OPEN")
# query_parms = '?alarm_definition_id=' + str(
# alarm_definition_ids[0]) + '&lifecycle_state=OPEN'
# resp, response_body = self.monasca_client.list_alarms(query_parms)
# self._verify_list_alarms_elements(resp, response_body,
# expect_num_elements=1)
# element = response_body['elements'][0]
# metric = element['metrics'][0]
# self._verify_metric_in_alarm(metric, expected_metric)
# self.assertEqual(alarm_definition_ids[0],
# element['alarm_definition']['id'])
@test.attr(type="gate")
def test_list_alarms_by_lifecycle_state(self):
alarm_definition_ids, expected_metric \
= self._create_alarms_for_test_alarms(num=1)
query_param = '?alarm_definition_id=' + str(alarm_definition_ids[0])
resp, response_body = self.monasca_client.list_alarms(query_param)
self.assertEqual(200, resp.status)
alarm_id = response_body['elements'][0]['id']
self.monasca_client.patch_alarm(id=alarm_id, lifecycle_state="OPEN")
query_parms = '?alarm_definition_id=' + str(
alarm_definition_ids[0]) + '&lifecycle_state=OPEN'
resp, response_body = self.monasca_client.list_alarms(query_parms)
self._verify_list_alarms_elements(resp, response_body,
expect_num_elements=1)
element = response_body['elements'][0]
metric = element['metrics'][0]
self._verify_metric_in_alarm(metric, expected_metric)
self.assertEqual(alarm_definition_ids[0],
element['alarm_definition']['id'])
# @test.attr(type="gate")
# def test_list_alarms_by_link(self):
# alarm_definition_ids, expected_metric \
# = self._create_alarms_for_test_alarms(num=1)
# query_param = '?alarm_definition_id=' + str(alarm_definition_ids[0])
# resp, response_body = self.monasca_client.list_alarms(query_param)
# self.assertEqual(200, resp.status)
# alarm_id = response_body['elements'][0]['id']
# self.monasca_client.patch_alarm(
# id=alarm_id, link="http://somesite.com/this-alarm-info")
# query_parms = '?link=http://somesite.com/this-alarm-info'
# resp, response_body = self.monasca_client.list_alarms(query_parms)
# self._verify_list_alarms_elements(resp, response_body,
# expect_num_elements=1)
# element = response_body['elements'][0]
# metric = element['metrics'][0]
# self._verify_metric_in_alarm(metric, expected_metric)
# self.assertEqual(alarm_definition_ids[0],
# element['alarm_definition']['id'])
@test.attr(type="gate")
def test_list_alarms_by_link(self):
alarm_definition_ids, expected_metric \
= self._create_alarms_for_test_alarms(num=1)
query_param = '?alarm_definition_id=' + str(alarm_definition_ids[0])
resp, response_body = self.monasca_client.list_alarms(query_param)
self.assertEqual(200, resp.status)
alarm_id = response_body['elements'][0]['id']
self.monasca_client.patch_alarm(
id=alarm_id, link="http://somesite.com/this-alarm-info")
query_parms = '?link=http://somesite.com/this-alarm-info'
resp, response_body = self.monasca_client.list_alarms(query_parms)
self._verify_list_alarms_elements(resp, response_body,
expect_num_elements=1)
element = response_body['elements'][0]
metric = element['metrics'][0]
self._verify_metric_in_alarm(metric, expected_metric)
self.assertEqual(alarm_definition_ids[0],
element['alarm_definition']['id'])
# @test.attr(type="gate")
# def test_list_alarms_by_state_updated_start_time(self):
# alarm_definition_ids, expected_metric \
# = self._create_alarms_for_test_alarms(num=1)
# query_param = '?alarm_definition_id=' + str(alarm_definition_ids[0])
# resp, response_body = self.monasca_client.list_alarms(query_param)
# self.assertEqual(200, resp.status)
# element = response_body['elements'][0]
# state_updated_start_time = element['state_updated_timestamp']
# query_parms = '?alarm_definition_id=' + str(alarm_definition_ids[0])\
# + '&state_updated_timestamp=' + \
# str(state_updated_start_time)
# resp, response_body = self.monasca_client.list_alarms(query_parms)
# self._verify_list_alarms_elements(resp, response_body,
# expect_num_elements=1)
# first_element = response_body['elements'][0]
# self.assertEqual(element, first_element)
# metric = element['metrics'][0]
# self._verify_metric_in_alarm(metric, expected_metric)
# self.assertEqual(alarm_definition_ids[0],
# element['alarm_definition']['id'])
@test.attr(type="gate")
def test_list_alarms_by_state_updated_start_time(self):
alarm_definition_ids, expected_metric \
= self._create_alarms_for_test_alarms(num=1)
query_param = '?alarm_definition_id=' + str(alarm_definition_ids[0])
resp, response_body = self.monasca_client.list_alarms(query_param)
self.assertEqual(200, resp.status)
element = response_body['elements'][0]
state_updated_start_time = element['state_updated_timestamp']
query_parms = '?alarm_definition_id=' + str(alarm_definition_ids[0])\
+ '&state_updated_timestamp=' + \
str(state_updated_start_time)
resp, response_body = self.monasca_client.list_alarms(query_parms)
self._verify_list_alarms_elements(resp, response_body,
expect_num_elements=1)
first_element = response_body['elements'][0]
self.assertEqual(element, first_element)
metric = element['metrics'][0]
self._verify_metric_in_alarm(metric, expected_metric)
self.assertEqual(alarm_definition_ids[0],
element['alarm_definition']['id'])
# @test.attr(type="gate")
# def test_list_alarms_by_offset_limit(self):
# helpers.delete_alarm_definitions(self.monasca_client)
# self._create_alarms_for_test_alarms(num=2)
# resp, response_body = self.monasca_client.list_alarms()
# self._verify_list_alarms_elements(resp, response_body,
# expect_num_elements=2)
# elements = response_body['elements']
# first_element = elements[0]
# next_element = elements[1]
# id_first_element = first_element['id']
# query_parms = '?offset=' + str(id_first_element) + '&limit=1'
# resp, response_body1 = self.monasca_client.list_alarms(query_parms)
# elements = response_body1['elements']
# self.assertEqual(1, len(elements))
# self.assertEqual(elements[0]['id'], next_element['id'])
# self.assertEqual(elements[0], next_element)
@test.attr(type="gate")
def test_list_alarms_by_offset_limit(self):
helpers.delete_alarm_definitions(self.monasca_client)
self._create_alarms_for_test_alarms(num=2)
resp, response_body = self.monasca_client.list_alarms()
self._verify_list_alarms_elements(resp, response_body,
expect_num_elements=2)
elements = response_body['elements']
first_element = elements[0]
next_element = elements[1]
id_first_element = first_element['id']
query_parms = '?offset=' + str(id_first_element) + '&limit=1'
resp, response_body1 = self.monasca_client.list_alarms(query_parms)
elements = response_body1['elements']
self.assertEqual(1, len(elements))
self.assertEqual(elements[0]['id'], next_element['id'])
self.assertEqual(elements[0], next_element)
# @test.attr(type="gate")
# def test_get_alarm(self):
# alarm_definition_ids, expected_metric \
# = self._create_alarms_for_test_alarms(num=1)
# query_param = '?alarm_definition_id=' + str(alarm_definition_ids[0])
# resp, response_body = self.monasca_client.list_alarms(query_param)
# self.assertEqual(200, resp.status)
# element = response_body['elements'][0]
# alarm_id = element['id']
# resp, response_body = self.monasca_client.get_alarm(alarm_id)
# self.assertEqual(200, resp.status)
# self._verify_alarm_keys(response_body)
# metric = element['metrics'][0]
# self._verify_metric_in_alarm(metric, expected_metric)
@test.attr(type="gate")
def test_get_alarm(self):
alarm_definition_ids, expected_metric \
= self._create_alarms_for_test_alarms(num=1)
query_param = '?alarm_definition_id=' + str(alarm_definition_ids[0])
resp, response_body = self.monasca_client.list_alarms(query_param)
self.assertEqual(200, resp.status)
element = response_body['elements'][0]
alarm_id = element['id']
resp, response_body = self.monasca_client.get_alarm(alarm_id)
self.assertEqual(200, resp.status)
self._verify_alarm_keys(response_body)
metric = element['metrics'][0]
self._verify_metric_in_alarm(metric, expected_metric)
@test.attr(type="gate")
@test.attr(type=['negative'])
@ -216,176 +216,176 @@ class TestAlarms(base.BaseMonascaTest):
self.assertRaises(exceptions.NotFound, self.monasca_client.get_alarm,
alarm_id)
# @test.attr(type="gate")
# def test_update_alarm(self):
# alarm_definition_ids, expected_metric \
# = self._create_alarms_for_test_alarms(num=1)
# query_param = '?alarm_definition_id=' + str(alarm_definition_ids[0])
# resp, response_body = self.monasca_client.list_alarms(query_param)
# self.assertEqual(200, resp.status)
# element = response_body['elements'][0]
# alarm_id = element['id']
# updated_state = "ALARM"
# updated_lifecycle_state = "OPEN"
# updated_link = "http://somesite.com"
# resp, response_body = self.monasca_client.update_alarm(
# id=alarm_id, state=updated_state,
# lifecycle_state=updated_lifecycle_state, link=updated_link)
# self.assertEqual(200, resp.status)
# self._verify_alarm_keys(response_body)
# # Validate fields updated
# resp, response_body = self.monasca_client.list_alarms(query_param)
# self._verify_list_alarms_elements(resp, response_body,
# expect_num_elements=1)
# element = response_body['elements'][0]
# self.assertEqual(updated_state, element['state'])
# self.assertEqual(updated_lifecycle_state, element['lifecycle_state'])
# self.assertEqual(updated_link, element['link'])
@test.attr(type="gate")
def test_update_alarm(self):
alarm_definition_ids, expected_metric \
= self._create_alarms_for_test_alarms(num=1)
query_param = '?alarm_definition_id=' + str(alarm_definition_ids[0])
resp, response_body = self.monasca_client.list_alarms(query_param)
self.assertEqual(200, resp.status)
element = response_body['elements'][0]
alarm_id = element['id']
updated_state = "ALARM"
updated_lifecycle_state = "OPEN"
updated_link = "http://somesite.com"
resp, response_body = self.monasca_client.update_alarm(
id=alarm_id, state=updated_state,
lifecycle_state=updated_lifecycle_state, link=updated_link)
self.assertEqual(200, resp.status)
self._verify_alarm_keys(response_body)
# Validate fields updated
resp, response_body = self.monasca_client.list_alarms(query_param)
self._verify_list_alarms_elements(resp, response_body,
expect_num_elements=1)
element = response_body['elements'][0]
self.assertEqual(updated_state, element['state'])
self.assertEqual(updated_lifecycle_state, element['lifecycle_state'])
self.assertEqual(updated_link, element['link'])
# @test.attr(type="gate")
# def test_patch_alarm(self):
# alarm_definition_ids, expected_metric \
# = self._create_alarms_for_test_alarms(num=1)
# query_param = '?alarm_definition_id=' + str(alarm_definition_ids[0])
# resp, response_body = self.monasca_client.list_alarms(query_param)
# self.assertEqual(200, resp.status)
# elements = response_body['elements']
# alarm_id = elements[0]['id']
# patch_link = "http://somesite.com"
# resp, response_body = self.monasca_client.patch_alarm(
# id=alarm_id, link=patch_link)
# self.assertEqual(200, resp.status)
# self._verify_alarm_keys(response_body)
# # Validate the field patched
# resp, response_body = self.monasca_client.list_alarms(query_param)
# self._verify_list_alarms_elements(resp, response_body,
# expect_num_elements=1)
# self.assertEqual(patch_link, response_body['elements'][0]['link'])
@test.attr(type="gate")
def test_patch_alarm(self):
alarm_definition_ids, expected_metric \
= self._create_alarms_for_test_alarms(num=1)
query_param = '?alarm_definition_id=' + str(alarm_definition_ids[0])
resp, response_body = self.monasca_client.list_alarms(query_param)
self.assertEqual(200, resp.status)
elements = response_body['elements']
alarm_id = elements[0]['id']
patch_link = "http://somesite.com"
resp, response_body = self.monasca_client.patch_alarm(
id=alarm_id, link=patch_link)
self.assertEqual(200, resp.status)
self._verify_alarm_keys(response_body)
# Validate the field patched
resp, response_body = self.monasca_client.list_alarms(query_param)
self._verify_list_alarms_elements(resp, response_body,
expect_num_elements=1)
self.assertEqual(patch_link, response_body['elements'][0]['link'])
# @test.attr(type="gate")
# def test_delete_alarm(self):
# alarm_definition_ids, expected_metric \
# = self._create_alarms_for_test_alarms(num=1)
# query_param = '?alarm_definition_id=' + str(alarm_definition_ids[0])
# resp, response_body = self.monasca_client.list_alarms(query_param)
# self.assertEqual(200, resp.status)
# elements = response_body['elements']
# alarm_id = elements[0]['id']
# resp, response_body = self.monasca_client.delete_alarm(alarm_id)
# self.assertEqual(204, resp.status)
# resp, response_body = self.monasca_client.list_alarms(query_param)
# self._verify_list_alarms_elements(resp, response_body,
# expect_num_elements=0)
@test.attr(type="gate")
def test_delete_alarm(self):
alarm_definition_ids, expected_metric \
= self._create_alarms_for_test_alarms(num=1)
query_param = '?alarm_definition_id=' + str(alarm_definition_ids[0])
resp, response_body = self.monasca_client.list_alarms(query_param)
self.assertEqual(200, resp.status)
elements = response_body['elements']
alarm_id = elements[0]['id']
resp, response_body = self.monasca_client.delete_alarm(alarm_id)
self.assertEqual(204, resp.status)
resp, response_body = self.monasca_client.list_alarms(query_param)
self._verify_list_alarms_elements(resp, response_body,
expect_num_elements=0)
# @test.attr(type="gate")
# @test.attr(type=['negative'])
# def test_delete_alarm_with_invalid_id(self):
# id = data_utils.rand_name()
# self.assertRaises(exceptions.NotFound,
# self.monasca_client.delete_alarm, id)
@test.attr(type="gate")
@test.attr(type=['negative'])
def test_delete_alarm_with_invalid_id(self):
id = data_utils.rand_name()
self.assertRaises(exceptions.NotFound,
self.monasca_client.delete_alarm, id)
# @test.attr(type="gate")
# def test_create_alarms_with_match_by(self):
# # Create an alarm definition with no match_by
# name = data_utils.rand_name('alarm_definition_1')
# expression = "max(cpu.idle_perc{service=monitoring}) < 20"
# alarm_definition = helpers.create_alarm_definition(
# name=name, description="description", expression=expression)
# resp, response_body = self.monasca_client.create_alarm_definitions(
# alarm_definition)
# alarm_definition_id = response_body['id']
# self._create_metrics_for_match_by(
# num=1, alarm_definition_id=alarm_definition_id)
# query_param = '?alarm_definition_id=' + str(alarm_definition_id)
# resp, response_body = self.monasca_client.list_alarms(query_param)
# self._verify_list_alarms_elements(resp, response_body,
# expect_num_elements=1)
# elements = response_body['elements']
# metrics = elements[0]['metrics']
# self.assertEqual(len(metrics), 2)
# self.assertNotEqual(metrics[0], metrics[1])
@test.attr(type="gate")
def test_create_alarms_with_match_by(self):
# Create an alarm definition with no match_by
name = data_utils.rand_name('alarm_definition_1')
expression = "max(cpu.idle_perc{service=monitoring}) < 20"
alarm_definition = helpers.create_alarm_definition(
name=name, description="description", expression=expression)
resp, response_body = self.monasca_client.create_alarm_definitions(
alarm_definition)
alarm_definition_id = response_body['id']
self._create_metrics_for_match_by(
num=1, alarm_definition_id=alarm_definition_id)
query_param = '?alarm_definition_id=' + str(alarm_definition_id)
resp, response_body = self.monasca_client.list_alarms(query_param)
self._verify_list_alarms_elements(resp, response_body,
expect_num_elements=1)
elements = response_body['elements']
metrics = elements[0]['metrics']
self.assertEqual(len(metrics), 2)
self.assertNotEqual(metrics[0], metrics[1])
# # Create an alarm definition with match_by
# name = data_utils.rand_name('alarm_definition_2')
# expression = "max(cpu.idle_perc{service=monitoring}) < 20"
# match_by = ['hostname']
# alarm_definition = helpers.create_alarm_definition(
# name=name, description="description", expression=expression,
# match_by=match_by)
# resp, response_body = self.monasca_client.create_alarm_definitions(
# alarm_definition)
# alarm_definition_id = response_body['id']
# # create some metrics
# self._create_metrics_for_match_by(
# num=2, alarm_definition_id=alarm_definition_id)
# query_param = '?alarm_definition_id=' + str(alarm_definition_id)
# resp, response_body = self.monasca_client.list_alarms(query_param)
# self._verify_list_alarms_elements(resp, response_body,
# expect_num_elements=2)
# elements = response_body['elements']
# self.assertEqual(len(elements[0]['metrics']), 1)
# self.assertEqual(len(elements[1]['metrics']), 1)
# self.assertNotEqual(elements[0]['metrics'], elements[1]['metrics'])
# Create an alarm definition with match_by
name = data_utils.rand_name('alarm_definition_2')
expression = "max(cpu.idle_perc{service=monitoring}) < 20"
match_by = ['hostname']
alarm_definition = helpers.create_alarm_definition(
name=name, description="description", expression=expression,
match_by=match_by)
resp, response_body = self.monasca_client.create_alarm_definitions(
alarm_definition)
alarm_definition_id = response_body['id']
# create some metrics
self._create_metrics_for_match_by(
num=2, alarm_definition_id=alarm_definition_id)
query_param = '?alarm_definition_id=' + str(alarm_definition_id)
resp, response_body = self.monasca_client.list_alarms(query_param)
self._verify_list_alarms_elements(resp, response_body,
expect_num_elements=2)
elements = response_body['elements']
self.assertEqual(len(elements[0]['metrics']), 1)
self.assertEqual(len(elements[1]['metrics']), 1)
self.assertNotEqual(elements[0]['metrics'], elements[1]['metrics'])
# @test.attr(type="gate")
# def test_create_alarms_with_sub_expressions_and_match_by(self):
# # Create an alarm definition with sub-expressions and match_by
# name = data_utils.rand_name('alarm_definition_3')
# expression = "max(cpu.idle_perc{service=monitoring}) < 10 or " \
# "max(cpu.user_perc{service=monitoring}) > 60"
# match_by = ['hostname']
# alarm_definition = helpers.create_alarm_definition(
# name=name, description="description", expression=expression,
# match_by=match_by)
# resp, response_body = self.monasca_client.create_alarm_definitions(
# alarm_definition)
# alarm_definition_id = response_body['id']
# self._create_metrics_for_match_by_sub_expressions(
# num=2, alarm_definition_id=alarm_definition_id)
# query_param = '?alarm_definition_id=' + str(alarm_definition_id)
# resp, response_body = self.monasca_client.list_alarms(query_param)
# self._verify_list_alarms_elements(resp, response_body,
# expect_num_elements=2)
# elements = response_body['elements']
# hostnames = []
# for i in xrange(2):
# self.assertEqual(len(elements[i]['metrics']), 2)
# for i in xrange(2):
# for j in xrange(2):
# hostnames.append(elements[i]['metrics'][j]['dimensions'][
# 'hostname'])
# self.assertEqual(hostnames[0], hostnames[1])
# self.assertEqual(hostnames[2], hostnames[3])
# self.assertNotEqual(hostnames[0], hostnames[2])
@test.attr(type="gate")
def test_create_alarms_with_sub_expressions_and_match_by(self):
# Create an alarm definition with sub-expressions and match_by
name = data_utils.rand_name('alarm_definition_3')
expression = "max(cpu.idle_perc{service=monitoring}) < 10 or " \
"max(cpu.user_perc{service=monitoring}) > 60"
match_by = ['hostname']
alarm_definition = helpers.create_alarm_definition(
name=name, description="description", expression=expression,
match_by=match_by)
resp, response_body = self.monasca_client.create_alarm_definitions(
alarm_definition)
alarm_definition_id = response_body['id']
self._create_metrics_for_match_by_sub_expressions(
num=2, alarm_definition_id=alarm_definition_id)
query_param = '?alarm_definition_id=' + str(alarm_definition_id)
resp, response_body = self.monasca_client.list_alarms(query_param)
self._verify_list_alarms_elements(resp, response_body,
expect_num_elements=2)
elements = response_body['elements']
hostnames = []
for i in xrange(2):
self.assertEqual(len(elements[i]['metrics']), 2)
for i in xrange(2):
for j in xrange(2):
hostnames.append(elements[i]['metrics'][j]['dimensions'][
'hostname'])
self.assertEqual(hostnames[0], hostnames[1])
self.assertEqual(hostnames[2], hostnames[3])
self.assertNotEqual(hostnames[0], hostnames[2])
# @test.attr(type="gate")
# def test_create_alarms_with_match_by_list(self):
# # Create an alarm definition with match_by as a list
# name = data_utils.rand_name('alarm_definition')
# expression = "max(cpu.idle_perc{service=monitoring}) < 10"
# match_by = ['hostname', 'device']
# alarm_definition = helpers.create_alarm_definition(
# name=name, description="description", expression=expression,
# match_by=match_by)
# resp, response_body = self.monasca_client.create_alarm_definitions(
# alarm_definition)
# alarm_definition_id = response_body['id']
# query_param = '?alarm_definition_id=' + str(alarm_definition_id)
# # create some metrics
# self._create_metrics_for_match_by_sub_expressions_list(
# num=4, alarm_definition_id=alarm_definition_id)
# resp, response_body = self.monasca_client.list_alarms(query_param)
# self._verify_list_alarms_elements(resp, response_body,
# expect_num_elements=4)
# elements = response_body['elements']
# dimensions = []
# for i in xrange(4):
# self.assertEqual(len(elements[i]['metrics']), 1)
# dimensions.append(elements[i]['metrics'][0]['dimensions'])
# for i in xrange(4):
# for j in xrange(4):
# if i != j:
# self.assertNotEqual(dimensions[i], dimensions[j])
@test.attr(type="gate")
def test_create_alarms_with_match_by_list(self):
# Create an alarm definition with match_by as a list
name = data_utils.rand_name('alarm_definition')
expression = "max(cpu.idle_perc{service=monitoring}) < 10"
match_by = ['hostname', 'device']
alarm_definition = helpers.create_alarm_definition(
name=name, description="description", expression=expression,
match_by=match_by)
resp, response_body = self.monasca_client.create_alarm_definitions(
alarm_definition)
alarm_definition_id = response_body['id']
query_param = '?alarm_definition_id=' + str(alarm_definition_id)
# create some metrics
self._create_metrics_for_match_by_sub_expressions_list(
num=4, alarm_definition_id=alarm_definition_id)
resp, response_body = self.monasca_client.list_alarms(query_param)
self._verify_list_alarms_elements(resp, response_body,
expect_num_elements=4)
elements = response_body['elements']
dimensions = []
for i in xrange(4):
self.assertEqual(len(elements[i]['metrics']), 1)
dimensions.append(elements[i]['metrics'][0]['dimensions'])
for i in xrange(4):
for j in xrange(4):
if i != j:
self.assertNotEqual(dimensions[i], dimensions[j])
def _verify_list_alarms_elements(self, resp, response_body,
expect_num_elements):

View File

@ -48,225 +48,225 @@ class TestAlarmsStateHistory(base.BaseMonascaTest):
break
time.sleep(1)
# @test.attr(type="gate")
# def test_list_alarms_state_history(self):
# resp, response_body = self.monasca_client.list_alarms_state_history()
# self.assertEqual(200, resp.status)
# # Test response body
# self.assertTrue(set(['links', 'elements']) == set(response_body))
# elements = response_body['elements']
# number_of_alarms = len(elements)
# if number_of_alarms < 1:
# error_msg = "Failed test_list_alarms_state_history: need " \
# "at least one alarms state history to test."
# self.fail(error_msg)
# else:
# element = elements[0]
# self.assertTrue(set(['id', 'alarm_id', 'metrics', 'old_state',
# 'new_state', 'reason', 'reason_data',
# 'timestamp', 'sub_alarms'])
# == set(element))
@test.attr(type="gate")
def test_list_alarms_state_history(self):
resp, response_body = self.monasca_client.list_alarms_state_history()
self.assertEqual(200, resp.status)
# Test response body
self.assertTrue(set(['links', 'elements']) == set(response_body))
elements = response_body['elements']
number_of_alarms = len(elements)
if number_of_alarms < 1:
error_msg = "Failed test_list_alarms_state_history: need " \
"at least one alarms state history to test."
self.fail(error_msg)
else:
element = elements[0]
self.assertTrue(set(['id', 'alarm_id', 'metrics', 'old_state',
'new_state', 'reason', 'reason_data',
'timestamp', 'sub_alarms'])
== set(element))
# @test.attr(type="gate")
# def test_list_alarms_state_history_with_dimensions(self):
# resp, response_body = self.monasca_client.list_alarms_state_history()
# elements = response_body['elements']
# if elements:
# element = elements[0]
# dimension = element['metrics'][0]['dimensions']
# dimension_items = dimension.items()
# dimension_item = dimension_items[0]
# dimension_item_0 = dimension_item[0]
# dimension_item_1 = dimension_item[1]
# name = element['metrics'][0]['name']
@test.attr(type="gate")
def test_list_alarms_state_history_with_dimensions(self):
resp, response_body = self.monasca_client.list_alarms_state_history()
elements = response_body['elements']
if elements:
element = elements[0]
dimension = element['metrics'][0]['dimensions']
dimension_items = dimension.items()
dimension_item = dimension_items[0]
dimension_item_0 = dimension_item[0]
dimension_item_1 = dimension_item[1]
name = element['metrics'][0]['name']
# query_parms = '?dimensions=' + str(dimension_item_0) + ':' + str(
# dimension_item_1)
# resp, response_body = self.monasca_client.\
# list_alarms_state_history(query_parms)
# name_new = response_body['elements'][0]['metrics'][0]['name']
# self.assertEqual(200, resp.status)
# self.assertEqual(name, name_new)
# else:
# error_msg = "Failed test_list_alarms_state_history_with_" \
# "dimensions: need at least one alarms state history " \
# "to test."
# self.fail(error_msg)
query_parms = '?dimensions=' + str(dimension_item_0) + ':' + str(
dimension_item_1)
resp, response_body = self.monasca_client.\
list_alarms_state_history(query_parms)
name_new = response_body['elements'][0]['metrics'][0]['name']
self.assertEqual(200, resp.status)
self.assertEqual(name, name_new)
else:
error_msg = "Failed test_list_alarms_state_history_with_" \
"dimensions: need at least one alarms state history " \
"to test."
self.fail(error_msg)
# @test.attr(type="gate")
# def test_list_alarms_state_history_with_start_time(self):
# # 1, get all histories
# resp, all_response_body = self.monasca_client.\
# list_alarms_state_history()
# all_elements = all_response_body['elements']
@test.attr(type="gate")
def test_list_alarms_state_history_with_start_time(self):
# 1, get all histories
resp, all_response_body = self.monasca_client.\
list_alarms_state_history()
all_elements = all_response_body['elements']
# if len(all_elements) < 2:
# error_msg = "Failed test_list_alarms_state_history_with_" \
# "start_time: need 2 or more alarms state history " \
# "to test."
# self.fail(error_msg)
if len(all_elements) < 2:
error_msg = "Failed test_list_alarms_state_history_with_" \
"start_time: need 2 or more alarms state history " \
"to test."
self.fail(error_msg)
# # 2, query min(timestamp) < x
# min_element, max_element = self._get_elements_with_min_max_timestamp(
# all_elements)
# start_time = min_element['timestamp']
# query_params = '?start_time=' + str(start_time)
# resp, selected_response_body = self.monasca_client.\
# list_alarms_state_history(query_params)
# selected_elements = selected_response_body['elements']
# 2, query min(timestamp) < x
min_element, max_element = self._get_elements_with_min_max_timestamp(
all_elements)
start_time = min_element['timestamp']
query_params = '?start_time=' + str(start_time)
resp, selected_response_body = self.monasca_client.\
list_alarms_state_history(query_params)
selected_elements = selected_response_body['elements']
# # 3. compare #1 and #2
# expected_elements = all_elements
# expected_elements.remove(min_element)
# self.assertEqual(expected_elements, selected_elements)
# 3. compare #1 and #2
expected_elements = all_elements
expected_elements.remove(min_element)
self.assertEqual(expected_elements, selected_elements)
# @test.attr(type="gate")
# def test_list_alarms_state_history_with_end_time(self):
# # 1, get all histories
# resp, all_response_body = self.monasca_client.\
# list_alarms_state_history()
# all_elements = all_response_body['elements']
@test.attr(type="gate")
def test_list_alarms_state_history_with_end_time(self):
# 1, get all histories
resp, all_response_body = self.monasca_client.\
list_alarms_state_history()
all_elements = all_response_body['elements']
# if len(all_elements) < 2:
# error_msg = "Failed test_list_alarms_state_history_with_" \
# "end_time: need 2 or more alarms state history " \
# "to test."
# self.fail(error_msg)
if len(all_elements) < 2:
error_msg = "Failed test_list_alarms_state_history_with_" \
"end_time: need 2 or more alarms state history " \
"to test."
self.fail(error_msg)
# # 2, query x < max(timestamp)
# min_element, max_element = self._get_elements_with_min_max_timestamp(
# all_elements)
# end_time = max_element['timestamp']
# query_params = '?end_time=' + str(end_time)
# resp, selected_response_body = self.monasca_client.\
# list_alarms_state_history(query_params)
# selected_elements = selected_response_body['elements']
# 2, query x < max(timestamp)
min_element, max_element = self._get_elements_with_min_max_timestamp(
all_elements)
end_time = max_element['timestamp']
query_params = '?end_time=' + str(end_time)
resp, selected_response_body = self.monasca_client.\
list_alarms_state_history(query_params)
selected_elements = selected_response_body['elements']
# # 3. compare #1 and #2
# expected_elements = all_elements
# expected_elements.remove(max_element)
# self.assertEqual(expected_elements, selected_elements)
# 3. compare #1 and #2
expected_elements = all_elements
expected_elements.remove(max_element)
self.assertEqual(expected_elements, selected_elements)
# @test.attr(type="gate")
# def test_list_alarms_state_history_with_start_end_time(self):
# # 1, get all histories
# resp, all_response_body = self.monasca_client.\
# list_alarms_state_history()
# all_elements = all_response_body['elements']
@test.attr(type="gate")
def test_list_alarms_state_history_with_start_end_time(self):
# 1, get all histories
resp, all_response_body = self.monasca_client.\
list_alarms_state_history()
all_elements = all_response_body['elements']
# if len(all_elements) < 3:
# error_msg = "Failed test_list_alarms_state_history_with_" \
# "start_end_time: need 3 or more alarms state history " \
# "to test."
# self.fail(error_msg)
if len(all_elements) < 3:
error_msg = "Failed test_list_alarms_state_history_with_" \
"start_end_time: need 3 or more alarms state history " \
"to test."
self.fail(error_msg)
# # 2, query min(timestamp) < x < max(timestamp)
# min_element, max_element = self._get_elements_with_min_max_timestamp(
# all_elements)
# start_time = min_element['timestamp']
# end_time = max_element['timestamp']
# query_params = '?start_time=' + str(start_time) + '&end_time=' + \
# str(end_time)
# resp, selected_response_body = self.monasca_client.\
# list_alarms_state_history(query_params)
# selected_elements = selected_response_body['elements']
# 2, query min(timestamp) < x < max(timestamp)
min_element, max_element = self._get_elements_with_min_max_timestamp(
all_elements)
start_time = min_element['timestamp']
end_time = max_element['timestamp']
query_params = '?start_time=' + str(start_time) + '&end_time=' + \
str(end_time)
resp, selected_response_body = self.monasca_client.\
list_alarms_state_history(query_params)
selected_elements = selected_response_body['elements']
# # 3. compare #1 and #2
# expected_elements = all_elements
# expected_elements.remove(min_element)
# expected_elements.remove(max_element)
# self.assertEqual(expected_elements, selected_elements)
# 3. compare #1 and #2
expected_elements = all_elements
expected_elements.remove(min_element)
expected_elements.remove(max_element)
self.assertEqual(expected_elements, selected_elements)
# @test.attr(type="gate")
# def test_list_alarms_state_history_with_offset_limit(self):
# resp, response_body = self.monasca_client.list_alarms_state_history()
# elements = response_body['elements']
# number_of_alarms = len(elements)
# if number_of_alarms >= MIN_HISTORY:
# first_element = elements[0]
# first_element_id = first_element['id']
@test.attr(type="gate")
def test_list_alarms_state_history_with_offset_limit(self):
resp, response_body = self.monasca_client.list_alarms_state_history()
elements = response_body['elements']
number_of_alarms = len(elements)
if number_of_alarms >= MIN_HISTORY:
first_element = elements[0]
first_element_id = first_element['id']
# for limit in xrange(1, MIN_HISTORY):
# query_parms = '?limit=' + str(limit) + \
# '&offset=' + str(first_element_id)
# resp, response_body = self.monasca_client.\
# list_alarms_state_history(query_parms)
# elements = response_body['elements']
# element_new = elements[0]
# self.assertEqual(200, resp.status)
# self.assertEqual(element_new, first_element)
# self.assertEqual(limit, len(elements))
# id_new = element_new['id']
# self.assertEqual(id_new, first_element_id)
# else:
# error_msg = ("Failed "
# "test_list_alarms_state_history_with_offset "
# "limit: need three alarms state history to "
# "test. Current number of alarms = {}").\
# format(number_of_alarms)
# self.fail(error_msg)
for limit in xrange(1, MIN_HISTORY):
query_parms = '?limit=' + str(limit) + \
'&offset=' + str(first_element_id)
resp, response_body = self.monasca_client.\
list_alarms_state_history(query_parms)
elements = response_body['elements']
element_new = elements[0]
self.assertEqual(200, resp.status)
self.assertEqual(element_new, first_element)
self.assertEqual(limit, len(elements))
id_new = element_new['id']
self.assertEqual(id_new, first_element_id)
else:
error_msg = ("Failed "
"test_list_alarms_state_history_with_offset "
"limit: need three alarms state history to "
"test. Current number of alarms = {}").\
format(number_of_alarms)
self.fail(error_msg)
# @test.attr(type="gate")
# def test_list_alarm_state_history(self):
# # Get the alarm state history for a specific alarm by ID
# resp, response_body = self.monasca_client.list_alarms_state_history()
# self.assertEqual(200, resp.status)
# elements = response_body['elements']
# if elements:
# element = elements[0]
# alarm_id = element['alarm_id']
# resp, response_body = self.monasca_client.list_alarm_state_history(
# alarm_id)
# self.assertEqual(200, resp.status)
@test.attr(type="gate")
def test_list_alarm_state_history(self):
# Get the alarm state history for a specific alarm by ID
resp, response_body = self.monasca_client.list_alarms_state_history()
self.assertEqual(200, resp.status)
elements = response_body['elements']
if elements:
element = elements[0]
alarm_id = element['alarm_id']
resp, response_body = self.monasca_client.list_alarm_state_history(
alarm_id)
self.assertEqual(200, resp.status)
# # Test Response Body
# self.assertTrue(set(['links', 'elements']) ==
# set(response_body))
# elements = response_body['elements']
# links = response_body['links']
# self.assertTrue(isinstance(links, list))
# link = links[0]
# self.assertTrue(set(['rel', 'href']) ==
# set(link))
# self.assertEqual(link['rel'], u'self')
# definition = elements[0]
# self.assertTrue(set(['id', 'alarm_id', 'metrics', 'new_state',
# 'old_state', 'reason', 'reason_data',
# 'sub_alarms', 'timestamp']) ==
# set(definition))
# else:
# error_msg = "Failed test_list_alarm_state_history: at least one " \
# "alarm state history is needed."
# self.fail(error_msg)
# Test Response Body
self.assertTrue(set(['links', 'elements']) ==
set(response_body))
elements = response_body['elements']
links = response_body['links']
self.assertTrue(isinstance(links, list))
link = links[0]
self.assertTrue(set(['rel', 'href']) ==
set(link))
self.assertEqual(link['rel'], u'self')
definition = elements[0]
self.assertTrue(set(['id', 'alarm_id', 'metrics', 'new_state',
'old_state', 'reason', 'reason_data',
'sub_alarms', 'timestamp']) ==
set(definition))
else:
error_msg = "Failed test_list_alarm_state_history: at least one " \
"alarm state history is needed."
self.fail(error_msg)
# @test.attr(type="gate")
# def test_list_alarm_state_history_with_offset_limit(self):
# # Get the alarm state history for a specific alarm by ID
# resp, response_body = self.monasca_client.list_alarms_state_history()
# self.assertEqual(200, resp.status)
# elements = response_body['elements']
# if elements:
# element = elements[0]
# alarm_id = element['alarm_id']
# query_parms = '?limit=1'
# resp, response_body = self.monasca_client.\
# list_alarm_state_history(alarm_id, query_parms)
# elements = response_body['elements']
# self.assertEqual(200, resp.status)
# self.assertEqual(1, len(elements))
@test.attr(type="gate")
def test_list_alarm_state_history_with_offset_limit(self):
# Get the alarm state history for a specific alarm by ID
resp, response_body = self.monasca_client.list_alarms_state_history()
self.assertEqual(200, resp.status)
elements = response_body['elements']
if elements:
element = elements[0]
alarm_id = element['alarm_id']
query_parms = '?limit=1'
resp, response_body = self.monasca_client.\
list_alarm_state_history(alarm_id, query_parms)
elements = response_body['elements']
self.assertEqual(200, resp.status)
self.assertEqual(1, len(elements))
# id = element['id']
# query_parms = '?limit=1&offset=' + str(id)
# resp, response_body = self.monasca_client.\
# list_alarm_state_history(alarm_id, query_parms)
# elements_new = response_body['elements']
# self.assertEqual(200, resp.status)
# self.assertEqual(1, len(elements_new))
# self.assertEqual(element, elements_new[0])
# else:
# error_msg = "Failed test_list_alarm_state_history_with_offset" \
# "_limit: at least one alarms state history is needed."
# self.fail(error_msg)
id = element['id']
query_parms = '?limit=1&offset=' + str(id)
resp, response_body = self.monasca_client.\
list_alarm_state_history(alarm_id, query_parms)
elements_new = response_body['elements']
self.assertEqual(200, resp.status)
self.assertEqual(1, len(elements_new))
self.assertEqual(element, elements_new[0])
else:
error_msg = "Failed test_list_alarm_state_history_with_offset" \
"_limit: at least one alarms state history is needed."
self.fail(error_msg)
def _get_elements_with_min_max_timestamp(self, elements):
sorted_elements = sorted(elements, key=lambda element: timeutils.