enable microservice

move implementation specific configurations in server.py to
v2/reference module. Making the server.py only responsible
for loading dispatchers which are configured in monasca.conf
file. This change will enable any dispatcher to be loaded
according to the configuration file. For example, to config
two dispatchers to be served on a server, a monasca.conf
file can look like the following:

[DEFAULT]
dispatcher = v2_ref_metrics
dispatcher = v2_ref_alarms

If all the reference dispatchers should be served out of
one server, then the configuration file may look like this:

[DEFAULT]
dispatcher = v2_ref_metrics
dispatcher = v2_ref_alarms
dispatcher = v2_ref_alarm_definitions
dispatcher = v2_ref_events
dispatcher = v2_ref_transforms
dispatcher = v2_ref_notifications

One can use the configuration to load any dispatchers which
may be developed by third party. This way, to load new
dispatchers, one only needs to change the configuration file.

This patch set makes the API server a true miscroservice
server.

Change-Id: I87005f8ff4807e4c818057f1e7866001482a50e1
This commit is contained in:
Tong Li 2014-12-04 14:05:23 -05:00
parent 118848af31
commit 7eeacb686a
5 changed files with 256 additions and 161 deletions

8
etc/monasca.conf Normal file → Executable file
View File

@ -8,6 +8,14 @@ log_level = DEBUG
# Identifies the region that the Monasca API is running in.
region = na
# Dispatchers to be loaded to serve restful APIs
dispatcher = v2_ref_metrics
dispatcher = v2_ref_alarms
dispatcher = v2_ref_alarm_definitions
dispatcher = v2_ref_events
dispatcher = v2_ref_transforms
dispatcher = v2_ref_notifications
[security]
# The roles that are allowed full access to the API.
default_authorized_roles = admin

170
monasca/api/server.py Normal file → Executable file
View File

@ -18,169 +18,51 @@ import os
from wsgiref import simple_server
from oslo.config import cfg
from oslo.config import types
import paste.deploy
from stevedore import named
from monasca.common import resource_api
from monasca.openstack.common import log
DISPATCHER_NAMESPACE = 'monasca.dispatcher'
METRICS_DISPATCHER_NAMESPACE = 'monasca.metrics_dispatcher'
ALARM_DEFINITIONS_DISPATCHER_NAMESPACE = 'monasca.alarm_definitions_dispatcher'
ALARMS_DISPATCHER_NAMESPACE = 'monasca.alarms_dispatcher'
EVENTS_DISPATCHER_NAMESPACE = 'monasca.events_dispatcher'
TRANSFORMS_DISPATCHER_NAMESPACE = 'monasca.transforms_dispatcher'
NOTIFICATIONS_DISPATCHER_NAMESPACE = 'monasca.notifications_dispatcher'
OPTS = [
cfg.MultiStrOpt('dispatcher',
default=[],
help='Dispatchers to process data.'),
]
cfg.CONF.register_opts(OPTS)
LOG = log.getLogger(__name__)
global_opts = [cfg.StrOpt('region', help='Region that API is running in')]
cfg.CONF.register_opts(global_opts)
security_opts = [cfg.ListOpt('default_authorized_roles', default=['admin'],
help='Roles that are allowed full access to the '
'API'),
cfg.ListOpt('agent_authorized_roles', default=['agent'],
help='Roles that are only allowed to POST to '
'the API'),
cfg.ListOpt('delegate_authorized_roles', default=['admin'],
help='Roles that are allowed to POST metrics on '
'behalf of another tenant')]
security_group = cfg.OptGroup(name='security', title='security')
cfg.CONF.register_group(security_group)
cfg.CONF.register_opts(security_opts, security_group)
messaging_opts = [cfg.StrOpt('driver', default='kafka',
help='The message queue driver to use'),
cfg.StrOpt('metrics_message_format', default='reference',
help='The type of metrics message format to '
'publish to the message queue'),
cfg.StrOpt('events_message_format', default='reference',
help='The type of events message format to '
'publish to the message queue')]
messaging_group = cfg.OptGroup(name='messaging', title='messaging')
cfg.CONF.register_group(messaging_group)
cfg.CONF.register_opts(messaging_opts, messaging_group)
repositories_opts = [
cfg.StrOpt('metrics_driver', default='influxdb_metrics_repo',
help='The repository driver to use for metrics'),
cfg.StrOpt('alarm_definitions_driver',
default='mysql_alarm_definitions_repo',
help='The repository driver to use for alarm definitions'),
cfg.StrOpt('alarms_driver', default='mysql_alarms_repo',
help='The repository driver to use for alarms'),
cfg.StrOpt('events_driver', default='fake_events_repo',
help='The repository driver to use for events'),
cfg.StrOpt('transforms_driver', default='mysql_transforms_repo',
help='The repository driver to use for transforms'),
cfg.StrOpt('notifications_driver', default='mysql_notifications_repo',
help='The repository driver to use for notifications')]
repositories_group = cfg.OptGroup(name='repositories', title='repositories')
cfg.CONF.register_group(repositories_group)
cfg.CONF.register_opts(repositories_opts, repositories_group)
dispatcher_opts = [
cfg.StrOpt('driver', default='monasca.v2.reference.metrics:Metrics',
help='The name of the dispatcher for the api server')]
dispatcher_group = cfg.OptGroup(name='dispatcher', title='dispatcher')
cfg.CONF.register_group(dispatcher_group)
cfg.CONF.register_opts(dispatcher_opts, dispatcher_group)
kafka_opts = [cfg.StrOpt('uri', help='Address to kafka server. For example: '
'uri=192.168.1.191:9092'),
cfg.StrOpt('metrics_topic', default='metrics',
help='The topic that metrics will be published too.'),
cfg.StrOpt('events_topic', default='raw-events',
help='The topic that events will be published too.'),
cfg.StrOpt('group', default='api',
help='The group name that this service belongs to.'),
cfg.IntOpt('wait_time', default=1,
help='The wait time when no messages on kafka '
'queue.'), cfg.IntOpt('ack_time', default=20,
help='The ack time back '
'to kafka.'),
cfg.IntOpt('max_retry', default=3,
help='The number of retry when there is a '
'connection error.'),
cfg.BoolOpt('auto_commit', default=False,
help='If automatically commmit when consume '
'messages.'),
cfg.BoolOpt('async', default=True, help='The type of posting.'),
cfg.BoolOpt('compact', default=True, help=(
'Specify if the message received should be parsed.'
'If True, message will not be parsed, otherwise '
'messages will be parsed.')),
cfg.MultiOpt('partitions', item_type=types.Integer(),
default=[0],
help='The sleep time when no messages on kafka '
'queue.'),
cfg.BoolOpt('drop_data', default=False, help=(
'Specify if received data should be simply dropped. '
'This parameter is only for testing purposes.')), ]
kafka_group = cfg.OptGroup(name='kafka', title='title')
cfg.CONF.register_group(kafka_group)
cfg.CONF.register_opts(kafka_opts, kafka_group)
influxdb_opts = [cfg.StrOpt('database_name'), cfg.StrOpt('ip_address'),
cfg.StrOpt('port'), cfg.StrOpt('user'),
cfg.StrOpt('password')]
influxdb_group = cfg.OptGroup(name='influxdb', title='influxdb')
cfg.CONF.register_group(influxdb_group)
cfg.CONF.register_opts(influxdb_opts, influxdb_group)
mysql_opts = [cfg.StrOpt('database_name'), cfg.StrOpt('hostname'),
cfg.StrOpt('username'), cfg.StrOpt('password')]
mysql_group = cfg.OptGroup(name='mysql', title='mysql')
cfg.CONF.register_group(mysql_group)
cfg.CONF.register_opts(mysql_opts, mysql_group)
def api_app(conf):
# Setup logs
cfg.CONF(args=[], project='monasca')
log_levels = (cfg.CONF.default_log_levels)
cfg.set_defaults(log.log_opts, default_log_levels=log_levels)
cfg.CONF(args=[], project='monasca')
log.setup('monasca')
dispatcher_manager = named.NamedExtensionManager(
namespace=DISPATCHER_NAMESPACE,
names=cfg.CONF.dispatcher,
invoke_on_load=True,
invoke_args=[cfg.CONF])
if not list(dispatcher_manager):
LOG.error('Failed to load any dispatchers for %s' %
DISPATCHER_NAMESPACE)
return None
# Create the application
app = resource_api.ResourceAPI()
# add the metrics resource
app.add_resource('metrics', METRICS_DISPATCHER_NAMESPACE,
cfg.CONF.dispatcher.driver, [conf])
# add the events resource
app.add_resource('events', EVENTS_DISPATCHER_NAMESPACE,
cfg.CONF.dispatcher.driver, [conf])
# add the transforms resource
app.add_resource('transforms', TRANSFORMS_DISPATCHER_NAMESPACE,
cfg.CONF.dispatcher.driver, [conf])
# add the notifications resource
app.add_resource('notifications', NOTIFICATIONS_DISPATCHER_NAMESPACE,
cfg.CONF.dispatcher.driver, [conf])
# load the alarm definitions resource
app.add_resource('alarm-definitions',
ALARM_DEFINITIONS_DISPATCHER_NAMESPACE,
cfg.CONF.dispatcher.driver, [conf])
# load the alarm definitions resource
app.add_resource('alarms',
ALARMS_DISPATCHER_NAMESPACE,
cfg.CONF.dispatcher.driver, [conf])
# add each dispatcher to the application to serve requests offered by
# each dispatcher
for driver in dispatcher_manager:
app.add_route(None, driver.obj)
LOG.debug('Dispatcher drivers have been added to the routes!')
return app

View File

@ -0,0 +1,81 @@
# Copyright 2013 IBM Corp
#
# Author: Tong Li <litong01@us.ibm.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import falcon
from oslo.config import cfg
from monasca.api import monasca_api_v2
from monasca.common import resource_api
from monasca.openstack.common import log
OPTS = [
cfg.MultiStrOpt('id',
default=['sample'],
help='Multiple String configuration.'),
cfg.StrOpt('prefix',
default='monasca_',
help='String configuration sample.'),
]
cfg.CONF.register_opts(OPTS, group='sample_dispatcher')
LOG = log.getLogger(__name__)
class SampleDispatcher(monasca_api_v2.V2API):
"""Monasca dispatcher sample class
This class shows how to develop a dispatcher and how the configuration
parameters should be defined and how these configuration parameters
should be set in monasca.conf file.
This class uses configuration parameters appear in sample_dispatcher
section such as the following:
[sample_dispatcher]
id = 101
id = 105
id = 180
prefix = sample__
If the above section appears in file monasca.conf, these values will be
loaded to cfg.CONF after the dispatcher gets loaded. The cfg.CONF should
have the following values:
cfg.CONF.sample_dispatcher.id = [101, 105, 180]
cfg.CONF.sample_dispatcher.prefix = "sample__"
"""
def __init__(self, global_conf):
LOG.debug('initializing SampleDispatcher!')
super(SampleDispatcher, self).__init__(global_conf)
LOG.debug('SampleDispatcher conf entries: prefix')
LOG.debug(global_conf.sample_dispatcher.prefix)
LOG.debug('SampleDispatcher conf entries: id')
LOG.debug(global_conf.sample_dispatcher.id)
@resource_api.Restify('/v2.0/datapoints/', method='post')
def do_post_metrics(self, req, res):
LOG.debug('Getting the call at endpoint datapoints.')
msg = req.stream.read()
LOG.debug('The msg:', msg)
res.status = getattr(falcon, 'HTTP_201')
@resource_api.Restify('/v2.0/demopoints/', method='get')
def do_get_metrics(self, req, res):
LOG.debug('Getting the call at endpoint demopoints.')
res.body = 'demo response'
res.status = getattr(falcon, 'HTTP_200')

131
monasca/v2/reference/__init__.py Normal file → Executable file
View File

@ -0,0 +1,131 @@
# Copyright 2014 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from oslo.config import types
"""Configurations for reference implementation
I think that these configuration parameters should have been split into
small groups and be set into each implementation where they get used.
For example: kafka configuration should have been in the implementation
where kafka get used. It seems to me that the configuration for kafka gets
used in kafka_publisher, but the original settings were at the api/server.py
which I think is at the wrong place. I move these settings here for now, we
need to have a bit more re-engineering to get it right.
"""
global_opts = [cfg.StrOpt('region', help='Region that API is running in')]
cfg.CONF.register_opts(global_opts)
security_opts = [cfg.ListOpt('default_authorized_roles', default=['admin'],
help='Roles that are allowed full access to the '
'API'),
cfg.ListOpt('agent_authorized_roles', default=['agent'],
help='Roles that are only allowed to POST to '
'the API'),
cfg.ListOpt('delegate_authorized_roles', default=['admin'],
help='Roles that are allowed to POST metrics on '
'behalf of another tenant')]
security_group = cfg.OptGroup(name='security', title='security')
cfg.CONF.register_group(security_group)
cfg.CONF.register_opts(security_opts, security_group)
messaging_opts = [cfg.StrOpt('driver', default='kafka',
help='The message queue driver to use'),
cfg.StrOpt('metrics_message_format', default='reference',
help='The type of metrics message format to '
'publish to the message queue'),
cfg.StrOpt('events_message_format', default='reference',
help='The type of events message format to '
'publish to the message queue')]
messaging_group = cfg.OptGroup(name='messaging', title='messaging')
cfg.CONF.register_group(messaging_group)
cfg.CONF.register_opts(messaging_opts, messaging_group)
repositories_opts = [
cfg.StrOpt('metrics_driver', default='influxdb_metrics_repo',
help='The repository driver to use for metrics'),
cfg.StrOpt('alarm_definitions_driver',
default='mysql_alarm_definitions_repo',
help='The repository driver to use for alarm definitions'),
cfg.StrOpt('alarms_driver', default='mysql_alarms_repo',
help='The repository driver to use for alarms'),
cfg.StrOpt('events_driver', default='fake_events_repo',
help='The repository driver to use for events'),
cfg.StrOpt('transforms_driver', default='mysql_transforms_repo',
help='The repository driver to use for transforms'),
cfg.StrOpt('notifications_driver', default='mysql_notifications_repo',
help='The repository driver to use for notifications')]
repositories_group = cfg.OptGroup(name='repositories', title='repositories')
cfg.CONF.register_group(repositories_group)
cfg.CONF.register_opts(repositories_opts, repositories_group)
kafka_opts = [cfg.StrOpt('uri', help='Address to kafka server. For example: '
'uri=192.168.1.191:9092'),
cfg.StrOpt('metrics_topic', default='metrics',
help='The topic that metrics will be published too.'),
cfg.StrOpt('events_topic', default='raw-events',
help='The topic that events will be published too.'),
cfg.StrOpt('group', default='api',
help='The group name that this service belongs to.'),
cfg.IntOpt('wait_time', default=1,
help='The wait time when no messages on kafka '
'queue.'), cfg.IntOpt('ack_time', default=20,
help='The ack time back '
'to kafka.'),
cfg.IntOpt('max_retry', default=3,
help='The number of retry when there is a '
'connection error.'),
cfg.BoolOpt('auto_commit', default=False,
help='If automatically commmit when consume '
'messages.'),
cfg.BoolOpt('async', default=True, help='The type of posting.'),
cfg.BoolOpt('compact', default=True, help=(
'Specify if the message received should be parsed.'
'If True, message will not be parsed, otherwise '
'messages will be parsed.')),
cfg.MultiOpt('partitions', item_type=types.Integer(),
default=[0],
help='The sleep time when no messages on kafka '
'queue.'),
cfg.BoolOpt('drop_data', default=False, help=(
'Specify if received data should be simply dropped. '
'This parameter is only for testing purposes.')), ]
kafka_group = cfg.OptGroup(name='kafka', title='title')
cfg.CONF.register_group(kafka_group)
cfg.CONF.register_opts(kafka_opts, kafka_group)
influxdb_opts = [cfg.StrOpt('database_name'), cfg.StrOpt('ip_address'),
cfg.StrOpt('port'), cfg.StrOpt('user'),
cfg.StrOpt('password')]
influxdb_group = cfg.OptGroup(name='influxdb', title='influxdb')
cfg.CONF.register_group(influxdb_group)
cfg.CONF.register_opts(influxdb_opts, influxdb_group)
mysql_opts = [cfg.StrOpt('database_name'), cfg.StrOpt('hostname'),
cfg.StrOpt('username'), cfg.StrOpt('password')]
mysql_group = cfg.OptGroup(name='mysql', title='mysql')
cfg.CONF.register_group(mysql_group)
cfg.CONF.register_opts(mysql_opts, mysql_group)

27
setup.cfg Normal file → Executable file
View File

@ -33,24 +33,17 @@ data_files =
console_scripts =
monasca-api = monasca.api.server:run
monasca.metrics_dispatcher =
monasca.dispatcher =
sample = monasca.dispatcher.sample_dispatcher:SampleDispatcher
kafka = monasca.dispatcher.kafka_dispatcher:KafkaDispatcher
v2_reference = monasca.v2.reference.metrics:Metrics
monasca.alarm_definitions_dispatcher =
v2_reference = monasca.v2.reference.alarm_definitions:AlarmDefinitions
monasca.alarms_dispatcher =
v2_reference = monasca.v2.reference.alarms:Alarms
monasca.events_dispatcher =
v2_reference = monasca.v2.reference.events:Events
monasca.transforms_dispatcher =
v2_reference = monasca.v2.reference.transforms:Transforms
monasca.notifications_dispatcher =
v2_reference = monasca.v2.reference.notifications:Notifications
v2_ref_metrics = monasca.v2.reference.metrics:Metrics
v2_ref_alarm_definitions = monasca.v2.reference.alarm_definitions:AlarmDefinitions
v2_ref_alarms = monasca.v2.reference.alarms:Alarms
v2_ref_events = monasca.v2.reference.events:Events
v2_ref_transforms = monasca.v2.reference.transforms:Transforms
v2_ref_notifications = monasca.v2.reference.notifications:Notifications
demo = monasca.v2.reference.demo_dispatcher:DemoDispatcher
paste.filter_factory =
login = monasca.middleware.login:filter_factory