Start of initial reference implementation and re-factored
Change-Id: I90d09331a41f15c47870cd7690b22405087ff7a4
This commit is contained in:
parent
82d007f3cf
commit
c5f5fba741
110
etc/monasca.conf
110
etc/monasca.conf
|
@ -1,33 +1,99 @@
|
|||
[DEFAULT]
|
||||
#logging, make sure that the user under whom the server runs has permission
|
||||
#to write to the directory.
|
||||
log_file=monasca.log
|
||||
log_dir=/var/log/monasca/
|
||||
log_level=DEBUG
|
||||
[default]
|
||||
# logging, make sure that the user under whom the server runs has permission
|
||||
# to write to the directory.
|
||||
log_file = monasca.log
|
||||
log_dir = .
|
||||
log_level = DEBUG
|
||||
|
||||
[dispatcher]
|
||||
# Identifies the region that the Monasca API is running in.
|
||||
region = na
|
||||
|
||||
[security]
|
||||
# The roles that are allowed full access to the API.
|
||||
default_authorized_roles = admin
|
||||
|
||||
# The roles that are allowed to only POST metrics to the API. This role would be used by the Monasca Agent.
|
||||
agent_authorized_roles = agent
|
||||
|
||||
# The roles that are allowed to access the API on behalf of another tenant.
|
||||
# For example, a service can POST metrics to another tenant if they are a member of the "delegate" role.
|
||||
delegate_authorized_roles = admin
|
||||
|
||||
[messaging]
|
||||
# The message queue driver to use
|
||||
driver = kafka
|
||||
|
||||
# The type of metrics message format to publish to the message queue.
|
||||
metrics_message_format = reference
|
||||
|
||||
# The type of events message format to publish to the message queue.
|
||||
events_message_format = reference
|
||||
|
||||
[repositories]
|
||||
# The driver to use for the metrics repository
|
||||
metrics_driver = influxdb_metrics_repo
|
||||
|
||||
# The driver to use for the events repository
|
||||
events_driver = none
|
||||
|
||||
# The driver to use for the transforms repository
|
||||
transforms_driver = mysql_transforms_repo
|
||||
|
||||
[dispatcher]
|
||||
driver = v2_reference
|
||||
|
||||
[kafka]
|
||||
#The endpoint to the kafka server
|
||||
uri = 192.168.1.191:9092
|
||||
#The topic on the kafka server
|
||||
topic = metrics
|
||||
#consumer group name
|
||||
group = metric_group
|
||||
#how many times to try when error occurs
|
||||
# The endpoint to the kafka server
|
||||
uri = 192.168.10.4:9092
|
||||
|
||||
# The topic that metrics will be published too
|
||||
metrics_topic = metrics
|
||||
|
||||
# The topic that events will be published too
|
||||
events_topic = raw-events
|
||||
|
||||
# consumer group name
|
||||
group = api
|
||||
|
||||
# how many times to try when error occurs
|
||||
max_retry = 1
|
||||
#wait time between tries when kafka goes down
|
||||
|
||||
# wait time between tries when kafka goes down
|
||||
wait_time = 1
|
||||
#use synchronized or asynchronized connection to kafka
|
||||
|
||||
# use synchronous or asynchronous connection to kafka
|
||||
async = False
|
||||
#send messages in bulk or send messages one by one.
|
||||
|
||||
# send messages in bulk or send messages one by one.
|
||||
compact = False
|
||||
#How many partitions this connection should listen messages on, this
|
||||
#parameter is for reading from kafka. If listens on multiple partitions,
|
||||
#For example, if the client should listen on partitions 1 and 3, then the
|
||||
#configuration should look like the following:
|
||||
|
||||
# How many partitions this connection should listen messages on, this
|
||||
# parameter is for reading from kafka. If listens on multiple partitions,
|
||||
# For example, if the client should listen on partitions 1 and 3, then the
|
||||
# configuration should look like the following:
|
||||
# partitions = 1
|
||||
# partitions = 3
|
||||
#default to listen on partition 0.
|
||||
# default to listen on partition 0.
|
||||
partitions = 0
|
||||
|
||||
[influxdb]
|
||||
# The IP address of the InfluxDB service.
|
||||
ip_address = 192.168.10.4
|
||||
|
||||
# The port number that the InfluxDB service is listening on.
|
||||
port = 8086
|
||||
|
||||
# The username to authenticate with.
|
||||
user = mon_api
|
||||
|
||||
# The password to authenticate with.
|
||||
password = password
|
||||
|
||||
# The name of the InfluxDB database to use.
|
||||
database_name = mon
|
||||
|
||||
[mysql]
|
||||
database_name = mon
|
||||
hostname = 192.168.10.4
|
||||
username = monapi
|
||||
password = password
|
|
@ -3,7 +3,7 @@ name = monasca
|
|||
|
||||
[pipeline:main]
|
||||
# Add validator in the pipeline so the metrics messages can be validated.
|
||||
pipeline = validator api
|
||||
pipeline = auth api
|
||||
|
||||
[app:api]
|
||||
paste.app_factory = monasca.api.server:api_app
|
||||
|
@ -17,9 +17,12 @@ use = egg: monasca_api#inspector
|
|||
[filter:validator]
|
||||
use = egg: monasca_api#metric_validator
|
||||
|
||||
[filter:auth]
|
||||
use = egg: monasca_api#mock_auth_filter
|
||||
|
||||
[server:main]
|
||||
use = egg:gunicorn#main
|
||||
host = 0.0.0.0
|
||||
port = 9090
|
||||
port = 9000
|
||||
workers = 1
|
||||
proc_name = monasca
|
||||
proc_name = monasca
|
|
@ -26,8 +26,8 @@ class V2API(object):
|
|||
LOG.debug('initializing V2API!')
|
||||
self.global_conf = global_conf
|
||||
|
||||
@resource_api.Restify('/v2.0/metrics/{id}')
|
||||
def do_get_metrics(self, req, res, id):
|
||||
@resource_api.Restify('/v2.0/metrics')
|
||||
def do_get_metrics(self, req, res):
|
||||
res.status = '501 Not Implemented'
|
||||
|
||||
@resource_api.Restify('/v2.0/metrics/', method='post')
|
||||
|
@ -92,4 +92,4 @@ class V2API(object):
|
|||
|
||||
@resource_api.Restify('/v2.0/alarms/{id}/state-history')
|
||||
def do_get_alarm_state_history(self, req, res, id):
|
||||
res.status = '501 Not Implemented'
|
||||
res.status = '501 Not Implemented'
|
|
@ -0,0 +1,29 @@
|
|||
# Copyright 2014 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from monasca.common import resource_api
|
||||
from monasca.openstack.common import log
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class EventsV2API(object):
|
||||
def __init__(self, global_conf):
|
||||
LOG.debug('initializing V2API!')
|
||||
self.global_conf = global_conf
|
||||
|
||||
@resource_api.Restify('/v2.0/events/', method='post')
|
||||
def do_post_events(self, req, res):
|
||||
res.status = '501 Not Implemented'
|
|
@ -0,0 +1,37 @@
|
|||
# Copyright 2014 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from monasca.common import resource_api
|
||||
from monasca.openstack.common import log
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class TransformsV2API(object):
|
||||
def __init__(self, global_conf):
|
||||
LOG.debug('initializing V2API!')
|
||||
self.global_conf = global_conf
|
||||
|
||||
@resource_api.Restify('/v2.0/events/transforms', method='post')
|
||||
def do_post_transforms(self, req, res):
|
||||
res.status = '501 Not Implemented'
|
||||
|
||||
@resource_api.Restify('/v2.0/events/transforms', method='get')
|
||||
def do_get_transforms(self, req, res):
|
||||
res.status = '501 Not Implemented'
|
||||
|
||||
@resource_api.Restify('/v2.0/events/transforms/{transform_id}', method='delete')
|
||||
def do_delete_transforms(self, req, res, transform_id):
|
||||
res.status = '501 Not Implemented'
|
|
@ -1,4 +1,4 @@
|
|||
# Copyright 2013 IBM Corp
|
||||
# Copyright 2014 IBM Corp
|
||||
#
|
||||
# Author: Tong Li <litong01@us.ibm.com>
|
||||
#
|
||||
|
@ -14,25 +14,143 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo.config import cfg
|
||||
from stevedore import driver
|
||||
|
||||
import os
|
||||
from monasca.common import resource_api
|
||||
from monasca.openstack.common import log
|
||||
from oslo.config import cfg
|
||||
from oslo.config import types
|
||||
from paste.deploy import loadapp
|
||||
from wsgiref import simple_server
|
||||
|
||||
DISPATCHER_NAMESPACE = 'monasca.dispatcher'
|
||||
|
||||
|
||||
cfg.CONF.register_opts(
|
||||
[cfg.StrOpt('driver', default='kafka',
|
||||
help='The name of the dispatcher for the api server'),
|
||||
], group="dispatcher")
|
||||
METRICS_DISPATCHER_NAMESPACE = 'monasca.metrics_dispatcher'
|
||||
EVENTS_DISPATCHER_NAMESPACE = 'monasca.events_dispatcher'
|
||||
TRANSFORMS_DISPATCHER_NAMESPACE = 'monasca.transforms_dispatcher'
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
global_opts = [
|
||||
cfg.StrOpt('region', help='Region that API is running in')
|
||||
]
|
||||
|
||||
cfg.CONF.register_opts(global_opts)
|
||||
|
||||
security_opts = [
|
||||
cfg.ListOpt('default_authorized_roles', default=['admin'],
|
||||
help='Roles that are allowed full access to the API'),
|
||||
cfg.ListOpt('agent_authorized_roles', default=['agent'],
|
||||
help='Roles that are only allowed to POST to the API'),
|
||||
cfg.ListOpt('delegate_authorized_roles', default=['admin'],
|
||||
help='Roles that are allowed to POST metrics on behalf of another tenant')
|
||||
]
|
||||
|
||||
security_group = cfg.OptGroup(name='security', title='security')
|
||||
cfg.CONF.register_group(security_group)
|
||||
cfg.CONF.register_opts(security_opts, security_group)
|
||||
|
||||
messaging_opts = [
|
||||
cfg.StrOpt('driver', default='kafka', help='The message queue driver to use'),
|
||||
cfg.StrOpt('metrics_message_format', default='reference',
|
||||
help='The type of metrics message format to publish to the message queue'),
|
||||
cfg.StrOpt('events_message_format', default='reference',
|
||||
help='The type of events message format to publish to the message queue')
|
||||
]
|
||||
|
||||
messaging_group = cfg.OptGroup(name='messaging', title='messaging')
|
||||
cfg.CONF.register_group(messaging_group)
|
||||
cfg.CONF.register_opts(messaging_opts, messaging_group)
|
||||
|
||||
repositories_opts = [
|
||||
cfg.StrOpt('metrics_driver', default='influxdb_metrics_repo', help='The repository driver to use for metrics'),
|
||||
cfg.StrOpt('events_driver', default='fake_events_repo', help='The repository driver to use for events'),
|
||||
cfg.StrOpt('transforms_driver', default='mysql_transforms_repo', help='The repository driver to use for transforms')
|
||||
]
|
||||
|
||||
repositories_group = cfg.OptGroup(name='repositories', title='repositories')
|
||||
cfg.CONF.register_group(repositories_group)
|
||||
cfg.CONF.register_opts(repositories_opts, repositories_group)
|
||||
|
||||
dispatcher_opts = [
|
||||
cfg.StrOpt('driver', default='monasca.v2.reference.metrics:Metrics',
|
||||
help='The name of the dispatcher for the api server')
|
||||
]
|
||||
|
||||
dispatcher_group = cfg.OptGroup(name='dispatcher', title='dispatcher')
|
||||
cfg.CONF.register_group(dispatcher_group)
|
||||
cfg.CONF.register_opts(dispatcher_opts, dispatcher_group)
|
||||
|
||||
kafka_opts = [
|
||||
cfg.StrOpt('uri',
|
||||
help='Address to kafka server. For example: '
|
||||
'uri=192.168.1.191:9092'),
|
||||
cfg.StrOpt('metrics_topic',
|
||||
default='metrics',
|
||||
help='The topic that metrics will be published too.'),
|
||||
cfg.StrOpt('events_topic',
|
||||
default='raw-events',
|
||||
help='The topic that events will be published too.'),
|
||||
cfg.StrOpt('group',
|
||||
default='api',
|
||||
help='The group name that this service belongs to.'),
|
||||
cfg.IntOpt('wait_time',
|
||||
default=1,
|
||||
help='The wait time when no messages on kafka queue.'),
|
||||
cfg.IntOpt('ack_time',
|
||||
default=20,
|
||||
help='The ack time back to kafka.'),
|
||||
cfg.IntOpt('max_retry',
|
||||
default=3,
|
||||
help='The number of retry when there is a connection error.'),
|
||||
cfg.BoolOpt('auto_commit',
|
||||
default=False,
|
||||
help='If automatically commmit when consume messages.'),
|
||||
cfg.BoolOpt('async',
|
||||
default=True,
|
||||
help='The type of posting.'),
|
||||
cfg.BoolOpt('compact',
|
||||
default=True,
|
||||
help=('Specify if the message received should be parsed.'
|
||||
'If True, message will not be parsed, otherwise '
|
||||
'messages will be parsed.')),
|
||||
cfg.MultiOpt('partitions',
|
||||
item_type=types.Integer(),
|
||||
default=[0],
|
||||
help='The sleep time when no messages on kafka queue.'),
|
||||
cfg.BoolOpt('drop_data',
|
||||
default=False,
|
||||
help=('Specify if received data should be simply dropped. '
|
||||
'This parameter is only for testing purposes.')),
|
||||
]
|
||||
|
||||
kafka_group = cfg.OptGroup(name='kafka', title='title')
|
||||
cfg.CONF.register_group(kafka_group)
|
||||
cfg.CONF.register_opts(kafka_opts, kafka_group)
|
||||
|
||||
influxdb_opts = [
|
||||
cfg.StrOpt('database_name'),
|
||||
cfg.StrOpt('ip_address'),
|
||||
cfg.StrOpt('port'),
|
||||
cfg.StrOpt('user'),
|
||||
cfg.StrOpt('password')
|
||||
]
|
||||
|
||||
influxdb_group = cfg.OptGroup(name='influxdb', title='influxdb')
|
||||
cfg.CONF.register_group(influxdb_group)
|
||||
cfg.CONF.register_opts(influxdb_opts, influxdb_group)
|
||||
|
||||
mysql_opts = [
|
||||
cfg.StrOpt('database_name'),
|
||||
cfg.StrOpt('hostname'),
|
||||
cfg.StrOpt('username'),
|
||||
cfg.StrOpt('password')
|
||||
]
|
||||
|
||||
mysql_group = cfg.OptGroup(name='mysql', title='mysql')
|
||||
cfg.CONF.register_group(mysql_group)
|
||||
cfg.CONF.register_opts(mysql_opts, mysql_group)
|
||||
|
||||
|
||||
def api_app(conf):
|
||||
|
||||
# Setup logs
|
||||
log_levels = (cfg.CONF.default_log_levels)
|
||||
cfg.set_defaults(log.log_opts, default_log_levels=log_levels)
|
||||
|
@ -43,16 +161,50 @@ def api_app(conf):
|
|||
# Create the application
|
||||
app = resource_api.ResourceAPI()
|
||||
|
||||
# load the driver specified by dispatcher in the monasca.ini file
|
||||
manager = driver.DriverManager(namespace=DISPATCHER_NAMESPACE,
|
||||
name=cfg.CONF.dispatcher.driver,
|
||||
invoke_on_load=True,
|
||||
invoke_args=[conf])
|
||||
# load the metrics driver specified by dispatcher in the monasca.ini file
|
||||
metrics_manager = driver.DriverManager(namespace=METRICS_DISPATCHER_NAMESPACE,
|
||||
name=cfg.CONF.dispatcher.driver,
|
||||
invoke_on_load=True,
|
||||
invoke_args=[conf])
|
||||
|
||||
LOG.debug('Dispatcher driver %s is loaded.' % cfg.CONF.dispatcher.driver)
|
||||
LOG.debug('Metrics dispatcher driver %s is loaded.' % cfg.CONF.dispatcher.driver)
|
||||
|
||||
# add the driver to the application
|
||||
app.add_route(None, manager.driver)
|
||||
app.add_route(None, metrics_manager.driver)
|
||||
|
||||
LOG.debug('Metrics dispatcher driver has been added to the routes!')
|
||||
|
||||
|
||||
# load the events driver specified by dispatcher in the monasca.ini file
|
||||
events_manager = driver.DriverManager(namespace=EVENTS_DISPATCHER_NAMESPACE,
|
||||
name=cfg.CONF.dispatcher.driver,
|
||||
invoke_on_load=True,
|
||||
invoke_args=[conf])
|
||||
|
||||
LOG.debug('Events dispatcher driver %s is loaded.' % cfg.CONF.dispatcher.driver)
|
||||
|
||||
# add the driver to the application
|
||||
app.add_route(None, events_manager.driver)
|
||||
|
||||
LOG.debug('Events dispatcher driver has been added to the routes!')
|
||||
|
||||
# load the events driver specified by dispatcher in the monasca.ini file
|
||||
transforms_manager = driver.DriverManager(namespace=TRANSFORMS_DISPATCHER_NAMESPACE,
|
||||
name=cfg.CONF.dispatcher.driver,
|
||||
invoke_on_load=True,
|
||||
invoke_args=[conf])
|
||||
|
||||
LOG.debug('Transforms dispatcher driver %s is loaded.' % cfg.CONF.dispatcher.driver)
|
||||
|
||||
# add the driver to the application
|
||||
app.add_route(None, transforms_manager.driver)
|
||||
|
||||
LOG.debug('Transforms dispatcher driver has been added to the routes!')
|
||||
|
||||
LOG.debug('Dispatcher driver has been added to the routes!')
|
||||
return app
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
wsgi_app = loadapp('config:etc/monasca.ini', relative_to=os.getcwd())
|
||||
httpd = simple_server.make_server('127.0.0.1', 9000, wsgi_app)
|
||||
httpd.serve_forever()
|
|
@ -1,7 +1,4 @@
|
|||
#
|
||||
# Copyright 2012-2013 eNovance <licensing@enovance.com>
|
||||
#
|
||||
# Author: Julien Danjou <julien@danjou.info>
|
||||
# Copyright 2013 IBM Corp
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
|
@ -20,8 +17,8 @@ from kafka import common
|
|||
from kafka import consumer
|
||||
from kafka import producer
|
||||
from oslo.config import cfg
|
||||
from oslo.config import types
|
||||
import time
|
||||
|
||||
try:
|
||||
import ujson as json
|
||||
except ImportError:
|
||||
|
@ -29,49 +26,6 @@ except ImportError:
|
|||
|
||||
from monasca.openstack.common import log
|
||||
|
||||
|
||||
OPTS = [
|
||||
cfg.StrOpt('uri',
|
||||
help='Address to kafka server. For example: '
|
||||
'uri=192.168.1.191:9092'),
|
||||
cfg.StrOpt('topic',
|
||||
default='event',
|
||||
help='The topic that this collector will listen on.'),
|
||||
cfg.StrOpt('group',
|
||||
default='event_consumer',
|
||||
help='The group name that this service belongs to.'),
|
||||
cfg.IntOpt('wait_time',
|
||||
default=1,
|
||||
help='The wait time when no messages on kafka queue.'),
|
||||
cfg.IntOpt('ack_time',
|
||||
default=20,
|
||||
help='The ack time back to kafka.'),
|
||||
cfg.IntOpt('max_retry',
|
||||
default=3,
|
||||
help='The number of retry when there is a connection error.'),
|
||||
cfg.BoolOpt('auto_commit',
|
||||
default=False,
|
||||
help='If automatically commmit when consume messages.'),
|
||||
cfg.BoolOpt('async',
|
||||
default=True,
|
||||
help='The type of posting.'),
|
||||
cfg.BoolOpt('compact',
|
||||
default=True,
|
||||
help=('Specify if the message received should be parsed.'
|
||||
'If True, message will not be parsed, otherwise '
|
||||
'messages will be parsed.')),
|
||||
cfg.MultiOpt('partitions',
|
||||
item_type=types.Integer(),
|
||||
default=[0],
|
||||
help='The sleep time when no messages on kafka queue.'),
|
||||
cfg.BoolOpt('drop_data',
|
||||
default=False,
|
||||
help=('Specify if received data should be simply dropped. '
|
||||
'This parameter is only for testing purposes.')),
|
||||
]
|
||||
|
||||
cfg.CONF.register_opts(OPTS, group="kafka")
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,16 @@
|
|||
# Copyright 2014 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
class MessageQueueException(Exception):
|
||||
pass
|
|
@ -0,0 +1,24 @@
|
|||
# Copyright 2014 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from monasca.common.messaging import publisher
|
||||
|
||||
|
||||
class FakePublisher(publisher.Publisher):
|
||||
|
||||
def __init__(self, topic):
|
||||
pass
|
||||
|
||||
def send_message(self, message):
|
||||
pass
|
|
@ -0,0 +1,110 @@
|
|||
# Copyright 2014 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from kafka import client
|
||||
from kafka import common
|
||||
from kafka import producer
|
||||
from oslo.config import cfg
|
||||
import time
|
||||
|
||||
from monasca.openstack.common import log
|
||||
from monasca.common.messaging import publisher
|
||||
from monasca.common.messaging import exceptions
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class KafkaPublisher(publisher.Publisher):
|
||||
def __init__(self, topic):
|
||||
if not cfg.CONF.kafka.uri:
|
||||
raise Exception('Kafka is not configured correctly! '
|
||||
'Use configuration file to specify Kafka '
|
||||
'uri, for example: '
|
||||
'uri=192.168.1.191:9092')
|
||||
|
||||
self.uri = cfg.CONF.kafka.uri
|
||||
self.topic = topic
|
||||
self.group = cfg.CONF.kafka.group
|
||||
self.wait_time = cfg.CONF.kafka.wait_time
|
||||
self.async = cfg.CONF.kafka.async
|
||||
self.ack_time = cfg.CONF.kafka.ack_time
|
||||
self.max_retry = cfg.CONF.kafka.max_retry
|
||||
self.auto_commit = cfg.CONF.kafka.auto_commit
|
||||
self.compact = cfg.CONF.kafka.compact
|
||||
self.partitions = cfg.CONF.kafka.partitions
|
||||
self.drop_data = cfg.CONF.kafka.drop_data
|
||||
|
||||
self._client = None
|
||||
self._producer = None
|
||||
|
||||
def _init_client(self, wait_time=None):
|
||||
for i in range(self.max_retry):
|
||||
try:
|
||||
# if there is a client instance, but _init_client is called
|
||||
# again, most likely the connection has gone stale, close that
|
||||
# connection and reconnect.
|
||||
if self._client:
|
||||
self._client.close()
|
||||
|
||||
if not wait_time:
|
||||
wait_time = self.wait_time
|
||||
time.sleep(wait_time)
|
||||
|
||||
self._client = client.KafkaClient(self.uri)
|
||||
|
||||
# when a client is re-initialized, existing consumer should be
|
||||
# reset as well.
|
||||
self._producer = None
|
||||
break
|
||||
except common.KafkaUnavailableError:
|
||||
LOG.error('Kafka server at %s is down.' % self.uri)
|
||||
except common.LeaderNotAvailableError:
|
||||
LOG.error('Kafka at %s has no leader available.' % self.uri)
|
||||
except Exception:
|
||||
LOG.error('Kafka at %s initialization failed.' % self.uri)
|
||||
|
||||
# Wait a bit and try again to get a client
|
||||
time.sleep(self.wait_time)
|
||||
|
||||
def _init_producer(self):
|
||||
try:
|
||||
if not self._client:
|
||||
self._init_client()
|
||||
self._producer = producer.SimpleProducer(
|
||||
self._client, async=self.async, ack_timeout=self.ack_time)
|
||||
LOG.debug('Kafka SimpleProducer was created successfully.')
|
||||
except Exception:
|
||||
self._producer = None
|
||||
LOG.exception('Kafka (%s) producer can not be created.' % self.uri)
|
||||
|
||||
def close(self):
|
||||
if self._client:
|
||||
self._producer = None
|
||||
self._client.close()
|
||||
|
||||
def send_message(self, message):
|
||||
try:
|
||||
if not self._producer:
|
||||
self._init_producer()
|
||||
self._producer.send_messages(self.topic, message)
|
||||
|
||||
except (common.KafkaUnavailableError,
|
||||
common.LeaderNotAvailableError):
|
||||
self._client = None
|
||||
LOG.exception('Error occurred while posting data to Kafka.')
|
||||
raise exceptions.MessageQueueException()
|
||||
except Exception:
|
||||
LOG.exception('Unknown error.')
|
||||
raise exceptions.MessageQueueException()
|
|
@ -0,0 +1,16 @@
|
|||
# Copyright 2014 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
def transform(events, tenant_id, region):
|
||||
raise NotImplemented()
|
|
@ -0,0 +1,16 @@
|
|||
# Copyright 2014 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
def transform(metrics, tenant_id, region):
|
||||
raise NotImplemented()
|
|
@ -0,0 +1,27 @@
|
|||
# Copyright 2014 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo.config import cfg
|
||||
import monasca.common.messaging.message_formats.reference.events as reference_events
|
||||
import monasca.common.messaging.message_formats.cadf.events as cadf_events
|
||||
import monasca.common.messaging.message_formats.identity.events as identity_events
|
||||
|
||||
def create_events_transform():
|
||||
message_format = cfg.CONF.messaging.events_message_format
|
||||
if message_format == 'reference':
|
||||
return reference_events.transform
|
||||
elif message_format == 'cadf':
|
||||
return cadf_events.transform
|
||||
else:
|
||||
return identity_events.transform
|
|
@ -0,0 +1,16 @@
|
|||
# Copyright 2014 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
class TransformationException(Exception):
|
||||
pass
|
|
@ -0,0 +1,18 @@
|
|||
# Copyright 2014 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
def transform(events, tenant_id, region):
|
||||
return events
|
|
@ -0,0 +1,18 @@
|
|||
# Copyright 2014 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
def transform(metrics, tenant_id, region):
|
||||
return metrics
|
|
@ -0,0 +1,27 @@
|
|||
# Copyright 2014 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo.config import cfg
|
||||
import monasca.common.messaging.message_formats.reference.metrics as reference_metrics
|
||||
import monasca.common.messaging.message_formats.cadf.metrics as cadf_metrics
|
||||
import monasca.common.messaging.message_formats.identity.metrics as identity_metrics
|
||||
|
||||
def create_metrics_transform():
|
||||
metrics_message_format = cfg.CONF.messaging.metrics_message_format
|
||||
if metrics_message_format == 'reference':
|
||||
return reference_metrics.transform
|
||||
elif metrics_message_format == 'cadf':
|
||||
return cadf_metrics.transform
|
||||
else:
|
||||
return identity_metrics.transform
|
|
@ -0,0 +1,28 @@
|
|||
# Copyright 2014 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
def transform(event, tenant_id, region):
|
||||
transformed_event = dict(
|
||||
event=event,
|
||||
meta=dict(
|
||||
tenantId=tenant_id,
|
||||
region=region
|
||||
),
|
||||
creation_time=datetime.now()
|
||||
)
|
||||
|
||||
return transformed_event
|
|
@ -0,0 +1,35 @@
|
|||
# Copyright 2014 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
def transform(metrics, tenant_id, region):
|
||||
transformed_metric = {
|
||||
'metric': {},
|
||||
'meta': {
|
||||
'tenantId': tenant_id,
|
||||
'region': region
|
||||
},
|
||||
'creation_time': datetime.now()
|
||||
}
|
||||
|
||||
if isinstance(metrics, list):
|
||||
transformed_metrics = []
|
||||
for metric in metrics:
|
||||
transformed_metric['metric'] = metric
|
||||
transformed_metrics.append(transformed_metric)
|
||||
return transformed_metrics
|
||||
else:
|
||||
transformed_metric['metric'] = metrics
|
||||
return transformed_metric
|
|
@ -0,0 +1,27 @@
|
|||
# Copyright 2014 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import abc
|
||||
|
||||
|
||||
class Publisher(object):
|
||||
__metaclass__ = abc.ABCMeta
|
||||
|
||||
@abc.abstractmethod
|
||||
def send_message(self, message):
|
||||
'''
|
||||
Sends the message using the message queue.
|
||||
:param message: Message to send.
|
||||
'''
|
||||
return
|
|
@ -0,0 +1,24 @@
|
|||
# Copyright 2014 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from monasca.common.messaging import publisher
|
||||
|
||||
|
||||
class RabbitmqPublisher(publisher.Publisher):
|
||||
|
||||
def __init__(self, topic):
|
||||
pass
|
||||
|
||||
def send_message(self, message):
|
||||
raise NotImplemented()
|
|
@ -0,0 +1,23 @@
|
|||
# Copyright 2014 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import abc
|
||||
|
||||
|
||||
class EventsRepository(object):
|
||||
__metaclass__ = abc.ABCMeta
|
||||
|
||||
@abc.abstractmethod
|
||||
def list_events(self, tenant_id, name, dimensions):
|
||||
return
|
|
@ -0,0 +1,19 @@
|
|||
# Copyright 2014 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
class RepositoryException(Exception):
|
||||
pass
|
||||
|
||||
class DoesNotExistException(RepositoryException):
|
||||
pass
|
|
@ -0,0 +1,27 @@
|
|||
# Copyright 2014 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from monasca.common.repositories import events_repository
|
||||
from monasca.openstack.common import log
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class EventsRepository(events_repository.EventsRepository):
|
||||
|
||||
def __init__(self):
|
||||
return
|
||||
|
||||
def list_events(self, tenant_id, name, dimensions):
|
||||
return {}
|
|
@ -0,0 +1,27 @@
|
|||
# Copyright 2014 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from monasca.common.repositories import metrics_repository
|
||||
from monasca.openstack.common import log
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class MetricsRepository(metrics_repository.MetricsRepository):
|
||||
|
||||
def __init__(self):
|
||||
return
|
||||
|
||||
def list_metrics(self, tenant_id, name, dimensions):
|
||||
return {}
|
|
@ -0,0 +1,54 @@
|
|||
# Copyright 2014 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo.config import cfg
|
||||
from monasca.common.repositories import metrics_repository
|
||||
from monasca.common.repositories import exceptions
|
||||
from monasca.openstack.common import log
|
||||
from influxdb import InfluxDBClient
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
class MetricsRepository(metrics_repository.MetricsRepository):
|
||||
|
||||
def __init__(self):
|
||||
try:
|
||||
self.conf = cfg.CONF
|
||||
self.influxdb_client = InfluxDBClient(self.conf.influxdb.ip_address,
|
||||
self.conf.influxdb.port,
|
||||
self.conf.influxdb.user,
|
||||
self.conf.influxdb.password,
|
||||
self.conf.influxdb.database_name)
|
||||
except Exception as ex:
|
||||
LOG.exception()
|
||||
raise exceptions.RepositoryException(ex)
|
||||
|
||||
def list_metrics(self, tenant_id, name, dimensions):
|
||||
try:
|
||||
# TODO: This code is just a placeholder right now. It actually returns metrics,
|
||||
# TODO: but the implementation is not complete
|
||||
#String serieNameRegex = buildSerieNameRegex(tenantId, name, dimensions);
|
||||
|
||||
#String query = String.format("list series /%1$s/", serieNameRegex);
|
||||
#logger.debug("Query string: {}", query);
|
||||
query = 'list series'
|
||||
|
||||
#List<Serie> result = this.influxDB.Query(this.config.influxDB.getName(), query,
|
||||
# TimeUnit.SECONDS);
|
||||
result = self.influxdb_client.query(query, 's')
|
||||
#return buildMetricDefList(result);
|
||||
return result
|
||||
except Exception as ex:
|
||||
LOG.exception()
|
||||
raise exceptions.RepositoryException(ex)
|
|
@ -0,0 +1,23 @@
|
|||
# Copyright 2014 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import abc
|
||||
|
||||
|
||||
class MetricsRepository(object):
|
||||
__metaclass__ = abc.ABCMeta
|
||||
|
||||
@abc.abstractmethod
|
||||
def list_metrics(self, tenant_id, name, dimensions):
|
||||
return
|
|
@ -0,0 +1,29 @@
|
|||
# Copyright 2014 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from monasca.openstack.common import log
|
||||
from oslo.config import cfg
|
||||
import peewee
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
db = peewee.MySQLDatabase(cfg.CONF.mysql.database_name,
|
||||
host=cfg.CONF.mysql.hostname,
|
||||
user=cfg.CONF.mysql.username,
|
||||
passwd=cfg.CONF.mysql.password)
|
||||
|
||||
|
||||
class Model(peewee.Model):
|
||||
class Meta:
|
||||
database = db
|
|
@ -0,0 +1,78 @@
|
|||
# Copyright 2014 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from monasca.common.repositories import transforms_repository
|
||||
from monasca.common.repositories import exceptions
|
||||
from monasca.openstack.common import log
|
||||
import peewee
|
||||
import model
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class Transform(model.Model):
|
||||
id = peewee.TextField(36)
|
||||
tenant_id = peewee.TextField(36)
|
||||
name = peewee.TextField()
|
||||
description = peewee.TextField()
|
||||
specification = peewee.TextField()
|
||||
enabled = peewee.BooleanField()
|
||||
created_at = peewee.DateTimeField()
|
||||
updated_at = peewee.DateTimeField()
|
||||
deleted_at = peewee.DateTimeField()
|
||||
|
||||
class TransformsRepository(transforms_repository.TransformsRepository):
|
||||
|
||||
def create_transforms(self, id, tenant_id, name, description, specification, enabled):
|
||||
try:
|
||||
q = Transform.create(id=id, tenant_id=tenant_id, name=name, description=description, specification=specification, enabled=enabled)
|
||||
q.save()
|
||||
except Exception as ex:
|
||||
LOG.exception(str(ex))
|
||||
raise exceptions.RepositoryException(str(ex))
|
||||
|
||||
def list_transforms(self, tenant_id):
|
||||
try:
|
||||
q = Transform.select().where(Transform.tenant_id == tenant_id)
|
||||
results = q.execute()
|
||||
|
||||
transforms = []
|
||||
for result in results:
|
||||
transform = {
|
||||
'id': result.id,
|
||||
'name': result.name,
|
||||
'description': result.description,
|
||||
'specification': result.specification,
|
||||
'enabled': result.enabled
|
||||
}
|
||||
transforms.append(transform)
|
||||
return transforms
|
||||
except Exception as ex:
|
||||
LOG.exception(str(ex))
|
||||
raise exceptions.RepositoryException(str(ex))
|
||||
|
||||
def delete_transform(self, tenant_id, transform_id):
|
||||
num_rows_deleted = 0
|
||||
|
||||
try:
|
||||
q = Transform.delete().where((Transform.tenant_id == tenant_id) & (Transform.id == transform_id))
|
||||
num_rows_deleted = q.execute()
|
||||
except Exception as ex:
|
||||
LOG.exception(str(ex))
|
||||
raise exceptions.RepositoryException(str(ex))
|
||||
|
||||
if num_rows_deleted < 1:
|
||||
raise exceptions.DoesNotExistException()
|
||||
|
||||
return
|
|
@ -0,0 +1,31 @@
|
|||
# Copyright 2014 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import abc
|
||||
|
||||
|
||||
class TransformsRepository(object):
|
||||
__metaclass__ = abc.ABCMeta
|
||||
|
||||
@abc.abstractmethod
|
||||
def create_transforms(self, id, tenant_id, name, description, specification, enabled):
|
||||
return
|
||||
|
||||
@abc.abstractmethod
|
||||
def list_transforms(self, tenant_id):
|
||||
return
|
||||
|
||||
@abc.abstractmethod
|
||||
def delete_transform(self, tenant_id, transform_id):
|
||||
return
|
|
@ -0,0 +1,33 @@
|
|||
# Copyright 2014 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
class MockAuthFilter(object):
|
||||
'''
|
||||
This authorization filter doesn't do any authentication, it just copies the
|
||||
auth token to the tenant ID and supplies the 'admin' role and is meant for
|
||||
testing purposes only.
|
||||
'''
|
||||
def __init__(self, app, conf):
|
||||
self.app = app
|
||||
self.conf = conf
|
||||
|
||||
def __call__(self, env, start_response):
|
||||
env['HTTP_X_TENANT_ID'] = env['HTTP_X_AUTH_TOKEN']
|
||||
env['HTTP_X_ROLES'] = 'admin'
|
||||
return self.app(env, start_response)
|
||||
|
||||
def filter_factory(global_conf, **local_conf):
|
||||
def validator_filter(app):
|
||||
return MockAuthFilter(app, local_conf)
|
||||
return validator_filter
|
|
@ -0,0 +1,37 @@
|
|||
# Copyright (c) 2012 Intel Corporation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
UUID related utilities and helper functions.
|
||||
"""
|
||||
|
||||
import uuid
|
||||
|
||||
|
||||
def generate_uuid():
|
||||
return str(uuid.uuid4())
|
||||
|
||||
|
||||
def is_uuid_like(val):
|
||||
"""Returns validation of a value as a UUID.
|
||||
|
||||
For our purposes, a UUID is a canonical form string:
|
||||
aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa
|
||||
|
||||
"""
|
||||
try:
|
||||
return str(uuid.UUID(val)) == val
|
||||
except (TypeError, ValueError, AttributeError):
|
||||
return False
|
|
@ -0,0 +1,31 @@
|
|||
# Copyright 2014 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from voluptuous import Schema
|
||||
from voluptuous import Any, All, Length
|
||||
from monasca.openstack.common import log
|
||||
from monasca.v2.common.schemas import exceptions
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
# TODO: Add regex to validate dimension names don't use any excluded characters.
|
||||
dimensions_schema = Schema({All(Any(str, unicode), Length(max=255)): All(Any(str, unicode), Length(max=255))})
|
||||
|
||||
|
||||
def validate(dimensions):
|
||||
try:
|
||||
dimensions_schema(dimensions)
|
||||
except Exception as ex:
|
||||
LOG.debug(ex)
|
||||
raise exceptions.ValidationException(str(ex))
|
|
@ -0,0 +1,31 @@
|
|||
# Copyright 2014 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from voluptuous import Schema
|
||||
from voluptuous import Any, All, Length
|
||||
from monasca.openstack.common import log
|
||||
from monasca.v2.common.schemas import exceptions
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
# TODO: Add regex to validate key/values don't use any excluded characters.
|
||||
event_schema_request_body = Schema({All(Any(str, unicode), Length(max=255)): All(Any(None, str, unicode, bool, int, float, dict, []))})
|
||||
|
||||
|
||||
def validate(body):
|
||||
try:
|
||||
event_schema_request_body(body)
|
||||
except Exception as ex:
|
||||
LOG.debug(ex)
|
||||
raise exceptions.ValidationException(str(ex))
|
|
@ -0,0 +1,16 @@
|
|||
# Copyright 2014 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
class ValidationException(Exception):
|
||||
pass
|
|
@ -0,0 +1,30 @@
|
|||
# Copyright 2014 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from voluptuous import Schema
|
||||
from voluptuous import Any, All, Length
|
||||
from monasca.openstack.common import log
|
||||
from monasca.v2.common.schemas import exceptions
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
metric_name_schema = Schema(All(Any(str, unicode), Length(max=64)))
|
||||
|
||||
|
||||
def validate(name):
|
||||
try:
|
||||
metric_name_schema(name)
|
||||
except Exception as ex:
|
||||
LOG.debug(ex)
|
||||
raise exceptions.ValidationException(str(ex))
|
|
@ -0,0 +1,39 @@
|
|||
# Copyright 2014 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from voluptuous import Schema
|
||||
from voluptuous import Required, Any, All, Range
|
||||
from monasca.openstack.common import log
|
||||
from monasca.v2.common.schemas import metric_name_schema
|
||||
from monasca.v2.common.schemas import dimensions_schema
|
||||
from monasca.v2.common.schemas import exceptions
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
metric_schema = {
|
||||
Required('name'): metric_name_schema.metric_name_schema,
|
||||
Required('dimensions'): dimensions_schema.dimensions_schema,
|
||||
Required('timestamp'): All(int, Range(min=0)),
|
||||
Required('value'): Any(int, float)
|
||||
}
|
||||
|
||||
request_body_schema = Schema(Any(metric_schema, [metric_schema]))
|
||||
|
||||
|
||||
def validate(msg):
|
||||
try:
|
||||
request_body_schema(msg)
|
||||
except Exception as ex:
|
||||
LOG.debug(ex)
|
||||
raise exceptions.ValidationException(str(ex))
|
|
@ -0,0 +1,37 @@
|
|||
# Copyright 2014 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from voluptuous import Schema
|
||||
from voluptuous import Optional, Required, Any, All, Length
|
||||
from monasca.openstack.common import log
|
||||
from monasca.v2.common.schemas import exceptions
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
transform_schema = {
|
||||
Required('name'): Schema(All(Any(str, unicode), Length(max=64))),
|
||||
Required('description'): Schema(All(Any(str, unicode), Length(max=250))),
|
||||
Required('specification'): Schema(All(Any(str, unicode), Length(max=64536))),
|
||||
Optional('enabled'): bool
|
||||
}
|
||||
|
||||
request_body_schema = Schema(Any(transform_schema))
|
||||
|
||||
|
||||
def validate(msg):
|
||||
try:
|
||||
request_body_schema(msg)
|
||||
except Exception as ex:
|
||||
LOG.debug(ex)
|
||||
raise exceptions.ValidationException(str(ex))
|
|
@ -0,0 +1,16 @@
|
|||
# Copyright 2014 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
def date_handler(obj):
|
||||
return obj.isoformat() if hasattr(obj, 'isoformat') else obj
|
|
@ -0,0 +1,104 @@
|
|||
# Copyright 2014 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import json
|
||||
|
||||
import falcon
|
||||
from oslo.config import cfg
|
||||
|
||||
from monasca.openstack.common import log
|
||||
from monasca.api import monasca_events_api_v2
|
||||
from monasca.common import resource_api
|
||||
from monasca.common.messaging import exceptions as message_queue_exceptions
|
||||
from monasca.common.messaging.message_formats import events_transform_factory
|
||||
from monasca.v2.common import utils
|
||||
from monasca.v2.common.schemas import exceptions as schemas_exceptions
|
||||
from monasca.v2.common.schemas import events_request_body_schema as schemas_event
|
||||
from monasca.v2.reference import helpers
|
||||
|
||||
from stevedore import driver
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class Events(monasca_events_api_v2.EventsV2API):
|
||||
def __init__(self, global_conf):
|
||||
super(Events, self).__init__(global_conf)
|
||||
self._region = cfg.CONF.region
|
||||
self._default_authorized_roles = cfg.CONF.security.default_authorized_roles
|
||||
self._delegate_authorized_roles = cfg.CONF.security.delegate_authorized_roles
|
||||
self._post_events_authorized_roles = cfg.CONF.security.default_authorized_roles + \
|
||||
cfg.CONF.security.agent_authorized_roles
|
||||
self._event_transform = events_transform_factory.create_events_transform()
|
||||
self._init_message_queue()
|
||||
|
||||
def _init_message_queue(self):
|
||||
mgr = driver.DriverManager(
|
||||
namespace = 'monasca.messaging',
|
||||
name = cfg.CONF.messaging.driver,
|
||||
invoke_on_load=True,
|
||||
invoke_args=(['raw-events'])
|
||||
)
|
||||
self._message_queue = mgr.driver
|
||||
|
||||
def _read_event(self, req):
|
||||
'''
|
||||
Read the event from the http request and return as JSON.
|
||||
:param req: HTTP request object.
|
||||
:return: Returns the event as a JSON object.
|
||||
:raises falcon.HTTPBadRequest:
|
||||
'''
|
||||
try:
|
||||
msg = req.stream.read()
|
||||
json_msg = json.loads(msg)
|
||||
return json_msg
|
||||
except ValueError as ex:
|
||||
LOG.debug(ex)
|
||||
raise falcon.HTTPBadRequest('Bad request', 'Request body is not valid JSON')
|
||||
|
||||
def _validate_event(self, event):
|
||||
'''
|
||||
Validates the event
|
||||
:param event: An event object.
|
||||
:raises falcon.HTTPBadRequest:
|
||||
'''
|
||||
try:
|
||||
schemas_event.validate(event)
|
||||
except schemas_exceptions.ValidationException as ex:
|
||||
LOG.debug(ex)
|
||||
raise falcon.HTTPBadRequest('Bad request', ex.message)
|
||||
|
||||
def _send_event(self, event):
|
||||
'''
|
||||
Send the event using the message queue.
|
||||
:param metrics: An event object.
|
||||
:raises: falcon.HTTPServiceUnavailable:
|
||||
'''
|
||||
try:
|
||||
str_msg = json.dumps(event, default=utils.date_handler)
|
||||
self._message_queue.send_message(str_msg)
|
||||
except message_queue_exceptions.MessageQueueException as ex:
|
||||
LOG.exception(ex)
|
||||
raise falcon.HTTPInternalServerError('Service unavailable', ex.message)
|
||||
|
||||
@resource_api.Restify('/v2.0/events/', method='post')
|
||||
def do_post_events(self, req, res):
|
||||
helpers.validate_json_content_type(req)
|
||||
helpers.validate_authorization(req, self._post_events_authorized_roles)
|
||||
event = self._read_event(req)
|
||||
self._validate_event(event)
|
||||
tenant_id = helpers.get_tenant_id(req)
|
||||
transformed_event = self._event_transform(event, tenant_id, self._region)
|
||||
self._send_event(transformed_event)
|
||||
res.status = falcon.HTTP_204
|
|
@ -0,0 +1,148 @@
|
|||
# Copyright 2014 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import falcon
|
||||
import json
|
||||
from falcon.util.uri import parse_query_string
|
||||
from monasca.openstack.common import log
|
||||
from monasca.v2.common.schemas import exceptions as schemas_exceptions
|
||||
from monasca.v2.common.schemas import metric_name_schema
|
||||
from monasca.v2.common.schemas import dimensions_schema
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
def validate_json_content_type(req):
|
||||
if req.content_type not in ['application/json']:
|
||||
raise falcon.HTTPBadRequest('Bad request', 'Bad content type. Must be application/json')
|
||||
|
||||
|
||||
def is_in_role(req, authorized_roles):
|
||||
'''
|
||||
Determines if one or more of the X-ROLES is in the supplied authorized_roles.
|
||||
:param req: HTTP request object. Must contain "X-ROLES" in the HTTP request header.
|
||||
:param authorized_roles: List of authorized roles to check against.
|
||||
:return: Returns True if in the list of authorized roles, otherwise False.
|
||||
'''
|
||||
str_roles = req.get_header('X-ROLES')
|
||||
if str_roles == None:
|
||||
return False
|
||||
roles = str_roles.lower().split(',')
|
||||
for role in roles:
|
||||
if role in authorized_roles:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def validate_authorization(req, authorized_roles):
|
||||
'''
|
||||
Validates whether one or more X-ROLES in the HTTP header is authorized.
|
||||
:param req: HTTP request object. Must contain "X-ROLES" in the HTTP request header.
|
||||
:param authorized_roles: List of authorized roles to check against.
|
||||
:raises falcon.HTTPUnauthorized:
|
||||
'''
|
||||
str_roles = req.get_header('X-ROLES')
|
||||
if str_roles == None:
|
||||
raise falcon.HTTPUnauthorized('Forbidden', 'Tenant does not have any roles', '')
|
||||
roles = str_roles.lower().split(',')
|
||||
for role in roles:
|
||||
if role in authorized_roles:
|
||||
return
|
||||
raise falcon.HTTPUnauthorized('Forbidden', 'Tenant ID is missing a required role to access this service', '')
|
||||
|
||||
|
||||
def get_tenant_id(req):
|
||||
'''
|
||||
Returns the tenant ID in the HTTP request header.
|
||||
:param req: HTTP request object.
|
||||
'''
|
||||
return req.get_header('X-TENANT-ID')
|
||||
|
||||
|
||||
def get_cross_tenant_or_tenant_id(req, delegate_authorized_roles):
|
||||
'''
|
||||
Evaluates whether the tenant ID or cross tenant ID should be returned.
|
||||
:param req: HTTP request object.
|
||||
:param delegate_authorized_roles: List of authorized roles that have delegate privileges.
|
||||
:returns: Returns the cross tenant or tenant ID.
|
||||
'''
|
||||
if is_in_role(req, delegate_authorized_roles):
|
||||
params = parse_query_string(req.query_string)
|
||||
if 'tenant_id' in params:
|
||||
tenant_id = params['tenant_id']
|
||||
return tenant_id
|
||||
return get_tenant_id(req)
|
||||
|
||||
|
||||
def get_query_name(req):
|
||||
'''
|
||||
Returns the query param "name" if supplied.
|
||||
:param req: HTTP request object.
|
||||
'''
|
||||
params = parse_query_string(req.query_string)
|
||||
name = ''
|
||||
if 'name' in params:
|
||||
name = params['name']
|
||||
return name
|
||||
|
||||
|
||||
def get_query_dimensions(req):
|
||||
'''
|
||||
Gets and parses the query param dimensions.
|
||||
:param req: HTTP request object.
|
||||
:return: Returns the dimensions as a JSON object
|
||||
:raises falcon.HTTPBadRequest: If dimensions are malformed.
|
||||
'''
|
||||
try:
|
||||
params = parse_query_string(req.query_string)
|
||||
dimensions = {}
|
||||
if 'dimensions' in params:
|
||||
dimensions_str = params['dimensions']
|
||||
dimensions_str_array = dimensions_str.split(',')
|
||||
for dimension in dimensions_str_array:
|
||||
dimension_name_value = dimension.split(':')
|
||||
if len(dimension_name_value) == 2:
|
||||
dimensions[str(dimension_name_value[0])] = str(dimension_name_value[1])
|
||||
else:
|
||||
raise Exception('Dimensions are malformed')
|
||||
return dimensions
|
||||
except Exception as ex:
|
||||
LOG.debug(ex)
|
||||
raise falcon.HTTPBadRequest('Bad request', ex.message)
|
||||
|
||||
|
||||
def validate_query_name(name):
|
||||
'''
|
||||
Validates the query param name.
|
||||
:param name: Query param name.
|
||||
:raises falcon.HTTPBadRequest: If name is not valid.
|
||||
'''
|
||||
try:
|
||||
metric_name_schema.validate(name)
|
||||
except schemas_exceptions.ValidationException as ex:
|
||||
LOG.debug(ex)
|
||||
raise falcon.HTTPBadRequest('Bad request', ex.message)
|
||||
|
||||
|
||||
def validate_query_dimensions(dimensions):
|
||||
'''
|
||||
Validates the query param dimensions.
|
||||
:param dimensions: Query param dimensions.
|
||||
:raises falcon.HTTPBadRequest: If dimensions are not valid.
|
||||
'''
|
||||
try:
|
||||
dimensions_schema.validate(dimensions)
|
||||
except schemas_exceptions.ValidationException as ex:
|
||||
LOG.debug(ex)
|
||||
raise falcon.HTTPBadRequest('Bad request', ex.message)
|
|
@ -0,0 +1,149 @@
|
|||
# Copyright 2014 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import json
|
||||
|
||||
import falcon
|
||||
from oslo.config import cfg
|
||||
|
||||
from monasca.openstack.common import log
|
||||
from monasca.api import monasca_api_v2
|
||||
from monasca.common import resource_api
|
||||
from monasca.common.messaging import exceptions as message_queue_exceptions
|
||||
from monasca.common.messaging.message_formats import metrics_transform_factory
|
||||
from monasca.v2.common import utils
|
||||
from monasca.v2.common.schemas import exceptions as schemas_exceptions
|
||||
from monasca.v2.common.schemas import metrics_request_body_schema as schemas_metrics
|
||||
from monasca.v2.reference import helpers
|
||||
|
||||
from stevedore import driver
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class Metrics(monasca_api_v2.V2API):
|
||||
def __init__(self, global_conf):
|
||||
super(Metrics, self).__init__(global_conf)
|
||||
self._region = cfg.CONF.region
|
||||
self._default_authorized_roles = cfg.CONF.security.default_authorized_roles
|
||||
self._delegate_authorized_roles = cfg.CONF.security.delegate_authorized_roles
|
||||
self._post_metrics_authorized_roles = cfg.CONF.security.default_authorized_roles + \
|
||||
cfg.CONF.security.agent_authorized_roles
|
||||
self._metrics_transform = metrics_transform_factory.create_metrics_transform()
|
||||
self._init_message_queue()
|
||||
self._init_metrics_repo()
|
||||
|
||||
def _init_message_queue(self):
|
||||
mgr = driver.DriverManager(
|
||||
namespace = 'monasca.messaging',
|
||||
name = cfg.CONF.messaging.driver,
|
||||
invoke_on_load=True,
|
||||
invoke_args=(['metrics'])
|
||||
)
|
||||
self._message_queue = mgr.driver
|
||||
|
||||
def _init_metrics_repo(self):
|
||||
mgr = driver.DriverManager(
|
||||
namespace = 'monasca.repositories',
|
||||
name = cfg.CONF.repositories.metrics_driver,
|
||||
invoke_on_load=True,
|
||||
invoke_args=()
|
||||
)
|
||||
self._metrics_repo = mgr.driver
|
||||
|
||||
def _read_metrics(self, req):
|
||||
'''
|
||||
Read the metrics from the http request and return them as JSON.
|
||||
:param req: HTTP request object.
|
||||
:return: Returns the metrics as a JSON object.
|
||||
:raises falcon.HTTPBadRequest:
|
||||
'''
|
||||
try:
|
||||
msg = req.stream.read()
|
||||
json_msg = json.loads(msg)
|
||||
return json_msg
|
||||
except ValueError as ex:
|
||||
LOG.debug(ex)
|
||||
raise falcon.HTTPBadRequest('Bad request', 'Request body is not valid JSON')
|
||||
|
||||
def _validate_metrics(self, metrics):
|
||||
'''
|
||||
Validates the metrics
|
||||
:param metrics: A metric object or array of metrics objects.
|
||||
:raises falcon.HTTPBadRequest:
|
||||
'''
|
||||
try:
|
||||
schemas_metrics.validate(metrics)
|
||||
except schemas_exceptions.ValidationException as ex:
|
||||
LOG.debug(ex)
|
||||
raise falcon.HTTPBadRequest('Bad request', ex.message)
|
||||
|
||||
def _send_metrics(self, metrics):
|
||||
'''
|
||||
Send the metrics using the message queue.
|
||||
:param metrics: A metric object or array of metrics objects.
|
||||
:raises: falcon.HTTPServiceUnavailable:
|
||||
'''
|
||||
def _send_metric(metric):
|
||||
try:
|
||||
str_msg = json.dumps(metric, default=utils.date_handler)
|
||||
self._message_queue.send_message(str_msg)
|
||||
except message_queue_exceptions.MessageQueueException as ex:
|
||||
LOG.exception(ex)
|
||||
raise falcon.HTTPServiceUnavailable('Service unavailable', ex.message)
|
||||
|
||||
if isinstance(metrics, list):
|
||||
for metric in metrics:
|
||||
_send_metric(metric)
|
||||
else:
|
||||
_send_metric(metrics)
|
||||
|
||||
def _list_metrics(self, tenant_id, name, dimensions):
|
||||
'''
|
||||
Query the metric repo for the metrics, format them and return them.
|
||||
:param tenant_id:
|
||||
:param name:
|
||||
:param dimensions:
|
||||
:raises falcon.HTTPServiceUnavailable:
|
||||
'''
|
||||
try:
|
||||
result = self._metrics_repo.list_metrics(tenant_id, name, dimensions)
|
||||
# TODO: Format response body correctly. Currently just returning what is returned by the metrics repository.
|
||||
return result
|
||||
except Exception as ex:
|
||||
log.exception()
|
||||
raise falcon.HTTPServiceUnavailable('Service unavailable', ex.message)
|
||||
|
||||
@resource_api.Restify('/v2.0/metrics/', method='post')
|
||||
def do_post_metrics(self, req, res):
|
||||
helpers.validate_json_content_type(req)
|
||||
helpers.validate_authorization(req, self._post_metrics_authorized_roles)
|
||||
metrics = self._read_metrics(req)
|
||||
self._validate_metrics(metrics)
|
||||
tenant_id = helpers.get_cross_tenant_or_tenant_id(req, self._delegate_authorized_roles)
|
||||
transformed_metrics = self._metrics_transform(metrics, tenant_id, self._region)
|
||||
self._send_metrics(transformed_metrics)
|
||||
res.status = falcon.HTTP_204
|
||||
|
||||
@resource_api.Restify('/v2.0/metrics/', method='get')
|
||||
def do_get_metrics(self, req, res):
|
||||
helpers.validate_authorization(req, self._default_authorized_roles)
|
||||
tenant_id = helpers.get_tenant_id(req)
|
||||
name = helpers.get_query_name(req)
|
||||
helpers.validate_query_name(name)
|
||||
dimensions = helpers.get_query_dimensions(req)
|
||||
helpers.validate_query_dimensions(dimensions)
|
||||
result = self._list_metrics(tenant_id, name, dimensions)
|
||||
res.body = json.dumps(result)
|
||||
res.status = falcon.HTTP_200
|
|
@ -0,0 +1,151 @@
|
|||
# Copyright 2014 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
# TODO: Used simplejson to read the yaml as simplejson transforms to "str" not "unicode"
|
||||
|
||||
import json
|
||||
import simplejson
|
||||
|
||||
import falcon
|
||||
from oslo.config import cfg
|
||||
|
||||
from monasca.openstack.common import log
|
||||
from monasca.openstack.common import uuidutils
|
||||
from monasca.api import monasca_transforms_api_v2
|
||||
from monasca.common import resource_api
|
||||
from monasca.common.repositories import exceptions as repository_exceptions
|
||||
from monasca.v2.common.schemas import exceptions as schemas_exceptions
|
||||
from monasca.v2.common.schemas import transforms_request_body_schema as schemas_transforms
|
||||
from monasca.v2.reference import helpers
|
||||
|
||||
from stevedore import driver
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class Transforms(monasca_transforms_api_v2.TransformsV2API):
|
||||
def __init__(self, global_conf):
|
||||
super(Transforms, self).__init__(global_conf)
|
||||
self._region = cfg.CONF.region
|
||||
self._default_authorized_roles = cfg.CONF.security.default_authorized_roles
|
||||
self._init_transforms_repo()
|
||||
|
||||
def _init_transforms_repo(self):
|
||||
mgr = driver.DriverManager(
|
||||
namespace = 'monasca.repositories',
|
||||
name = cfg.CONF.repositories.transforms_driver,
|
||||
invoke_on_load=True,
|
||||
invoke_args=()
|
||||
)
|
||||
self._transforms_repo = mgr.driver
|
||||
|
||||
def _read_transform(self, req):
|
||||
'''
|
||||
Read the transform from the http request and return as JSON.
|
||||
:param req: HTTP request object.
|
||||
:return: Returns the transform as a JSON object.
|
||||
:raises falcon.HTTPBadRequest:
|
||||
'''
|
||||
try:
|
||||
msg = req.stream.read()
|
||||
json_msg = simplejson.loads(msg)
|
||||
return json_msg
|
||||
except ValueError as ex:
|
||||
LOG.debug(ex)
|
||||
raise falcon.HTTPBadRequest('Bad request', 'Request body is not valid JSON')
|
||||
|
||||
def _validate_transform(self, transform):
|
||||
'''
|
||||
Validates the transform
|
||||
:param transform: An event object.
|
||||
:raises falcon.HTTPBadRequest:
|
||||
'''
|
||||
try:
|
||||
schemas_transforms.validate(transform)
|
||||
except schemas_exceptions.ValidationException as ex:
|
||||
LOG.debug(ex)
|
||||
raise falcon.HTTPBadRequest('Bad request', ex.message)
|
||||
|
||||
def _create_transform(self, id, tenant_id, transform):
|
||||
'''
|
||||
Store the transform using the repository.
|
||||
:param transform: A transform object.
|
||||
:raises: falcon.HTTPServiceUnavailable:
|
||||
'''
|
||||
try:
|
||||
name = transform['name']
|
||||
description = transform['description']
|
||||
specification = transform['specification']
|
||||
enabled = transform['enabled']
|
||||
self._transforms_repo.create_transforms(id, tenant_id, name, description, specification, enabled)
|
||||
except repository_exceptions.RepositoryException as ex:
|
||||
LOG.error(ex)
|
||||
raise falcon.HTTPInternalServerError('Service unavailable', ex.message)
|
||||
|
||||
def _create_transform_response(self, id, transform):
|
||||
name = transform['name']
|
||||
description = transform['description']
|
||||
specification = transform['specification']
|
||||
enabled = transform['enabled']
|
||||
response = {
|
||||
'id': id,
|
||||
'name': name,
|
||||
'description': description,
|
||||
'specification': specification,
|
||||
'enabled': enabled
|
||||
}
|
||||
return json.dumps(response)
|
||||
|
||||
def _list_transforms(self, tenant_id):
|
||||
try:
|
||||
transforms = self._transforms_repo.list_transforms(tenant_id)
|
||||
return json.dumps(transforms)
|
||||
except repository_exceptions.RepositoryException as ex:
|
||||
LOG.error(ex)
|
||||
raise falcon.HTTPInternalServerError('Service unavailable', ex.message)
|
||||
|
||||
def _delete_transform(self, tenant_id, transform_id):
|
||||
try:
|
||||
self._transforms_repo.delete_transform(tenant_id, transform_id)
|
||||
except repository_exceptions.DoesNotExistException:
|
||||
raise falcon.HTTPNotFound()
|
||||
except repository_exceptions.RepositoryException as ex:
|
||||
LOG.error(ex)
|
||||
raise falcon.HTTPInternalServerError('Service unavailable', ex.message)
|
||||
|
||||
@resource_api.Restify('/v2.0/events/transforms', method='post')
|
||||
def do_post_transforms(self, req, res):
|
||||
helpers.validate_json_content_type(req)
|
||||
helpers.validate_authorization(req, self._default_authorized_roles)
|
||||
transform = self._read_transform(req)
|
||||
self._validate_transform(transform)
|
||||
id = uuidutils.generate_uuid()
|
||||
tenant_id = helpers.get_tenant_id(req)
|
||||
self._create_transform(id, tenant_id, transform)
|
||||
res.body = self._create_transform_response(id, transform)
|
||||
res.status = falcon.HTTP_200
|
||||
|
||||
@resource_api.Restify('/v2.0/events/transforms', method='get')
|
||||
def do_get_transforms(self, req, res):
|
||||
helpers.validate_authorization(req, self._default_authorized_roles)
|
||||
tenant_id = helpers.get_tenant_id(req)
|
||||
res.body = self._list_transforms(tenant_id)
|
||||
res.status = falcon.HTTP_200
|
||||
|
||||
@resource_api.Restify('/v2.0/events/transforms/{transform_id}', method='delete')
|
||||
def do_delete_transforms(self, req, res, transform_id):
|
||||
helpers.validate_authorization(req, self._default_authorized_roles)
|
||||
tenant_id = helpers.get_tenant_id(req)
|
||||
self._delete_transform(tenant_id, transform_id)
|
||||
res.status = falcon.HTTP_204
|
21
setup.cfg
21
setup.cfg
|
@ -33,13 +33,32 @@ data_files =
|
|||
console_scripts =
|
||||
monasca-api = monasca.api.server:run
|
||||
|
||||
monasca.dispatcher =
|
||||
monasca.metrics_dispatcher =
|
||||
kafka = monasca.dispatcher.kafka_dispatcher:KafkaDispatcher
|
||||
v2_reference = monasca.v2.reference.metrics:Metrics
|
||||
|
||||
monasca.events_dispatcher =
|
||||
v2_reference = monasca.v2.reference.events:Events
|
||||
|
||||
monasca.transforms_dispatcher =
|
||||
v2_reference = monasca.v2.reference.transforms:Transforms
|
||||
|
||||
paste.filter_factory =
|
||||
login = monasca.middleware.login:filter_factory
|
||||
inspector = monasca.middleware.inspector:filter_factory
|
||||
metric_validator = monasca.middleware.metric_validator:filter_factory
|
||||
mock_auth_filter = monasca.middleware.mock_auth_filter:filter_factory
|
||||
|
||||
monasca.messaging =
|
||||
fake = monasca.common.messaging.fake_publisher:FakePublisher
|
||||
kafka = monasca.common.messaging.kafka_publisher:KafkaPublisher
|
||||
rabbitmq = monasca.common.messaging.rabbitmq_publisher:RabbitmqPublisher
|
||||
|
||||
monasca.repositories =
|
||||
fake_metrics_repo = monasca.common.repositories.fake.metrics_repository:MetricsRepository
|
||||
influxdb_metrics_repo = monasca.common.repositories.influxdb.metrics_repository:MetricsRepository
|
||||
fake_events_repo = monasca.common.repositories.fake.events_repository:EventsRepository
|
||||
mysql_transforms_repo = monasca.common.repositories.mysql.transforms_repository:TransformsRepository
|
||||
|
||||
[pbr]
|
||||
warnerrors = True
|
||||
|
|
Loading…
Reference in New Issue