Start of initial reference implementation and re-factored

Change-Id: I90d09331a41f15c47870cd7690b22405087ff7a4
This commit is contained in:
Roland Hochmuth 2014-09-22 19:53:27 -06:00
parent 82d007f3cf
commit c5f5fba741
57 changed files with 1891 additions and 96 deletions

View File

@ -1,33 +1,99 @@
[DEFAULT]
#logging, make sure that the user under whom the server runs has permission
#to write to the directory.
log_file=monasca.log
log_dir=/var/log/monasca/
log_level=DEBUG
[default]
# logging, make sure that the user under whom the server runs has permission
# to write to the directory.
log_file = monasca.log
log_dir = .
log_level = DEBUG
[dispatcher]
# Identifies the region that the Monasca API is running in.
region = na
[security]
# The roles that are allowed full access to the API.
default_authorized_roles = admin
# The roles that are allowed to only POST metrics to the API. This role would be used by the Monasca Agent.
agent_authorized_roles = agent
# The roles that are allowed to access the API on behalf of another tenant.
# For example, a service can POST metrics to another tenant if they are a member of the "delegate" role.
delegate_authorized_roles = admin
[messaging]
# The message queue driver to use
driver = kafka
# The type of metrics message format to publish to the message queue.
metrics_message_format = reference
# The type of events message format to publish to the message queue.
events_message_format = reference
[repositories]
# The driver to use for the metrics repository
metrics_driver = influxdb_metrics_repo
# The driver to use for the events repository
events_driver = none
# The driver to use for the transforms repository
transforms_driver = mysql_transforms_repo
[dispatcher]
driver = v2_reference
[kafka]
#The endpoint to the kafka server
uri = 192.168.1.191:9092
#The topic on the kafka server
topic = metrics
#consumer group name
group = metric_group
#how many times to try when error occurs
# The endpoint to the kafka server
uri = 192.168.10.4:9092
# The topic that metrics will be published too
metrics_topic = metrics
# The topic that events will be published too
events_topic = raw-events
# consumer group name
group = api
# how many times to try when error occurs
max_retry = 1
#wait time between tries when kafka goes down
# wait time between tries when kafka goes down
wait_time = 1
#use synchronized or asynchronized connection to kafka
# use synchronous or asynchronous connection to kafka
async = False
#send messages in bulk or send messages one by one.
# send messages in bulk or send messages one by one.
compact = False
#How many partitions this connection should listen messages on, this
#parameter is for reading from kafka. If listens on multiple partitions,
#For example, if the client should listen on partitions 1 and 3, then the
#configuration should look like the following:
# How many partitions this connection should listen messages on, this
# parameter is for reading from kafka. If listens on multiple partitions,
# For example, if the client should listen on partitions 1 and 3, then the
# configuration should look like the following:
# partitions = 1
# partitions = 3
#default to listen on partition 0.
# default to listen on partition 0.
partitions = 0
[influxdb]
# The IP address of the InfluxDB service.
ip_address = 192.168.10.4
# The port number that the InfluxDB service is listening on.
port = 8086
# The username to authenticate with.
user = mon_api
# The password to authenticate with.
password = password
# The name of the InfluxDB database to use.
database_name = mon
[mysql]
database_name = mon
hostname = 192.168.10.4
username = monapi
password = password

View File

@ -3,7 +3,7 @@ name = monasca
[pipeline:main]
# Add validator in the pipeline so the metrics messages can be validated.
pipeline = validator api
pipeline = auth api
[app:api]
paste.app_factory = monasca.api.server:api_app
@ -17,9 +17,12 @@ use = egg: monasca_api#inspector
[filter:validator]
use = egg: monasca_api#metric_validator
[filter:auth]
use = egg: monasca_api#mock_auth_filter
[server:main]
use = egg:gunicorn#main
host = 0.0.0.0
port = 9090
port = 9000
workers = 1
proc_name = monasca
proc_name = monasca

View File

@ -26,8 +26,8 @@ class V2API(object):
LOG.debug('initializing V2API!')
self.global_conf = global_conf
@resource_api.Restify('/v2.0/metrics/{id}')
def do_get_metrics(self, req, res, id):
@resource_api.Restify('/v2.0/metrics')
def do_get_metrics(self, req, res):
res.status = '501 Not Implemented'
@resource_api.Restify('/v2.0/metrics/', method='post')
@ -92,4 +92,4 @@ class V2API(object):
@resource_api.Restify('/v2.0/alarms/{id}/state-history')
def do_get_alarm_state_history(self, req, res, id):
res.status = '501 Not Implemented'
res.status = '501 Not Implemented'

View File

@ -0,0 +1,29 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from monasca.common import resource_api
from monasca.openstack.common import log
LOG = log.getLogger(__name__)
class EventsV2API(object):
def __init__(self, global_conf):
LOG.debug('initializing V2API!')
self.global_conf = global_conf
@resource_api.Restify('/v2.0/events/', method='post')
def do_post_events(self, req, res):
res.status = '501 Not Implemented'

View File

@ -0,0 +1,37 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from monasca.common import resource_api
from monasca.openstack.common import log
LOG = log.getLogger(__name__)
class TransformsV2API(object):
def __init__(self, global_conf):
LOG.debug('initializing V2API!')
self.global_conf = global_conf
@resource_api.Restify('/v2.0/events/transforms', method='post')
def do_post_transforms(self, req, res):
res.status = '501 Not Implemented'
@resource_api.Restify('/v2.0/events/transforms', method='get')
def do_get_transforms(self, req, res):
res.status = '501 Not Implemented'
@resource_api.Restify('/v2.0/events/transforms/{transform_id}', method='delete')
def do_delete_transforms(self, req, res, transform_id):
res.status = '501 Not Implemented'

View File

@ -1,4 +1,4 @@
# Copyright 2013 IBM Corp
# Copyright 2014 IBM Corp
#
# Author: Tong Li <litong01@us.ibm.com>
#
@ -14,25 +14,143 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from stevedore import driver
import os
from monasca.common import resource_api
from monasca.openstack.common import log
from oslo.config import cfg
from oslo.config import types
from paste.deploy import loadapp
from wsgiref import simple_server
DISPATCHER_NAMESPACE = 'monasca.dispatcher'
cfg.CONF.register_opts(
[cfg.StrOpt('driver', default='kafka',
help='The name of the dispatcher for the api server'),
], group="dispatcher")
METRICS_DISPATCHER_NAMESPACE = 'monasca.metrics_dispatcher'
EVENTS_DISPATCHER_NAMESPACE = 'monasca.events_dispatcher'
TRANSFORMS_DISPATCHER_NAMESPACE = 'monasca.transforms_dispatcher'
LOG = log.getLogger(__name__)
global_opts = [
cfg.StrOpt('region', help='Region that API is running in')
]
cfg.CONF.register_opts(global_opts)
security_opts = [
cfg.ListOpt('default_authorized_roles', default=['admin'],
help='Roles that are allowed full access to the API'),
cfg.ListOpt('agent_authorized_roles', default=['agent'],
help='Roles that are only allowed to POST to the API'),
cfg.ListOpt('delegate_authorized_roles', default=['admin'],
help='Roles that are allowed to POST metrics on behalf of another tenant')
]
security_group = cfg.OptGroup(name='security', title='security')
cfg.CONF.register_group(security_group)
cfg.CONF.register_opts(security_opts, security_group)
messaging_opts = [
cfg.StrOpt('driver', default='kafka', help='The message queue driver to use'),
cfg.StrOpt('metrics_message_format', default='reference',
help='The type of metrics message format to publish to the message queue'),
cfg.StrOpt('events_message_format', default='reference',
help='The type of events message format to publish to the message queue')
]
messaging_group = cfg.OptGroup(name='messaging', title='messaging')
cfg.CONF.register_group(messaging_group)
cfg.CONF.register_opts(messaging_opts, messaging_group)
repositories_opts = [
cfg.StrOpt('metrics_driver', default='influxdb_metrics_repo', help='The repository driver to use for metrics'),
cfg.StrOpt('events_driver', default='fake_events_repo', help='The repository driver to use for events'),
cfg.StrOpt('transforms_driver', default='mysql_transforms_repo', help='The repository driver to use for transforms')
]
repositories_group = cfg.OptGroup(name='repositories', title='repositories')
cfg.CONF.register_group(repositories_group)
cfg.CONF.register_opts(repositories_opts, repositories_group)
dispatcher_opts = [
cfg.StrOpt('driver', default='monasca.v2.reference.metrics:Metrics',
help='The name of the dispatcher for the api server')
]
dispatcher_group = cfg.OptGroup(name='dispatcher', title='dispatcher')
cfg.CONF.register_group(dispatcher_group)
cfg.CONF.register_opts(dispatcher_opts, dispatcher_group)
kafka_opts = [
cfg.StrOpt('uri',
help='Address to kafka server. For example: '
'uri=192.168.1.191:9092'),
cfg.StrOpt('metrics_topic',
default='metrics',
help='The topic that metrics will be published too.'),
cfg.StrOpt('events_topic',
default='raw-events',
help='The topic that events will be published too.'),
cfg.StrOpt('group',
default='api',
help='The group name that this service belongs to.'),
cfg.IntOpt('wait_time',
default=1,
help='The wait time when no messages on kafka queue.'),
cfg.IntOpt('ack_time',
default=20,
help='The ack time back to kafka.'),
cfg.IntOpt('max_retry',
default=3,
help='The number of retry when there is a connection error.'),
cfg.BoolOpt('auto_commit',
default=False,
help='If automatically commmit when consume messages.'),
cfg.BoolOpt('async',
default=True,
help='The type of posting.'),
cfg.BoolOpt('compact',
default=True,
help=('Specify if the message received should be parsed.'
'If True, message will not be parsed, otherwise '
'messages will be parsed.')),
cfg.MultiOpt('partitions',
item_type=types.Integer(),
default=[0],
help='The sleep time when no messages on kafka queue.'),
cfg.BoolOpt('drop_data',
default=False,
help=('Specify if received data should be simply dropped. '
'This parameter is only for testing purposes.')),
]
kafka_group = cfg.OptGroup(name='kafka', title='title')
cfg.CONF.register_group(kafka_group)
cfg.CONF.register_opts(kafka_opts, kafka_group)
influxdb_opts = [
cfg.StrOpt('database_name'),
cfg.StrOpt('ip_address'),
cfg.StrOpt('port'),
cfg.StrOpt('user'),
cfg.StrOpt('password')
]
influxdb_group = cfg.OptGroup(name='influxdb', title='influxdb')
cfg.CONF.register_group(influxdb_group)
cfg.CONF.register_opts(influxdb_opts, influxdb_group)
mysql_opts = [
cfg.StrOpt('database_name'),
cfg.StrOpt('hostname'),
cfg.StrOpt('username'),
cfg.StrOpt('password')
]
mysql_group = cfg.OptGroup(name='mysql', title='mysql')
cfg.CONF.register_group(mysql_group)
cfg.CONF.register_opts(mysql_opts, mysql_group)
def api_app(conf):
# Setup logs
log_levels = (cfg.CONF.default_log_levels)
cfg.set_defaults(log.log_opts, default_log_levels=log_levels)
@ -43,16 +161,50 @@ def api_app(conf):
# Create the application
app = resource_api.ResourceAPI()
# load the driver specified by dispatcher in the monasca.ini file
manager = driver.DriverManager(namespace=DISPATCHER_NAMESPACE,
name=cfg.CONF.dispatcher.driver,
invoke_on_load=True,
invoke_args=[conf])
# load the metrics driver specified by dispatcher in the monasca.ini file
metrics_manager = driver.DriverManager(namespace=METRICS_DISPATCHER_NAMESPACE,
name=cfg.CONF.dispatcher.driver,
invoke_on_load=True,
invoke_args=[conf])
LOG.debug('Dispatcher driver %s is loaded.' % cfg.CONF.dispatcher.driver)
LOG.debug('Metrics dispatcher driver %s is loaded.' % cfg.CONF.dispatcher.driver)
# add the driver to the application
app.add_route(None, manager.driver)
app.add_route(None, metrics_manager.driver)
LOG.debug('Metrics dispatcher driver has been added to the routes!')
# load the events driver specified by dispatcher in the monasca.ini file
events_manager = driver.DriverManager(namespace=EVENTS_DISPATCHER_NAMESPACE,
name=cfg.CONF.dispatcher.driver,
invoke_on_load=True,
invoke_args=[conf])
LOG.debug('Events dispatcher driver %s is loaded.' % cfg.CONF.dispatcher.driver)
# add the driver to the application
app.add_route(None, events_manager.driver)
LOG.debug('Events dispatcher driver has been added to the routes!')
# load the events driver specified by dispatcher in the monasca.ini file
transforms_manager = driver.DriverManager(namespace=TRANSFORMS_DISPATCHER_NAMESPACE,
name=cfg.CONF.dispatcher.driver,
invoke_on_load=True,
invoke_args=[conf])
LOG.debug('Transforms dispatcher driver %s is loaded.' % cfg.CONF.dispatcher.driver)
# add the driver to the application
app.add_route(None, transforms_manager.driver)
LOG.debug('Transforms dispatcher driver has been added to the routes!')
LOG.debug('Dispatcher driver has been added to the routes!')
return app
if __name__ == '__main__':
wsgi_app = loadapp('config:etc/monasca.ini', relative_to=os.getcwd())
httpd = simple_server.make_server('127.0.0.1', 9000, wsgi_app)
httpd.serve_forever()

View File

@ -1,7 +1,4 @@
#
# Copyright 2012-2013 eNovance <licensing@enovance.com>
#
# Author: Julien Danjou <julien@danjou.info>
# Copyright 2013 IBM Corp
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -20,8 +17,8 @@ from kafka import common
from kafka import consumer
from kafka import producer
from oslo.config import cfg
from oslo.config import types
import time
try:
import ujson as json
except ImportError:
@ -29,49 +26,6 @@ except ImportError:
from monasca.openstack.common import log
OPTS = [
cfg.StrOpt('uri',
help='Address to kafka server. For example: '
'uri=192.168.1.191:9092'),
cfg.StrOpt('topic',
default='event',
help='The topic that this collector will listen on.'),
cfg.StrOpt('group',
default='event_consumer',
help='The group name that this service belongs to.'),
cfg.IntOpt('wait_time',
default=1,
help='The wait time when no messages on kafka queue.'),
cfg.IntOpt('ack_time',
default=20,
help='The ack time back to kafka.'),
cfg.IntOpt('max_retry',
default=3,
help='The number of retry when there is a connection error.'),
cfg.BoolOpt('auto_commit',
default=False,
help='If automatically commmit when consume messages.'),
cfg.BoolOpt('async',
default=True,
help='The type of posting.'),
cfg.BoolOpt('compact',
default=True,
help=('Specify if the message received should be parsed.'
'If True, message will not be parsed, otherwise '
'messages will be parsed.')),
cfg.MultiOpt('partitions',
item_type=types.Integer(),
default=[0],
help='The sleep time when no messages on kafka queue.'),
cfg.BoolOpt('drop_data',
default=False,
help=('Specify if received data should be simply dropped. '
'This parameter is only for testing purposes.')),
]
cfg.CONF.register_opts(OPTS, group="kafka")
LOG = log.getLogger(__name__)

View File

View File

@ -0,0 +1,16 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class MessageQueueException(Exception):
pass

View File

@ -0,0 +1,24 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from monasca.common.messaging import publisher
class FakePublisher(publisher.Publisher):
def __init__(self, topic):
pass
def send_message(self, message):
pass

View File

@ -0,0 +1,110 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from kafka import client
from kafka import common
from kafka import producer
from oslo.config import cfg
import time
from monasca.openstack.common import log
from monasca.common.messaging import publisher
from monasca.common.messaging import exceptions
LOG = log.getLogger(__name__)
class KafkaPublisher(publisher.Publisher):
def __init__(self, topic):
if not cfg.CONF.kafka.uri:
raise Exception('Kafka is not configured correctly! '
'Use configuration file to specify Kafka '
'uri, for example: '
'uri=192.168.1.191:9092')
self.uri = cfg.CONF.kafka.uri
self.topic = topic
self.group = cfg.CONF.kafka.group
self.wait_time = cfg.CONF.kafka.wait_time
self.async = cfg.CONF.kafka.async
self.ack_time = cfg.CONF.kafka.ack_time
self.max_retry = cfg.CONF.kafka.max_retry
self.auto_commit = cfg.CONF.kafka.auto_commit
self.compact = cfg.CONF.kafka.compact
self.partitions = cfg.CONF.kafka.partitions
self.drop_data = cfg.CONF.kafka.drop_data
self._client = None
self._producer = None
def _init_client(self, wait_time=None):
for i in range(self.max_retry):
try:
# if there is a client instance, but _init_client is called
# again, most likely the connection has gone stale, close that
# connection and reconnect.
if self._client:
self._client.close()
if not wait_time:
wait_time = self.wait_time
time.sleep(wait_time)
self._client = client.KafkaClient(self.uri)
# when a client is re-initialized, existing consumer should be
# reset as well.
self._producer = None
break
except common.KafkaUnavailableError:
LOG.error('Kafka server at %s is down.' % self.uri)
except common.LeaderNotAvailableError:
LOG.error('Kafka at %s has no leader available.' % self.uri)
except Exception:
LOG.error('Kafka at %s initialization failed.' % self.uri)
# Wait a bit and try again to get a client
time.sleep(self.wait_time)
def _init_producer(self):
try:
if not self._client:
self._init_client()
self._producer = producer.SimpleProducer(
self._client, async=self.async, ack_timeout=self.ack_time)
LOG.debug('Kafka SimpleProducer was created successfully.')
except Exception:
self._producer = None
LOG.exception('Kafka (%s) producer can not be created.' % self.uri)
def close(self):
if self._client:
self._producer = None
self._client.close()
def send_message(self, message):
try:
if not self._producer:
self._init_producer()
self._producer.send_messages(self.topic, message)
except (common.KafkaUnavailableError,
common.LeaderNotAvailableError):
self._client = None
LOG.exception('Error occurred while posting data to Kafka.')
raise exceptions.MessageQueueException()
except Exception:
LOG.exception('Unknown error.')
raise exceptions.MessageQueueException()

View File

@ -0,0 +1,16 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
def transform(events, tenant_id, region):
raise NotImplemented()

View File

@ -0,0 +1,16 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
def transform(metrics, tenant_id, region):
raise NotImplemented()

View File

@ -0,0 +1,27 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
import monasca.common.messaging.message_formats.reference.events as reference_events
import monasca.common.messaging.message_formats.cadf.events as cadf_events
import monasca.common.messaging.message_formats.identity.events as identity_events
def create_events_transform():
message_format = cfg.CONF.messaging.events_message_format
if message_format == 'reference':
return reference_events.transform
elif message_format == 'cadf':
return cadf_events.transform
else:
return identity_events.transform

View File

@ -0,0 +1,16 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class TransformationException(Exception):
pass

View File

@ -0,0 +1,18 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from datetime import datetime
def transform(events, tenant_id, region):
return events

View File

@ -0,0 +1,18 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from datetime import datetime
def transform(metrics, tenant_id, region):
return metrics

View File

@ -0,0 +1,27 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
import monasca.common.messaging.message_formats.reference.metrics as reference_metrics
import monasca.common.messaging.message_formats.cadf.metrics as cadf_metrics
import monasca.common.messaging.message_formats.identity.metrics as identity_metrics
def create_metrics_transform():
metrics_message_format = cfg.CONF.messaging.metrics_message_format
if metrics_message_format == 'reference':
return reference_metrics.transform
elif metrics_message_format == 'cadf':
return cadf_metrics.transform
else:
return identity_metrics.transform

View File

@ -0,0 +1,28 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from datetime import datetime
def transform(event, tenant_id, region):
transformed_event = dict(
event=event,
meta=dict(
tenantId=tenant_id,
region=region
),
creation_time=datetime.now()
)
return transformed_event

View File

@ -0,0 +1,35 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from datetime import datetime
def transform(metrics, tenant_id, region):
transformed_metric = {
'metric': {},
'meta': {
'tenantId': tenant_id,
'region': region
},
'creation_time': datetime.now()
}
if isinstance(metrics, list):
transformed_metrics = []
for metric in metrics:
transformed_metric['metric'] = metric
transformed_metrics.append(transformed_metric)
return transformed_metrics
else:
transformed_metric['metric'] = metrics
return transformed_metric

View File

@ -0,0 +1,27 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
class Publisher(object):
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def send_message(self, message):
'''
Sends the message using the message queue.
:param message: Message to send.
'''
return

View File

@ -0,0 +1,24 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from monasca.common.messaging import publisher
class RabbitmqPublisher(publisher.Publisher):
def __init__(self, topic):
pass
def send_message(self, message):
raise NotImplemented()

View File

View File

@ -0,0 +1,23 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
class EventsRepository(object):
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def list_events(self, tenant_id, name, dimensions):
return

View File

@ -0,0 +1,19 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class RepositoryException(Exception):
pass
class DoesNotExistException(RepositoryException):
pass

View File

@ -0,0 +1,27 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from monasca.common.repositories import events_repository
from monasca.openstack.common import log
LOG = log.getLogger(__name__)
class EventsRepository(events_repository.EventsRepository):
def __init__(self):
return
def list_events(self, tenant_id, name, dimensions):
return {}

View File

@ -0,0 +1,27 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from monasca.common.repositories import metrics_repository
from monasca.openstack.common import log
LOG = log.getLogger(__name__)
class MetricsRepository(metrics_repository.MetricsRepository):
def __init__(self):
return
def list_metrics(self, tenant_id, name, dimensions):
return {}

View File

@ -0,0 +1,54 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from monasca.common.repositories import metrics_repository
from monasca.common.repositories import exceptions
from monasca.openstack.common import log
from influxdb import InfluxDBClient
LOG = log.getLogger(__name__)
class MetricsRepository(metrics_repository.MetricsRepository):
def __init__(self):
try:
self.conf = cfg.CONF
self.influxdb_client = InfluxDBClient(self.conf.influxdb.ip_address,
self.conf.influxdb.port,
self.conf.influxdb.user,
self.conf.influxdb.password,
self.conf.influxdb.database_name)
except Exception as ex:
LOG.exception()
raise exceptions.RepositoryException(ex)
def list_metrics(self, tenant_id, name, dimensions):
try:
# TODO: This code is just a placeholder right now. It actually returns metrics,
# TODO: but the implementation is not complete
#String serieNameRegex = buildSerieNameRegex(tenantId, name, dimensions);
#String query = String.format("list series /%1$s/", serieNameRegex);
#logger.debug("Query string: {}", query);
query = 'list series'
#List<Serie> result = this.influxDB.Query(this.config.influxDB.getName(), query,
# TimeUnit.SECONDS);
result = self.influxdb_client.query(query, 's')
#return buildMetricDefList(result);
return result
except Exception as ex:
LOG.exception()
raise exceptions.RepositoryException(ex)

View File

@ -0,0 +1,23 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
class MetricsRepository(object):
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def list_metrics(self, tenant_id, name, dimensions):
return

View File

@ -0,0 +1,29 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from monasca.openstack.common import log
from oslo.config import cfg
import peewee
LOG = log.getLogger(__name__)
db = peewee.MySQLDatabase(cfg.CONF.mysql.database_name,
host=cfg.CONF.mysql.hostname,
user=cfg.CONF.mysql.username,
passwd=cfg.CONF.mysql.password)
class Model(peewee.Model):
class Meta:
database = db

View File

@ -0,0 +1,78 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from monasca.common.repositories import transforms_repository
from monasca.common.repositories import exceptions
from monasca.openstack.common import log
import peewee
import model
LOG = log.getLogger(__name__)
class Transform(model.Model):
id = peewee.TextField(36)
tenant_id = peewee.TextField(36)
name = peewee.TextField()
description = peewee.TextField()
specification = peewee.TextField()
enabled = peewee.BooleanField()
created_at = peewee.DateTimeField()
updated_at = peewee.DateTimeField()
deleted_at = peewee.DateTimeField()
class TransformsRepository(transforms_repository.TransformsRepository):
def create_transforms(self, id, tenant_id, name, description, specification, enabled):
try:
q = Transform.create(id=id, tenant_id=tenant_id, name=name, description=description, specification=specification, enabled=enabled)
q.save()
except Exception as ex:
LOG.exception(str(ex))
raise exceptions.RepositoryException(str(ex))
def list_transforms(self, tenant_id):
try:
q = Transform.select().where(Transform.tenant_id == tenant_id)
results = q.execute()
transforms = []
for result in results:
transform = {
'id': result.id,
'name': result.name,
'description': result.description,
'specification': result.specification,
'enabled': result.enabled
}
transforms.append(transform)
return transforms
except Exception as ex:
LOG.exception(str(ex))
raise exceptions.RepositoryException(str(ex))
def delete_transform(self, tenant_id, transform_id):
num_rows_deleted = 0
try:
q = Transform.delete().where((Transform.tenant_id == tenant_id) & (Transform.id == transform_id))
num_rows_deleted = q.execute()
except Exception as ex:
LOG.exception(str(ex))
raise exceptions.RepositoryException(str(ex))
if num_rows_deleted < 1:
raise exceptions.DoesNotExistException()
return

View File

@ -0,0 +1,31 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
class TransformsRepository(object):
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def create_transforms(self, id, tenant_id, name, description, specification, enabled):
return
@abc.abstractmethod
def list_transforms(self, tenant_id):
return
@abc.abstractmethod
def delete_transform(self, tenant_id, transform_id):
return

View File

@ -0,0 +1,33 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class MockAuthFilter(object):
'''
This authorization filter doesn't do any authentication, it just copies the
auth token to the tenant ID and supplies the 'admin' role and is meant for
testing purposes only.
'''
def __init__(self, app, conf):
self.app = app
self.conf = conf
def __call__(self, env, start_response):
env['HTTP_X_TENANT_ID'] = env['HTTP_X_AUTH_TOKEN']
env['HTTP_X_ROLES'] = 'admin'
return self.app(env, start_response)
def filter_factory(global_conf, **local_conf):
def validator_filter(app):
return MockAuthFilter(app, local_conf)
return validator_filter

View File

@ -0,0 +1,37 @@
# Copyright (c) 2012 Intel Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
UUID related utilities and helper functions.
"""
import uuid
def generate_uuid():
return str(uuid.uuid4())
def is_uuid_like(val):
"""Returns validation of a value as a UUID.
For our purposes, a UUID is a canonical form string:
aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa
"""
try:
return str(uuid.UUID(val)) == val
except (TypeError, ValueError, AttributeError):
return False

0
monasca/v2/__init__.py Normal file
View File

View File

View File

View File

@ -0,0 +1,31 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from voluptuous import Schema
from voluptuous import Any, All, Length
from monasca.openstack.common import log
from monasca.v2.common.schemas import exceptions
LOG = log.getLogger(__name__)
# TODO: Add regex to validate dimension names don't use any excluded characters.
dimensions_schema = Schema({All(Any(str, unicode), Length(max=255)): All(Any(str, unicode), Length(max=255))})
def validate(dimensions):
try:
dimensions_schema(dimensions)
except Exception as ex:
LOG.debug(ex)
raise exceptions.ValidationException(str(ex))

View File

@ -0,0 +1,31 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from voluptuous import Schema
from voluptuous import Any, All, Length
from monasca.openstack.common import log
from monasca.v2.common.schemas import exceptions
LOG = log.getLogger(__name__)
# TODO: Add regex to validate key/values don't use any excluded characters.
event_schema_request_body = Schema({All(Any(str, unicode), Length(max=255)): All(Any(None, str, unicode, bool, int, float, dict, []))})
def validate(body):
try:
event_schema_request_body(body)
except Exception as ex:
LOG.debug(ex)
raise exceptions.ValidationException(str(ex))

View File

@ -0,0 +1,16 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class ValidationException(Exception):
pass

View File

@ -0,0 +1,30 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from voluptuous import Schema
from voluptuous import Any, All, Length
from monasca.openstack.common import log
from monasca.v2.common.schemas import exceptions
LOG = log.getLogger(__name__)
metric_name_schema = Schema(All(Any(str, unicode), Length(max=64)))
def validate(name):
try:
metric_name_schema(name)
except Exception as ex:
LOG.debug(ex)
raise exceptions.ValidationException(str(ex))

View File

@ -0,0 +1,39 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from voluptuous import Schema
from voluptuous import Required, Any, All, Range
from monasca.openstack.common import log
from monasca.v2.common.schemas import metric_name_schema
from monasca.v2.common.schemas import dimensions_schema
from monasca.v2.common.schemas import exceptions
LOG = log.getLogger(__name__)
metric_schema = {
Required('name'): metric_name_schema.metric_name_schema,
Required('dimensions'): dimensions_schema.dimensions_schema,
Required('timestamp'): All(int, Range(min=0)),
Required('value'): Any(int, float)
}
request_body_schema = Schema(Any(metric_schema, [metric_schema]))
def validate(msg):
try:
request_body_schema(msg)
except Exception as ex:
LOG.debug(ex)
raise exceptions.ValidationException(str(ex))

View File

@ -0,0 +1,37 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from voluptuous import Schema
from voluptuous import Optional, Required, Any, All, Length
from monasca.openstack.common import log
from monasca.v2.common.schemas import exceptions
LOG = log.getLogger(__name__)
transform_schema = {
Required('name'): Schema(All(Any(str, unicode), Length(max=64))),
Required('description'): Schema(All(Any(str, unicode), Length(max=250))),
Required('specification'): Schema(All(Any(str, unicode), Length(max=64536))),
Optional('enabled'): bool
}
request_body_schema = Schema(Any(transform_schema))
def validate(msg):
try:
request_body_schema(msg)
except Exception as ex:
LOG.debug(ex)
raise exceptions.ValidationException(str(ex))

View File

@ -0,0 +1,16 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
def date_handler(obj):
return obj.isoformat() if hasattr(obj, 'isoformat') else obj

View File

View File

@ -0,0 +1,104 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import falcon
from oslo.config import cfg
from monasca.openstack.common import log
from monasca.api import monasca_events_api_v2
from monasca.common import resource_api
from monasca.common.messaging import exceptions as message_queue_exceptions
from monasca.common.messaging.message_formats import events_transform_factory
from monasca.v2.common import utils
from monasca.v2.common.schemas import exceptions as schemas_exceptions
from monasca.v2.common.schemas import events_request_body_schema as schemas_event
from monasca.v2.reference import helpers
from stevedore import driver
LOG = log.getLogger(__name__)
class Events(monasca_events_api_v2.EventsV2API):
def __init__(self, global_conf):
super(Events, self).__init__(global_conf)
self._region = cfg.CONF.region
self._default_authorized_roles = cfg.CONF.security.default_authorized_roles
self._delegate_authorized_roles = cfg.CONF.security.delegate_authorized_roles
self._post_events_authorized_roles = cfg.CONF.security.default_authorized_roles + \
cfg.CONF.security.agent_authorized_roles
self._event_transform = events_transform_factory.create_events_transform()
self._init_message_queue()
def _init_message_queue(self):
mgr = driver.DriverManager(
namespace = 'monasca.messaging',
name = cfg.CONF.messaging.driver,
invoke_on_load=True,
invoke_args=(['raw-events'])
)
self._message_queue = mgr.driver
def _read_event(self, req):
'''
Read the event from the http request and return as JSON.
:param req: HTTP request object.
:return: Returns the event as a JSON object.
:raises falcon.HTTPBadRequest:
'''
try:
msg = req.stream.read()
json_msg = json.loads(msg)
return json_msg
except ValueError as ex:
LOG.debug(ex)
raise falcon.HTTPBadRequest('Bad request', 'Request body is not valid JSON')
def _validate_event(self, event):
'''
Validates the event
:param event: An event object.
:raises falcon.HTTPBadRequest:
'''
try:
schemas_event.validate(event)
except schemas_exceptions.ValidationException as ex:
LOG.debug(ex)
raise falcon.HTTPBadRequest('Bad request', ex.message)
def _send_event(self, event):
'''
Send the event using the message queue.
:param metrics: An event object.
:raises: falcon.HTTPServiceUnavailable:
'''
try:
str_msg = json.dumps(event, default=utils.date_handler)
self._message_queue.send_message(str_msg)
except message_queue_exceptions.MessageQueueException as ex:
LOG.exception(ex)
raise falcon.HTTPInternalServerError('Service unavailable', ex.message)
@resource_api.Restify('/v2.0/events/', method='post')
def do_post_events(self, req, res):
helpers.validate_json_content_type(req)
helpers.validate_authorization(req, self._post_events_authorized_roles)
event = self._read_event(req)
self._validate_event(event)
tenant_id = helpers.get_tenant_id(req)
transformed_event = self._event_transform(event, tenant_id, self._region)
self._send_event(transformed_event)
res.status = falcon.HTTP_204

View File

@ -0,0 +1,148 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import falcon
import json
from falcon.util.uri import parse_query_string
from monasca.openstack.common import log
from monasca.v2.common.schemas import exceptions as schemas_exceptions
from monasca.v2.common.schemas import metric_name_schema
from monasca.v2.common.schemas import dimensions_schema
LOG = log.getLogger(__name__)
def validate_json_content_type(req):
if req.content_type not in ['application/json']:
raise falcon.HTTPBadRequest('Bad request', 'Bad content type. Must be application/json')
def is_in_role(req, authorized_roles):
'''
Determines if one or more of the X-ROLES is in the supplied authorized_roles.
:param req: HTTP request object. Must contain "X-ROLES" in the HTTP request header.
:param authorized_roles: List of authorized roles to check against.
:return: Returns True if in the list of authorized roles, otherwise False.
'''
str_roles = req.get_header('X-ROLES')
if str_roles == None:
return False
roles = str_roles.lower().split(',')
for role in roles:
if role in authorized_roles:
return True
return False
def validate_authorization(req, authorized_roles):
'''
Validates whether one or more X-ROLES in the HTTP header is authorized.
:param req: HTTP request object. Must contain "X-ROLES" in the HTTP request header.
:param authorized_roles: List of authorized roles to check against.
:raises falcon.HTTPUnauthorized:
'''
str_roles = req.get_header('X-ROLES')
if str_roles == None:
raise falcon.HTTPUnauthorized('Forbidden', 'Tenant does not have any roles', '')
roles = str_roles.lower().split(',')
for role in roles:
if role in authorized_roles:
return
raise falcon.HTTPUnauthorized('Forbidden', 'Tenant ID is missing a required role to access this service', '')
def get_tenant_id(req):
'''
Returns the tenant ID in the HTTP request header.
:param req: HTTP request object.
'''
return req.get_header('X-TENANT-ID')
def get_cross_tenant_or_tenant_id(req, delegate_authorized_roles):
'''
Evaluates whether the tenant ID or cross tenant ID should be returned.
:param req: HTTP request object.
:param delegate_authorized_roles: List of authorized roles that have delegate privileges.
:returns: Returns the cross tenant or tenant ID.
'''
if is_in_role(req, delegate_authorized_roles):
params = parse_query_string(req.query_string)
if 'tenant_id' in params:
tenant_id = params['tenant_id']
return tenant_id
return get_tenant_id(req)
def get_query_name(req):
'''
Returns the query param "name" if supplied.
:param req: HTTP request object.
'''
params = parse_query_string(req.query_string)
name = ''
if 'name' in params:
name = params['name']
return name
def get_query_dimensions(req):
'''
Gets and parses the query param dimensions.
:param req: HTTP request object.
:return: Returns the dimensions as a JSON object
:raises falcon.HTTPBadRequest: If dimensions are malformed.
'''
try:
params = parse_query_string(req.query_string)
dimensions = {}
if 'dimensions' in params:
dimensions_str = params['dimensions']
dimensions_str_array = dimensions_str.split(',')
for dimension in dimensions_str_array:
dimension_name_value = dimension.split(':')
if len(dimension_name_value) == 2:
dimensions[str(dimension_name_value[0])] = str(dimension_name_value[1])
else:
raise Exception('Dimensions are malformed')
return dimensions
except Exception as ex:
LOG.debug(ex)
raise falcon.HTTPBadRequest('Bad request', ex.message)
def validate_query_name(name):
'''
Validates the query param name.
:param name: Query param name.
:raises falcon.HTTPBadRequest: If name is not valid.
'''
try:
metric_name_schema.validate(name)
except schemas_exceptions.ValidationException as ex:
LOG.debug(ex)
raise falcon.HTTPBadRequest('Bad request', ex.message)
def validate_query_dimensions(dimensions):
'''
Validates the query param dimensions.
:param dimensions: Query param dimensions.
:raises falcon.HTTPBadRequest: If dimensions are not valid.
'''
try:
dimensions_schema.validate(dimensions)
except schemas_exceptions.ValidationException as ex:
LOG.debug(ex)
raise falcon.HTTPBadRequest('Bad request', ex.message)

View File

@ -0,0 +1,149 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import falcon
from oslo.config import cfg
from monasca.openstack.common import log
from monasca.api import monasca_api_v2
from monasca.common import resource_api
from monasca.common.messaging import exceptions as message_queue_exceptions
from monasca.common.messaging.message_formats import metrics_transform_factory
from monasca.v2.common import utils
from monasca.v2.common.schemas import exceptions as schemas_exceptions
from monasca.v2.common.schemas import metrics_request_body_schema as schemas_metrics
from monasca.v2.reference import helpers
from stevedore import driver
LOG = log.getLogger(__name__)
class Metrics(monasca_api_v2.V2API):
def __init__(self, global_conf):
super(Metrics, self).__init__(global_conf)
self._region = cfg.CONF.region
self._default_authorized_roles = cfg.CONF.security.default_authorized_roles
self._delegate_authorized_roles = cfg.CONF.security.delegate_authorized_roles
self._post_metrics_authorized_roles = cfg.CONF.security.default_authorized_roles + \
cfg.CONF.security.agent_authorized_roles
self._metrics_transform = metrics_transform_factory.create_metrics_transform()
self._init_message_queue()
self._init_metrics_repo()
def _init_message_queue(self):
mgr = driver.DriverManager(
namespace = 'monasca.messaging',
name = cfg.CONF.messaging.driver,
invoke_on_load=True,
invoke_args=(['metrics'])
)
self._message_queue = mgr.driver
def _init_metrics_repo(self):
mgr = driver.DriverManager(
namespace = 'monasca.repositories',
name = cfg.CONF.repositories.metrics_driver,
invoke_on_load=True,
invoke_args=()
)
self._metrics_repo = mgr.driver
def _read_metrics(self, req):
'''
Read the metrics from the http request and return them as JSON.
:param req: HTTP request object.
:return: Returns the metrics as a JSON object.
:raises falcon.HTTPBadRequest:
'''
try:
msg = req.stream.read()
json_msg = json.loads(msg)
return json_msg
except ValueError as ex:
LOG.debug(ex)
raise falcon.HTTPBadRequest('Bad request', 'Request body is not valid JSON')
def _validate_metrics(self, metrics):
'''
Validates the metrics
:param metrics: A metric object or array of metrics objects.
:raises falcon.HTTPBadRequest:
'''
try:
schemas_metrics.validate(metrics)
except schemas_exceptions.ValidationException as ex:
LOG.debug(ex)
raise falcon.HTTPBadRequest('Bad request', ex.message)
def _send_metrics(self, metrics):
'''
Send the metrics using the message queue.
:param metrics: A metric object or array of metrics objects.
:raises: falcon.HTTPServiceUnavailable:
'''
def _send_metric(metric):
try:
str_msg = json.dumps(metric, default=utils.date_handler)
self._message_queue.send_message(str_msg)
except message_queue_exceptions.MessageQueueException as ex:
LOG.exception(ex)
raise falcon.HTTPServiceUnavailable('Service unavailable', ex.message)
if isinstance(metrics, list):
for metric in metrics:
_send_metric(metric)
else:
_send_metric(metrics)
def _list_metrics(self, tenant_id, name, dimensions):
'''
Query the metric repo for the metrics, format them and return them.
:param tenant_id:
:param name:
:param dimensions:
:raises falcon.HTTPServiceUnavailable:
'''
try:
result = self._metrics_repo.list_metrics(tenant_id, name, dimensions)
# TODO: Format response body correctly. Currently just returning what is returned by the metrics repository.
return result
except Exception as ex:
log.exception()
raise falcon.HTTPServiceUnavailable('Service unavailable', ex.message)
@resource_api.Restify('/v2.0/metrics/', method='post')
def do_post_metrics(self, req, res):
helpers.validate_json_content_type(req)
helpers.validate_authorization(req, self._post_metrics_authorized_roles)
metrics = self._read_metrics(req)
self._validate_metrics(metrics)
tenant_id = helpers.get_cross_tenant_or_tenant_id(req, self._delegate_authorized_roles)
transformed_metrics = self._metrics_transform(metrics, tenant_id, self._region)
self._send_metrics(transformed_metrics)
res.status = falcon.HTTP_204
@resource_api.Restify('/v2.0/metrics/', method='get')
def do_get_metrics(self, req, res):
helpers.validate_authorization(req, self._default_authorized_roles)
tenant_id = helpers.get_tenant_id(req)
name = helpers.get_query_name(req)
helpers.validate_query_name(name)
dimensions = helpers.get_query_dimensions(req)
helpers.validate_query_dimensions(dimensions)
result = self._list_metrics(tenant_id, name, dimensions)
res.body = json.dumps(result)
res.status = falcon.HTTP_200

View File

@ -0,0 +1,151 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# TODO: Used simplejson to read the yaml as simplejson transforms to "str" not "unicode"
import json
import simplejson
import falcon
from oslo.config import cfg
from monasca.openstack.common import log
from monasca.openstack.common import uuidutils
from monasca.api import monasca_transforms_api_v2
from monasca.common import resource_api
from monasca.common.repositories import exceptions as repository_exceptions
from monasca.v2.common.schemas import exceptions as schemas_exceptions
from monasca.v2.common.schemas import transforms_request_body_schema as schemas_transforms
from monasca.v2.reference import helpers
from stevedore import driver
LOG = log.getLogger(__name__)
class Transforms(monasca_transforms_api_v2.TransformsV2API):
def __init__(self, global_conf):
super(Transforms, self).__init__(global_conf)
self._region = cfg.CONF.region
self._default_authorized_roles = cfg.CONF.security.default_authorized_roles
self._init_transforms_repo()
def _init_transforms_repo(self):
mgr = driver.DriverManager(
namespace = 'monasca.repositories',
name = cfg.CONF.repositories.transforms_driver,
invoke_on_load=True,
invoke_args=()
)
self._transforms_repo = mgr.driver
def _read_transform(self, req):
'''
Read the transform from the http request and return as JSON.
:param req: HTTP request object.
:return: Returns the transform as a JSON object.
:raises falcon.HTTPBadRequest:
'''
try:
msg = req.stream.read()
json_msg = simplejson.loads(msg)
return json_msg
except ValueError as ex:
LOG.debug(ex)
raise falcon.HTTPBadRequest('Bad request', 'Request body is not valid JSON')
def _validate_transform(self, transform):
'''
Validates the transform
:param transform: An event object.
:raises falcon.HTTPBadRequest:
'''
try:
schemas_transforms.validate(transform)
except schemas_exceptions.ValidationException as ex:
LOG.debug(ex)
raise falcon.HTTPBadRequest('Bad request', ex.message)
def _create_transform(self, id, tenant_id, transform):
'''
Store the transform using the repository.
:param transform: A transform object.
:raises: falcon.HTTPServiceUnavailable:
'''
try:
name = transform['name']
description = transform['description']
specification = transform['specification']
enabled = transform['enabled']
self._transforms_repo.create_transforms(id, tenant_id, name, description, specification, enabled)
except repository_exceptions.RepositoryException as ex:
LOG.error(ex)
raise falcon.HTTPInternalServerError('Service unavailable', ex.message)
def _create_transform_response(self, id, transform):
name = transform['name']
description = transform['description']
specification = transform['specification']
enabled = transform['enabled']
response = {
'id': id,
'name': name,
'description': description,
'specification': specification,
'enabled': enabled
}
return json.dumps(response)
def _list_transforms(self, tenant_id):
try:
transforms = self._transforms_repo.list_transforms(tenant_id)
return json.dumps(transforms)
except repository_exceptions.RepositoryException as ex:
LOG.error(ex)
raise falcon.HTTPInternalServerError('Service unavailable', ex.message)
def _delete_transform(self, tenant_id, transform_id):
try:
self._transforms_repo.delete_transform(tenant_id, transform_id)
except repository_exceptions.DoesNotExistException:
raise falcon.HTTPNotFound()
except repository_exceptions.RepositoryException as ex:
LOG.error(ex)
raise falcon.HTTPInternalServerError('Service unavailable', ex.message)
@resource_api.Restify('/v2.0/events/transforms', method='post')
def do_post_transforms(self, req, res):
helpers.validate_json_content_type(req)
helpers.validate_authorization(req, self._default_authorized_roles)
transform = self._read_transform(req)
self._validate_transform(transform)
id = uuidutils.generate_uuid()
tenant_id = helpers.get_tenant_id(req)
self._create_transform(id, tenant_id, transform)
res.body = self._create_transform_response(id, transform)
res.status = falcon.HTTP_200
@resource_api.Restify('/v2.0/events/transforms', method='get')
def do_get_transforms(self, req, res):
helpers.validate_authorization(req, self._default_authorized_roles)
tenant_id = helpers.get_tenant_id(req)
res.body = self._list_transforms(tenant_id)
res.status = falcon.HTTP_200
@resource_api.Restify('/v2.0/events/transforms/{transform_id}', method='delete')
def do_delete_transforms(self, req, res, transform_id):
helpers.validate_authorization(req, self._default_authorized_roles)
tenant_id = helpers.get_tenant_id(req)
self._delete_transform(tenant_id, transform_id)
res.status = falcon.HTTP_204

View File

@ -33,13 +33,32 @@ data_files =
console_scripts =
monasca-api = monasca.api.server:run
monasca.dispatcher =
monasca.metrics_dispatcher =
kafka = monasca.dispatcher.kafka_dispatcher:KafkaDispatcher
v2_reference = monasca.v2.reference.metrics:Metrics
monasca.events_dispatcher =
v2_reference = monasca.v2.reference.events:Events
monasca.transforms_dispatcher =
v2_reference = monasca.v2.reference.transforms:Transforms
paste.filter_factory =
login = monasca.middleware.login:filter_factory
inspector = monasca.middleware.inspector:filter_factory
metric_validator = monasca.middleware.metric_validator:filter_factory
mock_auth_filter = monasca.middleware.mock_auth_filter:filter_factory
monasca.messaging =
fake = monasca.common.messaging.fake_publisher:FakePublisher
kafka = monasca.common.messaging.kafka_publisher:KafkaPublisher
rabbitmq = monasca.common.messaging.rabbitmq_publisher:RabbitmqPublisher
monasca.repositories =
fake_metrics_repo = monasca.common.repositories.fake.metrics_repository:MetricsRepository
influxdb_metrics_repo = monasca.common.repositories.influxdb.metrics_repository:MetricsRepository
fake_events_repo = monasca.common.repositories.fake.events_repository:EventsRepository
mysql_transforms_repo = monasca.common.repositories.mysql.transforms_repository:TransformsRepository
[pbr]
warnerrors = True