Updated for Falcon 0.2 and clean-up

Upgraded to Falcon 0.2
Converted from stevedore to simport
Converted from monasca to monasca_api
Removed events. Events api is in monasca-events-api
Removed references to elastic search
Removed support for message format translations
Removed unused and dead code
Removed author tags

Change-Id: I5034ea256372d22b9f824e301c379da81f82b4e2
This commit is contained in:
Roland Hochmuth 2015-06-05 09:35:48 -06:00
parent d4b9ddf076
commit 89731de2a6
133 changed files with 842 additions and 3097 deletions

View File

@ -1,8 +0,0 @@
# The order of packages is significant, because pip processes them in the order
# of appearance. Changing the order has an impact on the overall integration
# process, please pay attention to order them correctly.
#
# This is the dependency file for elasticsearch implementation which based on
# elasticsearch, all elasticsearch related dependencies should be listed here.
requests>=1.1

View File

@ -1,7 +1,7 @@
[DEFAULT]
# logging, make sure that the user under whom the server runs has permission
# to write to the directory.
log_file = monasca.log
log_file = monasca-api.log
log_dir = .
log_level = DEBUG
@ -9,13 +9,16 @@ log_level = DEBUG
region = useast
# Dispatchers to be loaded to serve restful APIs
dispatcher = v2_ref_metrics
dispatcher = v2_ref_stream_definitions
dispatcher = v2_ref_alarms
dispatcher = v2_ref_alarm_definitions
dispatcher = v2_ref_events
dispatcher = v2_ref_transforms
dispatcher = v2_ref_notifications
[dispatcher]
versions = monasca_api.v2.reference.versions:Versions
metrics = monasca_api.v2.reference.metrics:Metrics
metrics_measurements = monasca_api.v2.reference.metrics:MetricsMeasurements
metrics_statistics = monasca_api.v2.reference.metrics:MetricsStatistics
metrics_names = monasca_api.v2.reference.metrics:MetricsNames
alarm_definitions = monasca_api.v2.reference.alarm_definitions:AlarmDefinitions
alarms = monasca_api.v2.reference.alarms:Alarms
alarms_state_history = monasca_api.v2.reference.alarms:AlarmsStateHistory
notification_methods = monasca_api.v2.reference.notifications:Notifications
[security]
# The roles that are allowed full access to the API.
@ -30,35 +33,20 @@ delegate_authorized_roles = admin
[messaging]
# The message queue driver to use
driver = kafka
# The type of metrics message format to publish to the message queue.
metrics_message_format = reference
# The type of events message format to publish to the message queue.
events_message_format = reference
driver = monasca_api.common.messaging.kafka_publisher:KafkaPublisher
[repositories]
# The driver to use for the metrics repository
metrics_driver = influxdb_metrics_repo
# The driver to use for the stream definitions repository
streams_driver = mysql_streams_repo
# The driver to use for the events repository
events_driver = mysql_events_repo
# The driver to use for the transforms repository
transforms_driver = mysql_transforms_repo
metrics_driver = monasca_api.common.repositories.influxdb.metrics_repository:MetricsRepository
# The driver to use for the alarm definitions repository
alarm_definitions_driver = mysql_alarm_definitions_repo
alarm_definitions_driver = monasca_api.common.repositories.mysql.alarm_definitions_repository:AlarmDefinitionsRepository
# The driver to use for the alarms repository
alarms_driver = mysql_alarms_repo
alarms_driver = monasca_api.common.repositories.mysql.alarms_repository:AlarmsRepository
# The driver to use for the notifications repository
notifications_driver = mysql_notifications_repo
notifications_driver = monasca_api.common.repositories.mysql.notifications_repository:NotificationsRepository
[dispatcher]
driver = v2_reference
@ -70,9 +58,6 @@ uri = 192.168.10.4:9092
# The topic that metrics will be published too
metrics_topic = metrics
# The topic that events will be published too
events_topic = transformed-events
# consumer group name
group = api

22
etc/api-config.ini Normal file
View File

@ -0,0 +1,22 @@
[DEFAULT]
name = monasca_api
[pipeline:main]
# Add validator in the pipeline so the metrics messages can be validated.
pipeline = auth keystonecontext api
[app:api]
paste.app_factory = monasca_api.api.server:launch
[filter:auth]
paste.filter_factory = keystonemiddleware.auth_token:filter_factory
[filter:keystonecontext]
paste.filter_factory = monasca_api.middleware.keystone_context_filter:filter_factory
[server:main]
use = egg:gunicorn#main
host = 127.0.0.1
port = 8082
workers = 1
proc_name = monasca_api

View File

@ -1,31 +0,0 @@
[DEFAULT]
name = monasca
[pipeline:main]
# Add validator in the pipeline so the metrics messages can be validated.
pipeline = auth keystonecontext api
[app:api]
paste.app_factory = monasca.api.server:api_app
[filter:login]
use = egg: monasca_api#login
[filter:inspector]
use = egg: monasca_api#inspector
[filter:validator]
use = egg: monasca_api#metric_validator
[filter:auth]
paste.filter_factory = keystonemiddleware.auth_token:filter_factory
[filter:keystonecontext]
paste.filter_factory = monasca.middleware.keystone_context_filter:filter_factory
[server:main]
use = egg:gunicorn#main
host = 127.0.0.1
port = 8080
workers = 1
proc_name = monasca

View File

@ -1,51 +0,0 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from monasca.common import resource_api
from monasca.openstack.common import log
LOG = log.getLogger(__name__)
class AlarmDefinitionsV2API(object):
def __init__(self, global_conf):
super(AlarmDefinitionsV2API, self).__init__(global_conf)
LOG.debug('initializing AlarmDefinitionsV2API!')
self.global_conf = global_conf
@resource_api.Restify('/v2.0/alarm-definitions', method='post')
def do_post_alarm_definitions(self, req, res):
res.status = '501 Not Implemented'
@resource_api.Restify('/v2.0/alarm-definitions/{id}', method='get')
def do_get_alarm_definition(self, req, res, id):
res.status = '501 Not Implemented'
@resource_api.Restify('/v2.0/alarm-definitions/{id}', method='put')
def do_put_alarm_definitions(self, req, res, id):
res.status = '501 Not Implemented'
@resource_api.Restify('/v2.0/alarm-definitions', method='get')
def do_get_alarm_definitions(self, req, res):
res.status = '501 Not Implemented'
@resource_api.Restify('/v2.0/alarm-definitions/{id}', method='patch')
def do_patch_alarm_definitions(self, req, res, id):
res.status = '501 Not Implemented'
@resource_api.Restify('/v2.0/alarm-definitions/{id}', method='delete')
def do_delete_alarm_definitions(self, req, res, id):
res.status = '501 Not Implemented'

View File

@ -1,55 +0,0 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from monasca.common import resource_api
from monasca.openstack.common import log
LOG = log.getLogger(__name__)
class AlarmsV2API(object):
def __init__(self, global_conf):
super(AlarmsV2API, self).__init__(global_conf)
LOG.debug('initializing AlarmsV2API!')
self.global_conf = global_conf
@resource_api.Restify('/v2.0/alarms/{id}', method='put')
def do_put_alarms(self, req, res, id):
res.status = '501 Not Implemented'
@resource_api.Restify('/v2.0/alarms/{id}', method='patch')
def do_patch_alarms(self, req, res, id):
res.status = '501 Not Implemented'
@resource_api.Restify('/v2.0/alarms/{id}', method='delete')
def do_delete_alarms(self, req, res, id):
res.status = '501 Not Implemented'
@resource_api.Restify('/v2.0/alarms/', method='get')
def do_get_alarms(self, req, res):
res.status = '501 Not Implemented'
@resource_api.Restify('/v2.0/alarms/{id}', method='get')
def do_get_alarm_by_id(self, req, res, id):
res.status = '501 Not Implemented'
@resource_api.Restify('/v2.0/alarms/x', method='get')
def do_get_alarms_state_history(self, req, res):
res.status = '501 Not Implemented'
@resource_api.Restify('/v2.0/alarms/{id}/state-history', method='get')
def do_get_alarm_state_history(self, req, res, id):
res.status = '501 Not Implemented'

View File

@ -1,47 +0,0 @@
# Copyright 2013 IBM Corp
#
# Author: Tong Li <litong01@us.ibm.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from monasca.common import resource_api
from monasca.openstack.common import log
LOG = log.getLogger(__name__)
class V2API(object):
def __init__(self, global_conf):
LOG.debug('initializing V2API!')
self.global_conf = global_conf
@resource_api.Restify('/v2.0/metrics', method='get')
def do_get_metrics(self, req, res):
res.status = '501 Not Implemented'
@resource_api.Restify('/v2.0/metrics/', method='post')
def do_post_metrics(self, req, res):
res.status = '501 Not Implemented'
@resource_api.Restify('/{version_id}', method='get')
def do_get_version(self, req, res, version_id):
res.status = '501 Not Implemented'
@resource_api.Restify('/v2.0/metrics/measurements', method='get')
def do_get_measurements(self, req, res):
res.status = '501 Not Implemented'
@resource_api.Restify('/v2.0/metrics/statistics', method='get')
def do_get_statistics(self, req, res):
res.status = '501 Not Implemented'

View File

@ -1,46 +0,0 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from monasca.common import resource_api
from monasca.openstack.common import log
LOG = log.getLogger(__name__)
class NotificationsV2API(object):
def __init__(self, global_conf):
LOG.debug('initializing V2API!')
self.global_conf = global_conf
@resource_api.Restify('/v2.0/notification-methods', method='post')
def do_post_notification_methods(self, req, res):
res.status = '501 Not Implemented'
@resource_api.Restify('/v2.0/notification-methods/{id}', method='delete')
def do_delete_notification_methods(self, req, res, id):
res.status = '501 Not Implemented'
@resource_api.Restify('/v2.0/notification-methods', method='get')
def do_get_notification_methods(self, req, res):
res.status = '501 Not Implemented'
@resource_api.Restify('/v2.0/notification-methods/{id}', method='get')
def do_get_notification_method(self, req, res, id):
res.status = '501 Not Implemented'
@resource_api.Restify('/v2.0/notification-methods/{id}', method='put')
def do_put_notification_methods(self, req, res, id):
res.status = '501 Not Implemented'

View File

@ -1,74 +0,0 @@
# Copyright 2014 IBM Corp
#
# Author: Tong Li <litong01@us.ibm.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from wsgiref import simple_server
from oslo.config import cfg
import paste.deploy
from stevedore import named
from monasca.common import resource_api
from monasca.openstack.common import log
DISPATCHER_NAMESPACE = 'monasca.dispatcher'
OPTS = [
cfg.MultiStrOpt('dispatcher',
default=[],
help='Dispatchers to process data.'),
]
cfg.CONF.register_opts(OPTS)
LOG = log.getLogger(__name__)
def api_app(conf):
cfg.CONF(args=[], project='monasca')
log_levels = (cfg.CONF.default_log_levels)
cfg.set_defaults(log.log_opts, default_log_levels=log_levels)
log.setup('monasca')
dispatcher_manager = named.NamedExtensionManager(
namespace=DISPATCHER_NAMESPACE,
names=cfg.CONF.dispatcher,
invoke_on_load=True,
invoke_args=[cfg.CONF])
if not list(dispatcher_manager):
LOG.error('Failed to load any dispatchers for %s' %
DISPATCHER_NAMESPACE)
return None
# Create the application
app = resource_api.ResourceAPI()
# add each dispatcher to the application to serve requests offered by
# each dispatcher
for driver in dispatcher_manager:
app.add_route(None, driver.obj)
LOG.debug('Dispatcher drivers have been added to the routes!')
return app
if __name__ == '__main__':
wsgi_app = (
paste.deploy.loadapp('config:etc/monasca.ini',
relative_to=os.getcwd()))
httpd = simple_server.make_server('127.0.0.1', 8080, wsgi_app)
httpd.serve_forever()

View File

@ -1,42 +0,0 @@
# Copyright 2015 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from monasca.common import resource_api
from monasca.openstack.common import log
LOG = log.getLogger(__name__)
class StreamDefinitionsV2API(object):
def __init__(self, global_conf):
LOG.debug('initializing StreamDefinitionsV2API!')
self.global_conf = global_conf
@resource_api.Restify('/v2.0/stream-definitions', method='post')
def do_post_stream_definitions(self, req, res):
res.status = '501 Not Implemented'
@resource_api.Restify('/v2.0/stream-definitions/{id}', method='get')
def do_get_stream_definition(self, req, res, id):
res.status = '501 Not Implemented'
@resource_api.Restify('/v2.0/stream-definitions', method='get')
def do_get_stream_definitions(self, req, res):
res.status = '501 Not Implemented'
@resource_api.Restify(
'/v2.0/stream-definitions/{id}', method='delete')
def do_delete_stream_definitions(self, req, res, id):
res.status = '501 Not Implemented'

View File

@ -1,177 +0,0 @@
# Copyright 2013 IBM Corp
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from kafka import client
from kafka import common
from kafka import consumer
from kafka import producer
from oslo.config import cfg
try:
import ujson as json
except ImportError:
import json
from monasca.openstack.common import log
LOG = log.getLogger(__name__)
class KafkaConnection(object):
def __init__(self):
if not cfg.CONF.kafka.uri:
raise Exception('Kafka is not configured correctly! '
'Use configuration file to specify Kafka '
'uri, for example: '
'uri=192.168.1.191:9092')
self.uri = cfg.CONF.kafka.uri
self.topic = cfg.CONF.kafka.topic
self.group = cfg.CONF.kafka.group
self.wait_time = cfg.CONF.kafka.wait_time
self.async = cfg.CONF.kafka.async
self.ack_time = cfg.CONF.kafka.ack_time
self.max_retry = cfg.CONF.kafka.max_retry
self.auto_commit = cfg.CONF.kafka.auto_commit
self.compact = cfg.CONF.kafka.compact
self.partitions = cfg.CONF.kafka.partitions
self.drop_data = cfg.CONF.kafka.drop_data
self._client = None
self._consumer = None
self._producer = None
LOG.debug('Kafka Connection initialized successfully!')
def _init_client(self, wait_time=None):
for i in range(self.max_retry):
try:
# if there is a client instance, but _init_client is called
# again, most likely the connection has gone stale, close that
# connection and reconnect.
if self._client:
self._client.close()
if not wait_time:
wait_time = self.wait_time
time.sleep(wait_time)
self._client = client.KafkaClient(self.uri)
# when a client is re-initialized, existing consumer should be
# reset as well.
self._consumer = None
self._producer = None
LOG.debug("Successfully connected to Kafka server at topic: "
"\"%s\" partitions %s" % (self.topic,
self.partitions))
break
except common.KafkaUnavailableError:
LOG.error('Kafka server at %s is down.' % self.uri)
except common.LeaderNotAvailableError:
LOG.error('Kafka at %s has no leader available.' % self.uri)
except Exception:
LOG.error('Kafka at %s initialization failed.' % self.uri)
# Wait a bit and try again to get a client
time.sleep(self.wait_time)
def _init_consumer(self):
try:
if not self._client:
self._init_client()
self._consumer = consumer.SimpleConsumer(
self._client, self.group, self.topic,
auto_commit=self.auto_commit,
partitions=self.partitions)
LOG.debug('Consumer was created successfully.')
except Exception:
self._consumer = None
LOG.exception('Kafka (%s) consumer can not be created.' %
self.uri)
def _init_producer(self):
try:
if not self._client:
self._init_client()
self._producer = producer.SimpleProducer(
self._client, async=self.async, ack_timeout=self.ack_time)
LOG.debug('Producer was created successfully.')
except Exception:
self._producer = None
LOG.exception('Kafka (%s) producer can not be created.' %
self.uri)
def commit(self):
if self._consumer and self.auto_commit:
self._consumer.commit()
def close(self):
if self._client:
self._consumer = None
self._producer = None
self._client.close()
def get_messages(self):
try:
if not self._consumer:
self._init_consumer()
for message in self._consumer:
LOG.debug(message.message.value)
yield message
except Exception:
LOG.error('Error occurred while handling kafka messages.')
self._consumer = None
yield None
def send_messages(self, messages):
LOG.debug('Prepare to send messages.')
if not messages or self.drop_data:
return 204
code = 400
try:
if not self._producer:
self._init_producer()
LOG.debug('Start sending messages to kafka.')
if self.compact:
self._producer.send_messages(self.topic, messages)
else:
data = json.loads(messages)
LOG.debug('Msg parsed successfully.')
if isinstance(data, list):
for item in data:
self._producer.send_messages(
self.topic, json.dumps(item))
else:
self._producer.send_messages(self.topic, messages)
LOG.debug('Message posted successfully.')
code = 204
except (common.KafkaUnavailableError,
common.LeaderNotAvailableError):
self._client = None
code = 503
LOG.exception('Error occurred while posting data to '
'Kafka.')
except ValueError:
code = 406
LOG.exception('Message %s is not valid json.' % messages)
except Exception:
code = 500
LOG.exception('Unknown error.')
return code

View File

@ -1,17 +0,0 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
def transform(events, tenant_id, region):
raise NotImplemented()

View File

@ -1,17 +0,0 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
def transform(metrics, tenant_id, region):
raise NotImplemented()

View File

@ -1,29 +0,0 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
import monasca.common.messaging.message_formats.cadf.events as cadf_events
import monasca.common.messaging.message_formats.identity.events as ident_events
import monasca.common.messaging.message_formats.reference.events as ref_events
def create_events_transform():
message_format = cfg.CONF.messaging.events_message_format
if message_format == 'reference':
return ref_events.transform
elif message_format == 'cadf':
return cadf_events.transform
else:
return ident_events.transform

View File

@ -1,17 +0,0 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
def transform(events, tenant_id, region):
return events

View File

@ -1,17 +0,0 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
def transform(metrics, tenant_id, region):
return metrics

View File

@ -1,29 +0,0 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
import monasca.common.messaging.message_formats.cadf.metrics as cadf_metrics
import monasca.common.messaging.message_formats.identity.metrics as id_metrics
import monasca.common.messaging.message_formats.reference.metrics as r_metrics
def create_metrics_transform():
metrics_message_format = cfg.CONF.messaging.metrics_message_format
if metrics_message_format == 'reference':
return r_metrics.transform
elif metrics_message_format == 'cadf':
return cadf_metrics.transform
else:
return id_metrics.transform

View File

@ -1,28 +0,0 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils import timeutils
def transform(event, tenant_id, region):
transformed_event = dict(
event=event,
meta=dict(
tenantId=tenant_id,
region=region
),
creation_time=timeutils.utcnow_ts()
)
return transformed_event

View File

@ -1,28 +0,0 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class EventsRepository(object):
@abc.abstractmethod
def list_event(self, tenant_id, event_id):
return
@abc.abstractmethod
def list_events(self, tenant_id, offset, limit):
return

View File

@ -1,90 +0,0 @@
# Copyright 2015 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from monasca.common.repositories import events_repository as er
from monasca.common.repositories.mysql import mysql_repository
from monasca.openstack.common import log
LOG = log.getLogger(__name__)
class EventsRepository(mysql_repository.MySQLRepository,
er.EventsRepository):
def __init__(self):
super(EventsRepository, self).__init__()
self.database_name = "winchester"
self._base_query = """
select event.message_id,
event.generated,
event_type.desc,
trait.name,
trait.t_string,
trait.t_float,
trait.t_int,
trait.t_datetime
from event
inner join event_type on event.event_type_id=event_type.id
inner join trait on event.id=trait.event_id"""
@mysql_repository.mysql_try_catch_block
def list_event(self, tenant_id, event_id):
query = self._base_query + " where event.message_id=%s"
rows = self._execute_query(query, [event_id])
return rows
@mysql_repository.mysql_try_catch_block
def list_events(self, tenant_id, offset, limit):
where_clause = ""
order_by_clause = " order by event.generated asc"
event_ids = self._find_event_ids(offset, limit)
if event_ids:
ids = ",".join([str(event_id['id']) for event_id in event_ids])
where_clause = """
where trait.event_id
IN ({})""".format(ids)
query = self._base_query + where_clause + order_by_clause
rows = self._execute_query(query, [])
return rows
def _find_event_ids(self, offset, limit):
parameters = []
if offset:
parameters.append(offset.encode('utf8'))
offset_clause = """
where generated > (select generated
from event
where message_id = %s)"""
else:
offset_clause = ""
parameters.append(int(limit))
limit_clause = " limit %s"
id_query = ('select id from event ' +
offset_clause +
' order by generated ' +
limit_clause)
return self._execute_query(id_query, parameters)

View File

@ -1,31 +0,0 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
import peewee
from monasca.openstack.common import log
LOG = log.getLogger(__name__)
db = peewee.MySQLDatabase(cfg.CONF.mysql.database_name,
host=cfg.CONF.mysql.hostname,
user=cfg.CONF.mysql.username,
passwd=cfg.CONF.mysql.password)
class Model(peewee.Model):
class Meta:
database = db

View File

@ -1,198 +0,0 @@
# Copyright 2015 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
import MySQLdb
from oslo_utils import timeutils
from monasca.common.repositories import exceptions
from monasca.common.repositories.mysql import mysql_repository
from monasca.common.repositories import streams_repository as sdr
from monasca.openstack.common import log
LOG = log.getLogger(__name__)
class StreamsRepository(mysql_repository.MySQLRepository,
sdr.StreamsRepository):
base_query = """
select sd.id, sd.tenant_id, sd.name, sd.description,
sd.select_by, sd.group_by, sd.fire_criteria, sd.expiration,
sd.actions_enabled, sd.created_at,
sd.updated_at, sd.deleted_at,
saf.fire_actions, sae.expire_actions
from stream_definition as sd
left join (select stream_definition_id,
group_concat(action_id) as fire_actions
from stream_actions
where action_type = 'FIRE'
group by stream_definition_id) as saf
on saf.stream_definition_id = sd.id
left join (select stream_definition_id,
group_concat(action_id) as expire_actions
from stream_actions
where action_type = 'EXPIRE'
group by stream_definition_id) as sae
on sae.stream_definition_id = sd.id
"""
def __init__(self):
super(StreamsRepository, self).__init__()
@mysql_repository.mysql_try_catch_block
def get_stream_definition(self, tenant_id, stream_definition_id):
parms = [tenant_id, stream_definition_id]
where_clause = """ where sd.tenant_id = %s
and sd.id = %s
and deleted_at is NULL """
query = StreamsRepository.base_query + where_clause
rows = self._execute_query(query, parms)
if rows:
return rows[0]
else:
raise exceptions.DoesNotExistException
@mysql_repository.mysql_try_catch_block
def get_stream_definitions(self, tenant_id, name, offset, limit):
parms = [tenant_id]
select_clause = StreamsRepository.base_query
where_clause = " where sd.tenant_id = %s and deleted_at is NULL "
if name:
where_clause += " and sd.name = %s "
parms.append(name.encode('utf8'))
order_by_clause = " order by sd.id, sd.created_at "
if offset:
where_clause += " and sd.id > %s "
parms.append(offset.encode('utf8'))
limit_clause = " limit %s "
parms.append(limit + 1)
query = select_clause + where_clause + order_by_clause + limit_clause
return self._execute_query(query, parms)
@mysql_repository.mysql_try_catch_block
def delete_stream_definition(self, tenant_id, stream_definition_id):
"""Delete the stream definition.
:param tenant_id:
:param stream_definition_id:
:returns True: -- if stream definition exists and was deleted.
:returns False: -- if the stream definition does not exists.
:raises RepositoryException:
"""
cnxn, cursor = self._get_cnxn_cursor_tuple()
with cnxn:
cursor.execute("""delete from stream_definition
where tenant_id = %s and id = %s""",
[tenant_id, stream_definition_id])
if cursor.rowcount < 1:
return False
return True
@mysql_repository.mysql_try_catch_block
def create_stream_definition(self,
tenant_id,
name,
description,
select,
group_by,
fire_criteria,
expiration,
fire_actions,
expire_actions):
cnxn, cursor = self._get_cnxn_cursor_tuple()
with cnxn:
now = timeutils.utcnow()
stream_definition_id = str(uuid.uuid1())
try:
cursor.execute("""insert into stream_definition(
id,
tenant_id,
name,
description,
select_by,
group_by,
fire_criteria,
expiration,
created_at,
updated_at)
values (%s, %s, %s, %s, %s, %s, %s, %s, %s,
%s)""", (
stream_definition_id, tenant_id, name.encode('utf8'),
description.encode('utf8'), select.encode('utf8'),
group_by.encode('utf8'), fire_criteria.encode('utf8'),
expiration, now, now))
except MySQLdb.IntegrityError as e:
code, msg = e
if code == 1062:
raise exceptions.AlreadyExistsException(
'Stream Definition already '
'exists for tenant_id: {0} name: {1}'.format(
tenant_id, name.encode('utf8')))
else:
raise e
self._insert_into_stream_actions(cursor, stream_definition_id,
fire_actions, u"FIRE")
self._insert_into_stream_actions(cursor, stream_definition_id,
expire_actions,
u"EXPIRE")
return stream_definition_id
def _insert_into_stream_actions(self, cursor, stream_definition_id,
actions, action_type):
if actions is None:
return
for action in actions:
cursor.execute("select id from notification_method where id = %s",
(action.encode('utf8'),))
row = cursor.fetchone()
if not row:
raise exceptions.RepositoryException(
"Non-existent notification id {} submitted for {} "
"notification action".format(action.encode('utf8'),
action_type.encode('utf8')))
cursor.execute("""insert into stream_actions(
stream_definition_id,
action_type,
action_id)
values(%s,%s,%s)""", (
stream_definition_id, action_type.encode('utf8'),
action.encode('utf8')))

View File

@ -1,81 +0,0 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import model
import peewee
from monasca.common.repositories import exceptions
from monasca.common.repositories import transforms_repository
from monasca.openstack.common import log
LOG = log.getLogger(__name__)
class Transform(model.Model):
id = peewee.TextField(36)
tenant_id = peewee.TextField(36)
name = peewee.TextField()
description = peewee.TextField()
specification = peewee.TextField()
enabled = peewee.BooleanField()
created_at = peewee.DateTimeField()
updated_at = peewee.DateTimeField()
deleted_at = peewee.DateTimeField()
class TransformsRepository(transforms_repository.TransformsRepository):
def create_transforms(self, id, tenant_id, name, description,
specification, enabled):
try:
q = Transform.create(id=id, tenant_id=tenant_id, name=name,
description=description,
specification=specification, enabled=enabled)
q.save()
except Exception as ex:
LOG.exception(str(ex))
raise exceptions.RepositoryException(str(ex))
def list_transforms(self, tenant_id):
try:
q = Transform.select().where(Transform.tenant_id == tenant_id)
results = q.execute()
transforms = []
for result in results:
transform = {'id': result.id, 'name': result.name,
'description': result.description,
'specification': result.specification,
'enabled': result.enabled}
transforms.append(transform)
return transforms
except Exception as ex:
LOG.exception(str(ex))
raise exceptions.RepositoryException(str(ex))
def delete_transform(self, tenant_id, transform_id):
num_rows_deleted = 0
try:
q = Transform.delete().where((Transform.tenant_id == tenant_id) & (
Transform.id == transform_id))
num_rows_deleted = q.execute()
except Exception as ex:
LOG.exception(str(ex))
raise exceptions.RepositoryException(str(ex))
if num_rows_deleted < 1:
raise exceptions.DoesNotExistException()
return

View File

@ -1,48 +0,0 @@
# Copyright 2015 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class StreamsRepository(object):
def __init__(self):
super(StreamsRepository, self).__init__()
@abc.abstractmethod
def create_stream_definition(self,
tenant_id,
name,
description,
select,
group_by,
fire_criteria,
expiration,
fire_actions,
expire_actions):
pass
@abc.abstractmethod
def delete_stream_definition(self, tenant_id, stream_definition_id):
pass
@abc.abstractmethod
def get_stream_definition(self, tenant_id, stream_definition_id):
pass
@abc.abstractmethod
def get_stream_definitions(self, tenant_id, name, offset, limit):
pass

View File

@ -1,33 +0,0 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class TransformsRepository(object):
@abc.abstractmethod
def create_transforms(self, id, tenant_id, name, description,
specification, enabled):
return
@abc.abstractmethod
def list_transforms(self, tenant_id):
return
@abc.abstractmethod
def delete_transform(self, tenant_id, transform_id):
return

View File

@ -1,147 +0,0 @@
# Copyright 2013 IBM Corp
#
# Author: Tong Li <litong01@us.ibm.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import falcon
from falcon import api_helpers
from stevedore import driver
from monasca.openstack.common import log
RESOURCE_METHOD_FLAG = 'fab05a04-b861-4651-bd0c-9cb3eb9a6088'
LOG = log.getLogger(__name__)
def init_driver(namespace, driver_name, drv_invoke_args=()):
"""Initialize the resource driver and returns it.
:param namespace: the resource namespace (in setup.cfg).
:param driver_name: the driver name (in monasca.conf)
:param invoke_args: args to pass to the driver (a tuple)
"""
mgr = driver.DriverManager(namespace=namespace, name=driver_name,
invoke_on_load=True,
invoke_args=drv_invoke_args)
return mgr.driver
class Restify(object):
def __init__(self, path='', method='GET'):
if not path:
raise Exception('Path has to be specified.')
if method.upper() not in falcon.HTTP_METHODS:
raise Exception('Invalid request method.')
self.path = path
self.method = method.upper()
def __call__(self, func):
setattr(func, RESOURCE_METHOD_FLAG, self)
return func
class ResourceAPI(falcon.API):
def add_route(self, uri_template, resource):
"""Associates uri patterns with resource methods.
A resource is an instance of a class that defines various methods
to handle http requests.
Use this class to create applications which serve a standard
compliant ReSTful API. For example, you may have an API which manage
monitoring data, there can be multiple implementations of the API
using different technologies. One can use Mongodb, the other can use
Cassandra. To make the configuration of the application very easy,
each implementation provides a class with set of methods decorated
by class Restify, the application can simply using single entry
configuration to load different implementations.
For example::
class ExampleResource(object):
@Restify(path='/path1/', method='post')
def func1(self, req, res):
pass
@Restify(path='/path2/{id}/key/', method='get')
def func2(self, req, res, id):
pass
def func3(self, req, res, id):
pass
With the above class, the following code will add the class method
func1, func2 to handle post and get requests respectively, method
func3 won't be added to the routes.::
app.add_route(None, ExampleResource())
Args:
uri_template (url pattern): the url pattern which a client will
post a request against. If none, ResourceAPI will
automatically look up the decorated methods.
resource (instance): Object which represents an HTTP/REST
"resource". Falcon will pass requests to various decorated
methods to handle http requests.
"""
if not resource:
raise Exception('Not a valid resource')
path_maps = {}
try:
if uri_template:
super(ResourceAPI, self).add_route(uri_template, resource)
else:
for attr in dir(resource):
method = getattr(resource, attr)
if callable(method) and hasattr(method,
RESOURCE_METHOD_FLAG):
flag = getattr(method, RESOURCE_METHOD_FLAG)
map = path_maps.get(flag.path)
if not map:
uri_fields, template = (
api_helpers.compile_uri_template(flag.path))
map = (template, {})
path_maps[flag.path] = map
new_method = api_helpers._wrap_with_hooks(
self._before, self._after, method)
map[1][flag.method] = new_method
for item in path_maps:
self._routes.insert(0, (path_maps[item][0],
path_maps[item][1]))
except Exception:
LOG.exception('Error occurred while adding the resource')
LOG.debug(self._routes)
def add_resource(self, resource_name, namespace, driver_name,
invoke_args=(), uri=None):
"""Loads the resource driver, and adds it to the routes.
:param resource_name: the name of the resource.
:param namespace: the resource namespace (in setup.cfg).
:param driver_name: the driver name (in monasca.conf)
:param invoke_args: args to pass to the driver (a tuple)
:param uri: the uri to associate with the resource
"""
resource_driver = init_driver(namespace, driver_name, invoke_args)
LOG.debug('%s dispatcher driver %s is loaded.' %
(resource_name, driver_name))
self.add_route(uri, resource_driver)
LOG.debug('%s dispatcher driver has been added to the routes!' %
(resource_name))

View File

@ -1,30 +0,0 @@
# Copyright 2013 IBM Corp
#
# Author: Tong Li <litong01@us.ibm.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class Base(object):
def __init__(self, conf):
self.conf = conf
@abc.abstractmethod
def define_routes(self, app):
"""Post metric data interface."""

View File

@ -1,41 +0,0 @@
# Copyright 2013 IBM Corp
#
# Author: Tong Li <litong01@us.ibm.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import falcon
from monasca.api import monasca_api_v2
from monasca.common import kafka_conn
from monasca.common import resource_api
from monasca.openstack.common import log
LOG = log.getLogger(__name__)
class KafkaDispatcher(monasca_api_v2.V2API):
def __init__(self, global_conf):
LOG.debug('initializing KafkaDispatcher!')
super(KafkaDispatcher, self).__init__(global_conf)
self._kafka_conn = kafka_conn.KafkaConnection()
@resource_api.Restify('/v2.0/metrics/', method='post')
def do_post_metrics(self, req, res):
LOG.debug('Getting the call.')
msg = req.stream.read()
code = self._kafka_conn.send_messages(msg)
res.status = getattr(falcon, 'HTTP_' + str(code))

View File

@ -1,81 +0,0 @@
# Copyright 2013 IBM Corp
#
# Author: Tong Li <litong01@us.ibm.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import falcon
from oslo.config import cfg
from monasca.api import monasca_api_v2
from monasca.common import resource_api
from monasca.openstack.common import log
OPTS = [
cfg.MultiStrOpt('id',
default=['sample'],
help='Multiple String configuration.'),
cfg.StrOpt('prefix',
default='monasca_',
help='String configuration sample.'),
]
cfg.CONF.register_opts(OPTS, group='sample_dispatcher')
LOG = log.getLogger(__name__)
class SampleDispatcher(monasca_api_v2.V2API):
"""Monasca dispatcher sample class
This class shows how to develop a dispatcher and how the configuration
parameters should be defined and how these configuration parameters
should be set in monasca.conf file.
This class uses configuration parameters appear in sample_dispatcher
section such as the following:
[sample_dispatcher]
id = 101
id = 105
id = 180
prefix = sample__
If the above section appears in file monasca.conf, these values will be
loaded to cfg.CONF after the dispatcher gets loaded. The cfg.CONF should
have the following values:
cfg.CONF.sample_dispatcher.id = [101, 105, 180]
cfg.CONF.sample_dispatcher.prefix = "sample__"
"""
def __init__(self, global_conf):
LOG.debug('initializing SampleDispatcher!')
super(SampleDispatcher, self).__init__(global_conf)
LOG.debug('SampleDispatcher conf entries: prefix')
LOG.debug(global_conf.sample_dispatcher.prefix)
LOG.debug('SampleDispatcher conf entries: id')
LOG.debug(global_conf.sample_dispatcher.id)
@resource_api.Restify('/v2.0/datapoints/', method='post')
def do_post_metrics(self, req, res):
LOG.debug('Getting the call at endpoint datapoints.')
msg = req.stream.read()
LOG.debug('The msg:', msg)
res.status = getattr(falcon, 'HTTP_201')
@resource_api.Restify('/v2.0/demopoints/', method='get')
def do_get_metrics(self, req, res):
LOG.debug('Getting the call at endpoint demopoints.')
res.body = 'demo response'
res.status = getattr(falcon, 'HTTP_200')

View File

@ -1,47 +0,0 @@
# Copyright 2013 IBM Corp
#
# Author: Tong Li <litong01@us.ibm.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class SimpleLogin(object):
"""Example middleware that demostrates how a login middleware should work.
In this example, the middleware checks if a request path starts with
string '/datapoints/', if it does, the request gets pass this
middleware, otherwise, a 401 response is returned.
"""
def __init__(self, app, conf):
self.app = app
self.conf = conf
def __call__(self, environ, start_response):
# if request starts with /datapoints/, then let it go on.
# this login middle
if environ.get('PATH_INFO', '').startswith('/datapoints/'):
return self.app(environ, start_response)
# otherwise, send something else, request stops here.
else:
status = "401 Unauthorized"
response_headers = [("content-type", "text/plain")]
start_response(status, response_headers, "please login first")
return ['Please log in!']
def filter_factory(global_conf, **local_conf):
def login_filter(app):
return SimpleLogin(app, local_conf)
return login_filter

View File

@ -1,125 +0,0 @@
# Copyright 2013 IBM Corp
#
# Author: Tong Li <litong01@us.ibm.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import StringIO
try:
import ujson as json
except ImportError:
import json
class MetricValidator(object):
"""middleware that validate the metric input stream.
This middleware checks if the input stream actually follows metric spec
and all the messages in the request has valid metric data. If the body
is valid json and compliant with the spec, then the request will forward
the request to the next in the pipeline, otherwise, it will reject the
request with response code of 400 or 406.
"""
def __init__(self, app, conf):
self.app = app
self.conf = conf
def _is_valid_metric(self, metric):
"""Validate a message
The external message format is
{
"name":"name1",
"dimensions":{
"key1":"value1",
"key2":"value2"
},
"timestamp":1405630174,
"value":1.0
}
Once this is validated, the message needs to be transformed into
the following internal format:
The current valid message format is as follows (interna):
{
"metric": {"something": "The metric as a JSON object"},
"meta": {
"tenantId": "the tenant ID acquired",
"region": "the region that the metric was submitted under",
},
"creation_time": "the time when the API received the metric",
}
"""
if (metric.get('name') and metric.get('dimensions') and
metric.get('timestamp') and metric.get('value')):
return True
else:
return False
def __call__(self, env, start_response):
# if request starts with /datapoints/, then let it go on.
# this login middle
if (env.get('PATH_INFO', '').startswith('/v2.0/metrics') and
env.get('REQUEST_METHOD', '') == 'POST'):
# We only check the requests which are posting against metrics
# endpoint
try:
body = env['wsgi.input'].read()
metrics = json.loads(body)
# Do business logic validation here.
is_valid = True
if isinstance(metrics, list):
for metric in metrics:
if not self._is_valid_metric(metric):
is_valid = False
break
else:
is_valid = self._is_valid_metric(metrics)
if is_valid:
# If the message is valid, then wrap it into this internal
# format. The tenantId should be available from the
# request since this should have been authenticated.
# ideally this transformation should be done somewhere
# else. For the sake of simplicity, do the simple one
# here to make the life a bit easier.
# TODO(HP) Add logic to get region id from request header
# HTTP_X_SERVICE_CATALOG, then find endpoints, then region
region_id = None
msg = {'metric': metrics,
'meta': {'tenantId': env.get('HTTP_X_PROJECT_ID'),
'region': region_id},
'creation_time': datetime.datetime.now()}
env['wsgi.input'] = StringIO.StringIO(json.dumps(msg))
return self.app(env, start_response)
except Exception:
pass
# It is either invalid or exceptioned out while parsing json
# we will send the request back with 400.
start_response("400 Bad Request", [], '')
return []
else:
# not a metric post request, move on.
return self.app(env, start_response)
def filter_factory(global_conf, **local_conf):
def validator_filter(app):
return MetricValidator(app, local_conf)
return validator_filter

View File

View File

@ -1,33 +0,0 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import voluptuous
from monasca.openstack.common import log
from monasca.v2.common.schemas import exceptions
LOG = log.getLogger(__name__)
event_schema_request_body = voluptuous.Schema({
voluptuous.All(voluptuous.Any(str, unicode),
voluptuous.Length(max=255)): voluptuous.All(
voluptuous.Any(None, str, unicode, bool, int, float, dict, []))})
def validate(body):
try:
event_schema_request_body(body)
except Exception as ex:
LOG.debug(ex)
raise exceptions.ValidationException(str(ex))

View File

@ -1,53 +0,0 @@
# Copyright 2015 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import voluptuous
from monasca.openstack.common import log
from monasca.v2.common.schemas import exceptions
LOG = log.getLogger(__name__)
MILLISEC_PER_DAY = 86400000
MILLISEC_PER_WEEK = MILLISEC_PER_DAY * 7
stream_definition_schema = {
voluptuous.Required('name'): voluptuous.All(voluptuous.Any(str, unicode),
voluptuous.Length(max=140)),
voluptuous.Required('select'): voluptuous.All(
voluptuous.Any(list)),
voluptuous.Required('group_by'): voluptuous.All(
voluptuous.Any(list)),
voluptuous.Required('fire_criteria'): voluptuous.All(
voluptuous.Any(list)),
voluptuous.Required('expiration'): voluptuous.All(
voluptuous.Any(int), voluptuous.Range(min=0, max=MILLISEC_PER_WEEK)),
voluptuous.Optional('fire_actions'): voluptuous.All(
voluptuous.Any([str], [unicode]), voluptuous.Length(max=400)),
voluptuous.Optional('expire_actions'): voluptuous.All(
voluptuous.Any([str], [unicode]), voluptuous.Length(max=400)),
voluptuous.Optional('actions_enabled'): bool}
request_body_schema = voluptuous.Schema(stream_definition_schema,
required=True, extra=True)
def validate(msg):
try:
request_body_schema(msg)
except Exception as ex:
LOG.debug(ex)
raise exceptions.ValidationException(str(ex))

View File

@ -1,42 +0,0 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import voluptuous
from monasca.openstack.common import log
from monasca.v2.common.schemas import exceptions
LOG = log.getLogger(__name__)
transform_schema = {
voluptuous.Required('name'): voluptuous.Schema(
voluptuous.All(voluptuous.Any(str, unicode),
voluptuous.Length(max=64))),
voluptuous.Required('description'): voluptuous.Schema(
voluptuous.All(voluptuous.Any(str, unicode),
voluptuous.Length(max=250))),
voluptuous.Required('specification'): voluptuous.Schema(
voluptuous.All(voluptuous.Any(str, unicode),
voluptuous.Length(max=64536))),
voluptuous.Optional('enabled'): bool}
request_body_schema = voluptuous.Schema(voluptuous.Any(transform_schema))
def validate(msg):
try:
request_body_schema(msg)
except Exception as ex:
LOG.debug(ex)
raise exceptions.ValidationException(str(ex))

View File

@ -1,160 +0,0 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import falcon
import collections
from monasca.api import monasca_events_api_v2
from monasca.common.messaging import exceptions as message_queue_exceptions
from monasca.common.messaging.message_formats import events_transform_factory
from monasca.common import resource_api
from monasca.openstack.common import log
from monasca.v2.common.schemas import (
events_request_body_schema as schemas_event)
from monasca.v2.common.schemas import exceptions as schemas_exceptions
from monasca.v2.common import utils
from monasca.v2.reference import helpers
from monasca.v2.reference import resource
from oslo.config import cfg
LOG = log.getLogger(__name__)
class Events(monasca_events_api_v2.EventsV2API):
def __init__(self, global_conf):
super(Events, self).__init__(global_conf)
self._region = cfg.CONF.region
self._default_authorized_roles = (
cfg.CONF.security.default_authorized_roles)
self._delegate_authorized_roles = (
cfg.CONF.security.delegate_authorized_roles)
self._post_events_authorized_roles = (
cfg.CONF.security.default_authorized_roles +
cfg.CONF.security.agent_authorized_roles)
self._event_transform = (
events_transform_factory.create_events_transform())
self._message_queue = (
resource_api.init_driver('monasca.messaging',
cfg.CONF.messaging.driver,
['raw-events']))
self._events_repo = resource_api.init_driver(
'monasca.repositories', cfg.CONF.repositories.events_driver)
def _validate_event(self, event):
"""Validates the event
:param event: An event object.
:raises falcon.HTTPBadRequest
"""
try:
schemas_event.validate(event)
except schemas_exceptions.ValidationException as ex:
LOG.debug(ex)
raise falcon.HTTPBadRequest('Bad request', ex.message)
def _send_event(self, event):
"""Send the event using the message queue.
:param metrics: An event object.
:raises: falcon.HTTPServiceUnavailable
"""
try:
str_msg = json.dumps(event, default=utils.date_handler,
ensure_ascii=False).encode('utf8')
self._message_queue.send_message(str_msg)
except message_queue_exceptions.MessageQueueException as ex:
LOG.exception(ex)
raise falcon.HTTPInternalServerError('Service unavailable',
ex.message)
@resource.resource_try_catch_block
def _list_events(self, tenant_id, uri, offset, limit):
rows = self._events_repo.list_events(tenant_id, offset, limit)
return helpers.paginate(self._build_events(rows), uri, limit)
@resource.resource_try_catch_block
def _list_event(self, tenant_id, event_id, uri):
rows = self._events_repo.list_event(tenant_id, event_id)
return self._build_events(rows)
def _build_events(self, rows):
result = collections.OrderedDict()
for row in rows:
event_id, event_data = self._build_event_data(row)
if event_id['id'] in result:
result[event_id['id']]['data'].update(event_data)
else:
result[event_id['id']] = {'id': event_id['id'],
'description': event_id['desc'],
'generated': event_id['generated'],
'data': event_data}
return result.values()
def _build_event_data(self, event_row):
event_data = {}
name = event_row['name']
if event_row['t_string']:
event_data[name] = event_row['t_string']
if event_row['t_int']:
event_data[name] = event_row['t_int']
if event_row['t_float']:
event_data[name] = event_row['t_float']
if event_row['t_datetime']:
event_data[name] = float(event_row['t_datetime'])
event_id = {'id': event_row['message_id'],
'desc': event_row['desc'],
'generated': float(event_row['generated'])}
return event_id, event_data
@resource_api.Restify('/v2.0/events', method='post')
def do_post_events(self, req, res):
helpers.validate_json_content_type(req)
helpers.validate_authorization(req, self._post_events_authorized_roles)
event = helpers.read_http_resource(req)
self._validate_event(event)
tenant_id = helpers.get_tenant_id(req)
transformed_event = self._event_transform(event, tenant_id,
self._region)
self._send_event(transformed_event)
res.status = falcon.HTTP_204
@resource_api.Restify('/v2.0/events', method='get')
def do_get_events(self, req, res):
helpers.validate_authorization(req, self._default_authorized_roles)
tenant_id = helpers.get_tenant_id(req)
offset = helpers.get_query_param(req, 'offset')
limit = helpers.get_limit(req)
result = self._list_events(tenant_id, req.uri, offset, limit)
res.body = helpers.dumpit_utf8(result)
res.status = falcon.HTTP_200
@resource_api.Restify('/v2.0/events/{id}', method='get')
def do_get_event(self, req, res, id):
helpers.validate_authorization(req, self._default_authorized_roles)
tenant_id = helpers.get_tenant_id(req)
result = self._list_event(tenant_id, id, req.uri)
res.body = helpers.dumpit_utf8(result)
res.status = falcon.HTTP_200

View File

@ -1,345 +0,0 @@
# Copyright 2015 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import re
import falcon
from oslo.config import cfg
from monasca.api import stream_definitions_api_v2
from monasca.common.messaging import exceptions as message_queue_exceptions
from monasca.common.repositories import exceptions
from monasca.common import resource_api
from monasca.openstack.common import log
from monasca.v2.common.schemas import (stream_definition_request_body_schema
as schema_streams)
from monasca.v2.common.schemas import exceptions as schemas_exceptions
from monasca.v2.reference import helpers
from monasca.v2.reference import resource
LOG = log.getLogger(__name__)
class StreamDefinitions(stream_definitions_api_v2.StreamDefinitionsV2API):
def __init__(self, global_conf):
try:
super(StreamDefinitions, self).__init__(global_conf)
self._region = cfg.CONF.region
self._default_authorized_roles = (
cfg.CONF.security.default_authorized_roles)
self._delegate_authorized_roles = (
cfg.CONF.security.delegate_authorized_roles)
self._post_authorized_roles = (
cfg.CONF.security.default_authorized_roles +
cfg.CONF.security.agent_authorized_roles)
self._stream_definitions_repo = resource_api.init_driver(
'monasca.repositories', cfg.CONF.repositories.streams_driver)
self.stream_definition_event_message_queue = (
resource_api.init_driver('monasca.messaging',
cfg.CONF.messaging.driver,
(['stream-definitions'])))
except Exception as ex:
LOG.exception(ex)
raise exceptions.RepositoryException(ex)
@resource_api.Restify('/v2.0/stream-definitions', method='post')
def do_post_stream_definitions(self, req, res):
helpers.validate_authorization(req, self._default_authorized_roles)
stream_definition = helpers.read_json_msg_body(req)
self._validate_stream_definition(stream_definition)
tenant_id = helpers.get_tenant_id(req)
name = get_query_stream_definition_name(stream_definition)
description = get_query_stream_definition_description(
stream_definition)
select = stream_definition['select']
group_by = stream_definition['group_by']
fire_criteria = stream_definition['fire_criteria']
expiration = stream_definition['expiration']
fire_actions = get_query_stream_definition_fire_actions(
stream_definition)
expire_actions = get_query_stream_definition_expire_actions(
stream_definition)
result = self._stream_definition_create(tenant_id, name, description,
select, group_by,
fire_criteria, expiration,
fire_actions, expire_actions)
helpers.add_links_to_resource(result, req.uri)
res.body = helpers.dumpit_utf8(result)
res.status = falcon.HTTP_201
@resource_api.Restify('/v2.0/stream-definitions/{id}', method='get')
def do_get_stream_definition(self, req, res, id):
helpers.validate_authorization(req, self._default_authorized_roles)
tenant_id = helpers.get_tenant_id(req)
result = self._stream_definition_show(tenant_id, id)
helpers.add_links_to_resource(result, re.sub('/' + id, '', req.uri))
res.body = helpers.dumpit_utf8(result)
res.status = falcon.HTTP_200
@resource_api.Restify('/v2.0/stream-definitions', method='get')
def do_get_stream_definitions(self, req, res):
helpers.validate_authorization(req, self._default_authorized_roles)
tenant_id = helpers.get_tenant_id(req)
name = helpers.get_query_name(req)
offset = helpers.get_query_param(req, 'offset')
limit = helpers.get_limit(req)
result = self._stream_definition_list(tenant_id, name,
req.uri, offset, limit)
res.body = helpers.dumpit_utf8(result)
res.status = falcon.HTTP_200
@resource_api.Restify(
'/v2.0/stream-definitions/{id}', method='delete')
def do_delete_stream_definitions(self, req, res, id):
helpers.validate_authorization(req, self._default_authorized_roles)
tenant_id = helpers.get_tenant_id(req)
self._stream_definition_delete(tenant_id, id)
res.status = falcon.HTTP_204
@resource.resource_try_catch_block
def _stream_definition_delete(self, tenant_id, id):
stream_definition_row = (
self._stream_definitions_repo.get_stream_definition(tenant_id, id))
if not self._stream_definitions_repo.delete_stream_definition(
tenant_id, id):
raise falcon.HTTPNotFound
self._send_stream_definition_deleted_event(
id, tenant_id, stream_definition_row['name'])
def _validate_stream_definition(self, stream_definition):
try:
schema_streams.validate(stream_definition)
except schemas_exceptions.ValidationException as ex:
LOG.debug(ex)
raise falcon.HTTPBadRequest('Bad request', ex.message)
@resource.resource_try_catch_block
def _stream_definition_create(self, tenant_id, name,
description, select, group_by,
fire_criteria, expiration,
fire_actions, expire_actions):
stream_definition_id = (
self._stream_definitions_repo.
create_stream_definition(tenant_id,
name,
description,
json.dumps(select),
json.dumps(group_by),
json.dumps(fire_criteria),
expiration,
fire_actions,
expire_actions))
self._send_stream_definition_created_event(tenant_id,
stream_definition_id,
name,
select,
group_by,
fire_criteria,
expiration)
result = (
{u'name': name,
u'id': stream_definition_id,
u'description': description,
u'select': select,
u'group_by': group_by,
u'fire_criteria': fire_criteria,
u'expiration': expiration,
u'fire_actions': fire_actions,
u'expire_actions': expire_actions,
u'actions_enabled': u'true'}
)
return result
def send_event(self, message_queue, event_msg):
try:
message_queue.send_message(
helpers.dumpit_utf8(event_msg))
except message_queue_exceptions.MessageQueueException as ex:
LOG.exception(ex)
raise falcon.HTTPInternalServerError(
'Message queue service unavailable'.encode('utf8'),
ex.message.encode('utf8'))
@resource.resource_try_catch_block
def _stream_definition_show(self, tenant_id, id):
stream_definition_row = (
self._stream_definitions_repo.get_stream_definition(tenant_id, id))
return self._build_stream_definition_show_result(stream_definition_row)
@resource.resource_try_catch_block
def _stream_definition_list(self, tenant_id, name, req_uri,
offset, limit):
stream_definition_rows = (
self._stream_definitions_repo.get_stream_definitions(
tenant_id, name, offset, limit))
result = []
for stream_definition_row in stream_definition_rows:
sd = self._build_stream_definition_show_result(
stream_definition_row)
helpers.add_links_to_resource(sd, req_uri)
result.append(sd)
result = helpers.paginate(result, req_uri, limit)
return result
def _build_stream_definition_show_result(self, stream_definition_row):
fire_actions_list = get_comma_separated_str_as_list(
stream_definition_row['fire_actions'])
expire_actions_list = get_comma_separated_str_as_list(
stream_definition_row['expire_actions'])
result = (
{u'name': stream_definition_row['name'],
u'id': stream_definition_row['id'],
u'description': stream_definition_row['description'],
u'select': json.loads(stream_definition_row['select_by']),
u'group_by': json.loads(stream_definition_row['group_by']),
u'fire_criteria': json.loads(
stream_definition_row['fire_criteria']),
u'expiration': stream_definition_row['expiration'],
u'fire_actions': fire_actions_list,
u'expire_actions': expire_actions_list,
u'actions_enabled': stream_definition_row['actions_enabled'] == 1,
u'created_at': stream_definition_row['created_at'].isoformat(),
u'updated_at': stream_definition_row['updated_at'].isoformat()}
)
return result
def _send_stream_definition_deleted_event(self, stream_definition_id,
tenant_id, stream_name):
stream_definition_deleted_event_msg = {
u"stream-definition-deleted": {u'tenant_id': tenant_id,
u'stream_definition_id':
stream_definition_id,
u'name': stream_name}}
self.send_event(self.stream_definition_event_message_queue,
stream_definition_deleted_event_msg)
def _send_stream_definition_created_event(self, tenant_id,
stream_definition_id,
name,
select,
group_by,
fire_criteria,
expiration):
stream_definition_created_event_msg = {
u'stream-definition-created': {u'tenant_id': tenant_id,
u'stream_definition_id':
stream_definition_id,
u'name': name,
u'select': select,
u'group_by': group_by,
u'fire_criteria': fire_criteria,
u'expiration': expiration}
}
self.send_event(self.stream_definition_event_message_queue,
stream_definition_created_event_msg)
def get_query_stream_definition_name(stream_definition):
return (stream_definition['name'])
def get_query_stream_definition_description(stream_definition,
return_none=False):
if 'description' in stream_definition:
return stream_definition['description']
else:
if return_none:
return None
else:
return ''
def get_query_stream_definition_fire_actions(stream_definition,
return_none=False):
if 'fire_actions' in stream_definition:
return stream_definition['fire_actions']
else:
if return_none:
return None
else:
return []
def get_query_stream_definition_expire_actions(stream_definition,
return_none=False):
if 'expire_actions' in stream_definition:
return stream_definition['expire_actions']
else:
if return_none:
return None
else:
return []
def get_query_stream_definition_actions_enabled(stream_definition,
required=False,
return_none=False):
try:
if 'actions_enabled' in stream_definition:
return (stream_definition['actions_enabled'])
else:
if return_none:
return None
elif required:
raise Exception("Missing actions-enabled")
else:
return ''
except Exception as ex:
LOG.debug(ex)
raise falcon.HTTPBadRequest('Bad request', ex.message)
def get_comma_separated_str_as_list(comma_separated_str):
if not comma_separated_str:
return []
else:
return comma_separated_str.decode('utf8').split(',')

View File

@ -1,128 +0,0 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import falcon
from oslo.config import cfg
from monasca.api import monasca_transforms_api_v2
from monasca.common.repositories import exceptions as repository_exceptions
from monasca.common import resource_api
from monasca.openstack.common import log
from monasca.openstack.common import uuidutils
from monasca.v2.common.schemas import (exceptions as schemas_exceptions)
from monasca.v2.common.schemas import (
transforms_request_body_schema as schemas_transforms)
from monasca.v2.reference import helpers
LOG = log.getLogger(__name__)
class Transforms(monasca_transforms_api_v2.TransformsV2API):
def __init__(self, global_conf):
super(Transforms, self).__init__(global_conf)
self._region = cfg.CONF.region
self._default_authorized_roles = (
cfg.CONF.security.default_authorized_roles)
self._transforms_repo = resource_api.init_driver(
'monasca.repositories', cfg.CONF.repositories.transforms_driver)
def _validate_transform(self, transform):
"""Validates the transform
:param transform: An event object.
:raises falcon.HTTPBadRequest
"""
try:
schemas_transforms.validate(transform)
except schemas_exceptions.ValidationException as ex:
LOG.debug(ex)
raise falcon.HTTPBadRequest('Bad request', ex.message)
def _create_transform(self, id, tenant_id, transform):
"""Store the transform using the repository.
:param transform: A transform object.
:raises: falcon.HTTPServiceUnavailable
"""
try:
name = transform['name']
description = transform['description']
specification = transform['specification']
enabled = transform['enabled']
self._transforms_repo.create_transforms(id, tenant_id, name,
description, specification,
enabled)
except repository_exceptions.RepositoryException as ex:
LOG.error(ex)
raise falcon.HTTPInternalServerError('Service unavailable',
ex.message)
def _create_transform_response(self, id, transform):
name = transform['name']
description = transform['description']
specification = transform['specification']
enabled = transform['enabled']
response = {'id': id, 'name': name, 'description': description,
'specification': specification, 'enabled': enabled}
return json.dumps(response)
def _list_transforms(self, tenant_id):
try:
transforms = self._transforms_repo.list_transforms(tenant_id)
return json.dumps(transforms)
except repository_exceptions.RepositoryException as ex:
LOG.error(ex)
raise falcon.HTTPInternalServerError('Service unavailable',
ex.message)
def _delete_transform(self, tenant_id, transform_id):
try:
self._transforms_repo.delete_transform(tenant_id, transform_id)
except repository_exceptions.DoesNotExistException:
raise falcon.HTTPNotFound()
except repository_exceptions.RepositoryException as ex:
LOG.error(ex)
raise falcon.HTTPInternalServerError('Service unavailable',
ex.message)
@resource_api.Restify('/v2.0/events/transforms', method='post')
def do_post_transforms(self, req, res):
helpers.validate_json_content_type(req)
helpers.validate_authorization(req, self._default_authorized_roles)
transform = helpers.read_http_resource(req)
self._validate_transform(transform)
id = uuidutils.generate_uuid()
tenant_id = helpers.get_tenant_id(req)
self._create_transform(id, tenant_id, transform)
res.body = self._create_transform_response(id, transform)
res.status = falcon.HTTP_200
@resource_api.Restify('/v2.0/events/transforms', method='get')
def do_get_transforms(self, req, res):
helpers.validate_authorization(req, self._default_authorized_roles)
tenant_id = helpers.get_tenant_id(req)
res.body = self._list_transforms(tenant_id)
res.status = falcon.HTTP_200
@resource_api.Restify('/v2.0/events/transforms/{transform_id}',
method='delete')
def do_delete_transforms(self, req, res, transform_id):
helpers.validate_authorization(req, self._default_authorized_roles)
tenant_id = helpers.get_tenant_id(req)
self._delete_transform(tenant_id, transform_id)
res.status = falcon.HTTP_204

View File

@ -12,27 +12,27 @@
# License for the specific language governing permissions and limitations
# under the License.
from monasca.common import resource_api
from monasca.openstack.common import log
from monasca_api.openstack.common import log
LOG = log.getLogger(__name__)
class TransformsV2API(object):
def __init__(self, global_conf):
LOG.debug('initializing V2API!')
self.global_conf = global_conf
class AlarmDefinitionsV2API(object):
def __init__(self):
super(AlarmDefinitionsV2API, self).__init__()
LOG.info('Initializing AlarmDefinitionsV2API!')
@resource_api.Restify('/v2.0/events/transforms', method='post')
def do_post_transforms(self, req, res):
def on_post(self, req, res):
res.status = '501 Not Implemented'
@resource_api.Restify('/v2.0/events/transforms', method='get')
def do_get_transforms(self, req, res):
def on_get(self, req, res, alarm_definition_id):
res.status = '501 Not Implemented'
@resource_api.Restify('/v2.0/events/transforms/{transform_id}',
method='delete')
def do_delete_transforms(self, req, res, transform_id):
def on_put(self, req, res, alarm_definition_id):
res.status = '501 Not Implemented'
def on_patch(self, req, res, alarm_definition_id):
res.status = '501 Not Implemented'
def on_delete(self, req, res, alarm_definition_id):
res.status = '501 Not Implemented'

View File

@ -0,0 +1,44 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from monasca_api.openstack.common import log
LOG = log.getLogger(__name__)
class AlarmsV2API(object):
def __init__(self):
super(AlarmsV2API, self).__init__()
LOG.info('Initializing AlarmsV2API!')
def on_put(self, req, res, alarm_id):
res.status = '501 Not Implemented'
def on_patch(self, req, res, alarm_id):
res.status = '501 Not Implemented'
def on_delete(self, req, res, alarm_id):
res.status = '501 Not Implemented'
def on_get(self, req, res, alarm_id):
res.status = '501 Not Implemented'
class AlarmsStateHistoryV2API(object):
def __init__(self):
super(AlarmsStateHistoryV2API, self).__init__()
LOG.info('Initializing AlarmsStateHistoryV2API!')
def on_get(self, req, res, alarm_id):
res.status = '501 Not Implemented'

View File

@ -0,0 +1,57 @@
# Copyright 2014 IBM Corp
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from monasca_api.openstack.common import log
LOG = log.getLogger(__name__)
class MetricsV2API(object):
def __init__(self):
super(MetricsV2API, self).__init__()
LOG.info('Initializing MetricsV2API!')
def on_get(self, req, res):
res.status = '501 Not Implemented'
def on_post(self, req, res):
res.status = '501 Not Implemented'
class MetricsMeasurementsV2API(object):
def __init__(self):
super(MetricsMeasurementsV2API, self).__init__()
LOG.info('Initializing MetricsMeasurementsV2API!')
def on_get(self, req, res):
res.status = '501 Not Implemented'
class MetricsStatisticsV2API(object):
def __init__(self):
super(MetricsStatisticsV2API, self).__init__()
LOG.info('Initializing MetricsStatisticsV2API!')
def on_get(self, req, res):
res.status = '501 Not Implemented'
class MetricsNamesV2API(object):
def __init__(self):
super(MetricsNamesV2API, self).__init__()
LOG.info('Initializing MetricsNamesV2API!')
def on_get(self, req, res):
res.status = '501 Not Implemented'

View File

@ -12,26 +12,24 @@
# License for the specific language governing permissions and limitations
# under the License.
from monasca.common import resource_api
from monasca.openstack.common import log
from monasca_api.openstack.common import log
LOG = log.getLogger(__name__)
class EventsV2API(object):
def __init__(self, global_conf):
LOG.debug('initializing V2API!')
self.global_conf = global_conf
class NotificationsV2API(object):
def __init__(self):
super(NotificationsV2API, self).__init__()
LOG.info('Initializing NotificationsV2API!')
@resource_api.Restify('/v2.0/events', method='post')
def do_post_events(self, req, res):
def on_post(self, req, res):
res.status = '501 Not Implemented'
@resource_api.Restify('/v2.0/events', method='get')
def do_get_events(self, req, res):
def on_delete(self, req, res, notification_method_id):
res.status = '501 Not Implemented'
@resource_api.Restify('/v2.0/events/{id}', method='get')
def do_get_event(self, req, res, id):
def on_get(self, req, res, notification_method_id):
res.status = '501 Not Implemented'
def on_put(self, req, res, notification_method_id):
res.status = '501 Not Implemented'

109
monasca_api/api/server.py Executable file
View File

@ -0,0 +1,109 @@
# Copyright 2014 IBM Corp
# Copyright 2015 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from wsgiref import simple_server
import falcon
from oslo.config import cfg
import paste.deploy
import simport
from monasca_api.openstack.common import log
dispatcher_opts = [cfg.StrOpt('versions', default=None,
help='Versions'),
cfg.StrOpt('metrics', default=None,
help='Metrics'),
cfg.StrOpt('metrics_measurements', default=None,
help='Metrics measurements'),
cfg.StrOpt('metrics_statistics', default=None,
help='Metrics statistics'),
cfg.StrOpt('metrics_names', default=None,
help='Metrics names'),
cfg.StrOpt('alarm_definitions', default=None,
help='Alarm definitions'),
cfg.StrOpt('alarms', default=None,
help='Alarms'),
cfg.StrOpt('alarms_state_history', default=None,
help='Alarms state history'),
cfg.StrOpt('notification_methods', default=None,
help='Notification methods')]
dispatcher_group = cfg.OptGroup(name='dispatcher', title='dispatcher')
cfg.CONF.register_group(dispatcher_group)
cfg.CONF.register_opts(dispatcher_opts, dispatcher_group)
LOG = log.getLogger(__name__)
def launch(conf, config_file="/etc/monasca/api-config.conf"):
cfg.CONF(args=[],
project='monasca_api',
default_config_files=[config_file])
log_levels = (cfg.CONF.default_log_levels)
cfg.set_defaults(log.log_opts, default_log_levels=log_levels)
log.setup('monasca_api')
app = falcon.API()
versions = simport.load(cfg.CONF.dispatcher.versions)()
app.add_route("/", versions)
app.add_route("/{version_id}", versions)
metrics = simport.load(cfg.CONF.dispatcher.metrics)()
app.add_route("/v2.0/metrics", metrics)
metrics_measurements = simport.load(
cfg.CONF.dispatcher.metrics_measurements)()
app.add_route("/v2.0/metrics/measurements", metrics_measurements)
metrics_statistics = simport.load(cfg.CONF.dispatcher.metrics_statistics)()
app.add_route("/v2.0/metrics/statistics", metrics_statistics)
metrics_names = simport.load(cfg.CONF.dispatcher.metrics_names)()
app.add_route("/v2.0/metrics/names", metrics_names)
alarm_definitions = simport.load(cfg.CONF.dispatcher.alarm_definitions)()
app.add_route("/v2.0/alarm-definitions/", alarm_definitions)
app.add_route("/v2.0/alarm-definitions/{alarm_definition_id}",
alarm_definitions)
alarms = simport.load(cfg.CONF.dispatcher.alarms)()
app.add_route("/v2.0/alarms", alarms)
app.add_route("/v2.0/alarms/{alarm_id}", alarms)
alarms_state_history = simport.load(
cfg.CONF.dispatcher.alarms_state_history)()
app.add_route("/v2.0/alarms/state-history", alarms_state_history)
app.add_route("/v2.0/alarms/{alarm_id}/state-history",
alarms_state_history)
notification_methods = simport.load(
cfg.CONF.dispatcher.notification_methods)()
app.add_route("/v2.0/notification-methods", notification_methods)
app.add_route("/v2.0/notification-methods/{notification_method_id}",
notification_methods)
LOG.debug('Dispatcher drivers have been added to the routes!')
return app
if __name__ == '__main__':
wsgi_app = (
paste.deploy.loadapp('config:etc/api-config.ini',
relative_to=os.getcwd()))
httpd = simple_server.make_server('127.0.0.1', 8080, wsgi_app)
httpd.serve_forever()

View File

@ -1,4 +1,4 @@
# Copyright 2014 Hewlett-Packard
# Copyright 2015 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -12,16 +12,15 @@
# License for the specific language governing permissions and limitations
# under the License.
from monasca.common.repositories import events_repository
from monasca.openstack.common import log
from monasca_api.openstack.common import log
LOG = log.getLogger(__name__)
class EventsRepository(events_repository.EventsRepository):
class VersionsAPI(object):
def __init__(self):
return
super(VersionsAPI, self).__init__()
LOG.info('Initializing Versions!')
def list_events(self, tenant_id, name, dimensions):
return {}
def on_get(self, req, res, id):
res.status = '501 Not Implemented'

View File

@ -12,7 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from monasca.common.messaging import publisher
from monasca_api.common.messaging import publisher
class FakePublisher(publisher.Publisher):

View File

@ -11,6 +11,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from kafka import client
@ -18,10 +19,9 @@ from kafka import common
from kafka import producer
from oslo.config import cfg
from monasca.common.messaging import exceptions
from monasca.common.messaging import publisher
from monasca.openstack.common import log
from monasca_api.common.messaging import exceptions
from monasca_api.common.messaging import publisher
from monasca_api.openstack.common import log
LOG = log.getLogger(__name__)

View File

@ -21,8 +21,4 @@ import six
class Publisher(object):
@abc.abstractmethod
def send_message(self, message):
"""Sends the message using the message queue.
:param message: Message to send.
"""
return

View File

@ -11,6 +11,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six

View File

@ -11,6 +11,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six

View File

@ -12,8 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from monasca.common.repositories import metrics_repository
from monasca.openstack.common import log
from monasca_api.common.repositories import metrics_repository
from monasca_api.openstack.common import log
LOG = log.getLogger(__name__)

View File

@ -17,9 +17,9 @@ import json
from influxdb import client
from oslo.config import cfg
from monasca.common.repositories import exceptions
from monasca.common.repositories import metrics_repository
from monasca.openstack.common import log
from monasca_api.common.repositories import exceptions
from monasca_api.common.repositories import metrics_repository
from monasca_api.openstack.common import log
LOG = log.getLogger(__name__)
@ -265,9 +265,16 @@ class MetricsRepository(metrics_repository.MetricsRepository):
measurements_list = []
for point in serie['values']:
value_meta = {}
try:
value_meta = json.loads(point[2])
except Exception as ex:
pass
measurements_list.append([point[0],
point[1],
json.loads(point[2])])
value_meta])
measurement = {u'name': serie['name'],
u'id': measurements_list[-1][0],

View File

@ -11,14 +11,15 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
from monasca.common.repositories import alarm_definitions_repository as adr
from monasca.common.repositories import exceptions
from monasca.common.repositories.model import sub_alarm_definition
from monasca.common.repositories.mysql import mysql_repository
from monasca.openstack.common import log
from monasca.openstack.common import uuidutils
from monasca_api.common.repositories import alarm_definitions_repository as adr
from monasca_api.common.repositories import exceptions
from monasca_api.common.repositories.model import sub_alarm_definition
from monasca_api.common.repositories.mysql import mysql_repository
from monasca_api.openstack.common import log
from monasca_api.openstack.common import uuidutils
LOG = log.getLogger(__name__)

View File

@ -12,10 +12,10 @@
# License for the specific language governing permissions and limitations
# under the License.
from monasca.common.repositories import alarms_repository
from monasca.common.repositories import exceptions
from monasca.common.repositories.mysql import mysql_repository
from monasca.openstack.common import log
from monasca_api.common.repositories import alarms_repository
from monasca_api.common.repositories import exceptions
from monasca_api.common.repositories.mysql import mysql_repository
from monasca_api.openstack.common import log
LOG = log.getLogger(__name__)

View File

@ -15,8 +15,8 @@
import MySQLdb as mdb
from oslo.config import cfg
from monasca.common.repositories import exceptions
from monasca.openstack.common import log
from monasca_api.common.repositories import exceptions
from monasca_api.openstack.common import log
LOG = log.getLogger(__name__)

View File

@ -14,11 +14,11 @@
import datetime
from monasca.common.repositories import exceptions
from monasca.common.repositories.mysql import mysql_repository
from monasca.common.repositories import notifications_repository as nr
from monasca.openstack.common import log
from monasca.openstack.common import uuidutils
from monasca_api.common.repositories import exceptions
from monasca_api.common.repositories.mysql import mysql_repository
from monasca_api.common.repositories import notifications_repository as nr
from monasca_api.openstack.common import log
from monasca_api.openstack.common import uuidutils
LOG = log.getLogger(__name__)

View File

@ -257,9 +257,9 @@ class AlarmExprParser(object):
def sub_expr_list(self):
# Remove all spaces before parsing. Simple, quick fix for whitespace
# issue with dimension list not allowing whitespace after comma.
parseResult = (expression + pyparsing.stringEnd).parseString(
parse_result = (expression + pyparsing.stringEnd).parseString(
self._expr.replace(' ', ''))
sub_expr_list = parseResult[0].operands_list
sub_expr_list = parse_result[0].operands_list
return sub_expr_list
@ -280,8 +280,8 @@ def main():
for expr in (expr0, expr1, expr2):
print ('orig expr: {}'.format(expr.encode('utf8')))
alarmExprParser = AlarmExprParser(expr)
sub_expr = alarmExprParser.sub_expr_list
alarm_expr_parser = AlarmExprParser(expr)
sub_expr = alarm_expr_parser.sub_expr_list
for sub_expression in sub_expr:
print ('sub expr: {}'.format(
sub_expression.sub_expr_str.encode('utf8')))

View File

@ -18,7 +18,7 @@ import uuid
from oslo.utils import timeutils
from monasca.openstack.common import log
from monasca_api.openstack.common import log
LOG = log.getLogger(__name__)

View File

@ -14,8 +14,8 @@
import falcon
from monasca.middleware import context
from monasca.openstack.common import log
from monasca_api.middleware import context
from monasca_api.openstack.common import log
from oslo.middleware import request_id
from oslo.serialization import jsonutils

View File

@ -29,8 +29,8 @@ import eventlet.backdoor
import greenlet
from oslo.config import cfg
from monasca.openstack.common.gettextutils import _LI
from monasca.openstack.common import log as logging
from monasca_api.openstack.common.gettextutils import _LI
from monasca_api.openstack.common import log as logging
help_for_backdoor_port = (
"Acceptable values are 0, <port>, and <start>:<end>, where 0 results "

View File

@ -24,7 +24,7 @@ import traceback
import six
from monasca.openstack.common.gettextutils import _LE
from monasca_api.openstack.common.gettextutils import _LE
class save_and_reraise_exception(object):

View File

@ -18,8 +18,8 @@ import errno
import os
import tempfile
from monasca.openstack.common import excutils
from monasca.openstack.common import log as logging
from monasca_api.openstack.common import excutils
from monasca_api.openstack.common import log as logging
LOG = logging.getLogger(__name__)

View File

@ -15,7 +15,7 @@
import fixtures
from monasca.openstack.common import lockutils
from monasca_api.openstack.common import lockutils
class LockFixture(fixtures.Fixture):

View File

@ -59,10 +59,10 @@ else:
import six
import six.moves.xmlrpc_client as xmlrpclib
from monasca.openstack.common import gettextutils
from monasca.openstack.common import importutils
from monasca.openstack.common import strutils
from monasca.openstack.common import timeutils
from monasca_api.openstack.common import gettextutils
from monasca_api.openstack.common import importutils
from monasca_api.openstack.common import strutils
from monasca_api.openstack.common import timeutils
netaddr = importutils.try_import("netaddr")

View File

@ -28,8 +28,8 @@ import weakref
from oslo.config import cfg
from monasca.openstack.common import fileutils
from monasca.openstack.common.gettextutils import _, _LE, _LI
from monasca_api.openstack.common import fileutils
from monasca_api.openstack.common.gettextutils import _, _LE, _LI
LOG = logging.getLogger(__name__)

View File

@ -43,13 +43,13 @@ from six import moves
_PY26 = sys.version_info[0:2] == (2, 6)
from monasca.openstack.common.gettextutils import _
from monasca.openstack.common import importutils
from monasca.openstack.common import jsonutils
from monasca.openstack.common import local
from monasca_api.openstack.common.gettextutils import _
from monasca_api.openstack.common import importutils
from monasca_api.openstack.common import jsonutils
from monasca_api.openstack.common import local
# NOTE(flaper87): Pls, remove when graduating this module
# from the incubator.
from monasca.openstack.common.strutils import mask_password # noqa
from monasca_api.openstack.common.strutils import mask_password # noqa
_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"

Some files were not shown because too many files have changed in this diff Show More