Improve tests coverage

* Add unit tests
* Fix python3 and python2 compatibility

Story: 2003881
Task: 29254

Change-Id: Ib54b94737dace3104976321e38ad1a9076a46877
This commit is contained in:
Adrian Czarnecki 2019-01-23 14:20:03 +01:00
parent 55a172c072
commit e18ed3f02d
12 changed files with 1997 additions and 85 deletions

View File

@ -40,7 +40,7 @@ class AlarmsCountV2API(object):
super(AlarmsCountV2API, self).__init__()
def on_get(self, req, res):
res.status = "501 Not Implemented"
res.status = '501 Not Implemented'
class AlarmsStateHistoryV2API(object):

View File

@ -115,4 +115,4 @@ class Request(falcon.Request):
return self.context.can(action, target)
def __repr__(self):
return '%s, context=%s' % (self.path, self.context)
return '%s, context=%s' % (self.path, self.context.to_dict())

View File

@ -294,7 +294,7 @@ class AlarmsRepository(sql_repository.SQLRepository,
query = query.where(or_(ad.c.severity == bindparam(
'b_severity' + str(i)) for i in range(len(severities))))
for i, s in enumerate(severities):
parms['b_severity' + str(i)] = s.encode('utf8')
parms['b_severity' + str(i)] = s if six.PY3 else s.encode('utf-8')
if 'state' in query_parms:
query = query.where(a.c.state == bindparam('b_state'))
@ -505,7 +505,8 @@ class AlarmsRepository(sql_repository.SQLRepository,
query = query.where(ad.c.id == bindparam('b_alarm_definition_id'))
if 'state' in query_parms:
parms['b_state'] = query_parms['state'].encode('utf8')
parms['b_state'] = query_parms['state'] if six.PY3 else \
query_parms['state'].encode('utf8')
query = query.where(a.c.state == bindparam('b_state'))
if 'severity' in query_parms:
@ -513,23 +514,29 @@ class AlarmsRepository(sql_repository.SQLRepository,
query = query.where(or_(ad.c.severity == bindparam(
'b_severity' + str(i)) for i in range(len(severities))))
for i, s in enumerate(severities):
parms['b_severity' + str(i)] = s.encode('utf8')
parms['b_severity' + str(i)] = s if six.PY3 else s.encode('utf8')
if 'lifecycle_state' in query_parms:
parms['b_lifecycle_state'] = query_parms['lifecycle_state'].encode('utf8')
parms['b_lifecycle_state'] = query_parms['lifecycle_state'] if six.PY3 else \
query_parms['lifecycle_state'].encode('utf8')
query = query.where(a.c.lifecycle_state == bindparam('b_lifecycle_state'))
if 'link' in query_parms:
parms['b_link'] = query_parms['link'].encode('utf8')
parms['b_link'] = query_parms['link'] if six.PY3 else \
query_parms['link'].encode('utf8')
query = query.where(a.c.link == bindparam('b_link'))
if 'state_updated_start_time' in query_parms:
parms['b_state_updated_at'] = query_parms['state_updated_start_time'].encode("utf8")
date_str = query_parms['state_updated_start_time'] if six.PY3 \
else query_parms['state_updated_start_time'].encode('utf8')
date_param = datetime.strptime(date_str, '%Y-%m-%dT%H:%M:%S.%fZ')
parms['b_state_updated_at'] = date_param
query = query.where(a.c.state_updated_at >= bindparam('b_state_updated_at'))
if 'metric_name' in query_parms:
query = query.where(a.c.id.in_(self.get_a_am_query))
parms['b_md_name'] = query_parms['metric_name'].encode('utf8')
parms['b_md_name'] = query_parms['metric_name'] if six.PY3 else \
query_parms['metric_name'].encode('utf8')
if 'metric_dimensions' in query_parms:
sub_query = select([a.c.id])
@ -555,8 +562,10 @@ class AlarmsRepository(sql_repository.SQLRepository,
sub_query_md.c.dimension_set_id ==
mdd.c.metric_dimension_set_id))
parms[md_name] = metric_dimension[0].encode('utf8')
parms[md_value] = metric_dimension[1].encode('utf8')
parms[md_name] = metric_dimension[0] if six.PY3 else \
metric_dimension[0].encode('utf8')
parms[md_value] = metric_dimension[1] if six.PY3 else \
metric_dimension[1].encode('utf8')
sub_query = (sub_query
.select_from(sub_query_from)
@ -577,5 +586,4 @@ class AlarmsRepository(sql_repository.SQLRepository,
parms['b_offset'] = offset
query = query.distinct()
return [dict(row) for row in conn.execute(query, parms).fetchall()]

View File

@ -16,6 +16,7 @@
import datetime
from oslo_utils import uuidutils
import six
from monasca_api.common.repositories import exceptions
from monasca_api.common.repositories import notifications_repository as nr
@ -178,10 +179,11 @@ class NotificationsRepository(sql_repository.SQLRepository,
@sql_repository.sql_try_catch_block
def find_notification_by_name(self, tenant_id, name):
name = name if six.PY3 else name.encode('utf8')
with self._db_engine.connect() as conn:
return conn.execute(self._select_nm_name_query,
b_tenant_id=tenant_id,
b_name=name.encode('utf8')).fetchone()
b_name=name).fetchone()
@sql_repository.sql_try_catch_block
def update_notification(

View File

@ -20,10 +20,10 @@ import time
import fixtures
from oslo_config import cfg
from oslo_db.sqlalchemy.engines import create_engine
from monasca_api.tests import base
from sqlalchemy import delete, MetaData, insert, bindparam
from monasca_api.common.repositories.sqla import models
from monasca_api.tests import base
CONF = cfg.CONF
@ -230,7 +230,7 @@ class TestAlarmRepoDB(base.BaseTestCase):
{'id': '234',
'tenant_id': 'bob',
'name': '50% CPU',
'severity': 'LOW',
'severity': 'CRITICAL',
'expression': 'AVG(cpu.sys_mem'
'{service=monitoring})'
' > 20 and AVG(cpu.idle_perc'
@ -412,6 +412,9 @@ class TestAlarmRepoDB(base.BaseTestCase):
{'dimension_set_id': b'2',
'name': 'flavor_id',
'value': '222'},
{'dimension_set_id': b'22',
'name': 'flavor_id',
'value': '333'},
{'dimension_set_id': b'21',
'name': 'service',
'value': 'monitoring'},
@ -464,7 +467,7 @@ class TestAlarmRepoDB(base.BaseTestCase):
'updated_timestamp': '2015-03-14T09:26:54Z'}
self.alarm_compound = {'alarm_definition': {'id': '234',
'name': '50% CPU',
'severity': 'LOW'},
'severity': 'CRITICAL'},
'created_timestamp': '2015-03-15T09:26:53Z',
'id': '234111',
'lifecycle_state': None,
@ -475,6 +478,7 @@ class TestAlarmRepoDB(base.BaseTestCase):
'service': 'monitoring'},
'name': 'cpu.sys_mem'},
{'dimensions': {'extra': 'vivi',
'flavor_id': '333',
'hostname': 'roland',
'region': 'colorado',
'service': 'monitoring'},
@ -666,6 +670,37 @@ class TestAlarmRepoDB(base.BaseTestCase):
self.assertEqual(res, expected)
query_parms = {'metric_dimensions': {'flavor_id': '333'}}
res = self.repo.get_alarms(tenant_id=tenant_id,
query_parms=query_parms,
limit=1000)
res = self.helper_builder_result(res)
expected = [self.alarm_compound]
self.assertEqual(res, expected)
query_parms = {'metric_dimensions': {'flavor_id': '222|333'}}
res = self.repo.get_alarms(tenant_id=tenant_id,
query_parms=query_parms,
limit=1000)
res = self.helper_builder_result(res)
expected = [self.alarm1,
self.alarm_compound,
self.alarm3]
self.assertEqual(res, expected)
query_parms = {'metric_dimensions': {'flavor_id': ''}}
res = self.repo.get_alarms(tenant_id=tenant_id,
query_parms=query_parms,
limit=1000)
res = self.helper_builder_result(res)
expected = [self.alarm1,
self.alarm_compound,
self.alarm3]
self.assertEqual(res, expected)
query_parms = {'metric_name': 'cpu.idle_perc',
'metric_dimensions': {'service': 'monitoring',
'hostname': 'roland'}}
@ -794,6 +829,237 @@ class TestAlarmRepoDB(base.BaseTestCase):
self.assertEqual(res, expected)
query_parms = {'severity': 'LOW'}
res = self.repo.get_alarms(tenant_id=tenant_id,
query_parms=query_parms,
limit=1000)
res = self.helper_builder_result(res)
expected = [self.alarm1,
self.alarm2,
self.alarm3]
self.assertEqual(expected, res)
query_parms = {'severity': 'CRITICAL'}
res = self.repo.get_alarms(tenant_id=tenant_id,
query_parms=query_parms,
limit=1000)
res = self.helper_builder_result(res)
expected = [self.alarm_compound]
self.assertEqual(expected, res)
query_parms = {'severity': 'LOW|CRITICAL'}
res = self.repo.get_alarms(tenant_id=tenant_id,
query_parms=query_parms,
limit=1000)
res = self.helper_builder_result(res)
expected = [self.alarm1,
self.alarm2,
self.alarm_compound,
self.alarm3]
self.assertEqual(expected, res)
def test_should_count(self):
tenant_id = 'bob'
res = self.repo.get_alarms_count(tenant_id=tenant_id)
self.assertEqual([{'count': 4}], res)
res = self.repo.get_alarms_count(tenant_id=tenant_id,
limit=1000)
self.assertEqual([{'count': 4}], res)
res = self.repo.get_alarms_count(tenant_id=tenant_id,
limit=1000,
offset=10)
self.assertEqual([], res)
alarm_def_id = self.alarm_compound['alarm_definition']['id']
query_parms = {'alarm_definition_id': alarm_def_id}
res = self.repo.get_alarms_count(tenant_id=tenant_id,
query_parms=query_parms,
limit=1000)
self.assertEqual([{'count': 1}], res)
query_parms = {'metric_name': 'cpu.sys_mem'}
res = self.repo.get_alarms_count(tenant_id=tenant_id,
query_parms=query_parms,
limit=1000)
self.assertEqual([{'count': 1}], res)
query_parms = {'state': 'UNDETERMINED'}
res = self.repo.get_alarms_count(tenant_id=tenant_id,
query_parms=query_parms,
limit=1000)
self.assertEqual([{'count': 2}], res)
time_now = '2015-03-15T00:00:00.0Z'
query_parms = {'state_updated_start_time': time_now}
res = self.repo.get_alarms_count(tenant_id=tenant_id,
query_parms=query_parms,
limit=1000)
self.assertEqual([{'count': 1}], res)
query_parms = {'severity': 'LOW'}
res = self.repo.get_alarms_count(tenant_id=tenant_id,
query_parms=query_parms,
limit=1000)
self.assertEqual([{'count': 3}], res)
query_parms = {'lifecycle_state': 'OPEN'}
res = self.repo.get_alarms_count(tenant_id=tenant_id,
query_parms=query_parms,
limit=1000)
self.assertEqual([{'count': 2}], res)
query_parms = {'link': 'http://somesite.com/this-alarm-info'}
res = self.repo.get_alarms_count(tenant_id=tenant_id,
query_parms=query_parms,
limit=1000)
self.assertEqual([{'count': 3}], res)
query_parms = {'metric_dimensions': {'flavor_id': '222'}}
res = self.repo.get_alarms_count(tenant_id=tenant_id,
query_parms=query_parms,
limit=1000)
self.assertEqual([{'count': 2}], res)
query_parms = {'group_by': ['metric_name']}
res = self.repo.get_alarms_count(tenant_id=tenant_id,
query_parms=query_parms,
limit=1000)
expected = [{'count': 4, 'metric_name': 'cpu.idle_perc'},
{'count': 1, 'metric_name': 'cpu.sys_mem'}]
self.assertEqual(expected, res)
query_parms = {'group_by': ['dimension_name']}
res = self.repo.get_alarms_count(tenant_id=tenant_id,
query_parms=query_parms,
limit=1000)
expected = [{'count': 1, 'dimension_name': 'extra'},
{'count': 3, 'dimension_name': 'flavor_id'},
{'count': 1, 'dimension_name': 'hostname'},
{'count': 2, 'dimension_name': 'instance_id'},
{'count': 1, 'dimension_name': 'region'},
{'count': 3, 'dimension_name': 'service'}]
self.assertEqual(expected, res)
query_parms = {'group_by': ['dimension_value']}
res = self.repo.get_alarms_count(tenant_id=tenant_id,
query_parms=query_parms,
limit=1000)
expected = [{'count': 2, 'dimension_value': '123'},
{'count': 2, 'dimension_value': '222'},
{'count': 1, 'dimension_value': '333'},
{'count': 1, 'dimension_value': 'colorado'},
{'count': 3, 'dimension_value': 'monitoring'},
{'count': 1, 'dimension_value': 'roland'},
{'count': 1, 'dimension_value': 'vivi'}]
self.assertEqual(expected, res)
query_parms = {'group_by': []}
res = self.repo.get_alarms_count(tenant_id=tenant_id,
query_parms=query_parms,
limit=1000)
self.assertEqual([{'count': 4}], res)
def test_should_sort_and_find(self):
tenant_id = 'bob'
query_parms = {'metric_name': 'cpu.idle_perc',
'sort_by': ['alarm_id']}
res = self.repo.get_alarms(tenant_id=tenant_id,
query_parms=query_parms,
limit=1000)
res = self.helper_builder_result(res)
expected = [self.alarm1,
self.alarm2,
self.alarm_compound,
self.alarm3]
self.assertEqual(expected, res)
query_parms = {'metric_name': 'cpu.idle_perc',
'sort_by': ['alarm_definition_id']}
res = self.repo.get_alarms(tenant_id=tenant_id,
query_parms=query_parms,
limit=1000)
res = self.helper_builder_result(res)
expected = [self.alarm1,
self.alarm2,
self.alarm3,
self.alarm_compound]
self.assertEqual(expected, res)
query_parms = {'metric_name': 'cpu.idle_perc',
'sort_by': ['alarm_definition_name']}
res = self.repo.get_alarms(tenant_id=tenant_id,
query_parms=query_parms,
limit=1000)
expected = [self.alarm_compound,
self.alarm1,
self.alarm2,
self.alarm3]
res = self.helper_builder_result(res)
self.assertEqual(expected, res)
query_parms = {'metric_name': 'cpu.idle_perc',
'sort_by': ['severity']}
res = self.repo.get_alarms(tenant_id=tenant_id,
query_parms=query_parms,
limit=1000)
expected = [self.alarm1,
self.alarm2,
self.alarm3,
self.alarm_compound]
res = self.helper_builder_result(res)
self.assertEqual(expected, res)
query_parms = {'metric_name': 'cpu.idle_perc',
'sort_by': ['state']}
res = self.repo.get_alarms(tenant_id=tenant_id,
query_parms=query_parms,
limit=1000)
expected = [self.alarm1,
self.alarm2,
self.alarm_compound,
self.alarm3]
res = self.helper_builder_result(res)
self.assertEqual(expected, res)
query_parms = {'metric_name': 'cpu.idle_perc',
'sort_by': ['alarm_id asc']}
res = self.repo.get_alarms(tenant_id=tenant_id,
query_parms=query_parms,
limit=1000)
res = self.helper_builder_result(res)
expected = [self.alarm1,
self.alarm2,
self.alarm_compound,
self.alarm3]
self.assertEqual(expected, res)
query_parms = {'metric_name': 'cpu.idle_perc',
'sort_by': ['alarm_id desc']}
res = self.repo.get_alarms(tenant_id=tenant_id,
query_parms=query_parms,
limit=1000)
res = self.helper_builder_result(res)
expected = [self.alarm3,
self.alarm_compound,
self.alarm2,
self.alarm1]
self.assertEqual(expected, res)
query_parms = {'metric_name': 'cpu.idle_perc',
'sort_by': ['alarm_id nfl']}
res = self.repo.get_alarms(tenant_id=tenant_id,
query_parms=query_parms,
limit=1000)
res = self.helper_builder_result(res)
expected = [self.alarm1,
self.alarm2,
self.alarm_compound,
self.alarm3]
self.assertEqual(expected, res)
def test_should_update(self):
tenant_id = 'bob'
alarm_id = '2'

View File

@ -286,6 +286,26 @@ class TestAlarmDefinitionRepoDB(base.BaseTestCase):
count_sadd = conn.execute(query_sadd, id=count_sad[0][0]).fetchone()
self.assertEqual(count_sadd[0], 3)
def test_should_try_to_create_with_wrong_alarm_action(self):
expression = ('AVG(hpcs.compute{flavor_id=777, image_id=888,'
' metric_name=cpu}) > 10')
description = ''
match_by = ['flavor_id', 'image_id']
sub_expr_list = alarm_expr_parser.AlarmExprParser(expression).sub_expr_list
alarm_actions = ['66666666']
args = ('555',
'90% CPU',
expression,
sub_expr_list,
description,
'LOW',
match_by,
alarm_actions,
None,
None)
self.assertRaises(exceptions.InvalidUpdateException,
self.repo.create_alarm_definition, *args)
def test_should_update(self):
expression = ''.join(['AVG(hpcs.compute{flavor_id=777, image_id=888,',
' metric_name=mem}) > 20 and',
@ -556,6 +576,17 @@ class TestAlarmDefinitionRepoDB(base.BaseTestCase):
sub_alarms = self.repo.get_sub_alarm_definitions('asdfasdf')
self.assertEqual(sub_alarms, [])
def test_try_update_alarm_that_does_not_exist(self):
args = ('koala', '999',
None, None,
None, True,
None, None,
None, None,
None, None,
True)
self.assertRaises(exceptions.DoesNotExistException,
self.repo.update_or_patch_alarm_definition, *args)
def test_exists(self):
alarmDef1 = self.repo.get_alarm_definitions(tenant_id='bob',
name='90% CPU')
@ -696,6 +727,38 @@ class TestAlarmDefinitionRepoDB(base.BaseTestCase):
limit=1)
self.assertEqual(alarmDef1, [])
def test_should_find_and_sort(self):
expected = [{'actions_enabled': False,
'alarm_actions': '29387234,77778687',
'description': None,
'expression': 'AVG(hpcs.compute{flavor_id=777, '
'image_id=888, metric_name=cpu, device=1}) > 10',
'id': '123',
'match_by': 'flavor_id,image_id',
'name': '90% CPU',
'ok_actions': None,
'severity': 'LOW',
'undetermined_actions': None},
{'actions_enabled': False,
'alarm_actions': '29387234,77778687',
'description': None,
'expression': 'AVG(hpcs.compute{flavor_id=777, '
'image_id=888, metric_name=mem}) > 20 and '
'AVG(hpcs.compute) < 100',
'id': '234',
'match_by': 'flavor_id,image_id',
'name': '50% CPU',
'ok_actions': None,
'severity': 'LOW',
'undetermined_actions': None}]
alarmDef1 = self.repo.get_alarm_definitions(tenant_id='bob',
sort_by=['id'])
self.assertEqual(expected, alarmDef1)
alarmDef2 = self.repo.get_alarm_definitions(tenant_id='bob',
sort_by=['name'])
self.assertEqual(expected[::-1], alarmDef2)
def test_should_delete_by_id(self):
self.repo.delete_alarm_definition('bob', '123')
from monasca_api.common.repositories import exceptions
@ -721,6 +784,10 @@ class TestAlarmDefinitionRepoDB(base.BaseTestCase):
limit=1)
self.assertEqual(alarmDef1, expected)
def test_try_delete_alarm_that_does_not_exist(self):
response = self.repo.delete_alarm_definition('goku', '123')
self.assertEqual(False, response)
def test_should_patch_name(self):
self.run_patch_test(name=u'90% CPU New')

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,55 @@
# Copyright 2018 Fujitsu LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import mock
from monasca_api.common.messaging.message_formats.metrics import transform
from monasca_api.tests import base
class TestMessageFormats(base.BaseTestCase):
@mock.patch('oslo_utils.timeutils.utcnow_ts')
def test_single_metrics(self, time):
time.return_value = 514862580
tenant_id = 222
region = 'default'
metrics = 'example.test'
expected_metrics = {'metric': 'example.test',
'creation_time': 514862580,
'meta':
{'region': 'default',
'tenantId': 222}}
transformed_metric = transform(metrics, tenant_id, region)
self.assertIsInstance(transformed_metric, list)
self.assertEqual(len(transformed_metric), 1)
self.assertEqual(expected_metrics, json.loads(transformed_metric[0]))
@mock.patch('oslo_utils.timeutils.utcnow_ts')
def test_multiple_metrics(self, time):
time.return_value = 514862580
tenant_id = 222
region = 'default'
metrics = ['example.test1', 'example.test2']
expected_metrics = []
for metric in metrics:
expected_metrics.append({'metric': metric,
'creation_time': 514862580,
'meta':
{'region': 'default',
'tenantId': 222}})
transformed_metrics = transform(metrics, tenant_id, region)
self.assertIsInstance(transformed_metrics, list)
self.assertEqual(len(transformed_metrics), len(metrics))
for transformed_metric in transformed_metrics:
self.assertIn(json.loads(transformed_metric), expected_metrics)

View File

@ -78,22 +78,33 @@ class TestNotificationMethodRepoDB(base.BaseTestCase):
from monasca_api.common.repositories.sqla import notifications_repository as nr
self.repo = nr.NotificationsRepository()
self.created_at = datetime.datetime.now()
self.updated_at = datetime.datetime.now()
self.default_nms = [{'id': '123',
'tenant_id': '444',
'name': 'MyEmail',
'type': 'EMAIL',
'address': 'a@b',
'period': 0,
'created_at': datetime.datetime.now(),
'updated_at': datetime.datetime.now()},
'created_at': self.created_at,
'updated_at': self.updated_at},
{'id': '124',
'tenant_id': '444',
'name': 'OtherEmail',
'type': 'EMAIL',
'address': 'a@b',
'period': 0,
'created_at': datetime.datetime.now(),
'updated_at': datetime.datetime.now()}]
'created_at': self.created_at,
'updated_at': self.updated_at},
{'id': '125',
'tenant_id': '444',
'name': 'AEmail',
'type': 'EMAIL',
'address': 'a@b',
'period': 0,
'created_at': self.created_at,
'updated_at': self.updated_at}
]
with self.engine.connect() as conn:
conn.execute(self._delete_nm_query)
@ -157,12 +168,27 @@ class TestNotificationMethodRepoDB(base.BaseTestCase):
self.assertEqual(nm['type'], 'EMAIL')
self.assertEqual(nm['address'], 'a@b')
def test_should_find_by_name(self):
nms = self.repo.find_notification_by_name('444', 'MyEmail')
expected = ('123', '444', 'MyEmail', 'EMAIL', 'a@b', 0,
self.created_at,
self.updated_at)
self.assertEqual(expected, nms)
def test_should_find(self):
nms = self.repo.list_notifications('444', None, None, 1)
nms = self.repo.list_notifications('444', None, None, 2)
self.assertEqual(nms, self.default_nms)
nms = self.repo.list_notifications('444', None, 2, 1)
nms = self.repo.list_notifications('444', None, 3, 1)
self.assertEqual(nms, [])
def test_should_find_and_sort(self):
nms = self.repo.list_notifications('444', ['id'], None, 3)
self.assertEqual(self.default_nms, nms)
nms = self.repo.list_notifications('444', ['name'], None, 3)
expected = sorted(self.default_nms, key=lambda k: k['name'])
self.assertEqual(expected, nms)
def test_update(self):
import copy
self.repo.update_notification('123', '444', 'Foo', 'EMAIL', 'abc', 0)

View File

@ -12,10 +12,11 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from mock import mock
from monasca_common.policy import policy_engine as policy
from oslo_policy import policy as os_policy
from monasca_api.api.core import request
import monasca_api.common.repositories.constants as const
from monasca_api.tests import base
from monasca_api.v2.common import exceptions
@ -23,6 +24,12 @@ from monasca_api.v2.common import exceptions
class TestRequest(base.BaseApiTestCase):
def setUp(self):
super(TestRequest, self).setUp()
rules = [
os_policy.RuleDefault("example:allowed", "user_id:222"),
]
policy.reset()
policy.init()
policy._ENFORCER.register_defaults(rules)
def test_use_context_from_request(self):
req = request.Request(
@ -32,8 +39,10 @@ class TestRequest(base.BaseApiTestCase):
'X_AUTH_TOKEN': '111',
'X_USER_ID': '222',
'X_PROJECT_ID': '333',
'X_ROLES': 'terminator,predator'
}
'X_ROLES': 'terminator,predator',
},
query_string='tenant_id=444'
)
)
@ -41,6 +50,35 @@ class TestRequest(base.BaseApiTestCase):
self.assertEqual('222', req.user_id)
self.assertEqual('333', req.project_id)
self.assertEqual(['terminator', 'predator'], req.roles)
self.assertEqual('444', req.cross_project_id)
def test_policy_validation_with_target(self):
req = request.Request(
self.create_environ(
path='/',
headers={
'X_AUTH_TOKEN': '111',
'X_USER_ID': '222',
'X_PROJECT_ID': '333',
}
)
)
target = {'project_id': req.project_id,
'user_id': req.user_id}
self.assertEqual(True, req.can('example:allowed', target))
def test_policy_validation_without_target(self):
req = request.Request(
self.create_environ(
path='/',
headers={
'X_AUTH_TOKEN': '111',
'X_USER_ID': '222',
'X_PROJECT_ID': '333',
}
)
)
self.assertEqual(True, req.can('example:allowed'))
class TestRequestLimit(base.BaseApiTestCase):
@ -85,8 +123,7 @@ class TestRequestLimit(base.BaseApiTestCase):
property_wrapper
)
@mock.patch('monasca_api.common.repositories.constants.PAGE_LIMIT')
def test_default_limit(self, page_limit):
def test_default_limit(self):
req = request.Request(
self.create_environ(
path='/',
@ -98,4 +135,19 @@ class TestRequestLimit(base.BaseApiTestCase):
}
)
)
self.assertEqual(page_limit, req.limit)
self.assertEqual(const.PAGE_LIMIT, req.limit)
def test_to_big_limit(self):
req = request.Request(
self.create_environ(
path='/',
headers={
'X_AUTH_TOKEN': '111',
'X_USER_ID': '222',
'X_PROJECT_ID': '333',
'X_ROLES': 'terminator,predator'
},
query_string='limit={}'.format(const.PAGE_LIMIT + 1),
)
)
self.assertEqual(const.PAGE_LIMIT, req.limit)

View File

@ -0,0 +1,124 @@
# Copyright 2019 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from monasca_api.common.repositories.model import sub_alarm_definition
from monasca_api.expression_parser import alarm_expr_parser
from monasca_api.tests import base
class TestSubAlarmDefinition(base.BaseTestCase):
def test_init_from_row(self):
sub_alarm_d_dict = {'id': '111',
'alarm_definition_id': '123',
'function': 'AVG',
'metric_name': 'darth.vader',
'operator': 'GT',
'threshold': 10,
'period': 60,
'periods': 1,
'is_deterministic': 1,
'dimensions': 'device=1,image_id=888'}
dimension_dict = {'device': '1',
'image_id': '888'}
sub_alarm_d = sub_alarm_definition.SubAlarmDefinition(row=sub_alarm_d_dict)
self.assertEqual(sub_alarm_d_dict['id'], sub_alarm_d.id)
self.assertEqual(sub_alarm_d_dict['alarm_definition_id'], sub_alarm_d.alarm_definition_id)
self.assertEqual(sub_alarm_d_dict['metric_name'], sub_alarm_d.metric_name)
self.assertEqual(sub_alarm_d_dict['dimensions'], sub_alarm_d.dimensions_str)
self.assertEqual(dimension_dict, sub_alarm_d.dimensions)
self.assertEqual(sub_alarm_d_dict['function'], sub_alarm_d.function)
self.assertEqual(sub_alarm_d_dict['operator'], sub_alarm_d.operator)
self.assertEqual(sub_alarm_d_dict['period'], sub_alarm_d.period)
self.assertEqual(sub_alarm_d_dict['periods'], sub_alarm_d.periods)
self.assertEqual(sub_alarm_d_dict['threshold'], sub_alarm_d.threshold)
self.assertEqual(True, sub_alarm_d.deterministic)
def test_init_from_sub_expr(self):
sub_alarm_d_dict = {'function': 'AVG',
'metric_name': 'darth.vader',
'operator': 'GT',
'threshold': 10.0,
'period': 60,
'periods': 1,
'is_deterministic': 0,
'dimensions': 'device=1,image_id=888'}
dimension_dict = {'device': '1',
'image_id': '888'}
expression = 'avg(darth.vader{device=1,image_id=888}, 60) GT 10.0 times 1'
sub_expr_list = alarm_expr_parser.AlarmExprParser(expression).sub_expr_list
sub_alarm_d = sub_alarm_definition.SubAlarmDefinition(sub_expr=sub_expr_list[0])
self.assertEqual(sub_alarm_d_dict['metric_name'], sub_alarm_d.metric_name)
self.assertEqual(sub_alarm_d_dict['dimensions'], sub_alarm_d.dimensions_str)
self.assertEqual(dimension_dict, sub_alarm_d.dimensions)
self.assertEqual(sub_alarm_d_dict['function'], sub_alarm_d.function)
self.assertEqual(sub_alarm_d_dict['operator'], sub_alarm_d.operator)
self.assertEqual(sub_alarm_d_dict['period'], sub_alarm_d.period)
self.assertEqual(sub_alarm_d_dict['periods'], sub_alarm_d.periods)
self.assertEqual(sub_alarm_d_dict['threshold'], sub_alarm_d.threshold)
self.assertEqual(False, sub_alarm_d.deterministic)
def test_init_from_both_row_and_sub_expr(self):
sub_alarm_d_dict = {'id': '111',
'alarm_definition_id': '123',
'function': 'AVG',
'metric_name': 'darth.vader',
'operator': 'GT',
'threshold': 10,
'period': 60,
'periods': 1,
'is_deterministic': 0,
'dimensions': 'device=1,image_id=888'}
expression = 'avg(darth.vader.compute{device=1,image_id=888}, 60) GT 10.0 times 1'
sub_expr_list = alarm_expr_parser.AlarmExprParser(expression).sub_expr_list
self.assertRaises(Exception, sub_alarm_definition.SubAlarmDefinition,
sub_alarm_d_dict, sub_expr_list) # noqa: E202
def test_build_expression_all_parameters(self):
expression = 'avg(darth.vader{over=9000}, deterministic, 60) GT 10.0 times 1'
sub_expr_list = alarm_expr_parser.AlarmExprParser(expression).sub_expr_list
sub_alarm_d = sub_alarm_definition.SubAlarmDefinition(sub_expr=sub_expr_list[0])
self.assertEqual(expression, sub_alarm_d.expression)
sub_alarm_d.dimensions_str = None
sub_alarm_d.dimensions = None
sub_alarm_d.deterministic = False
sub_alarm_d.period = None
sub_alarm_d.periods = None
self.assertEqual('avg(darth.vader) GT 10.0', sub_alarm_d.expression)
def test_equality_method(self):
sub_alarm_d_dict = {'id': '111',
'alarm_definition_id': '123',
'function': 'AVG',
'metric_name': 'darth.vader',
'operator': 'GT',
'threshold': 10,
'period': 60,
'periods': 1,
'is_deterministic': 0,
'dimensions': 'device=1,image_id=888'}
sub_alarm_d = sub_alarm_definition.SubAlarmDefinition(row=sub_alarm_d_dict)
sub_alarm_d2 = sub_alarm_definition.SubAlarmDefinition(row=sub_alarm_d_dict)
# same object
self.assertEqual(True, sub_alarm_d == sub_alarm_d)
# different type
self.assertEqual(False, sub_alarm_d == list())
# Equal
self.assertEqual(True, sub_alarm_d == sub_alarm_d2)
# equal but different id
sub_alarm_d2.id = '222'
self.assertEqual(True, sub_alarm_d == sub_alarm_d2)
# Not Equal
sub_alarm_d2.metric_name = 'luck.iamyourfather'
self.assertEqual(False, sub_alarm_d == sub_alarm_d2)

View File

@ -392,7 +392,7 @@ class AlarmDefinitions(alarm_definitions_api_v2.AlarmDefinitionsV2API,
except (pyparsing.ParseException,
pyparsing.ParseFatalException) as ex:
LOG.exception(ex)
title = "Invalid alarm expression".encode('utf8')
title = "Invalid alarm expression"
msg = "parser failed on expression '{}' at column {}: {}".format(
expression.encode('utf8'), str(ex.column).encode('utf8'),
ex.msg.encode('utf8'))