Fix insert notification type of ORM

This patch fixes insert_notification_method_types of ORM.

Change-Id: I944f7cea583dd56cd1c777b1633498dde2abd31b
Story: 2001069
Task: 4685
This commit is contained in:
Koji Nakazono 2017-06-20 12:23:36 +09:00
parent 9eb280853f
commit 46b9afd03c
4 changed files with 144 additions and 12 deletions

View File

@ -13,7 +13,7 @@
import logging
from sqlalchemy import engine_from_config, MetaData
from sqlalchemy.sql import select, bindparam, and_
from sqlalchemy.sql import select, bindparam, and_, insert
from sqlalchemy.exc import DatabaseError
from monasca_notification.common.repositories import exceptions as exc
@ -30,7 +30,8 @@ class OrmRepo(object):
aa = models.create_alarm_action_model(metadata).alias('aa')
nm = models.create_notification_method_model(metadata).alias('nm')
nmt = models.create_notification_method_type_model(metadata).alias('nmt')
nmt_insert = models.create_notification_method_type_model(metadata)
nmt = nmt_insert.alias('nmt')
a = models.create_alarm_model(metadata).alias('a')
self._orm_query = select([nm.c.id, nm.c.type, nm.c.name, nm.c.address, nm.c.period])\
@ -46,6 +47,8 @@ class OrmRepo(object):
self._orm_get_notification = select([nm.c.name, nm.c.type, nm.c.address, nm.c.period])\
.where(nm.c.id == bindparam('notification_id'))
self._orm_add_notification_type = insert(nmt_insert).values(name=bindparam('b_name'))
self._orm = None
def fetch_notifications(self, alarm):
@ -86,13 +89,16 @@ class OrmRepo(object):
raise exc.DatabaseException(e)
def insert_notification_method_types(self, notification_types):
# This function can be called by multiple processes at same time.
# In that case, only the first one will succeed and the others will fail.
# We can ignore this error when the types have already been registered.
try:
with self._orm_engine.connect() as conn:
for notification_type in notification_types:
conn.execute(self.nmt.insert(), notification_type)
conn.execute(self._orm_add_notification_type, b_name=notification_type)
except DatabaseError as e:
log.exception("Couldn't insert notification types %s", e)
log.debug("Failed to insert notification types %s", notification_types)
raise exc.DatabaseException(e)
def get_notification(self, notification_id):

View File

@ -15,6 +15,7 @@
import logging
from monasca_notification.common.repositories import exceptions as exc
from monasca_notification.common.utils import get_db_repo
from monasca_notification.common.utils import get_statsd_client
from monasca_notification.types import notifiers
@ -32,19 +33,34 @@ class NotificationProcessor(object):
self._db_repo = get_db_repo(config)
self.insert_configured_plugins()
def _remaining_plugin_types(self):
configured_plugin_types = notifiers.enabled_notifications()
persisted_plugin_types = self._db_repo.fetch_notification_method_types()
return set(configured_plugin_types) - set(persisted_plugin_types)
def insert_configured_plugins(self):
"""Persists configured plugin types in DB
For each notification type configured add it in db, if it is not there
"""
configured_plugin_types = notifiers.enabled_notifications()
remaining_plugin_types = self._remaining_plugin_types()
persisted_plugin_types = self._db_repo.fetch_notification_method_types()
remaining_plugin_types = set(configured_plugin_types) - set(persisted_plugin_types)
if remaining_plugin_types:
log.info("New plugins detected: Adding new notification types {} to database"
.format(remaining_plugin_types))
self._db_repo.insert_notification_method_types(remaining_plugin_types)
max_retry = len(remaining_plugin_types)
retry_count = 0
while remaining_plugin_types:
retry_count = retry_count + 1
try:
log.info("New plugins detected: Adding new notification types %s to database",
remaining_plugin_types)
self._db_repo.insert_notification_method_types(remaining_plugin_types)
except exc.DatabaseException as e:
# There is a possibility the other process has already registered the type.
remaining_plugin_types = self._remaining_plugin_types()
if remaining_plugin_types and (retry_count >= max_retry):
log.exception("Couldn't insert notification types %s", e)
raise e
else:
log.debug("Plugin already exists. Ignore exception")
def send(self, notifications):
"""Send the notifications

View File

@ -8,6 +8,7 @@ coverage!=4.4,>=4.0 # Apache-2.0
mock>=2.0 # BSD
funcsigs>=0.4;python_version=='2.7' or python_version=='2.6' # Apache-2.0
os-testr>=0.8.0 # Apache-2.0
oslotest>=1.10.0 # Apache-2.0
testrepository>=0.0.18 # Apache-2.0/BSD
SQLAlchemy!=1.1.5,!=1.1.6,!=1.1.7,!=1.1.8,>=1.0.10 # MIT
PyMySQL>=0.7.6 # MIT License

109
tests/test_orm_repo.py Normal file
View File

@ -0,0 +1,109 @@
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under
# the License.
import mock
import sqlalchemy
from monasca_notification.common.repositories.orm import orm_repo
from oslotest import base
class TestOrmRepo(base.BaseTestCase):
@mock.patch('monasca_notification.common.repositories.orm.orm_repo.engine_from_config')
def setUp(self, mock_sql_engine_from_config):
super(TestOrmRepo, self).setUp()
config = {'database':
{'orm':
{'url': 'mysql+pymysql://user:password@hostname:3306/mon'}}}
self._rep = orm_repo.OrmRepo(config)
self.mock_conn = \
self._rep._orm_engine.connect.return_value.__enter__.return_value
def test_fetch_notifications_success(self):
alarmdef_id = 'alarmdef-123'
new_state = 'alarm'
alarm = {'alarmDefinitionId': alarmdef_id,
'newState': new_state}
self.mock_conn.execute.return_value = [('notification-123',
'EMAIL',
'notification-name',
'testaddress',
0)]
self.assertEqual([('notification-123',
'email',
'notification-name',
'testaddress',
0)],
self._rep.fetch_notifications(alarm))
self.mock_conn.execute.assert_called_once()
self.assertEqual(self._rep._orm_query,
self.mock_conn.execute.call_args_list[0][0][0])
self.assertEqual({'alarm_definition_id': alarmdef_id,
'alarm_state': new_state},
self.mock_conn.execute.call_args_list[0][1])
def test_get_alarm_current_state_success(self):
alarm_id = 'alarm-123'
alarm_state = 'alarm'
self.mock_conn.execute.return_value.fetchone.return_value = [alarm_state]
self.assertEqual(self._rep.get_alarm_current_state(alarm_id), alarm_state)
self.mock_conn.execute.assert_called_once()
self.assertEqual(self._rep._orm_get_alarm_state,
self.mock_conn.execute.call_args_list[0][0][0])
self.assertEqual({'alarm_id': alarm_id},
self.mock_conn.execute.call_args_list[0][1])
def test_fetch_notification_method_types_success(self):
notification_methods = [('EMAIL',), ('WEBHOOK',)]
self.mock_conn.execute.return_value.fetchall.return_value = [('EMAIL',), ('WEBHOOK',)]
self.assertEqual(self._rep.fetch_notification_method_types(), ['EMAIL', 'WEBHOOK'])
self.mock_conn.execute.assert_called_once()
self.assertEqual(self._rep._orm_nmt_query,
self.mock_conn.execute.call_args_list[0][0][0])
def test_insert_notification_method_types_success(self):
notification_types = ['SLACK', 'HIPCHAT', 'JIRA']
self.mock_conn.execute.return_value = 1
self._rep.insert_notification_method_types(notification_types)
self.assertEqual(self._rep._orm_add_notification_type,
self.mock_conn.execute.call_args_list[0][0][0])
self.assertEqual({'b_name': 'SLACK'},
self.mock_conn.execute.call_args_list[0][1])
def test_get_notification_success(self):
notification_id = 'notification-123'
self.mock_conn.execute.return_value.fetchone.return_value = [
'notification-123',
'email',
'notification-name',
'testaddress',
0]
self.assertEqual(['notification-123',
'email',
'notification-name',
'testaddress'],
self._rep.get_notification(notification_id))
self.mock_conn.execute.assert_called_once()
self.assertEqual(self._rep._orm_get_notification,
self.mock_conn.execute.call_args_list[0][0][0])
self.assertEqual({'notification_id': notification_id},
self.mock_conn.execute.call_args_list[0][1])