diff --git a/monasca_notification/common/repositories/orm/orm_repo.py b/monasca_notification/common/repositories/orm/orm_repo.py index 4e5c099..508e7f1 100644 --- a/monasca_notification/common/repositories/orm/orm_repo.py +++ b/monasca_notification/common/repositories/orm/orm_repo.py @@ -13,7 +13,7 @@ import logging from sqlalchemy import engine_from_config, MetaData -from sqlalchemy.sql import select, bindparam, and_ +from sqlalchemy.sql import select, bindparam, and_, insert from sqlalchemy.exc import DatabaseError from monasca_notification.common.repositories import exceptions as exc @@ -30,7 +30,8 @@ class OrmRepo(object): aa = models.create_alarm_action_model(metadata).alias('aa') nm = models.create_notification_method_model(metadata).alias('nm') - nmt = models.create_notification_method_type_model(metadata).alias('nmt') + nmt_insert = models.create_notification_method_type_model(metadata) + nmt = nmt_insert.alias('nmt') a = models.create_alarm_model(metadata).alias('a') self._orm_query = select([nm.c.id, nm.c.type, nm.c.name, nm.c.address, nm.c.period])\ @@ -46,6 +47,8 @@ class OrmRepo(object): self._orm_get_notification = select([nm.c.name, nm.c.type, nm.c.address, nm.c.period])\ .where(nm.c.id == bindparam('notification_id')) + self._orm_add_notification_type = insert(nmt_insert).values(name=bindparam('b_name')) + self._orm = None def fetch_notifications(self, alarm): @@ -86,13 +89,16 @@ class OrmRepo(object): raise exc.DatabaseException(e) def insert_notification_method_types(self, notification_types): + # This function can be called by multiple processes at same time. + # In that case, only the first one will succeed and the others will fail. + # We can ignore this error when the types have already been registered. try: with self._orm_engine.connect() as conn: for notification_type in notification_types: - conn.execute(self.nmt.insert(), notification_type) + conn.execute(self._orm_add_notification_type, b_name=notification_type) except DatabaseError as e: - log.exception("Couldn't insert notification types %s", e) + log.debug("Failed to insert notification types %s", notification_types) raise exc.DatabaseException(e) def get_notification(self, notification_id): diff --git a/monasca_notification/processors/notification_processor.py b/monasca_notification/processors/notification_processor.py index 53d51b1..e757e4f 100644 --- a/monasca_notification/processors/notification_processor.py +++ b/monasca_notification/processors/notification_processor.py @@ -15,6 +15,7 @@ import logging +from monasca_notification.common.repositories import exceptions as exc from monasca_notification.common.utils import get_db_repo from monasca_notification.common.utils import get_statsd_client from monasca_notification.types import notifiers @@ -32,19 +33,34 @@ class NotificationProcessor(object): self._db_repo = get_db_repo(config) self.insert_configured_plugins() + def _remaining_plugin_types(self): + configured_plugin_types = notifiers.enabled_notifications() + persisted_plugin_types = self._db_repo.fetch_notification_method_types() + + return set(configured_plugin_types) - set(persisted_plugin_types) + def insert_configured_plugins(self): """Persists configured plugin types in DB For each notification type configured add it in db, if it is not there """ - configured_plugin_types = notifiers.enabled_notifications() + remaining_plugin_types = self._remaining_plugin_types() - persisted_plugin_types = self._db_repo.fetch_notification_method_types() - remaining_plugin_types = set(configured_plugin_types) - set(persisted_plugin_types) - - if remaining_plugin_types: - log.info("New plugins detected: Adding new notification types {} to database" - .format(remaining_plugin_types)) - self._db_repo.insert_notification_method_types(remaining_plugin_types) + max_retry = len(remaining_plugin_types) + retry_count = 0 + while remaining_plugin_types: + retry_count = retry_count + 1 + try: + log.info("New plugins detected: Adding new notification types %s to database", + remaining_plugin_types) + self._db_repo.insert_notification_method_types(remaining_plugin_types) + except exc.DatabaseException as e: + # There is a possibility the other process has already registered the type. + remaining_plugin_types = self._remaining_plugin_types() + if remaining_plugin_types and (retry_count >= max_retry): + log.exception("Couldn't insert notification types %s", e) + raise e + else: + log.debug("Plugin already exists. Ignore exception") def send(self, notifications): """Send the notifications diff --git a/test-requirements.txt b/test-requirements.txt index 45d44b8..c8b2828 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -8,6 +8,7 @@ coverage!=4.4,>=4.0 # Apache-2.0 mock>=2.0 # BSD funcsigs>=0.4;python_version=='2.7' or python_version=='2.6' # Apache-2.0 os-testr>=0.8.0 # Apache-2.0 +oslotest>=1.10.0 # Apache-2.0 testrepository>=0.0.18 # Apache-2.0/BSD SQLAlchemy!=1.1.5,!=1.1.6,!=1.1.7,!=1.1.8,>=1.0.10 # MIT PyMySQL>=0.7.6 # MIT License diff --git a/tests/test_orm_repo.py b/tests/test_orm_repo.py new file mode 100644 index 0000000..beff6bd --- /dev/null +++ b/tests/test_orm_repo.py @@ -0,0 +1,109 @@ +# Copyright 2017 FUJITSU LIMITED +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing permissions and limitations under +# the License. + +import mock +import sqlalchemy + +from monasca_notification.common.repositories.orm import orm_repo +from oslotest import base + +class TestOrmRepo(base.BaseTestCase): + @mock.patch('monasca_notification.common.repositories.orm.orm_repo.engine_from_config') + def setUp(self, mock_sql_engine_from_config): + super(TestOrmRepo, self).setUp() + config = {'database': + {'orm': + {'url': 'mysql+pymysql://user:password@hostname:3306/mon'}}} + self._rep = orm_repo.OrmRepo(config) + self.mock_conn = \ + self._rep._orm_engine.connect.return_value.__enter__.return_value + + def test_fetch_notifications_success(self): + alarmdef_id = 'alarmdef-123' + new_state = 'alarm' + + alarm = {'alarmDefinitionId': alarmdef_id, + 'newState': new_state} + self.mock_conn.execute.return_value = [('notification-123', + 'EMAIL', + 'notification-name', + 'testaddress', + 0)] + + self.assertEqual([('notification-123', + 'email', + 'notification-name', + 'testaddress', + 0)], + self._rep.fetch_notifications(alarm)) + + self.mock_conn.execute.assert_called_once() + self.assertEqual(self._rep._orm_query, + self.mock_conn.execute.call_args_list[0][0][0]) + self.assertEqual({'alarm_definition_id': alarmdef_id, + 'alarm_state': new_state}, + self.mock_conn.execute.call_args_list[0][1]) + + def test_get_alarm_current_state_success(self): + alarm_id = 'alarm-123' + alarm_state = 'alarm' + self.mock_conn.execute.return_value.fetchone.return_value = [alarm_state] + + self.assertEqual(self._rep.get_alarm_current_state(alarm_id), alarm_state) + + self.mock_conn.execute.assert_called_once() + self.assertEqual(self._rep._orm_get_alarm_state, + self.mock_conn.execute.call_args_list[0][0][0]) + self.assertEqual({'alarm_id': alarm_id}, + self.mock_conn.execute.call_args_list[0][1]) + + def test_fetch_notification_method_types_success(self): + notification_methods = [('EMAIL',), ('WEBHOOK',)] + self.mock_conn.execute.return_value.fetchall.return_value = [('EMAIL',), ('WEBHOOK',)] + + self.assertEqual(self._rep.fetch_notification_method_types(), ['EMAIL', 'WEBHOOK']) + + self.mock_conn.execute.assert_called_once() + self.assertEqual(self._rep._orm_nmt_query, + self.mock_conn.execute.call_args_list[0][0][0]) + + def test_insert_notification_method_types_success(self): + notification_types = ['SLACK', 'HIPCHAT', 'JIRA'] + self.mock_conn.execute.return_value = 1 + + self._rep.insert_notification_method_types(notification_types) + + self.assertEqual(self._rep._orm_add_notification_type, + self.mock_conn.execute.call_args_list[0][0][0]) + self.assertEqual({'b_name': 'SLACK'}, + self.mock_conn.execute.call_args_list[0][1]) + + def test_get_notification_success(self): + notification_id = 'notification-123' + self.mock_conn.execute.return_value.fetchone.return_value = [ + 'notification-123', + 'email', + 'notification-name', + 'testaddress', + 0] + + self.assertEqual(['notification-123', + 'email', + 'notification-name', + 'testaddress'], + self._rep.get_notification(notification_id)) + + self.mock_conn.execute.assert_called_once() + self.assertEqual(self._rep._orm_get_notification, + self.mock_conn.execute.call_args_list[0][0][0]) + self.assertEqual({'notification_id': notification_id}, + self.mock_conn.execute.call_args_list[0][1])