Add configurable database setting (mysql, postgres, orm)

We used sqlalchemy.core for execute query for orm.
The configuration of db is based on monasca-api.
The default mode is mysql connection so we can use old configuration.

Change-Id: Iebb4d6dfca6d43298ced407178e7f9673a83a7ca
This commit is contained in:
Michal Zielonka 2015-08-04 22:34:57 +02:00
parent e100f433f4
commit a82a49a1e8
20 changed files with 321 additions and 76 deletions

3
.gitignore vendored
View File

@ -4,6 +4,9 @@
AUTHORS
ChangeLog
.*project
*~
*#
#*
build
dist
monasca_notification.egg-info

View File

View File

@ -0,0 +1,19 @@
# Copyright 2015 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under
# the License.
class BaseRepo(object):
def __init__(self, config):
self._find_alarm_action_sql = """SELECT name, type, address
FROM alarm_action as aa
JOIN notification_method as nm ON aa.action_id = nm.id
WHERE aa.alarm_definition_id = %s and aa.alarm_state = %s"""

View File

@ -0,0 +1,19 @@
# Copyright 2015 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under
# the License.
class RepositoryException(Exception):
pass
class DatabaseException(RepositoryException):
pass

View File

@ -0,0 +1,62 @@
# Copyright 2015 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under
# the License.
import logging
import MySQLdb
from monasca_notification.common.repositories.base.base_repo import BaseRepo
from monasca_notification.common.repositories import exceptions as exc
log = logging.getLogger(__name__)
class MysqlRepo(BaseRepo):
def __init__(self, config):
super(MysqlRepo, self).__init__(config)
if 'ssl' in config['mysql']:
self._mysql_ssl = config['mysql']['ssl']
else:
self._mysql_ssl = None
self._mysql_host = config['mysql']['host']
self._mysql_user = config['mysql']['user']
self._mysql_passwd = config['mysql']['passwd']
self._mysql_dbname = config['mysql']['db']
self._mysql = None
def _connect_to_mysql(self):
self._mysql = None
try:
self._mysql = MySQLdb.connect(host=self._mysql_host,
user=self._mysql_user,
passwd=unicode(self._mysql_passwd).encode('utf-8'),
db=self._mysql_dbname,
ssl=self._mysql_ssl,
use_unicode=True,
charset="utf8")
self._mysql.autocommit(True)
except MySQLdb.Error as e:
log.exception('MySQL connect failed %s', e)
raise
def fetch_notification(self, alarm):
try:
if self._mysql is None:
self._connect_to_mysql()
cur = self._mysql.cursor()
cur.execute(self._find_alarm_action_sql, (alarm['alarmDefinitionId'], alarm['newState']))
for row in cur:
yield (row[1].lower(), row[0], row[2])
except MySQLdb.Error as e:
log.exception("Couldn't fetch alarms actions %s", e)
raise exc.DatabaseException(e)

View File

@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Fujitsu Technology Solutions
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from datetime import datetime
from sqlalchemy import Column, String, Enum, DateTime, ForeignKey, Table
def create_alarm_action_model(metadata=None):
ALARM_STATES = ('UNDETERMINED', 'OK', 'ALARM')
return Table('alarm_action', metadata,
Column('action_id',
String(36), ForeignKey('notification_method.id'),
nullable=False, primary_key=True),
Column('alarm_definition_id', String(36), primary_key=True),
Column('alarm_state', Enum(*ALARM_STATES), primary_key=True))
def create_notification_method_model(metadata=None):
return Table('notification_method', metadata,
Column('id', String(36), primary_key=True),
Column('address', String(100)),
Column('name', String(250)),
Column('tenant_id', String(36)),
Column('type', String(255)),
Column('created_at', DateTime, default=lambda: datetime.utcnow()),
Column('updated_at', DateTime, onupdate=lambda: datetime.utcnow()))

View File

@ -0,0 +1,62 @@
# Copyright 2015 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under
# the License.
import logging
from sqlalchemy import engine_from_config, MetaData
from sqlalchemy.sql import select, bindparam, and_
from sqlalchemy.exc import DatabaseError
from monasca_notification.common.repositories import exceptions as exc
from monasca_notification.common.repositories.orm import models
log = logging.getLogger(__name__)
class OrmRepo(object):
def __init__(self, config):
self._orm_engine = engine_from_config(config['database']['orm'], prefix='')
metadata = MetaData()
aa = models.create_alarm_action_model(metadata).alias('aa')
nm = models.create_notification_method_model(metadata).alias('nm')
self._orm_query = select([nm.c.name, nm.c.type, nm.c.address])\
.select_from(aa.join(nm, aa.c.action_id == nm.c.id))\
.where(
and_(aa.c.alarm_definition_id == bindparam('alarm_definition_id'),
aa.c.alarm_state == bindparam('alarm_state')))
self._orm = None
def _connect_to_orm(self):
self._orm = None
try:
self._orm = self._orm_engine.connect()
except DatabaseError as e:
log.exception('Orm connect failed %s', e)
raise
def fetch_notification(self, alarm):
try:
if self._orm is None:
self._connect_to_orm()
log.debug('Orm query {%s}', str(self._orm_query))
notifcations = self._orm.execute(self._orm_query,
alarm_definition_id=alarm['alarmDefinitionId'],
alarm_state=alarm['newState'])
for row in notifcations:
yield (row[1].lower(), row[0], row[2])
except DatabaseError as e:
log.exception("Couldn't fetch alarms actions %s", e)
raise exc.DatabaseException(e)

View File

@ -0,0 +1,47 @@
# Copyright 2015 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under
# the License.
import logging
import psycopg2
from monasca_notification.common.repositories.base.base_repo import BaseRepo
from monasca_notification.common.repositories import exceptions as exc
log = logging.getLogger(__name__)
class PostgresqlRepo(BaseRepo):
def __init__(self, config):
super(PostgresqlRepo, self).__init__(config)
self._pgsql_params = config['postgresql']
self._pgsql = None
def _connect_to_pgsql(self):
self._pgsql = None
try:
self._pgsql = psycopg2.connect(**self._pgsql_params)
self._pgsql.autocommit = True
except psycopg2.Error as e:
log.exception('Pgsql connect failed %s', e)
raise
def fetch_notification(self, alarm):
try:
if self._pgsql is None:
self._connect_to_pgsql()
cur = self._pgsql.cursor()
cur.execute(self._find_alarm_action_sql, (alarm['alarmDefinitionId'], alarm['newState']))
for row in cur:
yield (row[1].lower(), row[0], row[2])
except psycopg2.Error as e:
log.exception("Couldn't fetch alarms actions %s", e)
raise exc.DatabaseException(e)

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import monascastatsd
from processors.alarm_processor import AlarmProcessor
@ -21,50 +22,34 @@ from processors.kafka_consumer import KafkaConsumer
from processors.kafka_producer import KafkaProducer
from processors.notification_processor import NotificationProcessor
log = logging.getLogger(__name__)
class NotificationEngine(object):
def __init__(self, config):
self._topics = {}
self._topics['notification_topic'] = config['kafka']['notification_topic']
self._topics['retry_topic'] = config['kafka']['notification_retry_topic']
self._statsd = monascastatsd.Client(name='monasca',
dimensions=BaseProcessor.dimensions)
self._consumer = KafkaConsumer(config['kafka']['url'],
config['zookeeper']['url'],
config['zookeeper']['notification_path'],
config['kafka']['group'],
config['kafka']['alarm_topic'])
self._producer = KafkaProducer(config['kafka']['url'])
if 'ssl' in config['mysql']:
ssl_config = config['mysql']['ssl']
else:
ssl_config = None
self._alarms = AlarmProcessor(config['processors']['alarm']['ttl'],
config['mysql']['host'],
config['mysql']['user'],
config['mysql']['passwd'],
config['mysql']['db'],
ssl_config)
self._alarm_ttl = config['processors']['alarm']['ttl']
self._alarms = AlarmProcessor(self._alarm_ttl, config)
self._notifier = NotificationProcessor(config['notification_types'])
def run(self):
finished_count = self._statsd.get_counter(name='alarms_finished_count')
for alarm in self._consumer:
log.debug('Received alarm >|%s|<', str(alarm))
notifications, partition, offset = self._alarms.to_notification(alarm)
if notifications:
sent, failed = self._notifier.send(notifications)
self._producer.publish(self._topics['notification_topic'], sent)
self._producer.publish(self._topics['retry_topic'], failed)
self._consumer.commit([partition])
finished_count.increment()

View File

@ -16,9 +16,10 @@
import json
import logging
import monascastatsd
import MySQLdb
import simport
import time
from monasca_notification.common.repositories import exceptions as exc
from monasca_notification.notification import Notification
from monasca_notification.notification_exceptions import AlarmFormatError
from monasca_notification.processors.base import BaseProcessor
@ -28,36 +29,14 @@ log = logging.getLogger(__name__)
class AlarmProcessor(BaseProcessor):
def __init__(
self, alarm_ttl, mysql_host, mysql_user,
mysql_passwd, dbname, mysql_ssl=None):
self._mysql_host = mysql_host
self._mysql_user = mysql_user
self._mysql_passwd = mysql_passwd
self._mysql_dbname = dbname
self._mysql_ssl = mysql_ssl
def __init__(self, alarm_ttl, config):
self._alarm_ttl = alarm_ttl
self._statsd = monascastatsd.Client(name='monasca',
dimensions=BaseProcessor.dimensions)
self._mysql = None
self._connect_to_mysql()
def _connect_to_mysql(self):
try:
self._mysql = MySQLdb.connect(host=self._mysql_host,
user=self._mysql_user,
passwd=unicode(self._mysql_passwd).encode('utf-8'),
db=self._mysql_dbname,
ssl=self._mysql_ssl,
use_unicode=True,
charset="utf8")
self._mysql.autocommit(True)
except:
log.exception('MySQL connect failed')
raise
if 'database' in config and 'repo_driver' in config['database']:
self._db_repo = simport.load(config['database']['repo_driver'])(config)
else:
self._db_repo = simport.load('monasca_notification.common.repositories.mysql.mysql_repo:MysqlRepo')(config)
@staticmethod
def _parse_alarm(alarm_data):
@ -74,7 +53,6 @@ class AlarmProcessor(BaseProcessor):
'tenantId',
'timestamp'
]
json_alarm = json.loads(alarm_data)
alarm = json_alarm['alarm-transitioned']
for field in expected_fields:
@ -101,26 +79,21 @@ class AlarmProcessor(BaseProcessor):
return True
def _build_notification(self, partition, offset, alarm):
cur = self._mysql.cursor()
db_time = self._statsd.get_timer()
with db_time.time('config_db_time'):
cur.execute("""SELECT name, type, address
FROM alarm_action as aa
JOIN notification_method as nm ON aa.action_id = nm.id
WHERE aa.alarm_definition_id = %s and aa.alarm_state = %s""",
[alarm['alarmDefinitionId'], alarm['newState']])
alarms_actions = self._db_repo.fetch_notification(alarm)
return [Notification(row[1].lower(),
return [Notification(alarms_action[0],
partition,
offset,
row[0],
row[2],
alarms_action[1],
alarms_action[2],
0,
alarm) for row in cur]
alarm) for alarms_action in alarms_actions]
def to_notification(self, raw_alarm):
"""Check the notification setting for this project in mysql then create the appropriate notification
"""Check the notification setting for this project then create the appropriate notification
"""
failed_parse_count = self._statsd.get_counter(name='alarms_failed_parse_count')
no_notification_count = self._statsd.get_counter(name='alarms_no_notification_count')
@ -144,9 +117,8 @@ class AlarmProcessor(BaseProcessor):
try:
notifications = self._build_notification(partition, offset, alarm)
except MySQLdb.Error:
log.debug('MySQL Error. Attempting reconnect')
self._connect_to_mysql()
except exc.DatabaseException:
log.debug('Database Error. Attempting reconnect')
notifications = self._build_notification(partition, offset, alarm)
if len(notifications) == 0:
@ -155,5 +127,6 @@ class AlarmProcessor(BaseProcessor):
% (partition, offset, alarm))
return [], partition, offset
else:
log.debug('Found %d notifications: [%s]', len(notifications), notifications)
notification_count += len(notifications)
return notifications, partition, offset

View File

@ -6,13 +6,27 @@ kafka:
notification_retry_topic: retry-notifications
max_offset_lag: 600 # In seconds, undefined for none
database:
# repo_driver: monasca_notification.common.repositories.postgres.pgsql_repo:PostgresqlRepo
# repo_driver: monasca_notification.common.repositories.orm.orm_repo:OrmRepo
repo_driver: monasca_notification.common.repositories.mysql.mysql_repo:MysqlRepo
orm:
url: 'postgres://notification:password@127.0.0.1:5432/mon'
mysql:
host: 192.168.10.4
user: notification
passwd: password
db: mon
# A dictionary set according to the params defined in, http://dev.mysql.com/doc/refman/5.0/en/mysql-ssl-set.html
# ssl: {'ca': '/path/to/ca'}
host: 192.168.10.4
user: notification
passwd: password
db: mon
# A dictionary set according to the params defined in, http://dev.mysql.com/doc/refman/5.0/en/mysql-ssl-set.html
# ssl: {'ca': '/path/to/ca'}
postgresql:
user: notification
password: password
database: mon
port: 5432
host: 127.0.0.1
notification_types:
email:

View File

@ -1,8 +1,8 @@
kafka-python>=0.9.1,<0.9.3
kazoo>=1.3
MySQL-python
pbr>=0.6,<2.0
monasca-statsd
requests
PyYAML
six
simport

View File

@ -4,3 +4,4 @@ nose
nosexcover
mock>=1.0.1
funcsigs
sqlalchemy

View File

@ -55,8 +55,13 @@ class TestAlarmProcessor(unittest.TestCase):
mock_mysql.cursor.return_value = mock_mysql
mock_mysql.__iter__.return_value = sql_response
processor = alarm_processor.AlarmProcessor(600, 'mysql_host', 'mysql_user',
'mysql_passwd', 'dbname')
mysql_config = {'mysql': {'ssl': None,
'host': 'mysql_host',
'user': 'mysql_user',
'db': 'dbname',
'passwd': 'mysql_passwd'}}
processor = alarm_processor.AlarmProcessor(600, mysql_config)
return processor.to_notification(alarm)

17
tox.ini
View File

@ -1,15 +1,26 @@
[tox]
envlist = py27,pypy,pep8
envlist = {py27,pypy}-{mysql,postgres},pep8
skipsdist = True
[testenv]
setenv = VIRTUAL_ENV={envdir}
usedevelop = True
install_command = pip install -U {opts} {packages}
install_command = pip install -U {opts} {packages} --pre
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
MySQL-python
commands = nosetests
[testenv:py27-postgres]
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
postgres: psycopg2
[testenv:pypy-postgres]
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
postgres: psycopg2
[testenv:pep8]
commands = flake8
@ -21,7 +32,7 @@ downloadcache = ~/cache/pip
[flake8]
max-line-length = 120
# TODO: ignored checks should be enabled in the future
# TODO: ignored checks should be enabled in the future
# H201 no 'except:' at least use 'except Exception:'
# H302 import only modules
# H305 imports not grouped correctly