# Copyright 2015 Hewlett-Packard # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import uuid import MySQLdb from oslo_log import log from oslo_utils import timeutils from monasca_events_api.common.repositories import constants from monasca_events_api.common.repositories import exceptions from monasca_events_api.common.repositories.mysql import mysql_repository from monasca_events_api.common.repositories import streams_repository as sdr LOG = log.getLogger(__name__) class StreamsRepository(mysql_repository.MySQLRepository, sdr.StreamsRepository): base_query = """ select sd.id, sd.tenant_id, sd.name, sd.description, sd.select_by, sd.group_by, sd.fire_criteria, sd.expiration, sd.actions_enabled, sd.created_at, sd.updated_at, sd.deleted_at, saf.fire_actions, sae.expire_actions from stream_definition as sd left join (select stream_definition_id, group_concat(action_id) as fire_actions from stream_actions where action_type = 'FIRE' group by stream_definition_id) as saf on saf.stream_definition_id = sd.id left join (select stream_definition_id, group_concat(action_id) as expire_actions from stream_actions where action_type = 'EXPIRE' group by stream_definition_id) as sae on sae.stream_definition_id = sd.id """ def __init__(self): super(StreamsRepository, self).__init__() @mysql_repository.mysql_try_catch_block def get_stream_definition(self, tenant_id, stream_definition_id): parms = [tenant_id, stream_definition_id] where_clause = """ where sd.tenant_id = %s and sd.id = %s and deleted_at is NULL """ query = StreamsRepository.base_query + where_clause rows = self._execute_query(query, parms) if rows: return rows[0] else: raise exceptions.DoesNotExistException @mysql_repository.mysql_try_catch_block def get_stream_definitions(self, tenant_id, name, offset=None, limit=None): parms = [tenant_id] select_clause = StreamsRepository.base_query where_clause = " where sd.tenant_id = %s and deleted_at is NULL " if name: where_clause += " and sd.name = %s " parms.append(name.encode('utf8')) if offset is not None: order_by_clause = " order by sd.id, sd.created_at " where_clause += " and sd.id > %s " parms.append(offset.encode('utf8')) limit_clause = " limit %s " parms.append(constants.PAGE_LIMIT) else: order_by_clause = " order by sd.created_at " limit_clause = "" if limit: limit_clause = " limit %s" parms.append(int(limit)) query = select_clause + where_clause + order_by_clause + limit_clause return self._execute_query(query, parms) @mysql_repository.mysql_try_catch_block def get_all_stream_definitions(self, offset=None, limit=None): parms = [] select_clause = StreamsRepository.base_query where_clause = " where deleted_at is NULL " if offset is not None: order_by_clause = " order by sd.id, sd.created_at " where_clause += " and sd.id > %s " parms.append(offset.encode('utf8')) limit_clause = " limit %s " if limit is not None: parms.append(limit) else: parms.append(constants.PAGE_LIMIT) else: order_by_clause = " order by sd.created_at " limit_clause = "" query = select_clause + where_clause + order_by_clause + limit_clause return self._execute_query(query, parms) @mysql_repository.mysql_try_catch_block def delete_stream_definition(self, tenant_id, stream_definition_id): """Delete the stream definition. :param tenant_id: :param stream_definition_id: :returns True: -- if stream definition exists and was deleted. :returns False: -- if the stream definition does not exists. :raises RepositoryException: """ cnxn, cursor = self._get_cnxn_cursor_tuple() with cnxn: cursor.execute("""delete from stream_definition where tenant_id = %s and id = %s""", [tenant_id, stream_definition_id]) if cursor.rowcount < 1: return False return True @mysql_repository.mysql_try_catch_block def create_stream_definition(self, tenant_id, name, description, select, group_by, fire_criteria, expiration, fire_actions, expire_actions): cnxn, cursor = self._get_cnxn_cursor_tuple() with cnxn: now = timeutils.utcnow() stream_definition_id = str(uuid.uuid1()) try: cursor.execute("""insert into stream_definition( id, tenant_id, name, description, select_by, group_by, fire_criteria, expiration, created_at, updated_at) values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""", ( stream_definition_id, tenant_id, name.encode('utf8'), description.encode('utf8'), select.encode('utf8'), group_by.encode('utf8'), fire_criteria.encode('utf8'), expiration, now, now)) except MySQLdb.IntegrityError as e: code, msg = e if code == 1062: raise exceptions.AlreadyExistsException( 'Stream Definition already ' 'exists for tenant_id: {0} name: {1}'.format( tenant_id, name.encode('utf8'))) else: raise e self._insert_into_stream_actions(cursor, stream_definition_id, fire_actions, u"FIRE") self._insert_into_stream_actions(cursor, stream_definition_id, expire_actions, u"EXPIRE") return stream_definition_id @mysql_repository.mysql_try_catch_block def patch_stream_definition(self, tenant_id, stream_definition_id, name, description, select, group_by, fire_criteria, expiration, fire_actions, expire_actions): cnxn, cursor = self._get_cnxn_cursor_tuple() with cnxn: # Get the original alarm definition from the DB parms = [tenant_id, stream_definition_id] where_clause = """ where sd.tenant_id = %s and sd.id = %s""" query = StreamsRepository.base_query + where_clause cursor.execute(query, parms) if cursor.rowcount < 1: raise exceptions.DoesNotExistException original_definition = cursor.fetchall()[0] # Update that stream definition in the database patch_query = """ update stream_definition set name = %s, description = %s, select_by = %s, group_by = %s, fire_criteria = %s, expiration = %s, updated_at = %s where tenant_id = %s and id = %s""" if name is None: name = original_definition['name'] if description is None: description = original_definition['description'] if select is None: select = original_definition['select_by'] if select != original_definition['select_by']: msg = "select_by must not change".encode('utf8') raise exceptions.InvalidUpdateException(msg) if group_by is None: group_by = original_definition['group_by'] if group_by != original_definition['group_by']: msg = "group_by must not change".encode('utf8') raise exceptions.InvalidUpdateException(msg) if fire_criteria is None: fire_criteria = original_definition['fire_criteria'] if expiration is None: expiration = original_definition['expiration'] now = timeutils.utcnow() update_parms = [ name, description, select, group_by, fire_criteria, expiration, now, tenant_id, stream_definition_id] cursor.execute(patch_query, update_parms) # Update the fire and expire actions in the database if defined if fire_actions is not None: self._delete_stream_actions(cursor, stream_definition_id, u'FIRE') if expire_actions is not None: self._delete_stream_actions(cursor, stream_definition_id, u'EXPIRE') self._insert_into_stream_actions(cursor, stream_definition_id, fire_actions, u"FIRE") self._insert_into_stream_actions(cursor, stream_definition_id, expire_actions, u"EXPIRE") # Get updated entry from mysql cursor.execute(query, parms) return cursor.fetchall()[0] def _delete_stream_actions(self, cursor, stream_definition_id, action_type): query = """ delete from stream_actions where stream_definition_id = %s and action_type = %s """ parms = [stream_definition_id, action_type.encode('utf8')] cursor.execute(query, parms) def _insert_into_stream_actions(self, cursor, stream_definition_id, actions, action_type): if actions is None: return for action in actions: cursor.execute( "select id,type from notification_method where id = %s", (action.encode('utf8'),)) row = cursor.fetchone() if not row: raise exceptions.InvalidUpdateException( "Non-existent notification id {} submitted for {} " "notification action".format(action.encode('utf8'), action_type.encode('utf8'))) else: if row['type'] == 'PAGERDUTY': raise exceptions.InvalidUpdateException( "PAGERDUTY action not supported for " "notification id {} submitted for {} " "notification action".format( action.encode('utf8'), action_type.encode('utf8'))) cursor.execute("""insert into stream_actions( stream_definition_id, action_type, action_id) values(%s,%s,%s)""", ( stream_definition_id, action_type.encode('utf8'), action.encode('utf8')))