Integrate with oslo.conf and oslo.log

Change upgrades the monasca-notification to leverage
the capabilities of both oslo.log and oslo.conf:

- configuration of logging separated from application settings
- ability to enforce data types for application settings
- ability to use oslo.config-generator capabilities
- automatic configuration parsing done by oslo.cfg

That change will bring it closer to the rest of monasca
components where such transition has happened already.
However, in the rest of monasca, oslo.cfg was partially
or fully implemented whereas monasca-notification has
been relying on YAML based configuration file.

Therefore backward compatybility for such format will
be kept for now.

Story: 2000959
Task: 4093
Task: 4092

Change-Id: Ia75c3b60d0fada854178f21ca5ccb9e6a880f37f
This commit is contained in:
Tomasz Trębski 2017-05-16 00:14:56 +02:00 committed by Tomasz Trębski
parent 2092e4185c
commit e1a9b9a96a
53 changed files with 2004 additions and 605 deletions

2
.gitignore vendored
View File

@ -16,3 +16,5 @@ dist
.stestr/
.coverage.*
cover/
*.sample

View File

@ -0,0 +1,5 @@
[DEFAULT]
output_file = etc/monasca/notification.conf.sample
wrap_width = 80
namespace = monasca_notification
namespace = oslo.log

View File

@ -0,0 +1,46 @@
[loggers]
keys = root, kafka, zookeeper, statsd
[handlers]
keys = console, file
[formatters]
keys = context
[logger_root]
level = DEBUG
handlers = console, file
[logger_kafka]
qualname = kafka
level = DEBUG
handlers = console, file
propagate = 0
[logger_zookeeper]
qualname = zookeeper
level = DEBUG
handlers = console, file
propagate = 0
[logger_statsd]
qualname = statsd
level = DEBUG
handlers = console, file
propagate = 0
[handler_console]
class = logging.StreamHandler
args = (sys.stderr,)
level = DEBUG
formatter = context
[handler_file]
class = logging.handlers.RotatingFileHandler
level = DEBUG
formatter = context
# store up to 5*100MB of logs
args = ('monasca-notification.log', 'a', 104857600, 5)
[formatter_context]
class = oslo_log.formatters.ContextFormatter

View File

@ -11,7 +11,7 @@
# or implied. See the License for the specific language governing permissions and limitations under
# the License.
import logging
from oslo_log import log as logging
import pymysql
from monasca_notification.common.repositories.base import base_repo
@ -24,21 +24,9 @@ log = logging.getLogger(__name__)
class MysqlRepo(base_repo.BaseRepo):
def __init__(self, config):
super(MysqlRepo, self).__init__(config)
if 'ssl' in config['mysql']:
self._mysql_ssl = config['mysql']['ssl']
else:
self._mysql_ssl = None
if 'port' in config['mysql']:
self._mysql_port = config['mysql']['port']
else:
#
# If port isn't specified in the config file,
# set it to the default value.
#
self._mysql_port = 3306
self._mysql_ssl = config['mysql']['ssl'] or None
self._mysql_host = config['mysql']['host']
self._mysql_port = config['mysql']['port']
self._mysql_user = config['mysql']['user']
self._mysql_passwd = config['mysql']['passwd']
self._mysql_dbname = config['mysql']['db']

View File

@ -1,4 +1,4 @@
# Copyright 2015 FUJITSU LIMITED
# Copyright 2015-2017 FUJITSU LIMITED
# (C) Copyright 2015,2016 Hewlett Packard Enterprise Development LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
@ -11,7 +11,8 @@
# or implied. See the License for the specific language governing permissions and limitations under
# the License.
import logging
from oslo_config import cfg
from oslo_log import log as logging
from sqlalchemy import engine_from_config, MetaData
from sqlalchemy.sql import select, bindparam, and_, insert
from sqlalchemy.exc import DatabaseError
@ -19,12 +20,15 @@ from sqlalchemy.exc import DatabaseError
from monasca_notification.common.repositories import exceptions as exc
from monasca_notification.common.repositories.orm import models
log = logging.getLogger(__name__)
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class OrmRepo(object):
def __init__(self, config):
self._orm_engine = engine_from_config(config['database']['orm'], prefix='')
def __init__(self):
self._orm_engine = engine_from_config({
'url': CONF.orm.url
}, prefix='')
metadata = MetaData()
@ -54,38 +58,38 @@ class OrmRepo(object):
def fetch_notifications(self, alarm):
try:
with self._orm_engine.connect() as conn:
log.debug('Orm query {%s}', str(self._orm_query))
LOG.debug('Orm query {%s}', str(self._orm_query))
notifications = conn.execute(self._orm_query,
alarm_definition_id=alarm['alarmDefinitionId'],
alarm_state=alarm['newState'])
return [(row[0], row[1].lower(), row[2], row[3], row[4]) for row in notifications]
except DatabaseError as e:
log.exception("Couldn't fetch alarms actions %s", e)
LOG.exception("Couldn't fetch alarms actions %s", e)
raise exc.DatabaseException(e)
def get_alarm_current_state(self, alarm_id):
try:
with self._orm_engine.connect() as conn:
log.debug('Orm query {%s}', str(self._orm_get_alarm_state))
LOG.debug('Orm query {%s}', str(self._orm_get_alarm_state))
result = conn.execute(self._orm_get_alarm_state,
alarm_id=alarm_id)
row = result.fetchone()
state = row[0] if row is not None else None
return state
except DatabaseError as e:
log.exception("Couldn't fetch the current alarm state %s", e)
LOG.exception("Couldn't fetch the current alarm state %s", e)
raise exc.DatabaseException(e)
def fetch_notification_method_types(self):
try:
with self._orm_engine.connect() as conn:
log.debug('Orm query {%s}', str(self._orm_nmt_query))
LOG.debug('Orm query {%s}', str(self._orm_nmt_query))
notification_method_types = conn.execute(self._orm_nmt_query).fetchall()
return [row[0] for row in notification_method_types]
except DatabaseError as e:
log.exception("Couldn't fetch notification method types %s", e)
LOG.exception("Couldn't fetch notification method types %s", e)
raise exc.DatabaseException(e)
def insert_notification_method_types(self, notification_types):
@ -98,13 +102,13 @@ class OrmRepo(object):
conn.execute(self._orm_add_notification_type, b_name=notification_type)
except DatabaseError as e:
log.debug("Failed to insert notification types %s", notification_types)
LOG.debug("Failed to insert notification types %s", notification_types)
raise exc.DatabaseException(e)
def get_notification(self, notification_id):
try:
with self._orm_engine.connect() as conn:
log.debug('Orm query {%s}', str(self._orm_get_notification))
LOG.debug('Orm query {%s}', str(self._orm_get_notification))
result = conn.execute(self._orm_get_notification,
notification_id=notification_id)
notification = result.fetchone()
@ -113,5 +117,5 @@ class OrmRepo(object):
else:
return [notification[0], notification[1].lower(), notification[2], notification[3]]
except DatabaseError as e:
log.exception("Couldn't fetch the notification method %s", e)
LOG.exception("Couldn't fetch the notification method %s", e)
raise exc.DatabaseException(e)

View File

@ -1,4 +1,4 @@
# Copyright 2015 FUJITSU LIMITED
# Copyright 2015-2017 FUJITSU LIMITED
# (C) Copyright 2016 Hewlett Packard Enterprise Development LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
@ -11,7 +11,7 @@
# or implied. See the License for the specific language governing permissions and limitations under
# the License.
import logging
from oslo_log import log as logging
import psycopg2
from monasca_notification.common.repositories.base import base_repo

View File

@ -13,25 +13,25 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import monascastatsd
from monasca_common.simport import simport
from oslo_config import cfg
from oslo_log import log
from monasca_notification.common.repositories import exceptions
from monasca_notification.notification import Notification
log = logging.getLogger(__name__)
LOG = log.getLogger(__name__)
CONF = cfg.CONF
NOTIFICATION_DIMENSIONS = {'service': 'monitoring',
'component': 'monasca-notification'}
def get_db_repo(config):
if 'database' in config and 'repo_driver' in config['database']:
return simport.load(config['database']['repo_driver'])(config)
else:
return simport.load('monasca_notification.common.repositories.mysql.mysql_repo:MysqlRepo')(config)
def get_db_repo():
repo_driver = CONF.database.repo_driver
LOG.debug('Enabling the %s RDB repository', repo_driver)
return repo_driver(CONF)
def construct_notification_object(db_repo, notification_json):
@ -47,7 +47,7 @@ def construct_notification_object(db_repo, notification_json):
stored_notification = grab_stored_notification_method(db_repo, notification.id)
# Notification method was deleted
if stored_notification is None:
log.debug("Notification method {0} was deleted from database. "
LOG.debug("Notification method {0} was deleted from database. "
"Will stop sending.".format(notification.id))
return None
# Update notification method with most up to date values
@ -58,11 +58,11 @@ def construct_notification_object(db_repo, notification_json):
notification.period = stored_notification[3]
return notification
except exceptions.DatabaseException:
log.warn("Error querying mysql for notification method. "
LOG.warn("Error querying mysql for notification method. "
"Using currently cached method.")
return notification
except Exception as e:
log.warn("Error when attempting to construct notification {0}".format(e))
LOG.warn("Error when attempting to construct notification {0}".format(e))
return None
@ -70,21 +70,18 @@ def grab_stored_notification_method(db_repo, notification_id):
try:
stored_notification = db_repo.get_notification(notification_id)
except exceptions.DatabaseException:
log.debug('Database Error. Attempting reconnect')
LOG.debug('Database Error. Attempting reconnect')
stored_notification = db_repo.get_notification(notification_id)
return stored_notification
def get_statsd_client(config, dimensions=None):
def get_statsd_client(dimensions=None):
local_dims = dimensions.copy() if dimensions else {}
local_dims.update(NOTIFICATION_DIMENSIONS)
if 'statsd' in config:
client = monascastatsd.Client(name='monasca',
host=config['statsd'].get('host', 'localhost'),
port=config['statsd'].get('port', 8125),
dimensions=local_dims)
else:
client = monascastatsd.Client(name='monasca',
dimensions=local_dims)
client = monascastatsd.Client(name='monasca',
host=CONF.statsd.host,
port=CONF.statsd.port,
dimensions=local_dims)
return client

View File

@ -0,0 +1,162 @@
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import copy
from oslo_config import cfg
from oslo_log import log
from oslo_utils import importutils
from monasca_notification.conf import cli
from monasca_notification.conf import database
from monasca_notification.conf import kafka
from monasca_notification.conf import notifiers
from monasca_notification.conf import processors
from monasca_notification.conf import queues
from monasca_notification.conf import retry
from monasca_notification.conf import statsd
from monasca_notification.conf import zookeeper
LOG = log.getLogger(__name__)
CONF = cfg.CONF
CONF_OPTS = [
cli,
database,
kafka,
notifiers,
processors,
queues,
retry,
statsd,
zookeeper
]
def register_opts(conf=None):
if conf is None:
conf = CONF
for m in CONF_OPTS:
m.register_opts(conf)
def list_opts():
opts = collections.defaultdict(list)
for m in CONF_OPTS:
configs = copy.deepcopy(m.list_opts())
for key, val in configs.items():
opts[key].extend(val)
return _tupleize(opts)
def load_from_yaml(yaml_config, conf=None):
# build named BACKWARD_MAP to modules set_defaults
if conf is None:
conf = CONF
def _noop(*arg, **kwargs):
pass
def _plain_override(g=None, **opts):
for k, v in opts.items():
conf.set_override(group=g, name=k, override=v)
def _load_plugin_settings(**notifiers_cfg):
notifiers_cfg = {t.lower(): v for t, v in notifiers_cfg.items()}
enabled_plugins = notifiers_cfg.pop('plugins', [])
# We still can have these 3 (email, pagerduty, webhook)
# that are considered as builtin, hence should be always enabled
conf_to_plugin = {
'email': 'monasca_notification.plugins.'
'email_notifier:EmailNotifier',
'pagerduty': 'monasca_notification.plugins.'
'pagerduty_notifier:PagerdutyNotifier',
'webhook': 'monasca_notification.plugins.'
'webhook_notifier:WebhookNotifier'
}
for ctp_key, ctp_clazz in conf_to_plugin.items():
if ctp_key in notifiers_cfg and ctp_key not in enabled_plugins:
LOG.debug('%s enabled as builtin plugin', ctp_key)
enabled_plugins.append(ctp_clazz)
_plain_override(g='notification_types', enabled=enabled_plugins)
if not enabled_plugins:
return
for ep in enabled_plugins:
ep_module = importutils.import_module(ep.split(':')[0])
ep_clazz = importutils.import_class(ep.replace(':', '.'))
if not hasattr(ep_module, 'register_opts'):
LOG.debug('%s does not have \'register_opts\' method')
continue
if not hasattr(ep_clazz, 'type'):
LOG.debug('%s does not have \'type\' class variable')
continue
ep_r_opt = getattr(ep_module, 'register_opts')
ep_type = getattr(ep_clazz, 'type')
ep_r_opt(conf) # register options
_plain_override(g='%s_notifier' % ep_type,
**notifiers_cfg.get(ep_type))
LOG.debug('Registered options and values of the %s notifier',
ep_type)
def _configure_and_warn_the_logging(logging_config):
LOG.warning('Configuration of the logging system from '
'\'notification.yml\' has been deprecated and '
'Please check how to configure logging with '
'oslo.log library.')
import logging.config
logging.config.dictConfig(logging_config)
mappper = {
'statsd': [lambda d: _plain_override(g='statsd', **d)],
'retry': [lambda d: _plain_override(g='retry_engine', **d)],
'database': [
lambda d: _plain_override(g='database', repo_driver=d['repo_driver']),
lambda d: _plain_override(g='orm', url=d['orm']['url'])
],
'postgresql': [lambda d: _plain_override(g='postgresql', **d)],
'mysql': [lambda d: _plain_override(g='mysql', **d)],
'processors': [
lambda d: _plain_override(g='alarm_processor',
number=d['alarm']['number'],
ttl=d['alarm']['ttl']),
lambda d: _plain_override(g='notification_processor',
number=d['notification']['number'])
],
'queues': [lambda d: _plain_override(g='queues', **d)],
'kafka': [lambda d: _plain_override(g='kafka', **d)],
'zookeeper': [lambda d: _plain_override(g='zookeeper', **d)],
'notification_types': [lambda d: _load_plugin_settings(**d)],
'logging': [_configure_and_warn_the_logging]
}
for key, opts in yaml_config.items():
LOG.debug('Loading group %s from deprecated yaml configuration', key)
handlers = mappper.get(key, [_noop])
if len(handlers) == 1 and handlers[0] == _noop:
LOG.warning('Unmapped configuration group %s from YAML file', key)
[handler(opts) for handler in handlers]
def _tupleize(d):
"""Convert a dict of options to the 2-tuple format."""
return [(key, value) for key, value in d.items()]

View File

@ -0,0 +1,40 @@
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
cli_opts = [
cfg.StrOpt(name='yaml_config', default=None,
positional=True,
help='Backward compatible option that allows to pass path '
'to YAML file containing configuration '
'of monasca-notitifcation.',
deprecated_for_removal=True,
deprecated_since='1.9.0',
deprecated_reason='monasca-notification has moved to '
'oslo.conf henceusing YAML based '
'configuration will be removed '
'after PIKE release.')
]
def register_opts(conf):
for opt in cli_opts:
conf.register_cli_opt(opt=opt)
def list_opts():
return {
'default': cli_opts
}

View File

@ -0,0 +1,129 @@
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from monasca_notification.conf import types
_POSTGRESQL = 'postgresql'
_MYSQL = 'mysql'
_ORM = 'orm'
_REPO_DRIVERS_MAP = {
_POSTGRESQL: 'monasca_notification.common.repositories.'
'postgres.pgsql_repo:PostgresqlRepo',
_MYSQL: 'monasca_notification.common.repositories.'
'mysql.mysql_repo:MysqlRepo',
_ORM: 'monasca_notification.common.repositories.'
'orm.orm_repo:OrmRepo'
}
_ACCEPTABLE_DRIVER_KEYS = set(list(_REPO_DRIVERS_MAP.keys()) +
list(_REPO_DRIVERS_MAP.values()))
_DEFAULT_DB_HOST = '127.0.0.1'
_DEFAULT_DB_USER = 'notification'
_DEFAULT_DB_PASSWORD = 'password'
_DEFAULT_DB_NAME = 'mon'
_DEFAULT_POSTGRESQL_PORT = 5432
_DEFAULT_MYSQL_PORT = 3306
db_group = cfg.OptGroup('database',
title='Database Options',
help='Driver configuration for database connectivity.')
db_opts = [
types.PluginOpt(name='repo_driver', choices=_ACCEPTABLE_DRIVER_KEYS,
default=_MYSQL, plugin_map=_REPO_DRIVERS_MAP,
required=True, advanced=True,
help='Driver name (or full class path) that should be '
'used to handle RDB connections. Accepts either '
'short labels {0} or full class names {1}. '
'Configuring either of them will require presence of '
'one of following sections {0} inside configuration '
'file.'.format(_REPO_DRIVERS_MAP.keys(),
_REPO_DRIVERS_MAP.values())
)
]
orm_group = cfg.OptGroup('orm',
title='ORM Options',
help='Configuration options to configure '
'ORM RBD driver.')
orm_opts = [
cfg.StrOpt(name='url', default=None,
help='Connection string for sqlalchemy.')
]
mysql_group = cfg.OptGroup('mysql',
title='MySQL Options',
help='Configuration options to configure '
'plain MySQL RBD driver.')
mysql_opts = [
cfg.HostAddressOpt(name='host', default=_DEFAULT_DB_HOST,
help='IP address of MySQL instance.'),
cfg.PortOpt(name='port', default=_DEFAULT_MYSQL_PORT,
help='Port number of MySQL instance.'),
cfg.StrOpt(name='user', default=_DEFAULT_DB_USER,
help='Username to connect to MySQL '
'instance and given database.'),
cfg.StrOpt(name='passwd', default=_DEFAULT_DB_PASSWORD,
ignore_case=True, secret=True,
help='Password to connect to MySQL instance '
'and given database.'),
cfg.DictOpt(name='ssl', default={},
help='A dict of arguments similar '
'to mysql_ssl_set parameters.'),
cfg.StrOpt(name='db', default=_DEFAULT_DB_NAME,
help='Database name available in given MySQL instance.')
]
postgresql_group = cfg.OptGroup('postgresql',
title='PostgreSQL Options',
help='Configuration options to configure '
'plain PostgreSQL RBD driver.')
postgresql_opts = [
cfg.HostAddressOpt(name='host', default=_DEFAULT_DB_HOST,
help='IP address of PostgreSQL instance.'),
cfg.PortOpt(name='port', default=_DEFAULT_POSTGRESQL_PORT,
help='Port number of PostgreSQL instance.'),
cfg.StrOpt(name='user', default=_DEFAULT_DB_USER,
help='Username to connect to PostgreSQL '
'instance and given database.'),
cfg.StrOpt(name='password', default=_DEFAULT_DB_PASSWORD,
secret=True, help='Password to connect to PostgreSQL '
'instance and given database'),
cfg.StrOpt(name='database', default=_DEFAULT_DB_NAME,
help='Database name available in '
'given PostgreSQL instance.')
]
def register_opts(conf):
conf.register_group(db_group)
conf.register_group(orm_group)
conf.register_group(mysql_group)
conf.register_group(postgresql_group)
conf.register_opts(db_opts, group=db_group)
conf.register_opts(orm_opts, group=orm_group)
conf.register_opts(mysql_opts, group=mysql_group)
conf.register_opts(postgresql_opts, group=postgresql_group)
def list_opts():
return {
db_group: db_opts,
orm_group: orm_opts,
mysql_group: mysql_opts,
postgresql_group: postgresql_opts,
}

View File

@ -0,0 +1,77 @@
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from monasca_notification.conf import types
_DEFAULT_URL = '127.0.0.1:9092'
_DEFAULT_GROUP = 'monasca-notification'
_DEFAULT_ALARM_TOPIC = 'alarm-state-transitions'
_DEFAULT_NOTIFICATION_TOPIC = 'alarm-notifications'
_DEFAULT_RETRY_TOPIC = 'retry-notifications'
_DEFAULT_PERIODIC_TOPICS = {
60: '60-seconds-notifications'
}
_DEFAULT_MAX_OFFSET_LAG = 600
kafka_group = cfg.OptGroup('kafka',
title='Kafka Options',
help='Options under this group allow to configure '
'valid connection or Kafka queue.')
kafka_opts = [
cfg.ListOpt(name='url', item_type=types.HostAddressPortType(),
bounds=False,
default=_DEFAULT_URL, required=True,
help='List of addresses (with ports) pointing '
'at zookeeper cluster.'),
cfg.StrOpt(name='group', default=_DEFAULT_GROUP,
required=True, advanced=True,
help='Consumer\'s group for monasca-notification client.'),
cfg.StrOpt(name='alarm_topic', default=_DEFAULT_ALARM_TOPIC,
required=True, advanced=True,
help='Topic name in kafka where alarm '
'transitions are stored.'),
cfg.StrOpt(name='notification_topic', default=_DEFAULT_NOTIFICATION_TOPIC,
required=True, advanced=True,
help='Topic name in kafka where alarm '
'notifications are stored.'),
cfg.StrOpt(name='notification_retry_topic', default=_DEFAULT_RETRY_TOPIC,
required=True, advanced=True,
help='Topic name in kafka where notifications, that have '
'failed to be sent and are waiting for retry operations, '
'are stored.'),
cfg.DictOpt(name='periodic', default=_DEFAULT_PERIODIC_TOPICS,
required=True, advanced=True,
help='Dict of periodic topics. Keys are the period and '
'values the actual topic names in kafka where '
'notifications are stored.'),
cfg.IntOpt(name='max_offset_lag', default=_DEFAULT_MAX_OFFSET_LAG,
required=True, advanced=True,
help='Maximum lag for topic that is acceptable by '
'the monasca-notification. Notifications that are older '
'than this offset are skipped.')
]
def register_opts(conf):
conf.register_group(kafka_group)
conf.register_opts(kafka_opts, group=kafka_group)
def list_opts():
return {
kafka_group: kafka_opts
}

View File

@ -0,0 +1,53 @@
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
# NOTE(kornicameister) Until https://review.openstack.org/#/c/435136/
# is merged we only treat these below as plugins.
# WEBHOOK, EMAIL, PAGERDUTY are now treated as built-in & hardcoded
# user has no possibility of enabling/disabling them
_KEY_MAP = {
'hipchat': 'monasca_notification.plugins.hipchat_notifier.HipChatNotifier',
'slack': 'monasca_notification.plugins.slack_notifier.SlackNotifier',
'jira': 'monasca_notification.plugins.jira_notifier.JiraNotifier'
}
notifier_group = cfg.OptGroup('notification_types',
title='Notification types',
help='Group allows to configure available '
'notifiers inside notification engine.')
notifier_opts = [
cfg.ListOpt(name='enabled', default=[],
item_type=lambda x: _KEY_MAP.get(x, x), bounds=False,
advanced=True, sample_default=','.join(_KEY_MAP.keys()),
help='List of enabled notification types. You may specify '
'full class name {} '
'or shorter label {}.'.format(_KEY_MAP.get('hipchat'),
'hipchat')
)
]
def register_opts(conf):
conf.register_group(notifier_group)
conf.register_opts(notifier_opts, group=notifier_group)
def list_opts():
return {
notifier_group: notifier_opts,
}

View File

@ -0,0 +1,55 @@
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
ap_group = cfg.OptGroup('alarm_processor',
title='Alarm processor group',
help='Options to configure alarm processor.')
ap_opts = [
cfg.IntOpt(name='number', min=1, default=2,
help='Number of alarm processors to spawn',
deprecated_for_removal=True,
deprecated_since='1.8.0',
deprecated_reason='Options is not used in the current code '
'and will be removed in future releases.'),
cfg.IntOpt(name='ttl', default=14400,
advanced=True,
help='Alarms older than TTL are not processed '
'by notification engine.')
]
np_group = cfg.OptGroup('notification_processor',
title='Notification processor group',
help='Options to configure notification processor.')
np_opts = [
cfg.IntOpt(name='number', min=1,
default=4, help='Number of notification processors to spawn.')
]
def register_opts(conf):
conf.register_group(ap_group)
conf.register_group(np_group)
conf.register_opts(ap_opts, group=ap_group)
conf.register_opts(np_opts, group=np_group)
def list_opts():
return {
ap_group: ap_opts,
np_group: np_opts
}

View File

@ -0,0 +1,45 @@
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
queues_group = cfg.OptGroup('queues',
title='Queues Options',
help=('Options under this group allow to '
'configure valid connection sizes of '
'all queues.'))
queues_opts = [
cfg.IntOpt(name='alarms_size', min=1, default=256,
help='Size of the alarms queue.'),
cfg.IntOpt(name='finished_size', min=1, default=256,
help='Size of the finished alarms queue.'),
cfg.IntOpt(name='notifications_size', min=1, default=256,
help='Size of notifications queue.'),
cfg.IntOpt(name='sent_notifications_size', min=1, default=50,
help='Size of sent notifications queue. '
'Limiting this size reduces potential or '
're-sent notifications after a failure.')
]
def register_opts(conf):
conf.register_group(queues_group)
conf.register_opts(queues_opts, group=queues_group)
def list_opts():
return {
queues_group: queues_opts
}

View File

@ -0,0 +1,41 @@
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
retry_engine_group = cfg.OptGroup('retry_engine',
title='Retry Engine Options',
help='Options under this group allow to '
'configure valid connection '
'for retry engine.')
retry_opts = [
cfg.IntOpt(name='interval', min=30, default=30,
advanced=True,
help='How often should retry happen.'),
cfg.IntOpt(name='max_attempts', default=5,
advanced=True,
help='How many times should retrying be tried.')
]
def register_opts(conf):
conf.register_group(retry_engine_group)
conf.register_opts(retry_opts, group=retry_engine_group)
def list_opts():
return {
retry_engine_group: retry_opts
}

View File

@ -0,0 +1,41 @@
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
_DEFAULT_HOST = '127.0.0.1'
_DEFAULT_PORT = 8125
statsd_group = cfg.OptGroup('statsd',
title='statsd Options',
help='Options under this group allow '
'to configure valid connection '
'to statsd server launched by monasca-agent.')
statsd_opts = [
cfg.HostAddressOpt('host', default=_DEFAULT_HOST,
help='IP address of statsd server.'),
cfg.PortOpt('port', default=_DEFAULT_PORT, help='Port of statsd server.'),
]
def register_opts(conf):
conf.register_group(statsd_group)
conf.register_opts(statsd_opts, group=statsd_group)
def list_opts():
return {
statsd_group: statsd_opts
}

View File

@ -0,0 +1,102 @@
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from oslo_config import types
from oslo_log import log
from oslo_utils import importutils
LOG = log.getLogger(__name__)
class Plugin(types.String):
def __init__(self, ignore_missing=False, choices=None, plugin_map=None):
if not plugin_map:
# since simport is used, we cannot tell where plugin is located
# thus plugin map wouldn't contain it
plugin_map = {}
super(Plugin, self).__init__(choices, quotes=False, ignore_case=True,
type_name='plugin value')
self._plugin_map = plugin_map
self._ingore_mission = ignore_missing
def __call__(self, value):
value = super(Plugin, self).__call__(value)
value = self._get_actual_target(value)
cls = None
try:
value = value.replace(':', '.')
cls = importutils.import_class(value)
except ImportError:
if not self._ingore_mission:
raise ValueError('%s cannot be imported' % value)
else:
LOG.exception('%s cannot be imported', value)
return cls
def _get_actual_target(self, value):
# NOTE(trebskit) missing values will be handled
# by choices from StringType
if value in self._plugin_map.keys():
return self._plugin_map[value]
return value
class PluginOpt(cfg.Opt):
def __init__(self, name, choices=None, plugin_map=None, **kwargs):
plugin = Plugin(choices=choices, plugin_map=plugin_map)
super(PluginOpt, self).__init__(name,
type=plugin,
**kwargs)
class HostAddressPortType(types.HostAddress):
"""HostAddress with additional port"""
def __init__(self, version=None):
type_name = 'host address port value'
super(HostAddressPortType, self).__init__(version,
type_name=type_name)
def __call__(self, value):
addr, port = value.split(':')
addr = self._validate_addr(addr)
port = self._validate_port(port)
if addr and port:
return '%s:%d' % (addr, port)
raise ValueError('%s is not valid host address with optional port')
@staticmethod
def _validate_port(port=80):
port = types.Port()(port)
return port
def _validate_addr(self, addr):
try:
addr = self.ip_address(addr)
except ValueError:
try:
addr = self.hostname(addr)
except ValueError:
raise ValueError("%s is not a valid host address", addr)
return addr

View File

@ -0,0 +1,61 @@
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from monasca_notification.conf import types
_DEFAULT_URL = '127.0.0.1:2181'
_DEFAULT_NOTIFICATION_PATH = '/notification/alarms'
_DEFAULT_RETRY_PATH = '/notification/retry'
_DEFAULT_PERIODIC_PATH = {
60: '/notification/60_seconds'
}
zookeeper_group = cfg.OptGroup('zookeeper',
title='Zookeeper Options',
help='Options under this group allow to '
'configure settings for zookeeper '
'handling.')
zookeeper_opts = [
cfg.ListOpt(name='url', item_type=types.HostAddressPortType(),
default=_DEFAULT_URL, required=True,
help='List of addresses (with ports) pointing '
'at zookeeper cluster.'),
cfg.StrOpt(name='notification_path', default=_DEFAULT_NOTIFICATION_PATH,
required=True, advanced=True,
help='Path in zookeeper tree to track notification offsets.'),
cfg.StrOpt(name='notification_retry_path', default=_DEFAULT_RETRY_PATH,
required=True, advanced=True,
help='Path in zookeeper tree to track notification '
'retries offsets.'),
cfg.DictOpt(name='periodic_path', default=_DEFAULT_PERIODIC_PATH,
required=True, advanced=True,
help='Paths in zookeeper tree to track periodic offsets. '
'Keys must be integers describing the interval '
'of periodic notification. Values are actual '
'paths inside zookeeper tree.')
]
def register_opts(conf):
conf.register_group(zookeeper_group)
conf.register_opts(zookeeper_opts, group=zookeeper_group)
def list_opts():
return {
zookeeper_group: zookeeper_opts
}

View File

@ -0,0 +1,64 @@
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log
import yaml
from monasca_notification import conf
from monasca_notification import version
LOG = log.getLogger(__name__)
CONF = conf.CONF
_CONF_LOADED = False
def parse_args(argv, no_yaml=False):
"""Sets up configuration of monasca-notification."""
global _CONF_LOADED
if _CONF_LOADED:
LOG.debug('Configuration has been already loaded')
return
conf.register_opts(CONF)
log.register_options(CONF)
default_log_levels = (log.get_default_log_levels())
log.set_defaults(default_log_levels=default_log_levels)
CONF(args=argv,
project='monasca',
prog='notification',
version=version.version_string,
description='''
monasca-notification is an engine responsible for
transforming alarm transitions into proper notifications
''')
log.setup(CONF,
product_name='monasca-notification',
version=version.version_string)
if not no_yaml:
# note(trebskit) used only in test cases as the notification.yml
# will be dropped eventually
set_from_yaml()
_CONF_LOADED = True
def set_from_yaml():
if CONF.yaml_config:
LOG.info('Detected usage of deprecated YAML configuration')
yaml_cfg = yaml.safe_load(open(CONF.yaml_config, 'rb'))
conf.load_from_yaml(yaml_cfg)

View File

@ -18,20 +18,23 @@
This engine reads alarms from Kafka and then notifies the customer using their configured notification method.
"""
import logging
import logging.config
import multiprocessing
import os
import signal
import sys
import time
import yaml
import warnings
from monasca_notification.notification_engine import NotificationEngine
from monasca_notification.periodic_engine import PeriodicEngine
from monasca_notification.retry_engine import RetryEngine
from oslo_log import log
from monasca_notification import config
from monasca_notification import notification_engine
from monasca_notification import periodic_engine
from monasca_notification import retry_engine
LOG = log.getLogger(__name__)
CONF = config.CONF
log = logging.getLogger(__name__)
processors = [] # global list to facilitate clean signal handling
exiting = False
@ -45,10 +48,10 @@ def clean_exit(signum, frame=None):
# Since this is set up as a handler for SIGCHLD when this kills one
# child it gets another signal, the global exiting avoids this running
# multiple times.
log.debug('Exit in progress clean_exit received additional signal %s' % signum)
LOG.debug('Exit in progress clean_exit received additional signal %s' % signum)
return
log.info('Received signal %s, beginning graceful shutdown.' % signum)
LOG.info('Received signal %s, beginning graceful shutdown.' % signum)
exiting = True
wait_for_exit = False
@ -68,7 +71,7 @@ def clean_exit(signum, frame=None):
# Kill everything, that didn't already die
for child in multiprocessing.active_children():
log.debug('Killing pid %s' % child.pid)
LOG.debug('Killing pid %s' % child.pid)
try:
os.kill(child.pid, signal.SIGKILL)
except Exception: # nosec
@ -82,50 +85,35 @@ def clean_exit(signum, frame=None):
sys.exit(signum)
def start_process(process_type, config, *args):
log.info("start process: {}".format(process_type))
p = process_type(config, *args)
def start_process(process_type, *args):
LOG.info("start process: {}".format(process_type))
p = process_type(*args)
p.run()
def main(argv=None):
if argv is None:
argv = sys.argv
if len(argv) == 2:
config_file = argv[1]
elif len(argv) > 2:
print("Usage: " + argv[0] + " <config_file>")
print("Config file defaults to /etc/monasca/notification.yaml")
return 1
else:
config_file = '/etc/monasca/notification.yaml'
warnings.simplefilter('always')
config.parse_args(argv=argv)
config = yaml.safe_load(open(config_file, 'rb'))
# Setup logging
try:
if config['logging']['raise_exceptions'] is True:
logging.raiseExceptions = True
else:
logging.raiseExceptions = False
except KeyError:
logging.raiseExceptions = False
pass
logging.config.dictConfig(config['logging'])
for proc in range(0, config['processors']['notification']['number']):
for proc in range(0, CONF.notification_processor.number):
processors.append(multiprocessing.Process(
target=start_process, args=(NotificationEngine, config)))
target=start_process,
args=(notification_engine.NotificationEngine,))
)
processors.append(multiprocessing.Process(
target=start_process, args=(RetryEngine, config)))
target=start_process,
args=(retry_engine.RetryEngine,))
)
if 60 in config['kafka']['periodic']:
if 60 in CONF.kafka.periodic:
processors.append(multiprocessing.Process(
target=start_process, args=(PeriodicEngine, config, 60)))
target=start_process,
args=(periodic_engine.PeriodicEngine, 60))
)
try:
log.info('Starting processes')
LOG.info('Starting processes')
for process in processors:
process.start()
@ -139,8 +127,9 @@ def main(argv=None):
time.sleep(10)
except Exception:
log.exception('Error! Exiting.')
LOG.exception('Error! Exiting.')
clean_exit(signal.SIGKILL)
if __name__ == "__main__":
sys.exit(main())
sys.exit(main(sys.argv[1:]))

View File

@ -1,4 +1,5 @@
# (C) Copyright 2015-2016 Hewlett Packard Enterprise Development LP
# Copyright 2017 Fujitsu LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -13,44 +14,40 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from oslo_config import cfg
from oslo_log import log as logging
from monasca_common.kafka import consumer
from monasca_common.kafka import producer
from monasca_notification.common.utils import get_statsd_client
from monasca_notification.processors.alarm_processor import AlarmProcessor
from monasca_notification.processors.notification_processor import NotificationProcessor
from monasca_notification.processors import alarm_processor as ap
from monasca_notification.processors import notification_processor as np
log = logging.getLogger(__name__)
CONF = cfg.CONF
class NotificationEngine(object):
def __init__(self, config):
self._topics = {}
self._topics['notification_topic'] = config['kafka']['notification_topic']
self._topics['retry_topic'] = config['kafka']['notification_retry_topic']
self._statsd = get_statsd_client(config)
def __init__(self):
self._statsd = get_statsd_client()
self._consumer = consumer.KafkaConsumer(
config['kafka']['url'],
config['zookeeper']['url'],
config['zookeeper']['notification_path'],
config['kafka']['group'],
config['kafka']['alarm_topic'])
self._producer = producer.KafkaProducer(config['kafka']['url'])
self._alarm_ttl = config['processors']['alarm']['ttl']
self._alarms = AlarmProcessor(self._alarm_ttl, config)
self._notifier = NotificationProcessor(config)
self._config = config
CONF.kafka.url,
','.join(CONF.zookeeper.url),
CONF.zookeeper.notification_path,
CONF.kafka.group,
CONF.kafka.alarm_topic)
self._producer = producer.KafkaProducer(CONF.kafka.url)
self._alarms = ap.AlarmProcessor()
self._notifier = np.NotificationProcessor()
def _add_periodic_notifications(self, notifications):
for notification in notifications:
topic = notification.periodic_topic
if topic in self._config['kafka']['periodic'] and notification.type == "webhook":
if topic in CONF.kafka.periodic and notification.type == "webhook":
notification.notification_timestamp = time.time()
self._producer.publish(self._config['kafka']['periodic'][topic],
self._producer.publish(CONF.kafka.periodic[topic],
[notification.to_json()])
def run(self):
@ -62,9 +59,9 @@ class NotificationEngine(object):
self._add_periodic_notifications(notifications)
sent, failed = self._notifier.send(notifications)
self._producer.publish(self._topics['notification_topic'],
self._producer.publish(CONF.kafka.notification_topic,
[i.to_json() for i in sent])
self._producer.publish(self._topics['retry_topic'],
self._producer.publish(CONF.kafka.notification_retry_topic,
[i.to_json() for i in failed])
self._consumer.commit()

View File

@ -1,4 +1,5 @@
# (C) Copyright 2016 Hewlett Packard Enterprise Development LP
# Copyright 2017 Fujitsu LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -14,9 +15,11 @@
# limitations under the License.
import json
import logging
import time
from oslo_config import cfg
from oslo_log import log as logging
from monasca_common.kafka import consumer
from monasca_common.kafka import producer
from monasca_notification.common.repositories import exceptions
@ -26,25 +29,26 @@ from monasca_notification.common.utils import get_statsd_client
from monasca_notification.processors import notification_processor
log = logging.getLogger(__name__)
CONF = cfg.CONF
class PeriodicEngine(object):
def __init__(self, config, period):
self._topic_name = config['kafka']['periodic'][period]
def __init__(self, period):
self._topic_name = CONF.kafka.periodic[period]
self._statsd = get_statsd_client(config)
self._statsd = get_statsd_client()
zookeeper_path = config['zookeeper']['periodic_path'][period]
self._consumer = consumer.KafkaConsumer(config['kafka']['url'],
config['zookeeper']['url'],
zookeeper_path = CONF.zookeeper.periodic_path[period]
self._consumer = consumer.KafkaConsumer(CONF.kafka.url,
','.join(CONF.zookeeper.url),
zookeeper_path,
config['kafka']['group'],
CONF.kafka.group,
self._topic_name)
self._producer = producer.KafkaProducer(config['kafka']['url'])
self._producer = producer.KafkaProducer(CONF.kafka.url)
self._notifier = notification_processor.NotificationProcessor(config)
self._db_repo = get_db_repo(config)
self._notifier = notification_processor.NotificationProcessor()
self._db_repo = get_db_repo()
self._period = period
def _keep_sending(self, alarm_id, original_state, type, period):

View File

@ -1,4 +1,5 @@
# (C) Copyright 2015 Hewlett Packard Enterprise Development LP
# Copyright 2017 Fujitsu LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -23,10 +24,6 @@ class AbstractNotifier(object):
def __init__(self):
pass
@abc.abstractproperty
def type(self):
pass
@abc.abstractproperty
def statsd_name(self):
pass

View File

@ -1,4 +1,5 @@
# (C) Copyright 2015-2016 Hewlett Packard Enterprise Development LP
# Copyright 2017 Fujitsu LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -20,9 +21,13 @@ import six
import smtplib
import time
from debtcollector import removals
from oslo_config import cfg
from monasca_notification.plugins import abstract_notifier
CONF = cfg.CONF
EMAIL_SINGLE_HOST_BASE = u'''On host "{hostname}" for target "{target_host}" {message}
Alarm "{alarm_name}" transitioned to the {state} state at {timestamp} UTC
@ -60,22 +65,39 @@ With dimensions
{metric_dimensions}'''
def register_opts(conf):
gr = cfg.OptGroup(name='%s_notifier' % EmailNotifier.type)
opts = [
cfg.StrOpt(name='from_addr'),
cfg.HostAddressOpt(name='server'),
cfg.PortOpt(name='port', default=25),
cfg.IntOpt(name='timeout', default=5, min=1),
cfg.StrOpt(name='user', default=None),
cfg.StrOpt(name='password', default=None, secret=True),
cfg.StrOpt(name='grafana_url', default=None)
]
conf.register_group(gr)
conf.register_opts(opts, group=gr)
class EmailNotifier(abstract_notifier.AbstractNotifier):
type = 'email'
def __init__(self, log):
super(EmailNotifier, self).__init__()
self._log = log
self._smtp = None
self._config = None
def config(self, config):
self._config = config
@removals.remove(
message='Configuration of notifier is available through oslo.cfg',
version='1.9.0',
removal_version='3.0.0'
)
def config(self, config=None):
self._smtp_connect()
@property
def type(self):
return "email"
@property
def statsd_name(self):
return "sent_smtp_count"
@ -122,7 +144,7 @@ class EmailNotifier(abstract_notifier.AbstractNotifier):
return False
def _sendmail(self, notification, msg):
self._smtp.sendmail(self._config['from_addr'],
self._smtp.sendmail(CONF.email_notifier.from_addr,
notification.address,
msg.as_string())
self._log.debug("Sent email to {}, notification {}".format(notification.address,
@ -135,15 +157,19 @@ class EmailNotifier(abstract_notifier.AbstractNotifier):
def _smtp_connect(self):
"""Connect to the smtp server
"""
self._log.info("Connecting to Email Server {}".format(self._config['server']))
self._log.info("Connecting to Email Server {}".format(
CONF.email_notifier.server))
try:
smtp = smtplib.SMTP(self._config['server'],
self._config['port'],
timeout=self._config['timeout'])
smtp = smtplib.SMTP(CONF.email_notifier.server,
CONF.email_notifier.port,
timeout=CONF.email_notifier.timeout)
if ('user', 'password') in self._config.keys():
smtp.login(self._config['user'], self._config['password'])
email_notifier_user = CONF.email_notifier.user
email_notifier_password = CONF.email_notifier.password
if email_notifier_user and email_notifier_password:
smtp.login(email_notifier_user,
email_notifier_password)
self._smtp = smtp
return True
@ -222,7 +248,7 @@ class EmailNotifier(abstract_notifier.AbstractNotifier):
msg = email.mime.text.MIMEText(text, 'plain', 'utf-8')
msg['Subject'] = email.header.Header(subject, 'utf-8')
msg['From'] = self._config['from_addr']
msg['From'] = CONF.email_notifier.from_addr
msg['To'] = notification.address
msg['Date'] = email.utils.formatdate(localtime=True, usegmt=True)
@ -237,7 +263,7 @@ class EmailNotifier(abstract_notifier.AbstractNotifier):
has been defined.
"""
grafana_url = self._config.get('grafana_url', None)
grafana_url = CONF.email_notifier.grafana_url
if grafana_url is None:
return None

View File

@ -1,4 +1,5 @@
# (C) Copyright 2016-2017 Hewlett Packard Enterprise Development LP
# Copyright 2017 Fujitsu LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -16,10 +17,13 @@
import requests
import ujson as json
from debtcollector import removals
from oslo_config import cfg
from six.moves import urllib
from monasca_notification.plugins import abstract_notifier
CONF = cfg.CONF
"""
notification.address = https://hipchat.hpcloud.net/v2/room/<room_id>/notification?auth_token=432432
@ -44,17 +48,34 @@ SEVERITY_COLORS = {"low": 'green',
'critical': 'red'}
def register_opts(conf):
gr = cfg.OptGroup(name='%s_notifier' % HipChatNotifier.type)
opts = [
cfg.IntOpt(name='timeout', default=5, min=1),
cfg.BoolOpt(name='insecure', default=True),
cfg.StrOpt(name='ca_certs', default=None),
cfg.StrOpt(name='proxy', default=None)
]
conf.register_group(gr)
conf.register_opts(opts, group=gr)
class HipChatNotifier(abstract_notifier.AbstractNotifier):
type = 'hipchat'
def __init__(self, log):
super(HipChatNotifier, self).__init__()
self._log = log
def config(self, config_dict):
self._config = {'timeout': 5}
self._config.update(config_dict)
@property
def type(self):
return "hipchat"
@removals.remove(
message='Configuration of notifier is available through oslo.cfg',
version='1.9.0',
removal_version='3.0.0'
)
def config(self, config_dict=None):
pass
@property
def statsd_name(self):
@ -97,14 +118,17 @@ class HipChatNotifier(abstract_notifier.AbstractNotifier):
url = urllib.parse.urljoin(notification.address, urllib.parse.urlparse(notification.address).path)
# Default option is to do cert verification
verify = not self._config.get('insecure', True)
verify = not CONF.hipchat_notifier.insecure
ca_certs = CONF.hipchat_notifier.ca_certs
proxy = CONF.hipchat_notifier.proxy
# If ca_certs is specified, do cert validation and ignore insecure flag
if (self._config.get("ca_certs")):
verify = self._config.get("ca_certs")
if ca_certs is not None:
verify = ca_certs
proxyDict = None
if (self._config.get("proxy")):
proxyDict = {"https": self._config.get("proxy")}
if proxy is not None:
proxyDict = {'https': proxy}
try:
# Posting on the given URL
@ -113,7 +137,7 @@ class HipChatNotifier(abstract_notifier.AbstractNotifier):
verify=verify,
params=query_params,
proxies=proxyDict,
timeout=self._config['timeout'])
timeout=CONF.hipchat_notifier.timeout)
if result.status_code in range(200, 300):
self._log.info("Notification successfully posted.")

View File

@ -1,4 +1,5 @@
# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP
# Copyright 2017 Fujitsu LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -19,6 +20,9 @@ from six.moves import urllib
import ujson as json
import yaml
from debtcollector import removals
from oslo_config import cfg
from monasca_notification.plugins.abstract_notifier import AbstractNotifier
"""
@ -51,27 +55,40 @@ from monasca_notification.plugins.abstract_notifier import AbstractNotifier
"""
CONF = cfg.CONF
def register_opts(conf):
gr = cfg.OptGroup(name='%s_notifier' % JiraNotifier.type)
opts = [
cfg.IntOpt(name='timeout', default=5, min=1),
cfg.StrOpt(name='user', required=False),
cfg.StrOpt(name='password', required=False, secret=True),
cfg.StrOpt(name='custom_formatter', default=None),
cfg.StrOpt(name='proxy', default=None)
]
conf.register_group(gr)
conf.register_opts(opts, group=gr)
class JiraNotifier(AbstractNotifier):
type = 'jira'
_search_query = search_query = "project={} and reporter='{}' and summary ~ '{}'"
def __init__(self, log):
super(JiraNotifier, self).__init__()
self._log = log
self.jira_fields_format = None
@removals.remove(
message='Configuration of notifier is available through oslo.cfg',
version='1.9.0',
removal_version='3.0.0'
)
def config(self, config_dict):
self._config = {'timeout': 5}
if not config_dict.get("user") and not config_dict.get("password"):
message = "Missing user and password settings in JIRA plugin configuration"
self._log.exception(message)
raise Exception(message)
self._config.update(config_dict)
self.jira_fields_format = self._get_jira_custom_format_fields()
@property
def type(self):
return "jira"
pass
@property
def statsd_name(self):
@ -80,9 +97,10 @@ class JiraNotifier(AbstractNotifier):
def _get_jira_custom_format_fields(self):
jira_fields_format = None
if (not self.jira_fields_format and self._config.get("custom_formatter")):
formatter = CONF.jira_notifier.custom_formatter
if not self.jira_fields_format and formatter:
try:
with open(self._config.get("custom_formatter")) as f:
with open(formatter, 'r') as f:
jira_fields_format = yaml.safe_load(f)
except Exception:
self._log.exception("Unable to read custom_formatter file. Check file location")
@ -139,8 +157,10 @@ class JiraNotifier(AbstractNotifier):
return jira_fields
def _build_jira_message(self, notification):
if self._config.get("custom_formatter"):
return self._build_custom_jira_message(notification, self.jira_fields_format)
formatter = CONF.jira_notifier.custom_formatter
if formatter:
return self._build_custom_jira_message(notification,
self._get_jira_custom_format_fields())
return self._build_default_jira_message(notification)
@ -159,13 +179,17 @@ class JiraNotifier(AbstractNotifier):
if query_params.get("component"):
jira_fields["component"] = query_params["component"][0]
auth = (self._config["user"], self._config["password"])
proxyDict = None
if (self._config.get("proxy")):
proxyDict = {"https": self._config.get("proxy")}
auth = (
CONF.jira_notifier.user,
CONF.jira_notifier.password
)
proxy = CONF.jira_notifier.proxy
proxy_dict = None
if proxy is not None:
proxy_dict = {"https": proxy}
try:
jira_obj = jira.JIRA(url, basic_auth=auth, proxies=proxyDict)
jira_obj = jira.JIRA(url, basic_auth=auth, proxies=proxy_dict)
self.jira_workflow(jira_fields, jira_obj, notification)
except Exception:
@ -192,7 +216,8 @@ class JiraNotifier(AbstractNotifier):
issue_dict["components"] = [{"name": jira_fields.get("component")}]
search_term = self._search_query.format(issue_dict["project"]["key"],
self._config["user"], notification.alarm_id)
CONF.jira_notifier.user,
notification.alarm_id)
issue_list = jira_obj.search_issues(search_term)
if not issue_list:
self._log.debug("Creating an issue with the data {}".format(issue_dict))

View File

@ -1,4 +1,5 @@
# (C) Copyright 2015,2016 Hewlett Packard Enterprise Development LP
# Copyright 2017 Fujitsu LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -16,25 +17,44 @@
import requests
import ujson as json
from debtcollector import removals
from oslo_config import cfg
from monasca_notification.plugins import abstract_notifier
CONF = cfg.CONF
VALID_HTTP_CODES = [200, 201, 204]
def register_opts(conf):
gr = cfg.OptGroup(name='%s_notifier' % PagerdutyNotifier.type)
opts = [
cfg.IntOpt(name='timeout', default=5, min=1),
cfg.StrOpt(name='url',
default='https://events.pagerduty.com/'
'generic/2010-04-15/create_event.json')
]
conf.register_group(gr)
conf.register_opts(opts, group=gr)
class PagerdutyNotifier(abstract_notifier.AbstractNotifier):
type = 'pagerduty'
def __init__(self, log):
super(PagerdutyNotifier, self).__init__()
self._log = log
@removals.remove(
message='Configuration of notifier is available through oslo.cfg',
version='1.9.0',
removal_version='3.0.0'
)
def config(self, config):
self._config = {
'timeout': 5,
'url': 'https://events.pagerduty.com/generic/2010-04-15/create_event.json'}
self._config.update(config)
@property
def type(self):
return "pagerduty"
pass
@property
def statsd_name(self):
@ -44,7 +64,7 @@ class PagerdutyNotifier(abstract_notifier.AbstractNotifier):
"""Send pagerduty notification
"""
url = self._config['url']
url = CONF.pagerduty_notifier.url
headers = {"content-type": "application/json"}
body = {"service_key": notification.address,
"event_type": "trigger",
@ -60,7 +80,7 @@ class PagerdutyNotifier(abstract_notifier.AbstractNotifier):
result = requests.post(url=url,
data=json.dumps(body),
headers=headers,
timeout=self._config['timeout'])
timeout=CONF.pagerduty_notifier.timeout)
if result.status_code in VALID_HTTP_CODES:
return True

View File

@ -1,4 +1,5 @@
# (C) Copyright 2016-2017 Hewlett Packard Enterprise Development LP
# Copyright 2017 Fujitsu LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -17,8 +18,26 @@ import requests
from six.moves import urllib
import ujson as json
from debtcollector import removals
from oslo_config import cfg
from monasca_notification.plugins import abstract_notifier
CONF = cfg.CONF
def register_opts(conf):
gr = cfg.OptGroup(name='%s_notifier' % SlackNotifier.type)
opts = [
cfg.IntOpt(name='timeout', default=5, min=1),
cfg.BoolOpt(name='insecure', default=True),
cfg.StrOpt(name='ca_certs', default=None),
cfg.StrOpt(name='proxy', default=None)
]
conf.register_group(gr)
conf.register_opts(opts, group=gr)
class SlackNotifier(abstract_notifier.AbstractNotifier):
"""This module is a notification plugin to integrate with Slack.
@ -42,25 +61,24 @@ class SlackNotifier(abstract_notifier.AbstractNotifier):
https://api.slack.com/incoming-webhooks
"""
CONFIG_CA_CERTS = 'ca_certs'
CONFIG_INSECURE = 'insecure'
CONFIG_PROXY = 'proxy'
CONFIG_TIMEOUT = 'timeout'
type = 'slack'
MAX_CACHE_SIZE = 100
RESPONSE_OK = 'ok'
_raw_data_url_caches = []
def __init__(self, log):
super(SlackNotifier, self).__init__()
self._log = log
@removals.remove(
message='Configuration of notifier is available through oslo.cfg',
version='1.9.0',
removal_version='3.0.0'
)
def config(self, config_dict):
self._config = {'timeout': 5}
self._config.update(config_dict)
@property
def type(self):
return "slack"
pass
@property
def statsd_name(self):
@ -141,12 +159,12 @@ class SlackNotifier(abstract_notifier.AbstractNotifier):
# Default option is to do cert verification
# If ca_certs is specified, do cert validation and ignore insecure flag
verify = self._config.get(self.CONFIG_CA_CERTS,
(not self._config.get(self.CONFIG_INSECURE, True)))
verify = CONF.slack_notifier.ca_certs or not CONF.slack_notifier.insecure
proxyDict = None
if (self.CONFIG_PROXY in self._config):
proxyDict = {'https': self._config.get(self.CONFIG_PROXY)}
proxy = CONF.slack_notifier.proxy
proxy_dict = None
if proxy is not None:
proxy_dict = {'https': proxy}
data_format_list = ['json', 'data']
if url in SlackNotifier._raw_data_url_caches:
@ -159,8 +177,8 @@ class SlackNotifier(abstract_notifier.AbstractNotifier):
'url': url,
'verify': verify,
'params': query_params,
'proxies': proxyDict,
'timeout': self._config[self.CONFIG_TIMEOUT],
'proxies': proxy_dict,
'timeout': CONF.slack_notifier.timeout,
data_format: slack_message
}
if self._send_message(request_options):

View File

@ -1,4 +1,5 @@
# (C) Copyright 2015,2016 Hewlett Packard Enterprise Development LP
# Copyright 2017 Fujitsu LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -16,20 +17,39 @@
import requests
import ujson as json
from debtcollector import removals
from oslo_config import cfg
from monasca_notification.plugins import abstract_notifier
CONF = cfg.CONF
def register_opts(conf):
gr = cfg.OptGroup(name='%s_notifier' % WebhookNotifier.type)
opts = [
cfg.IntOpt(name='timeout', default=5, min=1)
]
conf.register_group(gr)
conf.register_opts(opts, group=gr)
class WebhookNotifier(abstract_notifier.AbstractNotifier):
type = 'webhook'
def __init__(self, log):
super(WebhookNotifier, self).__init__()
self._log = log
@removals.remove(
message='Configuration of notifier is available through oslo.cfg',
version='1.9.0',
removal_version='3.0.0'
)
def config(self, config_dict):
self._config = {'timeout': 5}
self._config.update(config_dict)
@property
def type(self):
return "webhook"
pass
@property
def statsd_name(self):
@ -60,7 +80,7 @@ class WebhookNotifier(abstract_notifier.AbstractNotifier):
result = requests.post(url=url,
data=json.dumps(body),
headers=headers,
timeout=self._config['timeout'])
timeout=CONF.webhook_notifier.timeout)
if result.status_code in range(200, 300):
self._log.info("Notification successfully posted.")

View File

@ -13,10 +13,11 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import six
import time
from oslo_config import cfg
from oslo_log import log as logging
import six
import ujson as json
from monasca_notification.common.repositories import exceptions as exc
@ -27,13 +28,14 @@ from monasca_notification import notification_exceptions
log = logging.getLogger(__name__)
CONF = cfg.CONF
class AlarmProcessor(object):
def __init__(self, alarm_ttl, config):
self._alarm_ttl = alarm_ttl
self._statsd = get_statsd_client(config)
self._db_repo = get_db_repo(config)
def __init__(self):
self._alarm_ttl = CONF.alarm_processor.ttl
self._statsd = get_statsd_client()
self._db_repo = get_db_repo()
@staticmethod
def _parse_alarm(alarm_data):

View File

@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from oslo_log import log as logging
from monasca_notification.common.repositories import exceptions as exc
from monasca_notification.common.utils import get_db_repo
@ -25,12 +25,14 @@ log = logging.getLogger(__name__)
class NotificationProcessor(object):
def __init__(self, config):
self.statsd = get_statsd_client(config)
def __init__(self):
self.statsd = get_statsd_client()
notifiers.init(self.statsd)
notifiers.load_plugins(config['notification_types'])
notifiers.config(config['notification_types'])
self._db_repo = get_db_repo(config)
notifiers.load_plugins()
notifiers.config()
self._db_repo = get_db_repo()
self.insert_configured_plugins()
def _remaining_plugin_types(self):

View File

@ -1,4 +1,5 @@
# (C) Copyright 2015-2016 Hewlett Packard Enterprise Development LP
# Copyright 2017 Fujitsu LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -14,9 +15,11 @@
# limitations under the License.
import json
import logging
import time
from oslo_config import cfg
from oslo_log import log as logging
from monasca_common.kafka import consumer
from monasca_common.kafka import producer
from monasca_notification.common.utils import construct_notification_object
@ -25,30 +28,24 @@ from monasca_notification.common.utils import get_statsd_client
from monasca_notification.processors import notification_processor
log = logging.getLogger(__name__)
CONF = cfg.CONF
class RetryEngine(object):
def __init__(self, config):
self._retry_interval = config['retry']['interval']
self._retry_max = config['retry']['max_attempts']
self._topics = {}
self._topics['notification_topic'] = config['kafka']['notification_topic']
self._topics['retry_topic'] = config['kafka']['notification_retry_topic']
self._statsd = get_statsd_client(config)
def __init__(self):
self._statsd = get_statsd_client()
self._consumer = consumer.KafkaConsumer(
config['kafka']['url'],
config['zookeeper']['url'],
config['zookeeper']['notification_retry_path'],
config['kafka']['group'],
config['kafka']['notification_retry_topic'])
CONF.kafka.url,
','.join(CONF.zookeeper.url),
CONF.zookeeper.notification_retry_path,
CONF.kafka.group,
CONF.kafka.notification_retry_topic
)
self._producer = producer.KafkaProducer(CONF.kafka.url)
self._producer = producer.KafkaProducer(config['kafka']['url'])
self._notifier = notification_processor.NotificationProcessor(config)
self._db_repo = get_db_repo(config)
self._notifier = notification_processor.NotificationProcessor()
self._db_repo = get_db_repo()
def run(self):
for raw_notification in self._consumer:
@ -62,7 +59,7 @@ class RetryEngine(object):
self._consumer.commit()
continue
wait_duration = self._retry_interval - (
wait_duration = CONF.retry_engine.interval - (
time.time() - notification_data['notification_timestamp'])
if wait_duration > 0:
@ -71,19 +68,19 @@ class RetryEngine(object):
sent, failed = self._notifier.send([notification])
if sent:
self._producer.publish(self._topics['notification_topic'],
self._producer.publish(CONF.kafka.notification_topic,
[notification.to_json()])
if failed:
notification.retry_count += 1
notification.notification_timestamp = time.time()
if notification.retry_count < self._retry_max:
if notification.retry_count < CONF.retry_engine.max_attempts:
log.error(u"retry failed for {} with name {} "
u"at {}. "
u"Saving for later retry.".format(notification.type,
notification.name,
notification.address))
self._producer.publish(self._topics['retry_topic'],
self._producer.publish(CONF.kafka.notification_retry_topic,
[notification.to_json()])
else:
log.error(u"retry failed for {} with name {} "
@ -92,6 +89,6 @@ class RetryEngine(object):
.format(notification.type,
notification.name,
notification.address,
self._retry_max))
CONF.retry_engine.max_attempts))
self._consumer.commit()

View File

@ -1,4 +1,5 @@
# (C) Copyright 2015,2016 Hewlett Packard Enterprise Development LP
# Copyright 2017 Fujitsu LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -13,15 +14,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from monasca_common.simport import simport
from debtcollector import removals
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import importutils
from monasca_notification.plugins import email_notifier
from monasca_notification.plugins import pagerduty_notifier
from monasca_notification.plugins import webhook_notifier
log = logging.getLogger(__name__)
CONF = cfg.CONF
possible_notifiers = None
configured_notifiers = None
@ -49,14 +54,16 @@ def init(statsd_obj):
]
def load_plugins(config):
def load_plugins():
global possible_notifiers
for plugin_class in config.get("plugins", []):
for plugin_class in CONF.notification_types.enabled:
try:
possible_notifiers.append(simport.load(plugin_class)(log))
plugin_class = plugin_class.replace(':', '.')
clz = importutils.import_class(plugin_class)
possible_notifiers.append(clz(logging.getLogger(plugin_class)))
except Exception:
log.exception("unable to load the class {0} , ignoring it".format(plugin_class))
log.exception("unable to load the class %s , ignoring it" %
plugin_class)
def enabled_notifications():
@ -68,29 +75,20 @@ def enabled_notifications():
return results
def config(cfg):
@removals.remove(
message='Loading the plugin configuration has been moved to oslo, '
'This method will be fully deleted in future releases',
version='1.9.0',
removal_version='3.0.0'
)
def config():
global possible_notifiers, configured_notifiers, statsd_counter
formatted_config = {t.lower(): v for t, v in cfg.items()}
for notifier in possible_notifiers:
ntype = notifier.type.lower()
if ntype in formatted_config:
try:
notifier.config(formatted_config[ntype])
configured_notifiers[ntype] = notifier
statsd_counter[ntype] = statsd.get_counter(notifier.statsd_name)
log.info("{} notification ready".format(ntype))
except Exception:
log.exception("config exception for {}".format(ntype))
else:
log.warn("No config data for type: {}".format(ntype))
config_with_no_notifiers = set(formatted_config.keys()) - set(configured_notifiers.keys())
# Plugins section contains only additional plugins and should not be
# considered as a separate plugin
if 'plugins' in config_with_no_notifiers:
config_with_no_notifiers.remove('plugins')
if config_with_no_notifiers:
log.warn("No notifiers found for {0}". format(", ".join(config_with_no_notifiers)))
configured_notifiers[ntype] = notifier
statsd_counter[ntype] = statsd.get_counter(notifier.statsd_name)
log.info("{} notification ready".format(ntype))
def send_notifications(notifications):

View File

@ -0,0 +1,23 @@
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from pbr import version
__all__ = [
'version_info',
'version_string'
]
version_info = version.VersionInfo('monasca-notification')
version_string = version_info.version_string()

View File

@ -1,9 +1,12 @@
# The order of packages is significant, because pip processes them in the order
# of appearance. Changing the order has an impact on the overall integration
# process, which may cause wedges in the gate later.
pbr!=2.1.0,>=2.0.0 # Apache-2.0
pbr>=2.0.0,!=2.1.0 # Apache-2.0
debtcollector>=1.2.0 # Apache-2.0
monasca-statsd>=1.1.0 # Apache-2.0
requests>=2.14.2 # Apache-2.0
PyYAML>=3.10.0 # MIT
PyYAML>=3.10 # MIT
six>=1.9.0 # MIT
monasca-common>=1.4.0 # Apache-2.0
oslo.config>=4.6.0 # Apache-2.0
oslo.log>=3.30.0 # Apache-2.0

View File

@ -13,8 +13,10 @@ home-page = https://github.com/stackforge/monasca-notification
license = Apache
[entry_points]
console_scripts =
console_scripts =
monasca-notification = monasca_notification.main:main
oslo.config.opts =
monasca_notification = monasca_notification.conf:list_opts
[files]
packages = monasca_notification

View File

@ -2,14 +2,16 @@
# of appearance. Changing the order has an impact on the overall integration
# process, which may cause wedges in the gate later.
# Hacking already pins down pep8, pyflakes and flake8
bandit>=1.1.0 # Apache-2.0
Babel>=2.3.4,!=2.4.0 # BSD
hacking!=0.13.0,<0.14,>=0.12.0 # Apache-2.0
coverage!=4.4,>=4.0 # Apache-2.0
mock>=2.0 # BSD
funcsigs>=0.4;python_version=='2.7' or python_version=='2.6' # Apache-2.0
os-testr>=0.8.0 # Apache-2.0
coverage>=4.0,!=4.4 # Apache-2.0
mock>=2.0.0 # BSD
funcsigs>=1.0.0;python_version=='2.7' or python_version=='2.6' # Apache-2.0
os-testr>=1.0.0 # Apache-2.0
oslotest>=1.10.0 # Apache-2.0
testrepository>=0.0.18 # Apache-2.0/BSD
SQLAlchemy!=1.1.5,!=1.1.6,!=1.1.7,!=1.1.8,>=1.0.10 # MIT
SQLAlchemy>=1.0.10,!=1.1.5,!=1.1.6,!=1.1.7,!=1.1.8 # MIT
PyMySQL>=0.7.6 # MIT License
psycopg2>=2.5 # LGPL/ZPL
psycopg2>=2.6.2 # LGPL/ZPL

83
tests/base.py Normal file
View File

@ -0,0 +1,83 @@
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import fixtures
import mock
from oslo_config import cfg
from oslo_config import fixture as oo_cfg
from oslotest import base as oslotest_base
from monasca_notification import conf
from monasca_notification import config
class DisableStatsdFixture(fixtures.Fixture):
def setUp(self):
super(DisableStatsdFixture, self).setUp()
statsd_patch = mock.patch('monascastatsd.Connection')
statsd_patch.start()
self.addCleanup(statsd_patch.stop)
class ConfigFixture(oo_cfg.Config):
"""Mocks configuration"""
def __init__(self):
super(ConfigFixture, self).__init__(config.CONF)
def setUp(self):
super(ConfigFixture, self).setUp()
self.addCleanup(self._clean_config_loaded_flag)
conf.register_opts()
# prevent test from trying to load the yaml file
config.parse_args(argv=[], no_yaml=True)
@staticmethod
def _clean_config_loaded_flag():
config._CONF_LOADED = False
class BaseTestCase(oslotest_base.BaseTestCase):
def setUp(self):
super(BaseTestCase, self).setUp()
self.useFixture(ConfigFixture())
self.useFixture(DisableStatsdFixture())
@staticmethod
def conf_override(**kw):
"""Override flag variables for a test."""
group = kw.pop('group', None)
for k, v in kw.items():
cfg.CONF.set_override(k, v, group)
@staticmethod
def conf_default(**kw):
"""Override flag variables for a test."""
group = kw.pop('group', None)
for k, v in kw.items():
cfg.CONF.set_default(k, v, group)
class PluginTestCase(BaseTestCase):
register_opts = None
def setUp(self, register_opts=None):
super(PluginTestCase, self).setUp()
if register_opts:
register_opts(conf.CONF)

View File

@ -0,0 +1,125 @@
kafka:
url: 127.0.0.1:9092
group: a
alarm_topic: b
notification_topic: c
notification_retry_topic: d
periodic:
60: e
max_offset_lag: 666
database:
repo_driver: mysql
orm:
url: 'postgres://a:b@127.0.0.1:9999/goo'
mysql:
host: 100.99.100.99
port: 3306
user: goku
passwd: kame-ha-me-ha
db: planet_vegeta
postgresql:
user: goku
password: kame-ha-me-ha
database: planet_vegeta
port: 9999
host: 100.10.100.10
notification_types:
plugins:
- monasca_notification.plugins.hipchat_notifier:HipChatNotifier
- monasca_notification.plugins.slack_notifier:SlackNotifier
- monasca_notification.plugins.jira_notifier:JiraNotifier
email:
server: 127.0.0.1
port: 25
user:
password:
timeout: 60
from_addr: root@localhost
grafana_url: 'http://127.0.0.1:3000'
webhook:
timeout: 123
pagerduty:
timeout: 231
url: "https://a.b.c/d/e/f.json"
hipchat:
timeout: 512
ca_certs: "/a.crt"
insecure: True
proxy: https://myproxy.corp.invalid:9999
slack:
timeout: 512
ca_certs: "/a.crt"
insecure: True
proxy: https://myproxy.corp.invalid:9999
jira:
user: username
password: password
timeout: 666
custom_formatter: /some_yml.yml
proxy: www.example.org
processors:
alarm:
number: 666
ttl: 666
notification:
number: 666
retry:
interval: 300
max_attempts: 500
queues:
alarms_size: 1024
finished_size: 1024
notifications_size: 1024
sent_notifications_size: 1024
zookeeper:
url: 127.0.0.1:2181
notification_path: /foo/bar
notification_retry_path: /son/goku
periodic_path:
666: /bu/666_bubu
logging:
raise_exceptions: False
version: 1
disable_existing_loggers: False
formatters:
default:
format: "%(asctime)s %(levelname)s %(name)s %(message)s"
handlers:
console:
class: logging.StreamHandler
formatter: default
file:
class : logging.handlers.RotatingFileHandler
filename: /tmp/notification.log
formatter: default
maxBytes: 10485760 # Rotate at file size ~10MB
backupCount: 5 # Keep 5 older logs around
loggers:
kazoo:
level: WARN
kafka:
level: WARN
statsd:
level: WARN
root:
handlers:
- console
level: DEBUG
statsd:
host: 'localhost'
port: 8125

View File

@ -1,4 +1,5 @@
# (C) Copyright 2014-2016 Hewlett Packard Enterprise Development LP
# Copyright 2017 Fujitsu LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -19,17 +20,19 @@ import collections
import json
import mock
import time
import unittest
from monasca_notification import notification as m_notification
from monasca_notification.processors import alarm_processor
from tests import base
alarm_tuple = collections.namedtuple('alarm_tuple', ['offset', 'message'])
message_tuple = collections.namedtuple('message_tuple', ['value'])
class TestAlarmProcessor(unittest.TestCase):
class TestAlarmProcessor(base.BaseTestCase):
def setUp(self):
super(TestAlarmProcessor, self).setUp()
self.trap = []
def _create_raw_alarm(self, partition, offset, message):
@ -55,16 +58,13 @@ class TestAlarmProcessor(unittest.TestCase):
mock_mysql.cursor.return_value = mock_mysql
mock_mysql.__iter__.return_value = sql_response
config = {'mysql': {'ssl': None,
'host': 'mysql_host',
'port': 'mysql_port',
'user': 'mysql_user',
'db': 'dbname',
'passwd': 'mysql_passwd'},
'statsd': {'host': 'localhost',
'port': 8125}}
processor = alarm_processor.AlarmProcessor(600, config)
self.conf_override(group='mysql', ssl=None,
host='localhost', port='3306',
user='mysql_user', db='dbname',
passwd='mysql_passwd')
self.conf_override(group='statsd', host='localhost',
port=8125)
processor = alarm_processor.AlarmProcessor()
return processor.to_notification(alarm)

226
tests/test_config.py Normal file
View File

@ -0,0 +1,226 @@
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_config import cfg
from oslo_utils import importutils
import yaml
from monasca_notification import conf
from monasca_notification import config
from tests import base
class TestConfig(base.BaseTestCase):
@mock.patch('monasca_notification.config.conf')
def test_should_load_deprecated_yaml(self, conf):
fake_config = """
sayians:
- goku
- vegeta
"""
yaml_config = self.create_tempfiles(
files=[('notification', fake_config)],
ext='.yml'
)[0]
self.conf_override(yaml_config=yaml_config)
config.set_from_yaml()
fake_yaml_config = {
'sayians': ['goku', 'vegeta']
}
conf.load_from_yaml.assert_called_once_with(fake_yaml_config)
@mock.patch('monasca_notification.config.conf')
def test_should_not_load_deprecated_yaml(self, conf):
config.set_from_yaml()
conf.load_from_yaml.assert_not_called()
class TestYamlOverriding(base.BaseTestCase):
# TOP_LEVEL keys represents old groups in YAML file
VERIFIERS = {
'statsd': {
'groups': [
('statsd', {
'host': 'localhost',
'port': 8125
})
]
},
'retry': {
'groups': [
('retry_engine', {
'interval': 300,
'max_attempts': 500
})
]
},
'queues': {
'groups': [
('queues', {
'alarms_size': 1024,
'finished_size': 1024,
'notifications_size': 1024,
'sent_notifications_size': 1024
})
]
},
'zookeeper': {
'groups': [
('zookeeper', {
'url': ['127.0.0.1:2181'],
'notification_path': '/foo/bar',
'periodic_path': {
666: '/bu/666_bubu'
},
})
]
},
'kafka': {
'groups': [
('kafka', {
'url': ['127.0.0.1:9092'],
'group': 'a',
'alarm_topic': 'b',
'notification_topic': 'c',
'notification_retry_topic': 'd',
'periodic': {
60: 'e'
},
'max_offset_lag': 666
})
]
},
'processors': {
'groups': [
('alarm_processor', {'number': 666, 'ttl': 666}),
('notification_processor', {'number': 666})
]
},
'postgresql': {
'groups': [
('postgresql', {
'host': '100.10.100.10',
'port': 9999,
'user': 'goku',
'password': 'kame-ha-me-ha',
'database': 'planet_vegeta'
})
]
},
'mysql': {
'groups': [
('mysql', {
'host': '100.99.100.99',
'port': 3306,
'user': 'goku',
'passwd': 'kame-ha-me-ha',
'db': 'planet_vegeta',
'ssl': {}
})
]
},
'database': {
'groups': [
('database', {'repo_driver': importutils.import_class(
'monasca_notification.common.repositories.mysql.'
'mysql_repo.MysqlRepo')}),
('orm', {'url': 'postgres://a:b@127.0.0.1:9999/goo'})
]
},
'notification_types': {
'groups': [
('notification_types', {
'enabled': [
'monasca_notification.plugins.hipchat_notifier:HipChatNotifier',
'monasca_notification.plugins.slack_notifier:SlackNotifier',
'monasca_notification.plugins.jira_notifier:JiraNotifier',
'monasca_notification.plugins.email_notifier:EmailNotifier',
'monasca_notification.plugins.pagerduty_notifier:PagerdutyNotifier',
'monasca_notification.plugins.webhook_notifier:WebhookNotifier',
]
}),
('email_notifier', {
'server': '127.0.0.1',
'port': 25,
'user': None,
'password': None,
'timeout': 60,
'from_addr': 'root@localhost',
'grafana_url': 'http://127.0.0.1:3000'
}),
('webhook_notifier', {'timeout': 123}),
('pagerduty_notifier', {
'timeout': 231,
'url': 'https://a.b.c/d/e/f.json'
}),
('hipchat_notifier', {
'timeout': 512,
'ca_certs': "/a.crt",
'insecure': True,
'proxy': 'https://myproxy.corp.invalid:9999'
}),
('slack_notifier', {
'timeout': 512,
'ca_certs': "/a.crt",
'insecure': True,
'proxy': 'https://myproxy.corp.invalid:9999'
}),
('jira_notifier', {
'user': 'username',
'password': 'password',
'timeout': 666,
'custom_formatter': '/some_yml.yml',
'proxy': 'www.example.org'
})
]
}
}
def setUp(self):
super(TestYamlOverriding, self).setUp()
self.yaml_config = yaml.safe_load(
open('tests/resources/notification.yaml', 'rb')
)
def test_overriding(self):
conf.load_from_yaml(yaml_config=self.yaml_config, conf=config.CONF)
opts = config.CONF
for group in self.VERIFIERS.keys():
verifier_details = self.VERIFIERS[group]
groups = verifier_details['groups']
for opt_group, opt_values in groups:
for key, value in opt_values.items():
try:
opt_value = opts[opt_group][key]
except (cfg.NoSuchOptError, cfg.NoSuchGroupError) as ex:
self.fail(str(ex))
else:
msg = ('%s not overridden in group %s'
% (key, opt_group))
if (isinstance(value, list) and
isinstance(opt_value, list)):
for v in value:
self.assertIn(v, opt_value, msg)
continue
self.assertEqual(value, opt_value, msg)

View File

@ -1,5 +1,6 @@
# coding=utf-8
# (C) Copyright 2014-2016 Hewlett Packard Enterprise Development LP
# Copyright 2017 Fujitsu LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -20,7 +21,6 @@ import mock
import smtplib
import socket
import time
import unittest
import six
@ -34,6 +34,7 @@ else:
from monasca_notification.notification import Notification
from monasca_notification.plugins import email_notifier
from tests import base
UNICODE_CHAR = six.unichr(2344)
UNICODE_CHAR_ENCODED = UNICODE_CHAR.encode("utf-8")
@ -113,20 +114,16 @@ class smtpStubException(object):
raise smtplib.SMTPServerDisconnected
class TestEmail(unittest.TestCase):
class TestEmail(base.PluginTestCase):
def setUp(self):
super(TestEmail, self).setUp(email_notifier.register_opts)
self.trap = []
self.email_config = {'server': 'my.smtp.server',
'port': 25,
'user': None,
'password': None,
'timeout': 60,
'from_addr': 'hpcs.mon@hp.com',
'grafana_url': 'http://127.0.0.1:3000'}
def tearDown(self):
pass
self.conf_override(group='email_notifier', server='my.smtp.server',
port=25, user=None,
password=None, timeout=60,
from_addr='hpcs.mon@hp.com',
grafana_url='http://127.0.0.1:3000')
def _smtpStub(self, *arg, **kwargs):
return smtpStub(self.trap)
@ -143,7 +140,6 @@ class TestEmail(unittest.TestCase):
mock_log.error = self.trap.append
email = email_notifier.EmailNotifier(mock_log)
email.config(self.email_config)
alarm_dict = alarm(metric)
@ -269,7 +265,7 @@ class TestEmail(unittest.TestCase):
email = email_notifier.EmailNotifier(mock_log)
email.config(self.email_config)
email.config()
alarm_dict = alarm(metrics)
@ -313,7 +309,7 @@ class TestEmail(unittest.TestCase):
email = email_notifier.EmailNotifier(mock_log)
email.config(self.email_config)
email.config()
del self.trap[:]
@ -324,8 +320,7 @@ class TestEmail(unittest.TestCase):
email_result = email.send_notification(notification)
self.assertFalse(email_result)
self.assertIn("Connecting to Email Server {}"
.format(self.email_config['server']),
self.assertIn("Connecting to Email Server my.smtp.server",
self.trap)
@mock.patch('monasca_notification.plugins.email_notifier.smtplib')
@ -358,7 +353,7 @@ class TestEmail(unittest.TestCase):
email = email_notifier.EmailNotifier(mock_log)
email.config(self.email_config)
email.config()
alarm_dict = alarm(metrics)
@ -398,7 +393,7 @@ class TestEmail(unittest.TestCase):
email = email_notifier.EmailNotifier(mock_log)
email.config(self.email_config)
email.config()
alarm_dict = alarm(metrics)
@ -438,7 +433,7 @@ class TestEmail(unittest.TestCase):
email = email_notifier.EmailNotifier(mock_log)
email.config(self.email_config)
email.config()
alarm_dict = alarm(metrics)
@ -471,7 +466,6 @@ class TestEmail(unittest.TestCase):
mock_smtp.SMTPException = smtplib.SMTPException
email = email_notifier.EmailNotifier(mock_log)
email.config(self.email_config)
# Create alarm timestamp and timestamp for 'from' and 'to' dates in milliseconds.
alarm_date = datetime.datetime(2017, 6, 7, 18, 0)

View File

@ -13,12 +13,12 @@
import json
import mock
import unittest
import six
from monasca_notification import notification as m_notification
from monasca_notification.plugins import hipchat_notifier
from tests import base
if six.PY2:
import Queue as queue
@ -47,12 +47,15 @@ class requestsResponse(object):
self.status_code = status
class TestHipchat(unittest.TestCase):
class TestHipchat(base.PluginTestCase):
def setUp(self):
super(TestHipchat, self).setUp(hipchat_notifier.register_opts)
self.conf_default(group='hipchat_notifier', timeout=50)
self.trap = queue.Queue()
self.hipchat_config = {'timeout': 50}
def tearDown(self):
super(TestHipchat, self).tearDown()
self.assertTrue(self.trap.empty())
def _http_post_200(self, url, data, **kwargs):
@ -72,7 +75,7 @@ class TestHipchat(unittest.TestCase):
hipchat = hipchat_notifier.HipChatNotifier(mock_log)
hipchat.config(self.hipchat_config)
hipchat.config()
metric = []
metric_data = {'dimensions': {'hostname': 'foo1', 'service': 'bar1'}}

View File

@ -11,11 +11,11 @@
# the License.
import mock
from oslotest import base
import six
from monasca_notification import notification as m_notification
from monasca_notification.plugins import jira_notifier
from tests import base
if six.PY2:
import Queue as queue
@ -64,7 +64,7 @@ class RequestsResponse(object):
self.status_code = status
class TestJira(base.BaseTestCase):
class TestJira(base.PluginTestCase):
default_address = 'http://test.jira:3333/?project=MyProject' \
'&component=MyComponent'
@ -72,7 +72,14 @@ class TestJira(base.BaseTestCase):
issue_status_resolved = 'resolved'
def setUp(self):
super(TestJira, self).setUp()
super(TestJira, self).setUp(
jira_notifier.register_opts
)
self.conf_override(
group='jira_notifier',
user='username',
password='password'
)
self._trap = queue.Queue()
@ -85,17 +92,12 @@ class TestJira(base.BaseTestCase):
self._jr = jira_notifier.JiraNotifier(mock_log)
self._jira_config = {'user': 'username',
'password': 'password'}
@mock.patch('monasca_notification.plugins.jira_notifier.jira')
def _notify(self,
transitions_value,
issue_status,
jira_config,
address,
mock_jira):
self._jr.config(jira_config)
alarm_dict = alarm()
mock_jira_obj = mock.Mock()
@ -117,7 +119,6 @@ class TestJira(base.BaseTestCase):
def test_send_notification_issue_status_resolved(self):
mock_jira, mock_jira_obj, result = self._notify(TestJira.default_transitions_value,
TestJira.issue_status_resolved,
self._jira_config,
TestJira.default_address)
self.assertTrue(result)
mock_jira_obj.create_issue.assert_not_called()
@ -130,7 +131,6 @@ class TestJira(base.BaseTestCase):
issue_status = 'closed'
mock_jira, mock_jira_obj, result = self._notify(TestJira.default_transitions_value,
issue_status,
self._jira_config,
TestJira.default_address)
self.assertTrue(result)
mock_jira_obj.create_issue.assert_not_called()
@ -143,7 +143,6 @@ class TestJira(base.BaseTestCase):
issue_status = 'progress'
mock_jira, mock_jira_obj, result = self._notify(TestJira.default_transitions_value,
issue_status,
self._jira_config,
TestJira.default_address)
self.assertTrue(result)
mock_jira_obj.create_issue.assert_not_called()
@ -153,7 +152,6 @@ class TestJira(base.BaseTestCase):
transitions_value = [[{'id': 100, 'name': 'not open'}]]
mock_jira, mock_jira_obj, result = self._notify(transitions_value,
TestJira.issue_status_resolved,
self._jira_config,
TestJira.default_address)
self.assertTrue(result)
mock_jira_obj.create_issue.assert_not_called()
@ -163,7 +161,6 @@ class TestJira(base.BaseTestCase):
issue_status = None
mock_jira, mock_jira_obj, result = self._notify(TestJira.default_transitions_value,
issue_status,
self._jira_config,
TestJira.default_address)
self.assertTrue(result)
mock_jira_obj.transitions.assert_not_called()
@ -174,7 +171,6 @@ class TestJira(base.BaseTestCase):
address = 'http://test.jira:3333/?project=MyProject'
mock_jira, mock_jira_obj, result = self._notify(TestJira.default_transitions_value,
issue_status,
self._jira_config,
address)
self.assertTrue(result)
self.assertEqual(issue(component=False), mock_jira_obj.create_issue.call_args[1])
@ -183,18 +179,18 @@ class TestJira(base.BaseTestCase):
def test_send_notification_create_jira_object_args(self):
mock_jira, mock_jira_obj, result = self._notify(TestJira.default_transitions_value,
TestJira.issue_status_resolved,
self._jira_config,
TestJira.default_address)
self.assertEqual('http://test.jira:3333/', mock_jira.JIRA.call_args[0][0])
self.assertEqual(('username', 'password'), mock_jira.JIRA.call_args[1].get('basic_auth'))
self.assertEqual(None, mock_jira.JIRA.call_args[1].get('proxies'))
def test_send_notification_with_proxy(self):
jira_config = self._jira_config
jira_config.update({'proxy': 'http://yourid:password@proxyserver:8080'})
self.conf_override(
proxy='http://yourid:password@proxyserver:8080',
group='jira_notifier'
)
mock_jira, mock_jira_obj, result = self._notify(TestJira.default_transitions_value,
TestJira.issue_status_resolved,
jira_config,
TestJira.default_address)
self.assertTrue(result)
self.assertEqual({'https': 'http://yourid:password@proxyserver:8080'},
@ -202,45 +198,46 @@ class TestJira(base.BaseTestCase):
def test_send_notification_custom_config_success(self):
issue_status = None
jira_config = self._jira_config
jira_config.update(
{'custom_formatter': 'tests/resources/test_jiraformat.yml'})
self.conf_override(
custom_formatter='tests/resources/test_jiraformat.yml',
group='jira_notifier'
)
mock_jira, mock_jira_obj, result = self._notify(TestJira.default_transitions_value,
issue_status,
jira_config,
TestJira.default_address)
self.assertTrue(result)
mock_jira_obj.transitions.assert_not_called()
self.assertEqual(issue(custom_config=True), mock_jira_obj.create_issue.call_args[1])
def test_send_notification_custom_config_failed(self):
jira_config = self._jira_config
jira_config.update(
{'custom_formatter': 'tests/resources/test_jiraformat_without_summary.yml'})
self.conf_override(
custom_formatter='tests/resources/test_jiraformat_without_summary.yml',
group='jira_notifier'
)
mock_jira, mock_jira_obj, result = self._notify(TestJira.default_transitions_value,
TestJira.issue_status_resolved,
jira_config,
TestJira.default_address)
self.assertFalse(result)
def test_send_notification_custom_config_without_comments(self):
jira_config = self._jira_config
jira_config.update(
{'custom_formatter': 'tests/resources/test_jiraformat_without_comments.yml'})
self.conf_override(
custom_formatter='tests/resources/test_jiraformat_without_comments.yml',
group='jira_notifier'
)
mock_jira, mock_jira_obj, result = self._notify(TestJira.default_transitions_value,
TestJira.issue_status_resolved,
jira_config,
TestJira.default_address)
self.assertTrue(result)
mock_jira_obj.add_comment.assert_not_called()
def test_send_notification_custom_config_exception(self):
jira_config = self._jira_config
jira_config.update({'custom_formatter': 'tests/resources/not_exist_file.yml'})
self.conf_override(
custom_formatter='tests/resources/not_exist_file.yml',
group='jira_notifier'
)
self.assertRaises(Exception, self._notify,
TestJira.default_transitions_value,
TestJira.issue_status_resolved,
jira_config,
TestJira.default_address)
def test_type(self):
@ -248,6 +245,3 @@ class TestJira(base.BaseTestCase):
def test_statsd_name(self):
self.assertEqual('jira_notifier', self._jr.statsd_name)
def test_config_exception(self):
self.assertRaises(Exception, self._jr.config, {})

View File

@ -1,4 +1,5 @@
# (C) Copyright 2015 Hewlett Packard Enterprise Development LP
# Copyright 2017 Fujitsu LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -14,15 +15,21 @@
# limitations under the License.
import mock
import unittest
import pymysql
from monasca_notification.common.repositories import exceptions as exc
from monasca_notification.common.repositories.mysql import mysql_repo
from tests import base
class TestMySqlRepo(unittest.TestCase):
class TestMySqlRepo(base.BaseTestCase):
def setUp(self):
super(TestMySqlRepo, self).setUp()
self.conf_default(group='mysql', host='localhost',
port=3306, user='bar',
passwd='1', db='2')
@mock.patch('monasca_notification.common.repositories.mysql.mysql_repo.pymysql')
def testReconnect(self, mock_mysql):
m = mock.MagicMock()
@ -32,13 +39,7 @@ class TestMySqlRepo(unittest.TestCase):
mock_mysql.connect.return_value = m
mock_mysql.Error = pymysql.Error
config = {'mysql': {'host': 'foo',
'port': '3306',
'user': 'bar',
'passwd': '1',
'db': '2'}}
repo = mysql_repo.MysqlRepo(config)
repo = mysql_repo.MysqlRepo(base.config.CONF)
alarm = {'alarmDefinitionId': 'foo',
'newState': 'bar'}

View File

@ -1,4 +1,5 @@
# (C) Copyright 2014-2016 Hewlett Packard Enterprise Development LP
# Copyright 2017 Fujitsu LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -17,10 +18,11 @@
import mock
import time
import unittest
from monasca_notification import notification as m_notification
from monasca_notification.processors import notification_processor
from monasca_notification.plugins import email_notifier
from monasca_notification.processors import notification_processor as np
from tests import base
class smtpStub(object):
@ -36,29 +38,23 @@ class requestsResponse(object):
self.status_code = status
class TestNotificationProcessor(unittest.TestCase):
class TestNotificationProcessor(base.BaseTestCase):
def setUp(self):
super(TestNotificationProcessor, self).setUp()
self.trap = []
self.email_config = {'server': 'my.smtp.server',
'port': 25,
'user': None,
'password': None,
'timeout': 60,
'from_addr': 'hpcs.mon@hp.com'}
self.mysql_config = {'ssl': None,
'host': 'mysql_host',
'port': 'mysql_port',
'user': 'mysql_user',
'db': 'dbname',
'passwd': 'mysql_passwd'}
email_notifier.register_opts(base.config.CONF)
self.statsd_config = {'host': 'localhost',
'port': 8125}
self.conf_default(group='email_notifier', server='my.smtp.server',
port=25, user=None, password=None,
timeout=60, from_addr='hpcs.mon@hp.com')
self.conf_default(group='mysql', ssl=None, host='localhost',
port='3306', user='mysql_user', db='dbname',
passwd='mysql_passwd')
self.conf_default(group='statsd', host='localhost', port=8125)
def tearDown(self):
pass
self.conf_default(group='notification_types', enabled=[])
# ------------------------------------------------------------------------
# Test helper functions
@ -70,19 +66,15 @@ class TestNotificationProcessor(unittest.TestCase):
def _start_processor(self, notifications, mock_log, mock_smtp, mock_statsd, mock_pymsql):
"""Start the processor with the proper mocks
"""
# Since the log runs in another thread I can mock it directly, instead change the methods to put to a queue
# Since the log runs in another thread I can mock it directly,
# instead change the methods to put to a queue
mock_log.warn = self.trap.append
mock_log.error = self.trap.append
mock_smtp.SMTP = self._smtpStub
config = {}
config["email"] = self.email_config
config["mysql"] = self.mysql_config
config["statsd"] = self.statsd_config
config["notification_types"] = {}
processor = (notification_processor.NotificationProcessor(config))
np.NotificationProcessor.insert_configured_plugins = mock.Mock()
processor = np.NotificationProcessor()
processor.send(notifications)
def _smtpStub(self, *arg, **kwargs):

View File

@ -1,4 +1,5 @@
# (C) Copyright 2015-2016 Hewlett Packard Enterprise Development LP
# Copyright 2017 Fujitsu LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -16,10 +17,13 @@
import contextlib
import mock
import time
import unittest
from monasca_notification import notification as m_notification
from monasca_notification.plugins import email_notifier
from monasca_notification.plugins import pagerduty_notifier
from monasca_notification.plugins import webhook_notifier
from monasca_notification.types import notifiers
from tests import base
def alarm(metrics):
@ -112,18 +116,23 @@ class StatsdCounter(object):
self.counter += val
class TestInterface(unittest.TestCase):
class TestInterface(base.BaseTestCase):
def setUp(self):
super(TestInterface, self).setUp()
self.trap = []
self.statsd = Statsd()
self.email_config = {'server': 'my.smtp.server',
'port': 25,
'user': None,
'password': None,
'timeout': 60,
'from_addr': 'hpcs.mon@hp.com'}
email_notifier.register_opts(base.config.CONF)
webhook_notifier.register_opts(base.config.CONF)
pagerduty_notifier.register_opts(base.config.CONF)
self.conf_override(group='email_notifier', server='my.smtp.server',
port=25, user=None, password=None,
timeout=60, from_addr='hpcs.mon@hp.com')
def tearDown(self):
super(TestInterface, self).tearDown()
notifiers.possible_notifiers = []
notifiers.configured_notifiers = {}
self.trap = []
@ -143,70 +152,15 @@ class TestInterface(unittest.TestCase):
@mock.patch('monasca_notification.types.notifiers.email_notifier.smtplib')
@mock.patch('monasca_notification.types.notifiers.log')
def test_enabled_notifications(self, mock_log, mock_smtp):
config_dict = {'email': self.email_config,
'webhook': {'address': 'xyz.com'},
'pagerduty': {'address': 'xyz.com'}}
notifiers.init(self.statsd)
notifiers.config(config_dict)
notifiers.config()
notifications = notifiers.enabled_notifications()
self.assertEqual(len(notifications), 3)
self.assertEqual(sorted(notifications),
["EMAIL", "PAGERDUTY", "WEBHOOK"])
@mock.patch('monasca_notification.types.notifiers.email_notifier.smtplib')
@mock.patch('monasca_notification.types.notifiers.log')
def test_config_missing_data(self, mock_log, mock_smtp):
mock_log.warn = self.trap.append
mock_log.error = self.trap.append
mock_log.info = self.trap.append
config_dict = {'email': self.email_config,
'webhook': {'address': 'xyz.com'}}
notifiers.init(self.statsd)
notifiers.config(config_dict)
self.assertIn("No config data for type: pagerduty", self.trap)
@mock.patch('monasca_notification.types.notifiers.email_notifier')
@mock.patch('monasca_notification.types.notifiers.email_notifier.smtplib')
@mock.patch('monasca_notification.types.notifiers.log')
def test_config_exception(self, mock_log, mock_smtp, mock_email):
mock_log.warn = self.trap.append
mock_log.error = self.trap.append
mock_log.exception = self.trap.append
mock_email.EmailNotifier = self._configExceptionStub
config_dict = {'email': self.email_config,
'webhook': {'address': 'xyz.com'},
'pagerduty': {'address': 'abc'}}
notifiers.init(self.statsd)
notifiers.config(config_dict)
self.assertIn("config exception for email", self.trap)
@mock.patch('monasca_notification.types.notifiers.email_notifier.smtplib')
@mock.patch('monasca_notification.types.notifiers.log')
def test_config_correct(self, mock_log, mock_smtp):
mock_log.warn = self.trap.append
mock_log.error = self.trap.append
mock_log.info = self.trap.append
config_dict = {'email': self.email_config,
'webhook': {'address': 'xyz.com'},
'pagerduty': {'address': 'abc'}}
notifiers.init(self.statsd)
notifiers.config(config_dict)
self.assertIn("email notification ready", self.trap)
self.assertIn("webhook notification ready", self.trap)
self.assertIn("pagerduty notification ready", self.trap)
@mock.patch('monasca_notification.types.notifiers.email_notifier')
@mock.patch('monasca_notification.types.notifiers.email_notifier.smtplib')
@mock.patch('monasca_notification.types.notifiers.log')
@ -217,12 +171,8 @@ class TestInterface(unittest.TestCase):
mock_email.EmailNotifier = self._sendExceptionStub
config_dict = {'email': self.email_config,
'webhook': {'address': 'xyz.com'},
'pagerduty': {'address': 'abc'}}
notifiers.init(self.statsd)
notifiers.config(config_dict)
notifiers.config()
notifications = []
notifications.append(m_notification.Notification(0, 'email', 'email notification',
@ -242,12 +192,8 @@ class TestInterface(unittest.TestCase):
mock_email.EmailNotifier = self._sendFailureStub
config_dict = {'email': self.email_config,
'webhook': {'address': 'xyz.com'},
'pagerduty': {'address': 'abc'}}
notifiers.init(self.statsd)
notifiers.config(config_dict)
notifiers.config()
notifications = []
notifications.append(m_notification.Notification(0, 'email', 'email notification',
@ -259,36 +205,6 @@ class TestInterface(unittest.TestCase):
self.assertEqual(len(failed), 1)
self.assertEqual(invalid, [])
@mock.patch('monasca_notification.types.notifiers.email_notifier')
@mock.patch('monasca_notification.types.notifiers.email_notifier.smtplib')
@mock.patch('monasca_notification.types.notifiers.log')
def test_send_notification_unconfigured(self, mock_log, mock_smtp, mock_email):
mock_log.warn = self.trap.append
mock_log.error = self.trap.append
mock_log.info = self.trap.append
mock_email.EmailNotifier = self._sendExceptionStub
config_dict = {'email': self.email_config,
'webhook': {'address': 'xyz.com'}}
notifiers.init(self.statsd)
notifiers.config(config_dict)
self.assertIn("No config data for type: pagerduty", self.trap)
notifications = []
notifications.append(m_notification.Notification(0, 'pagerduty', 'pagerduty notification',
'me@here.com', 0, 0, alarm({})))
sent, failed, invalid = notifiers.send_notifications(notifications)
self.assertEqual(sent, [])
self.assertEqual(failed, [])
self.assertEqual(len(invalid), 1)
self.assertIn("attempting to send unconfigured notification: pagerduty", self.trap)
@mock.patch('monasca_notification.types.notifiers.time')
@mock.patch('monasca_notification.types.notifiers.email_notifier')
@mock.patch('monasca_notification.types.notifiers.email_notifier.smtplib')
@ -302,12 +218,8 @@ class TestInterface(unittest.TestCase):
mock_time.time.return_value = 42
config_dict = {'email': self.email_config,
'webhook': {'address': 'xyz.com'},
'pagerduty': {'address': 'abc'}}
notifiers.init(self.statsd)
notifiers.config(config_dict)
notifiers.config()
notifications = []
notifications.append(m_notification.Notification(0, 'email', 'email notification',
@ -335,12 +247,8 @@ class TestInterface(unittest.TestCase):
mock_email.EmailNotifier = self._goodSendStub
config_dict = {'email': self.email_config,
'webhook': {'address': 'xyz.com'},
'pagerduty': {'address': 'abc'}}
notifiers.init(self.statsd)
notifiers.config(config_dict)
notifiers.config()
notifications = []
notifications.append(m_notification.Notification(0, 'email', 'email notification',
@ -357,11 +265,16 @@ class TestInterface(unittest.TestCase):
self.assertEqual(self.statsd.counter.counter, 3)
def test_plugin_load(self):
config_dict = {"plugins": ["monasca_notification.plugins.hipchat_notifier:HipChatNotifier",
"monasca_notification.plugins.slack_notifier:SlackNotifier"]}
self.conf_override(
group='notification_types',
enabled=[
'monasca_notification.plugins.hipchat_notifier:HipChatNotifier',
'monasca_notification.plugins.slack_notifier:SlackNotifier'
]
)
notifiers.init(self.statsd)
notifiers.load_plugins(config_dict)
notifiers.load_plugins()
self.assertEqual(len(notifiers.possible_notifiers), 5)
configured_plugins = ["email", "webhook", "pagerduty", "hipchat", "slack"]
@ -371,27 +284,19 @@ class TestInterface(unittest.TestCase):
@mock.patch('monasca_notification.types.notifiers.log')
def test_invalid_plugin_load_exception_ignored(self, mock_log):
mock_log.exception = self.trap.append
config_dict = {"plugins": ["monasca_notification.plugins.hipchat_notifier:UnknownPlugin",
"monasca_notification.plugins.slack_notifier:SlackNotifier"]}
self.conf_override(
group='notification_types',
enabled=[
'monasca_notification.plugins.hipchat_notifier:UnknownPlugin',
'monasca_notification.plugins.slack_notifier:SlackNotifier'
]
)
notifiers.init(self.statsd)
notifiers.load_plugins(config_dict)
notifiers.load_plugins()
self.assertEqual(len(notifiers.possible_notifiers), 4)
self.assertEqual(len(self.trap), 1)
configured_plugins = ["email", "webhook", "pagerduty", "slack"]
for plugin in notifiers.configured_notifiers:
self.asssertIn(plugin.type in configured_plugins)
@mock.patch('monasca_notification.types.notifiers.log')
def test_no_plugins_keyword_in_possible_notifiers(self, mock_log):
mock_log.warning = self.trap.append
config_dict = {"plugins": ["monasca_notification.plugins.slack_notifier:SlackNotifier"],
"fake_notifier": ["monasca_notification.plugins.fake_notifier:FakeNotifier"],
"slack": {"timeout": 5, "ca_certs": "/etc/ssl/certs/ca-certificates.crt", "insecure": False}
}
notifiers.init(self.statsd)
notifiers.possible_notifiers = []
notifiers.load_plugins(config_dict)
notifiers.config(config_dict)
self.assertEqual('No notifiers found for fake_notifier', mock_log.warn.call_args[0][0])

View File

@ -13,17 +13,18 @@
import mock
from monasca_notification.common.repositories.orm import orm_repo
from oslotest import base
from tests import base
class TestOrmRepo(base.BaseTestCase):
@mock.patch('monasca_notification.common.repositories.orm.orm_repo.engine_from_config')
def setUp(self, mock_sql_engine_from_config):
super(TestOrmRepo, self).setUp()
config = {'database':
{'orm':
{'url': 'mysql+pymysql://user:password@hostname:3306/mon'}}}
self._rep = orm_repo.OrmRepo(config)
self.conf_default(
group='orm', url='mysql+pymysql://user:password@hostname:3306/mon'
)
self._rep = orm_repo.OrmRepo()
self.mock_conn = \
self._rep._orm_engine.connect.return_value.__enter__.return_value

View File

@ -1,4 +1,5 @@
# (C) Copyright 2014-2016 Hewlett Packard Enterprise Development LP
# Copyright 2017 Fujitsu LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -17,12 +18,12 @@ import json
import mock
import requests
import time
import unittest
import six
from monasca_notification import notification as m_notification
from monasca_notification.plugins import pagerduty_notifier
from tests import base
if six.PY2:
import Queue as queue
@ -49,12 +50,18 @@ class requestsResponse(object):
self.status_code = status
class TestWebhook(unittest.TestCase):
class TestPagerduty(base.PluginTestCase):
def setUp(self):
super(TestPagerduty, self).setUp(
pagerduty_notifier.register_opts
)
self.conf_override(group='pagerduty_notifier', timeout=50)
self.trap = queue.Queue()
self.pagerduty_config = {'timeout': 50, 'key': 'foobar'}
def tearDown(self):
super(TestPagerduty, self).tearDown()
self.assertTrue(self.trap.empty())
def _http_post_200(self, url, data, headers, **kwargs):
@ -142,8 +149,6 @@ class TestWebhook(unittest.TestCase):
pagerduty = pagerduty_notifier.PagerdutyNotifier(mock_log)
pagerduty.config(self.pagerduty_config)
metric = []
metric_data = {'dimensions': {'hostname': 'foo1', 'service': 'bar1'}}
metric.append(metric_data)
@ -275,7 +280,5 @@ class TestWebhook(unittest.TestCase):
self.assertRegexpMatches(results, "Exception on pagerduty request")
self.assertRegexpMatches(results, "key=<ABCDEF>")
self.assertRaises(requests.exceptions.Timeout)
return_value = self.trap.get()
self.assertFalse(return_value)

View File

@ -14,12 +14,11 @@
import json
import mock
from oslotest import base
import six
from monasca_notification import notification as m_notification
from monasca_notification.plugins import slack_notifier
from tests import base
if six.PY2:
import Queue as queue
@ -70,9 +69,16 @@ class RequestsResponse(object):
return json.loads(self.text)
class TestSlack(base.BaseTestCase):
class TestSlack(base.PluginTestCase):
def setUp(self):
super(TestSlack, self).setUp()
super(TestSlack, self).setUp(
slack_notifier.register_opts
)
self.conf_default(group='slack_notifier', timeout=50,
ca_certs='/etc/ssl/certs/ca-bundle.crt',
proxy='http://yourid:password@proxyserver:8080',
insecure=False)
self._trap = queue.Queue()
@ -85,17 +91,10 @@ class TestSlack(base.BaseTestCase):
self._slk = slack_notifier.SlackNotifier(mock_log)
slack_notifier.SlackNotifier._raw_data_url_caches = []
self._slack_config = {'timeout': 50,
'ca_certs': '/etc/ssl/certs/ca-bundle.crt',
'proxy': 'http://yourid:password@proxyserver:8080',
'insecure': False}
@mock.patch('monasca_notification.plugins.slack_notifier.requests')
def _notify(self, response_list, slack_config, mock_requests):
def _notify(self, response_list, mock_requests):
mock_requests.post = mock.Mock(side_effect=response_list)
self._slk.config(slack_config)
metric = []
metric_data = {'dimensions': {'hostname': 'foo1', 'service': 'bar1'}}
metric.append(metric_data)
@ -123,7 +122,7 @@ class TestSlack(base.BaseTestCase):
"""
response_list = [RequestsResponse(200, 'ok',
{'Content-Type': 'application/text'})]
mock_method, result = self._notify(response_list, self._slack_config)
mock_method, result = self._notify(response_list)
self.assertTrue(result)
mock_method.assert_called_once()
self._validate_post_args(mock_method.call_args_list[0][1], 'json')
@ -137,7 +136,7 @@ class TestSlack(base.BaseTestCase):
{'Content-Type': 'application/text'}),
RequestsResponse(200, '{"ok":false,"error":"failure"}',
{'Content-Type': 'application/json'})]
mock_method, result = self._notify(response_list, self._slack_config)
mock_method, result = self._notify(response_list)
self.assertFalse(result)
self._validate_post_args(mock_method.call_args_list[0][1], 'json')
self._validate_post_args(mock_method.call_args_list[1][1], 'data')
@ -150,7 +149,7 @@ class TestSlack(base.BaseTestCase):
{'Content-Type': 'application/json'}),
RequestsResponse(200, '{"ok":true}',
{'Content-Type': 'application/json'})]
mock_method, result = self._notify(response_list, self._slack_config)
mock_method, result = self._notify(response_list)
self.assertTrue(result)
self._validate_post_args(mock_method.call_args_list[0][1], 'json')
self._validate_post_args(mock_method.call_args_list[1][1], 'data')
@ -165,7 +164,7 @@ class TestSlack(base.BaseTestCase):
['http://test.slack:3333']):
response_list = [RequestsResponse(200, '{"ok":true}',
{'Content-Type': 'application/json'})]
mock_method, result = self._notify(response_list, self._slack_config)
mock_method, result = self._notify(response_list)
self.assertTrue(result)
mock_method.assert_called_once()
self._validate_post_args(mock_method.call_args_list[0][1], 'data')
@ -180,7 +179,7 @@ class TestSlack(base.BaseTestCase):
['http://test.slack:3333']):
response_list = [RequestsResponse(200, '{"ok":false,"error":"failure"}',
{'Content-Type': 'application/json'})]
mock_method, result = self._notify(response_list, self._slack_config)
mock_method, result = self._notify(response_list)
self.assertFalse(result)
mock_method.assert_called_once()
self._validate_post_args(mock_method.call_args_list[0][1], 'data')
@ -190,9 +189,13 @@ class TestSlack(base.BaseTestCase):
def test_slack_webhook_success_only_timeout(self):
"""slack success with only timeout config
"""
self.conf_override(group='slack_notifier', timeout=50,
insecure=True, ca_certs=None,
proxy=None)
response_list = [RequestsResponse(200, 'ok',
{'Content-Type': 'application/text'})]
mock_method, result = self._notify(response_list, {'timeout': 50})
mock_method, result = self._notify(response_list)
self.assertTrue(result)
mock_method.assert_called_once()
self.assertEqual(slack_notifier.SlackNotifier._raw_data_url_caches, [])
@ -205,16 +208,6 @@ class TestSlack(base.BaseTestCase):
self.assertEqual('http://test.slack:3333', post_args.get('url'))
self.assertFalse(post_args.get('verify'))
def test_slack_exception(self):
"""exception occurs
"""
mock_method, result = self._notify(RuntimeError('exception'),
self._slack_config)
self.assertFalse(result)
self._validate_post_args(mock_method.call_args_list[0][1], 'json')
self._validate_post_args(mock_method.call_args_list[1][1], 'data')
def test_slack_reponse_400(self):
"""slack returns 400 error
"""
@ -222,7 +215,7 @@ class TestSlack(base.BaseTestCase):
{'Content-Type': 'application/json'}),
RequestsResponse(400, '{"ok":false,"error":"failure"}',
{'Content-Type': 'application/json'})]
mock_method, result = self._notify(response_list, self._slack_config)
mock_method, result = self._notify(response_list)
self.assertFalse(result)
self._validate_post_args(mock_method.call_args_list[0][1], 'json')
@ -239,7 +232,7 @@ class TestSlack(base.BaseTestCase):
{'Content-Type': 'application/json'}),
RequestsResponse(200, '{"ok":true}',
{'Content-Type': 'application/json'})]
mock_method, result = self._notify(response_list, self._slack_config)
mock_method, result = self._notify(response_list)
self.assertTrue(result)
self._validate_post_args(mock_method.call_args_list[0][1], 'json')
self._validate_post_args(mock_method.call_args_list[1][1], 'data')
@ -247,13 +240,14 @@ class TestSlack(base.BaseTestCase):
slack_notifier.SlackNotifier._raw_data_url_caches)
def test_config_insecure_true_ca_certs(self):
slack_config = {'timeout': 50,
'ca_certs': '/etc/ssl/certs/ca-bundle.crt',
'insecure': True}
self.conf_override(group='slack_notifier', timeout=50,
insecure=True,
ca_certs='/etc/ssl/certs/ca-bundle.crt')
response_list = [RequestsResponse(200, 'ok',
{'Content-Type': 'application/text'})]
mock_method, result = self._notify(response_list, slack_config)
mock_method, result = self._notify(response_list)
self.assertTrue(result)
mock_method.assert_called_once()
self.assertEqual(slack_notifier.SlackNotifier._raw_data_url_caches, [])
@ -265,12 +259,14 @@ class TestSlack(base.BaseTestCase):
self.assertEqual('/etc/ssl/certs/ca-bundle.crt', post_args.get('verify'))
def test_config_insecure_true_no_ca_certs(self):
slack_config = {'timeout': 50,
'insecure': True}
self.conf_override(group='slack_notifier', timeout=50,
insecure=True,
ca_certs=None)
response_list = [RequestsResponse(200, 'ok',
{'Content-Type': 'application/text'})]
mock_method, result = self._notify(response_list, slack_config)
mock_method, result = self._notify(response_list)
self.assertTrue(result)
mock_method.assert_called_once()
self.assertEqual(slack_notifier.SlackNotifier._raw_data_url_caches, [])
@ -282,12 +278,14 @@ class TestSlack(base.BaseTestCase):
self.assertFalse(post_args.get('verify'))
def test_config_insecure_false_no_ca_certs(self):
slack_config = {'timeout': 50,
'insecure': False}
self.conf_override(group='slack_notifier', timeout=50,
insecure=False,
ca_certs=None)
response_list = [RequestsResponse(200, 'ok',
{'Content-Type': 'application/text'})]
mock_method, result = self._notify(response_list, slack_config)
mock_method, result = self._notify(response_list)
self.assertTrue(result)
mock_method.assert_called_once()
self.assertEqual(slack_notifier.SlackNotifier._raw_data_url_caches, [])

View File

@ -1,4 +1,4 @@
# Copyright 2016 FUJITSU LIMITED
# Copyright 2016-2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -13,41 +13,45 @@
# under the License.
from mock import patch
import unittest
from monasca_notification.common import utils
from tests import base
class TestStatsdConnection(unittest.TestCase):
class TestStatsdConnection(base.BaseTestCase):
extra_dimensions = {'foo': 'bar'}
base_name = 'monasca'
def test_statsd_default_connection(self):
config = {}
with patch(
'monasca_notification.common.utils.monascastatsd.Client') as c:
utils.get_statsd_client(config)
utils.get_statsd_client()
c.assert_called_once_with(dimensions=utils.NOTIFICATION_DIMENSIONS,
name=self.base_name)
name=self.base_name,
host='127.0.0.1',
port=8125)
def test_statsd_config_connection(self):
port_number = 9999
hostname = 'www.example.org'
config = {'statsd': {'host': hostname, 'port': port_number}}
self.conf_override(group='statsd', host=hostname, port=port_number)
with patch(
'monasca_notification.common.utils.monascastatsd.Client') as c:
utils.get_statsd_client(config)
utils.get_statsd_client()
c.assert_called_once_with(dimensions=utils.NOTIFICATION_DIMENSIONS,
name=self.base_name,
port=port_number,
host=hostname)
def test_statsd_update_dimmensions(self):
config = {}
expected_dimensions = utils.NOTIFICATION_DIMENSIONS.copy()
expected_dimensions.update(self.extra_dimensions)
with patch(
'monasca_notification.common.utils.monascastatsd.Client') as c:
utils.get_statsd_client(config, dimensions=self.extra_dimensions)
utils.get_statsd_client(dimensions=self.extra_dimensions)
c.assert_called_once_with(dimensions=expected_dimensions,
name=self.base_name)
name=self.base_name,
host='127.0.0.1',
port=8125)

View File

@ -1,4 +1,5 @@
# (C) Copyright 2014-2016 Hewlett Packard Enterprise Development LP
# Copyright 2017 Fujitsu LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -15,13 +16,13 @@
import mock
import requests
import unittest
import six
import ujson as json
from monasca_notification import notification as m_notification
from monasca_notification.plugins import webhook_notifier
from tests import base
if six.PY2:
import Queue as queue
@ -50,12 +51,14 @@ class requestsResponse(object):
self.status_code = status
class TestWebhook(unittest.TestCase):
class TestWebhook(base.PluginTestCase):
def setUp(self):
super(TestWebhook, self).setUp(webhook_notifier.register_opts)
self.trap = queue.Queue()
self.webhook_config = {'timeout': 50}
def tearDown(self):
super(TestWebhook, self).tearDown()
self.assertTrue(self.trap.empty())
def _http_post_200(self, url, data, headers, **kwargs):
@ -84,8 +87,6 @@ class TestWebhook(unittest.TestCase):
webhook = webhook_notifier.WebhookNotifier(mock_log)
webhook.config(self.webhook_config)
metric = []
metric_data = {'dimensions': {'hostname': 'foo1', 'service': 'bar1'}}
metric.append(metric_data)
@ -140,6 +141,7 @@ class TestWebhook(unittest.TestCase):
"""webhook timeout exception
"""
self.conf_override(group='webhook_notifier', timeout=50)
self.notify(self._http_post_exception)
result = self.trap.get()
@ -152,7 +154,6 @@ class TestWebhook(unittest.TestCase):
self.assertNotRegexpMatches(result, "content-type.: .application/json")
self.assertRegexpMatches(result, "Error trying to post on URL http://mock:3333/")
self.assertRaises(requests.exceptions.Timeout)
return_value = self.trap.get()
self.assertFalse(return_value)

View File

@ -72,6 +72,10 @@ commands =
commands =
bandit -r monasca_notification -n5 -x monasca_notification/tests
[testenv:genconfig]
description = Generates an example of monasca-notification configuration file
commands = oslo-config-generator --config-file={toxinidir}/config-generator/notification.conf
[flake8]
max-line-length = 120
# TODO: ignored checks should be enabled in the future
@ -80,3 +84,7 @@ max-line-length = 120
# H405 multi line docstring summary not separated with an empty line
ignore = F821,H201,H202,H405
exclude=.venv,.git,.tox,dist,*egg,build
[hacking]
import_exceptions =
six.moves