Fix insert notification type of ORM
This patch fixes insert_notification_method_types of ORM. Change-Id: I944f7cea583dd56cd1c777b1633498dde2abd31b Story: 2001069 Task: 4685
This commit is contained in:
parent
9eb280853f
commit
46b9afd03c
|
@ -13,7 +13,7 @@
|
|||
|
||||
import logging
|
||||
from sqlalchemy import engine_from_config, MetaData
|
||||
from sqlalchemy.sql import select, bindparam, and_
|
||||
from sqlalchemy.sql import select, bindparam, and_, insert
|
||||
from sqlalchemy.exc import DatabaseError
|
||||
|
||||
from monasca_notification.common.repositories import exceptions as exc
|
||||
|
@ -30,7 +30,8 @@ class OrmRepo(object):
|
|||
|
||||
aa = models.create_alarm_action_model(metadata).alias('aa')
|
||||
nm = models.create_notification_method_model(metadata).alias('nm')
|
||||
nmt = models.create_notification_method_type_model(metadata).alias('nmt')
|
||||
nmt_insert = models.create_notification_method_type_model(metadata)
|
||||
nmt = nmt_insert.alias('nmt')
|
||||
a = models.create_alarm_model(metadata).alias('a')
|
||||
|
||||
self._orm_query = select([nm.c.id, nm.c.type, nm.c.name, nm.c.address, nm.c.period])\
|
||||
|
@ -46,6 +47,8 @@ class OrmRepo(object):
|
|||
self._orm_get_notification = select([nm.c.name, nm.c.type, nm.c.address, nm.c.period])\
|
||||
.where(nm.c.id == bindparam('notification_id'))
|
||||
|
||||
self._orm_add_notification_type = insert(nmt_insert).values(name=bindparam('b_name'))
|
||||
|
||||
self._orm = None
|
||||
|
||||
def fetch_notifications(self, alarm):
|
||||
|
@ -86,13 +89,16 @@ class OrmRepo(object):
|
|||
raise exc.DatabaseException(e)
|
||||
|
||||
def insert_notification_method_types(self, notification_types):
|
||||
# This function can be called by multiple processes at same time.
|
||||
# In that case, only the first one will succeed and the others will fail.
|
||||
# We can ignore this error when the types have already been registered.
|
||||
try:
|
||||
with self._orm_engine.connect() as conn:
|
||||
for notification_type in notification_types:
|
||||
conn.execute(self.nmt.insert(), notification_type)
|
||||
conn.execute(self._orm_add_notification_type, b_name=notification_type)
|
||||
|
||||
except DatabaseError as e:
|
||||
log.exception("Couldn't insert notification types %s", e)
|
||||
log.debug("Failed to insert notification types %s", notification_types)
|
||||
raise exc.DatabaseException(e)
|
||||
|
||||
def get_notification(self, notification_id):
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
import logging
|
||||
|
||||
from monasca_notification.common.repositories import exceptions as exc
|
||||
from monasca_notification.common.utils import get_db_repo
|
||||
from monasca_notification.common.utils import get_statsd_client
|
||||
from monasca_notification.types import notifiers
|
||||
|
@ -32,19 +33,34 @@ class NotificationProcessor(object):
|
|||
self._db_repo = get_db_repo(config)
|
||||
self.insert_configured_plugins()
|
||||
|
||||
def _remaining_plugin_types(self):
|
||||
configured_plugin_types = notifiers.enabled_notifications()
|
||||
persisted_plugin_types = self._db_repo.fetch_notification_method_types()
|
||||
|
||||
return set(configured_plugin_types) - set(persisted_plugin_types)
|
||||
|
||||
def insert_configured_plugins(self):
|
||||
"""Persists configured plugin types in DB
|
||||
For each notification type configured add it in db, if it is not there
|
||||
"""
|
||||
configured_plugin_types = notifiers.enabled_notifications()
|
||||
remaining_plugin_types = self._remaining_plugin_types()
|
||||
|
||||
persisted_plugin_types = self._db_repo.fetch_notification_method_types()
|
||||
remaining_plugin_types = set(configured_plugin_types) - set(persisted_plugin_types)
|
||||
|
||||
if remaining_plugin_types:
|
||||
log.info("New plugins detected: Adding new notification types {} to database"
|
||||
.format(remaining_plugin_types))
|
||||
self._db_repo.insert_notification_method_types(remaining_plugin_types)
|
||||
max_retry = len(remaining_plugin_types)
|
||||
retry_count = 0
|
||||
while remaining_plugin_types:
|
||||
retry_count = retry_count + 1
|
||||
try:
|
||||
log.info("New plugins detected: Adding new notification types %s to database",
|
||||
remaining_plugin_types)
|
||||
self._db_repo.insert_notification_method_types(remaining_plugin_types)
|
||||
except exc.DatabaseException as e:
|
||||
# There is a possibility the other process has already registered the type.
|
||||
remaining_plugin_types = self._remaining_plugin_types()
|
||||
if remaining_plugin_types and (retry_count >= max_retry):
|
||||
log.exception("Couldn't insert notification types %s", e)
|
||||
raise e
|
||||
else:
|
||||
log.debug("Plugin already exists. Ignore exception")
|
||||
|
||||
def send(self, notifications):
|
||||
"""Send the notifications
|
||||
|
|
|
@ -8,6 +8,7 @@ coverage!=4.4,>=4.0 # Apache-2.0
|
|||
mock>=2.0 # BSD
|
||||
funcsigs>=0.4;python_version=='2.7' or python_version=='2.6' # Apache-2.0
|
||||
os-testr>=0.8.0 # Apache-2.0
|
||||
oslotest>=1.10.0 # Apache-2.0
|
||||
testrepository>=0.0.18 # Apache-2.0/BSD
|
||||
SQLAlchemy!=1.1.5,!=1.1.6,!=1.1.7,!=1.1.8,>=1.0.10 # MIT
|
||||
PyMySQL>=0.7.6 # MIT License
|
||||
|
|
|
@ -0,0 +1,109 @@
|
|||
# Copyright 2017 FUJITSU LIMITED
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
|
||||
# in compliance with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
# or implied. See the License for the specific language governing permissions and limitations under
|
||||
# the License.
|
||||
|
||||
import mock
|
||||
import sqlalchemy
|
||||
|
||||
from monasca_notification.common.repositories.orm import orm_repo
|
||||
from oslotest import base
|
||||
|
||||
class TestOrmRepo(base.BaseTestCase):
|
||||
@mock.patch('monasca_notification.common.repositories.orm.orm_repo.engine_from_config')
|
||||
def setUp(self, mock_sql_engine_from_config):
|
||||
super(TestOrmRepo, self).setUp()
|
||||
config = {'database':
|
||||
{'orm':
|
||||
{'url': 'mysql+pymysql://user:password@hostname:3306/mon'}}}
|
||||
self._rep = orm_repo.OrmRepo(config)
|
||||
self.mock_conn = \
|
||||
self._rep._orm_engine.connect.return_value.__enter__.return_value
|
||||
|
||||
def test_fetch_notifications_success(self):
|
||||
alarmdef_id = 'alarmdef-123'
|
||||
new_state = 'alarm'
|
||||
|
||||
alarm = {'alarmDefinitionId': alarmdef_id,
|
||||
'newState': new_state}
|
||||
self.mock_conn.execute.return_value = [('notification-123',
|
||||
'EMAIL',
|
||||
'notification-name',
|
||||
'testaddress',
|
||||
0)]
|
||||
|
||||
self.assertEqual([('notification-123',
|
||||
'email',
|
||||
'notification-name',
|
||||
'testaddress',
|
||||
0)],
|
||||
self._rep.fetch_notifications(alarm))
|
||||
|
||||
self.mock_conn.execute.assert_called_once()
|
||||
self.assertEqual(self._rep._orm_query,
|
||||
self.mock_conn.execute.call_args_list[0][0][0])
|
||||
self.assertEqual({'alarm_definition_id': alarmdef_id,
|
||||
'alarm_state': new_state},
|
||||
self.mock_conn.execute.call_args_list[0][1])
|
||||
|
||||
def test_get_alarm_current_state_success(self):
|
||||
alarm_id = 'alarm-123'
|
||||
alarm_state = 'alarm'
|
||||
self.mock_conn.execute.return_value.fetchone.return_value = [alarm_state]
|
||||
|
||||
self.assertEqual(self._rep.get_alarm_current_state(alarm_id), alarm_state)
|
||||
|
||||
self.mock_conn.execute.assert_called_once()
|
||||
self.assertEqual(self._rep._orm_get_alarm_state,
|
||||
self.mock_conn.execute.call_args_list[0][0][0])
|
||||
self.assertEqual({'alarm_id': alarm_id},
|
||||
self.mock_conn.execute.call_args_list[0][1])
|
||||
|
||||
def test_fetch_notification_method_types_success(self):
|
||||
notification_methods = [('EMAIL',), ('WEBHOOK',)]
|
||||
self.mock_conn.execute.return_value.fetchall.return_value = [('EMAIL',), ('WEBHOOK',)]
|
||||
|
||||
self.assertEqual(self._rep.fetch_notification_method_types(), ['EMAIL', 'WEBHOOK'])
|
||||
|
||||
self.mock_conn.execute.assert_called_once()
|
||||
self.assertEqual(self._rep._orm_nmt_query,
|
||||
self.mock_conn.execute.call_args_list[0][0][0])
|
||||
|
||||
def test_insert_notification_method_types_success(self):
|
||||
notification_types = ['SLACK', 'HIPCHAT', 'JIRA']
|
||||
self.mock_conn.execute.return_value = 1
|
||||
|
||||
self._rep.insert_notification_method_types(notification_types)
|
||||
|
||||
self.assertEqual(self._rep._orm_add_notification_type,
|
||||
self.mock_conn.execute.call_args_list[0][0][0])
|
||||
self.assertEqual({'b_name': 'SLACK'},
|
||||
self.mock_conn.execute.call_args_list[0][1])
|
||||
|
||||
def test_get_notification_success(self):
|
||||
notification_id = 'notification-123'
|
||||
self.mock_conn.execute.return_value.fetchone.return_value = [
|
||||
'notification-123',
|
||||
'email',
|
||||
'notification-name',
|
||||
'testaddress',
|
||||
0]
|
||||
|
||||
self.assertEqual(['notification-123',
|
||||
'email',
|
||||
'notification-name',
|
||||
'testaddress'],
|
||||
self._rep.get_notification(notification_id))
|
||||
|
||||
self.mock_conn.execute.assert_called_once()
|
||||
self.assertEqual(self._rep._orm_get_notification,
|
||||
self.mock_conn.execute.call_args_list[0][0][0])
|
||||
self.assertEqual({'notification_id': notification_id},
|
||||
self.mock_conn.execute.call_args_list[0][1])
|
Loading…
Reference in New Issue