Add events endpoint

Provide basic endpoint:
/v1.0/events/
/healthcheck
/version
Endpoint reads request and returns HTTP code 200.
Provide code to run application in gunicorn mode.

Story: 2001112
Task: 4863

Change-Id: Ic7c344360b5acec5af7751a825e2dff8346cf1f7
Depends-On: I18d9f4ec543c76bfe1311ed1ee940827d4162298
This commit is contained in:
Artur Basiak 2017-08-16 16:17:37 +02:00 committed by Adrian Czarnecki
parent c3a3b4b765
commit 620a477df0
50 changed files with 1805 additions and 64 deletions

3
.gitignore vendored
View File

@ -7,6 +7,7 @@ cover
.coverage
*.egg
*.egg-info
.stestr
.testrepository
.tox
AUTHORS
@ -14,7 +15,7 @@ ChangeLog
MANIFEST
monasca.log
*.log
*.swp
*.iml
.DS_Store

3
.stestr.conf Normal file
View File

@ -0,0 +1,3 @@
[DEFAULT]
test_path=$LISTOPT
group_regex=monasca_events_api\.tests\.unit(?:\.|_)([^_]+)

View File

@ -34,6 +34,10 @@ function install_events_api {
fi
}
function create_monasca_events_cache_dir {
sudo install -m 700 -d -o $STACK_USER $MONASCA_EVENTS_API_CACHE_DIR
}
function configure_events_api {
if is_events_api_enabled; then
echo_summary "Configuring Events Api"
@ -41,6 +45,8 @@ function configure_events_api {
# Put config files in ``$MONASCA_EVENTS_API_CONF_DIR`` for everyone to find
sudo install -d -o $STACK_USER $MONASCA_EVENTS_API_CONF_DIR
create_monasca_events_cache_dir
# ensure fresh installation of configuration files
rm -rf $MONASCA_EVENTS_API_CONF $MONASCA_EVENTS_API_PASTE $MONASCA_EVENTS_API_LOGGING_CONF

View File

@ -0,0 +1,28 @@
#
# (C) Copyright 2015 Hewlett Packard Enterprise Development Company LP
# (C) Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Sleep some time until all services are started
sleep 6
function load_devstack_utilities {
source $BASE/new/devstack/stackrc
source $BASE/new/devstack/functions
source $BASE/new/devstack/openrc admin admin
# print OS_ variables
env | grep OS_
}

View File

@ -74,7 +74,7 @@ MONASCA_EVENTS_API_PASTE=${MONASCA_EVENTS_API_PASTE:-$MONASCA_EVENTS_API_CONF_DI
MONASCA_EVENTS_API_LOGGING_CONF=${MONASCA_EVENTS_API_LOGGING_CONF:-$MONASCA_EVENTS_API_CONF_DIR/events-api-logging.conf}
MONASCA_EVENTS_API_CACHE_DIR=${MONASCA_EVENTS_API_CACHE_DIR:-/var/cache/monasca-events-api}
MONASCA_EVENTS_API_SERVICE_HOST=${MONASCA_EVENTS_API_SERVICE_HOST:-${SERVICE_HOST}}
MONASCA_EVENTS_API_SERVICE_PORT=${MONASCA_EVENTS_API_SERVICE_PORT:-5670}
MONASCA_EVENTS_API_SERVICE_PORT=${MONASCA_EVENTS_API_SERVICE_PORT:-5656}
MONASCA_EVENTS_API_SERVICE_PROTOCOL=${MONASCA_EVENTS_API_SERVICE_PROTOCOL:-${SERVICE_PROTOCOL}}
MONASCA_EVENTS_PERSISTER_CONF_DIR=${MONASCA_EVENTS_PERSISTER_CONF_DIR:-/etc/monasca}

View File

@ -28,7 +28,7 @@ class = logging.handlers.RotatingFileHandler
level = DEBUG
formatter = context
# store up to 5*100MB of logs
args = ('monasca-events-api.log', 'a', 104857600, 5)
args = ('/var/log/monasca/monasca-events-api.log', 'a', 104857600, 5)
[formatter_context]
class = oslo_log.formatters.ContextFormatter

View File

@ -18,34 +18,47 @@ name = main
[composite:main]
use = egg:Paste#urlmap
/: events_version
/v1.0: events_api_v1
/healthcheck: events_healthcheck
[pipeline:events_api_v1]
pipeline = error_trap request_id auth sizelimit middleware api_v1_app
[pipeline:events_version]
pipeline = error_trap versionapp
pipeline = error_trap versionapp
[pipeline:events_healthcheck]
pipeline = error_trap healthcheckapp
[app:api_v1_app]
paste.app_factory = monasca_events_api.app.api:create_api_app
[app:versionapp]
paste.app_factory = monasca_events_api.app.api:create_version_app
[app:healthcheckapp]
paste.app_factory= monasca_events_api.app.api:create_healthcheck_app
[filter:auth]
paste.filter_factory = keystonemiddleware.auth_token:filter_factory
[filter:roles]
paste.filter_factory = monasca_events_api.middleware.role_middleware:RoleMiddleware.factory
[filter:request_id]
paste.filter_factory = oslo_middleware.request_id:RequestId.factory
# NOTE(trebskit) this is optional
# insert this into either pipeline to get some WSGI environment debug output
[filter:debug]
paste.filter_factory = oslo_middleware.debug:Debug.factory
[filter:error_trap]
paste.filter_factory = oslo_middleware.catch_errors:CatchErrors.factory
[filter:request_id]
paste.filter_factory = oslo_middleware.request_id:RequestId.factory
[filter:middleware]
paste.filter_factory = monasca_events_api.middleware.validation_middleware:ValidationMiddleware.factory
[filter:sizelimit]
use = egg:oslo.middleware#sizelimit
[server:main]
chdir = /opt/stack/monasca-events-api
use = egg:gunicorn#main
bind = 127.0.0.1:5670
workers = 9
bind = 127.0.0.1:5656
workers = 2
worker-connections = 2000
worker-class = eventlet
timeout = 30

View File

@ -12,47 +12,112 @@
# License for the specific language governing permissions and limitations
# under the License.
"""Module initializes various applications of monasca-events-api."""
"""
Module contains factories to initializes various applications
of monasca-events-api.
"""
import falcon
from oslo_config import cfg
from oslo_log import log
import six
from monasca_events_api.app.controller import healthchecks
from monasca_events_api.app.controller.v1 import events as v1_events
from monasca_events_api.app.controller import versions
from monasca_events_api.app.core import error_handlers
from monasca_events_api.app.core import request
from monasca_events_api import config
LOG = log.getLogger(__name__)
CONF = cfg.CONF
_CONF_LOADED = False
class Versions(object):
"""Versions API.
Versions returns information about API itself.
def error_trap(app_name):
"""Decorator trapping any error during application boot time.
:param app_name: Application name
:type app_name: str
:return: _wrapper function
"""
@six.wraps(error_trap)
def _wrapper(func):
def __init__(self):
"""Init the Version App."""
LOG.info('Initializing VersionsAPI!')
def on_get(self, req, res):
"""On get method."""
res.status = falcon.HTTP_200
res.body = '{"version": "v1.0"}'
@six.wraps(_wrapper)
def _inner_wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception:
logger = log.getLogger(__name__)
logger.exception(
'Failed to load application: \'{}\''.format(app_name))
raise
return _inner_wrapper
return _wrapper
def singleton_config(func):
"""Decorator ensuring that configuration is loaded only once.
:param func: Function to execute
:return: _wrapper
"""
@six.wraps(singleton_config)
def _wrapper(global_conf, **local_conf):
config.parse_args()
return func(global_conf, **local_conf)
return _wrapper
@error_trap('version')
def create_version_app(global_conf, **local_conf):
"""Create Version application."""
ctrl = Versions()
"""Creates Version application"""
ctrl = versions.Versions()
controllers = {
'/': ctrl, # redirect http://host:port/ down to Version app
# avoid conflicts with actual pipelines and 404 error
'/version': ctrl, # list all the versions
'/version/{version_id}': ctrl # display details of the version
}
wsgi_app = falcon.API()
for route, ctrl in controllers.items():
wsgi_app.add_route(route, ctrl)
return wsgi_app
@error_trap('healthcheck')
def create_healthcheck_app(global_conf, **local_conf):
"""Create Healthcheck application"""
controllers = {
'/': healthchecks.HealthChecks(),
}
wsgi_app = falcon.API()
for route, ctrl in controllers.items():
wsgi_app.add_route(route, ctrl)
return wsgi_app
@error_trap('api')
@singleton_config
def create_api_app(global_conf, **local_conf):
"""Create Main Events Api application.
:param global_conf: Global config
:param local_conf: Local config
:return: falcon.API
"""
controllers = {}
controllers.update({
'/events': v1_events.Events()
})
wsgi_app = falcon.API(
request_type=request.Request
)
for route, ctrl in controllers.items():
wsgi_app.add_route(route, ctrl)
error_handlers.register_error_handler(wsgi_app)
return wsgi_app

View File

@ -0,0 +1,179 @@
# Copyright 2015 kornicameister@gmail.com
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import falcon
from monasca_common.kafka import producer
from monasca_common.rest import utils as rest_utils
from oslo_log import log
from monasca_events_api import conf
LOG = log.getLogger(__name__)
CONF = conf.CONF
_RETRY_AFTER = 60
_KAFKA_META_DATA_SIZE = 32
_TRUNCATION_SAFE_OFFSET = 1
class InvalidMessageException(Exception):
pass
class EventPublisher(object):
"""Publishes events data to Kafka
EventPublisher is able to send single message to multiple configured topic.
It uses following configuration written in conf file ::
[event_publisher]
topics = 'monevents'
kafka_url = 'localhost:8900'
Note:
Uses :py:class:`monasca_common.kafka.producer.KafkaProducer`
to ship events to kafka. For more details
see `monasca-common`_ github repository.
.. _monasca-common: https://github.com/openstack/monasca-common
"""
def __init__(self):
self._topics = CONF.events_publisher.topics
self._kafka_publisher = producer.KafkaProducer(
url=CONF.events_publisher.kafka_url
)
LOG.info('Initializing EventPublisher <%s>', self)
def send_message(self, messages):
"""Sends message to each configured topic.
Note:
Empty content is not shipped to kafka
:param dict| list messages:
"""
if not messages:
return
if not isinstance(messages, list):
messages = [messages]
sent_counter = 0
num_of_msgs = len(messages)
LOG.debug('About to publish %d messages to %s topics',
num_of_msgs, self._topics)
send_messages = []
for message in messages:
try:
msg = self._transform_message_to_json(message)
send_messages.append(msg)
except Exception as ex:
LOG.exception(
'Failed to transform message, '
'this massage is dropped {} '
'Exception: {}'.format(message, str(ex)))
try:
self._publish(send_messages)
sent_counter = len(send_messages)
except Exception as ex:
LOG.exception('Failure in publishing messages to kafka')
raise ex
finally:
self._check_if_all_messages_was_publish(sent_counter, num_of_msgs)
def _transform_message_to_json(self, message):
"""Transforms message into JSON.
Method transforms message to JSON and
encode to utf8
:param str message: instance of message
:return: serialized message
:rtype: str
"""
msg_json = rest_utils.as_json(message)
return msg_json.encode('utf-8')
def _create_message_for_persister_from_request_body(self, body):
"""Create message for persister from request body
Method take original request body and them
transform the request to proper message format
acceptable by event-prsister
:param body: original request body
:return: transformed message
"""
timestamp = body['timestamp']
final_body = []
for events in body['events']:
ev = events['event'].copy()
ev.update({'timestamp': timestamp})
final_body.append(ev)
return final_body
def _ensure_type_bytes(self, message):
"""Ensures that message will have proper type.
:param str message: instance of message
"""
return message.encode('utf-8')
def _publish(self, messages):
"""Publishes messages to kafka.
:param list messages: list of messages
"""
num_of_msg = len(messages)
LOG.debug('Publishing %d messages', num_of_msg)
try:
for topic in self._topics:
self._kafka_publisher.publish(
topic,
messages
)
LOG.debug('Sent %d messages to topic %s', num_of_msg, topic)
except Exception as ex:
raise falcon.HTTPServiceUnavailable('Service unavailable',
str(ex), 60)
def _check_if_all_messages_was_publish(self, send_count, to_send_count):
"""Executed after publishing to sent metrics.
:param int send_count: how many messages have been sent
:param int to_send_count: how many messages should be sent
"""
failed_to_send = to_send_count - send_count
if failed_to_send == 0:
LOG.debug('Successfully published all [%d] messages',
send_count)
else:
error_str = ('Failed to send all messages, %d '
'messages out of %d have not been published')
LOG.error(error_str, failed_to_send, to_send_count)

View File

@ -0,0 +1,44 @@
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import falcon
from oslo_log import log
from monasca_common.rest import exceptions
from monasca_common.rest import utils as rest_utils
LOG = log.getLogger(__name__)
def read_json_msg_body(req):
"""Read the json_msg from the http request body and return as JSON.
:param req: HTTP request object.
:return: Returns the metrics as a JSON object.
:raises falcon.HTTPBadRequest:
"""
try:
msg = req.stream.read()
json_msg = rest_utils.from_json(msg)
return json_msg
except exceptions.DataConversionException as ex:
LOG.debug(ex)
raise falcon.HTTPBadRequest('Bad request',
'Request body is not valid JSON')
except ValueError as ex:
LOG.debug(ex)
raise falcon.HTTPBadRequest('Bad request',
'Request body is not valid JSON')

View File

@ -0,0 +1,62 @@
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import falcon
from monasca_common.rest import utils as rest_utils
from monasca_events_api.app.healthcheck import kafka_check
HealthCheckResult = collections.namedtuple('HealthCheckResult',
['status', 'details'])
class HealthChecks(object):
# response configuration
CACHE_CONTROL = ['must-revalidate', 'no-cache', 'no-store']
# response codes
HEALTHY_CODE_GET = falcon.HTTP_OK
HEALTHY_CODE_HEAD = falcon.HTTP_NO_CONTENT
NOT_HEALTHY_CODE = falcon.HTTP_SERVICE_UNAVAILABLE
def __init__(self):
self._kafka_check = kafka_check.KafkaHealthCheck()
super(HealthChecks, self).__init__()
def on_head(self, req, res):
res.status = self.HEALTHY_CODE_HEAD
res.cache_control = self.CACHE_CONTROL
def on_get(self, req, res):
# at this point we know API is alive, so
# keep up good work and verify kafka status
kafka_result = self._kafka_check.healthcheck()
# in case it'd be unhealthy,
# message will contain error string
status_data = {
'kafka': kafka_result.message
}
# Really simple approach, ideally that should be
# part of monasca-common with some sort of registration of
# healthchecks concept
res.status = (self.HEALTHY_CODE_GET
if kafka_result.healthy else self.NOT_HEALTHY_CODE)
res.cache_control = self.CACHE_CONTROL
res.body = rest_utils.as_json(status_data)

View File

@ -0,0 +1,40 @@
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import six
from oslo_log import log
from voluptuous import Any
from voluptuous import Required
from voluptuous import Schema
LOG = log.getLogger(__name__)
default_schema = Schema({Required("events"): Any(list, dict),
Required("timestamp"):
Any(str, unicode) if six.PY2 else str})
def validate_body(request_body):
"""Validate body.
Method validate if body contain all required fields,
and check if all value have correct type.
:param request_body: body
"""
default_schema(request_body)

View File

@ -0,0 +1,64 @@
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log
from monasca_events_api.app.common import events_publisher
from monasca_events_api import conf
LOG = log.getLogger(__name__)
CONF = conf.CONF
class EventsBulkProcessor(events_publisher.EventPublisher):
"""BulkProcessor for effective events processing and publishing.
BulkProcessor is customized version of
:py:class:`monasca_events_api.app.base.event_publisher.EventPublisher`
that utilizes processing of bulk request inside single loop.
"""
def send_message(self, events):
"""Sends bulk package to kafka
:param list events: received events
"""
num_of_msgs = len(events) if events else 0
to_send_msgs = []
LOG.debug('Bulk package <events=%d>',
num_of_msgs)
for ev_el in events:
try:
t_el = self._transform_message_to_json(ev_el)
if t_el:
to_send_msgs.append(t_el)
except Exception as ex:
LOG.error('Failed to transform message to json. '
'message: {} Exception {}'.format(ev_el, str(ex)))
sent_count = len(to_send_msgs)
try:
self._publish(to_send_msgs)
except Exception as ex:
LOG.error('Failed to send bulk package <events=%d, dimensions=%s>',
num_of_msgs)
LOG.exception(ex)
raise ex
finally:
self._check_if_all_messages_was_publish(num_of_msgs, sent_count)

View File

@ -0,0 +1,71 @@
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import falcon
from oslo_log import log
from voluptuous import MultipleInvalid
from monasca_events_api.app.common import helpers
from monasca_events_api.app.controller.v1 import body_validation
from monasca_events_api.app.controller.v1 import bulk_processor
from monasca_events_api.app.core.model import prepare_message_to_sent
LOG = log.getLogger(__name__)
class Events(object):
"""Events.
Events acts as a RESTful endpoint accepting messages contains
collected events from the OpenStack message bus.
Works as getaway for any further processing for accepted data.
"""
VERSION = 'v1.0'
SUPPORTED_CONTENT_TYPES = {'application/json'}
def __init__(self):
super(Events, self).__init__()
self._processor = bulk_processor.EventsBulkProcessor()
def on_post(self, req, res):
"""Accepts sent events as json.
Accepts events sent to resource which should be sent
to Kafka queue.
:param req: current request
:param res: current response
"""
policy_action = 'events_api:agent_required'
try:
request_body = helpers.read_json_msg_body(req)
req.can(policy_action)
body_validation.validate_body(request_body)
messages = prepare_message_to_sent(request_body)
self._processor.send_message(messages)
res.status = falcon.HTTP_200
except MultipleInvalid as ex:
LOG.error('Entire bulk package was rejected, unsupported body')
LOG.exception(ex)
res.status = falcon.HTTP_422
except Exception as ex:
LOG.error('Entire bulk package was rejected')
LOG.exception(ex)
res.status = falcon.HTTP_400
@property
def version(self):
return getattr(self, 'VERSION')

View File

@ -0,0 +1,114 @@
# Copyright 2015 kornicameister@gmail.com
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import falcon
import six
from monasca_common.rest import utils as rest_utils
_VERSIONS_TPL_DICT = {
'v1.0': {
'id': 'v1.0',
'links': [
{
'rel': 'event',
'href': '/events'
}
],
'status': 'CURRENT',
'updated': "2017-09-01T00:00:00Z"
},
}
class Versions(object):
"""Versions Api"""
@staticmethod
def handle_none_version_id(req, res, result):
for version in _VERSIONS_TPL_DICT:
selected_version = _parse_version(version, req)
result['elements'].append(selected_version)
res.body = rest_utils.as_json(result, sort_keys=True)
res.status = falcon.HTTP_200
@staticmethod
def handle_version_id(req, res, result, version_id):
if version_id in _VERSIONS_TPL_DICT:
result['elements'].append(_parse_version(version_id, req))
res.body = rest_utils.as_json(result, sort_keys=True)
res.status = falcon.HTTP_200
else:
error_body = {'message': '%s is not valid version' % version_id}
res.body = rest_utils.as_json(error_body)
res.status = falcon.HTTP_400
def on_get(self, req, res, version_id=None):
result = {
'links': _get_common_links(req),
'elements': []
}
if version_id is None:
self.handle_none_version_id(req, res, result)
else:
self.handle_version_id(req, res, result, version_id)
def _get_common_links(req):
self_uri = req.uri
if six.PY2:
self_uri = self_uri.decode(rest_utils.ENCODING)
base_uri = self_uri.replace(req.path, '')
return [
{
'rel': 'self',
'href': self_uri
},
{
'rel': 'version',
'href': '%s/version' % base_uri
},
{
'rel': 'healthcheck',
'href': '%s/healthcheck' % base_uri
}
]
def _parse_version(version_id, req):
self_uri = req.uri
if six.PY2:
self_uri = self_uri.decode(rest_utils.ENCODING)
base_uri = self_uri.replace(req.path, '')
# need to get template dict, consecutive calls
# needs to operate on unmodified instance
selected_version = _VERSIONS_TPL_DICT[version_id].copy()
raw_links = selected_version['links']
links = []
for link in raw_links:
raw_link_href = link.get('href')
raw_link_rel = link.get('rel')
link_href = base_uri + '/' + version_id + raw_link_href
links.append({
'href': link_href,
'rel': raw_link_rel
})
selected_version['links'] = links
return selected_version

View File

@ -0,0 +1,29 @@
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import falcon
from monasca_events_api.app.model import envelope
def events_envelope_exception_handlet(ex, req, resp, params):
raise falcon.HTTPUnprocessableEntity(
title='Failed to create Envelope',
description=ex.message
)
def register_error_handler(app):
app.add_error_handler(envelope.EventsEnvelopeException,
events_envelope_exception_handlet)

View File

@ -0,0 +1,28 @@
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
def prepare_message_to_sent(body):
"""prepare_message_to_sent convert message to proper format,
:param dict body: original request body
:return dict: prepared message for publish to kafka
"""
timestamp = body['timestamp']
final_body = []
for events in body['events']:
ev = events['event'].copy()
ev.update({'timestamp': timestamp})
final_body.append(ev)
return final_body

View File

@ -13,29 +13,27 @@
# under the License.
import falcon
from oslo_log import log
from oslo_context import context
from monasca_events_api.app.core import request_contex
from monasca_events_api import policy
LOG = log.getLogger(__name__)
class Request(falcon.Request):
"""Variation of falcon. Request with context.
Following class enhances :py:class:`falcon.Request` with
:py:class:`context.RequestContext`
:py:class:`context.CustomRequestContext`
"""
def __init__(self, env, options=None):
"""Init an Request class."""
super(Request, self).__init__(env, options)
self.is_admin = None
self.context = context.RequestContext.from_environ(self.env)
self.context = \
request_contex.RequestContext.from_environ(self.env)
self.is_admin = policy.check_is_admin(self.context)
if self.is_admin is None:
self.is_admin = policy.check_is_admin(self)
def to_policy_values(self):
policy = self.context.to_policy_values()
policy['is_admin'] = self.is_admin
return policy
def can(self, action, target=None):
return self.context.can(action, target)

View File

@ -0,0 +1,38 @@
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_context import context
from monasca_events_api import policy
class RequestContext(context.RequestContext):
"""RequestContext.
RequestContext is customized version of
:py:class:oslo_context.context.RequestContext.
"""
def to_policy_values(self):
pl = super(RequestContext, self).to_policy_values()
pl['is_admin'] = self.is_admin
return pl
def can(self, action, target=None):
if target is None:
target = {'project_id': self.project_id,
'user_id': self.user_id}
return policy.authorize(self, action=action, target=target)

View File

@ -0,0 +1,94 @@
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
from monasca_common.kafka_lib import client
from oslo_log import log
from monasca_events_api import conf
LOG = log.getLogger(__name__)
CONF = conf.CONF
CheckResult = collections.namedtuple('CheckResult', ['healthy', 'message'])
"""Result from the healthcheck, contains healthy(boolean) and message"""
class KafkaHealthCheck(object):
"""Evaluates kafka health
Healthcheck verifies if:
* kafka server is up and running
* there is a configured topic in kafka
If following conditions are met healthcheck returns healthy status.
Otherwise unhealthy status is returned with explanation.
Example of middleware configuration:
.. code-block:: ini
[events_publisher]
kafka_url = localhost:8900
kafka_topics = events
Note:
It is possible to specify multiple topics if necessary.
Just separate them with ,
"""
def healthcheck(self):
url = CONF.events_publisher.kafka_url
try:
kafka_client = client.KafkaClient(hosts=url)
except client.KafkaUnavailableError as ex:
LOG.error(repr(ex))
error_str = 'Could not connect to kafka at %s' % url
return CheckResult(healthy=False, message=error_str)
result = self._verify_topics(kafka_client)
self._disconnect_gracefully(kafka_client)
return result
# noinspection PyMethodMayBeStatic
def _verify_topics(self, kafka_client):
topics = CONF.events_publisher.topics
for t in topics:
# kafka client loads metadata for topics as fast
# as possible (happens in __init__), therefore this
# topic_partitions is sure to be filled
for_topic = t in kafka_client.topic_partitions
if not for_topic:
error_str = 'Kafka: Topic %s not found' % t
LOG.error(error_str)
return CheckResult(healthy=False, message=error_str)
return CheckResult(healthy=True, message='OK')
# noinspection PyMethodMayBeStatic
def _disconnect_gracefully(self, kafka_client):
# at this point, client is connected so it must be closed
# regardless of topic existence
try:
kafka_client.close()
except Exception as ex:
# log that something went wrong and move on
LOG.error(repr(ex))

View File

View File

@ -0,0 +1,17 @@
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class EventsEnvelopeException(Exception):
pass

View File

@ -0,0 +1,23 @@
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Use this file for deploying the API under mod_wsgi.
"""
from paste import deploy
base_dir = '/etc/monasca/'
conf = '{0}event-api-paste.ini'.format(base_dir)
application = deploy.loadapp('config:{0}'.format(conf))

View File

@ -15,10 +15,12 @@
import os
import pkgutil
from oslo_config import cfg
from oslo_log import log
from oslo_utils import importutils
LOG = log.getLogger(__name__)
CONF = cfg.CONF
def load_conf_modules():

View File

@ -0,0 +1,39 @@
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
_MAX_MESSAGE_SIZE = 1048576
events_publisher_opts = [
cfg.StrOpt('kafka_url',
required=True,
help='Url to kafka server',
default="127.0.0.1:9092"),
cfg.MultiStrOpt('topics',
help='Consumer topics',
default=['monevents'],)
]
events_publisher_group = cfg.OptGroup(name='events_publisher',
title='events_publisher')
def register_opts(conf):
conf.register_group(events_publisher_group)
conf.register_opts(events_publisher_opts, events_publisher_group)
def list_opts():
return events_publisher_group, events_publisher_opts

View File

@ -42,13 +42,13 @@ def parse_args():
log.register_options(CONF)
CONF(args=[],
prog='events-app',
prog='events-api',
project='monasca',
version=version.version_str,
description='RESTful API to collect events from cloud')
log.setup(CONF,
product_name='monasca-events-app',
product_name='monasca-events-api',
version=version.version_str)
conf.register_opts(CONF)

View File

@ -0,0 +1,67 @@
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import falcon
from oslo_log import log
from oslo_middleware import base
from monasca_events_api import config
CONF = config.CONF
LOG = log.getLogger(__name__)
SUPPORTED_CONTENT_TYPES = ('application/json',)
def _validate_content_type(req):
"""Validate content type.
Function validates request against correct content type.
If Content-Type cannot be established (i.e. header is missing),
:py:class:`falcon.HTTPMissingHeader` is thrown.
If Content-Type is not **application/json**(supported contents
types are define in SUPPORTED_CONTENT_TYPES variable),
:py:class:`falcon.HTTPUnsupportedMediaType` is thrown.
:param falcon.Request req: current request
:exception: :py:class:`falcon.HTTPMissingHeader`
:exception: :py:class:`falcon.HTTPUnsupportedMediaType`
"""
content_type = req.content_type
LOG.debug('Content-type is {0}'.format(content_type))
if content_type is None or len(content_type) == 0:
raise falcon.HTTPMissingHeader('Content-Type')
if content_type not in SUPPORTED_CONTENT_TYPES:
types = ','.join(SUPPORTED_CONTENT_TYPES)
details = ('Only [{0}] are accepted as events representation'.
format(types))
raise falcon.HTTPUnsupportedMediaType(description=details)
class ValidationMiddleware(base.ConfigurableMiddleware):
"""Middleware that validates request content.
"""
@staticmethod
def process_request(req):
_validate_content_type(req)
return

View File

@ -28,8 +28,7 @@ def load_policy_modules():
Method iterates over modules of :py:mod:`monasca_events_api.policies`
and imports only those that contain following methods:
- list_opts (required by oslo_config.genconfig)
- register_opts (required by :py:currentmodule:)
- list_rules
"""
for modname in _list_module_names():

View File

@ -18,7 +18,7 @@ from oslo_policy import policy
agent_policies = [
policy.DocumentedRuleDefault(
name='events_api:agent_required',
check_str='role:monasca_events_agent',
check_str='role:monasca or role:admin',
description='Send events to api',
operations=[{'path': '/v1.0/events', 'method': 'POST'}]
)

View File

@ -14,6 +14,8 @@
import os
import falcon
from falcon import testing
import fixtures
from oslo_config import cfg
from oslo_config import fixture as config_fixture
@ -22,6 +24,7 @@ from oslo_log.fixture import logging_error as log_fixture
from oslo_serialization import jsonutils
from oslotest import base
from monasca_events_api.app.core import request
from monasca_events_api import config
from monasca_events_api import policies
from monasca_events_api import policy
@ -93,3 +96,20 @@ class PolicyFixture(fixtures.Fixture):
for rule in policies.list_rules():
if rule.name not in rules:
rules[rule.name] = rule.check_str
class MockedApi(falcon.API):
"""Mocked API.
Subclasses :py:class:`falcon.API` in order to overwrite
request_type property with custom :py:class:`request.Request`
"""
def __init__(self):
super(MockedApi, self).__init__(
media_type=falcon.DEFAULT_MEDIA_TYPE,
request_type=request.Request
)
class BaseApiTestCase(BaseTestCase, testing.TestBase):
api_class = MockedApi

View File

@ -0,0 +1,187 @@
{
"timestamp": "2012-10-29T13:42:11Z+0200",
"events": [
{
"dimensions": {
"service": "compute",
"topic": "notification.sample",
"hostname": "nova-compute:compute"
},
"project_id": "6f70656e737461636b20342065766572",
"event": {
"event_type": "instance.reboot.end",
"payload": {
"nova_object.data": {
"architecture": "x86_64",
"availability_zone": "nova",
"created_at": "2012-10-29T13:42:11Z",
"deleted_at": null,
"display_name": "some-server",
"display_description": "some-server",
"fault": null,
"host": "compute",
"host_name": "some-server",
"ip_addresses": [
{
"nova_object.name": "IpPayload",
"nova_object.namespace": "nova",
"nova_object.version": "1.0",
"nova_object.data": {
"mac": "fa:16:3e:4c:2c:30",
"address": "192.168.1.3",
"port_uuid": "ce531f90-199f-48c0-816c-13e38010b442",
"meta": {},
"version": 4,
"label": "private-network",
"device_name": "tapce531f90-19"
}
}
],
"key_name": "my-key",
"auto_disk_config": "MANUAL",
"kernel_id": "",
"launched_at": "2012-10-29T13:42:11Z",
"image_uuid": "155d900f-4e14-4e4c-a73d-069cbf4541e6",
"metadata": {},
"locked": false,
"node": "fake-mini",
"os_type": null,
"progress": 0,
"ramdisk_id": "",
"reservation_id": "r-npxv0e40",
"state": "active",
"task_state": null,
"power_state": "running",
"tenant_id": "6f70656e737461636b20342065766572",
"terminated_at": null,
"flavor": {
"nova_object.name": "FlavorPayload",
"nova_object.data": {
"flavorid": "a22d5517-147c-4147-a0d1-e698df5cd4e3",
"name": "test_flavor",
"projects": null,
"root_gb": 1,
"vcpus": 1,
"ephemeral_gb": 0,
"memory_mb": 512,
"disabled": false,
"rxtx_factor": 1.0,
"extra_specs": {
"hw:watchdog_action": "disabled"
},
"swap": 0,
"is_public": true,
"vcpu_weight": 0
},
"nova_object.version": "1.3",
"nova_object.namespace": "nova"
},
"user_id": "fake",
"uuid": "178b0921-8f85-4257-88b6-2e743b5a975c"
},
"nova_object.name": "InstanceActionPayload",
"nova_object.namespace": "nova",
"nova_object.version": "1.3"
},
"priority": "INFO",
"publisher_id": "nova-compute:compute"
}
},
{
"dimensions": {
"service": "compute",
"topic": "notification.error",
"hostname": "nova-compute:compute"
},
"project_id": "6f70656e737461636b20342065766572",
"event": {
"priority": "ERROR",
"payload": {
"nova_object.name": "InstanceActionPayload",
"nova_object.data": {
"state": "active",
"availability_zone": "nova",
"key_name": "my-key",
"kernel_id": "",
"host_name": "some-server",
"progress": 0,
"task_state": "rebuilding",
"deleted_at": null,
"architecture": null,
"auto_disk_config": "MANUAL",
"ramdisk_id": "",
"locked": false,
"created_at": "2012-10-29T13:42:11Z",
"host": "compute",
"display_name": "some-server",
"os_type": null,
"metadata": {},
"ip_addresses": [
{
"nova_object.name": "IpPayload",
"nova_object.data": {
"device_name": "tapce531f90-19",
"port_uuid": "ce531f90-199f-48c0-816c-13e38010b442",
"address": "192.168.1.3",
"version": 4,
"meta": {},
"label": "private-network",
"mac": "fa:16:3e:4c:2c:30"
},
"nova_object.version": "1.0",
"nova_object.namespace": "nova"
}
],
"power_state": "running",
"display_description": "some-server",
"uuid": "5fafd989-4043-44b4-8acc-907e847f4b70",
"flavor": {
"nova_object.name": "FlavorPayload",
"nova_object.data": {
"disabled": false,
"ephemeral_gb": 0,
"extra_specs": {
"hw:watchdog_action": "disabled"
},
"flavorid": "a22d5517-147c-4147-a0d1-e698df5cd4e3",
"is_public": true,
"memory_mb": 512,
"name": "test_flavor",
"projects": null,
"root_gb": 1,
"rxtx_factor": 1.0,
"swap": 0,
"vcpu_weight": 0,
"vcpus": 1
},
"nova_object.version": "1.3",
"nova_object.namespace": "nova"
},
"reservation_id": "r-pfiic52h",
"terminated_at": null,
"tenant_id": "6f70656e737461636b20342065766572",
"node": "fake-mini",
"launched_at": "2012-10-29T13:42:11Z",
"user_id": "fake",
"image_uuid": "a2459075-d96c-40d5-893e-577ff92e721c",
"fault": {
"nova_object.name": "ExceptionPayload",
"nova_object.data": {
"module_name": "nova.tests.functional.notification_sample_tests.test_instance",
"exception_message": "Insufficient compute resources: fake-resource.",
"function_name": "_compute_resources_unavailable",
"exception": "ComputeResourcesUnavailable"
},
"nova_object.version": "1.0",
"nova_object.namespace": "nova"
}
},
"nova_object.version": "1.3",
"nova_object.namespace": "nova"
},
"publisher_id": "nova-compute:compute",
"event_type": "instance.rebuild.error"
}
}
]
}

View File

@ -0,0 +1,91 @@
{
"timestamp": "2012-10-29T13:42:11Z+0200",
"events": [
{
"dimensions": {
"service": "compute",
"topic": "notification.sample",
"hostname": "nova-compute:compute"
},
"project_id": "6f70656e737461636b20342065766572",
"event": {
"event_type": "instance.reboot.end",
"payload": {
"nova_object.data": {
"architecture": "x86_64",
"availability_zone": "nova",
"created_at": "2012-10-29T13:42:11Z",
"deleted_at": null,
"display_name": "some-server",
"display_description": "some-server",
"fault": null,
"host": "compute",
"host_name": "some-server",
"ip_addresses": [
{
"nova_object.name": "IpPayload",
"nova_object.namespace": "nova",
"nova_object.version": "1.0",
"nova_object.data": {
"mac": "fa:16:3e:4c:2c:30",
"address": "192.168.1.3",
"port_uuid": "ce531f90-199f-48c0-816c-13e38010b442",
"meta": {},
"version": 4,
"label": "private-network",
"device_name": "tapce531f90-19"
}
}
],
"key_name": "my-key",
"auto_disk_config": "MANUAL",
"kernel_id": "",
"launched_at": "2012-10-29T13:42:11Z",
"image_uuid": "155d900f-4e14-4e4c-a73d-069cbf4541e6",
"metadata": {},
"locked": false,
"node": "fake-mini",
"os_type": null,
"progress": 0,
"ramdisk_id": "",
"reservation_id": "r-npxv0e40",
"state": "active",
"task_state": null,
"power_state": "running",
"tenant_id": "6f70656e737461636b20342065766572",
"terminated_at": null,
"flavor": {
"nova_object.name": "FlavorPayload",
"nova_object.data": {
"flavorid": "a22d5517-147c-4147-a0d1-e698df5cd4e3",
"name": "test_flavor",
"projects": null,
"root_gb": 1,
"vcpus": 1,
"ephemeral_gb": 0,
"memory_mb": 512,
"disabled": false,
"rxtx_factor": 1.0,
"extra_specs": {
"hw:watchdog_action": "disabled"
},
"swap": 0,
"is_public": true,
"vcpu_weight": 0
},
"nova_object.version": "1.3",
"nova_object.namespace": "nova"
},
"user_id": "fake",
"uuid": "178b0921-8f85-4257-88b6-2e743b5a975c"
},
"nova_object.name": "InstanceActionPayload",
"nova_object.namespace": "nova",
"nova_object.version": "1.3"
},
"priority": "INFO",
"publisher_id": "nova-compute:compute"
}
}
]
}

View File

@ -0,0 +1,47 @@
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from voluptuous import MultipleInvalid
from monasca_events_api.app.controller.v1.body_validation import validate_body
from monasca_events_api.tests.unit import base
class TestBodyValidation(base.BaseTestCase):
def test_missing_events_filed(self):
body = {'timestamp': '2012-10-29T13:42:11Z+0200'}
self.assertRaises(MultipleInvalid, validate_body, body)
def test_missing_timestamp_field(self):
body = {'events': []}
self.assertRaises(MultipleInvalid, validate_body, body)
def test_empty_body(self):
body = {}
self.assertRaises(MultipleInvalid, validate_body, body)
def test_incorrect_timestamp_type(self):
body = {'events': [], 'timestamp': 9000}
self.assertRaises(MultipleInvalid, validate_body, body)
def test_incorrect_events_type(self):
body = {'events': 'over9000', 'timestamp': '2012-10-29T13:42:11Z+0200'}
self.assertRaises(MultipleInvalid, validate_body, body)
def test_correct_body(self):
body = [{'events': [], 'timestamp': '2012-10-29T13:42:11Z+0200'},
{'events': {}, 'timestamp': u'2012-10-29T13:42:11Z+0200'}]
for b in body:
validate_body(b)

View File

@ -0,0 +1,134 @@
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import falcon
import mock
import ujson as json
from monasca_events_api.app.controller.v1 import events
from monasca_events_api.tests.unit import base
ENDPOINT = '/events'
def _init_resource(test):
resource = events.Events()
test.api.add_route(ENDPOINT, resource)
return resource
@mock.patch('monasca_events_api.app.controller.v1.'
'bulk_processor.EventsBulkProcessor')
class TestEventsApi(base.BaseApiTestCase):
def test_should_pass_simple_event(self, bulk_processor):
events_resource = _init_resource(self)
events_resource._processor = bulk_processor
unit_test_patch = os.path.dirname(__file__)
json_file_path = 'event_template_json/req_simple_event.json'
patch_to_req_simple_event_file = os.path.join(unit_test_patch,
json_file_path)
with open(patch_to_req_simple_event_file, 'r') as fi:
body = fi.read()
self.simulate_request(
path=ENDPOINT,
method='POST',
headers={
'Content-Type': 'application/json',
'X_ROLES': 'monasca'
},
body=body
)
self.assertEqual(falcon.HTTP_200, self.srmock.status)
def test_should_multiple_events(self, bulk_processor):
events_resource = _init_resource(self)
events_resource._processor = bulk_processor
unit_test_patch = os.path.dirname(__file__)
json_file_path = 'event_template_json/req_multiple_events.json'
req_multiple_events_json = os.path.join(unit_test_patch,
json_file_path)
with open(req_multiple_events_json, 'r') as fi:
body = fi.read()
self.simulate_request(
path=ENDPOINT,
method='POST',
headers={
'Content-Type': 'application/json',
'X_ROLES': 'monasca'
},
body=body
)
self.assertEqual(falcon.HTTP_200, self.srmock.status)
def test_should_fail_empty_body(self, bulk_processor):
events_resource = _init_resource(self)
events_resource._processor = bulk_processor
self.simulate_request(
path=ENDPOINT,
method='POST',
headers={
'Content-Type': 'application/json',
'X_ROLES': 'monasca'
},
body=''
)
self.assertEqual(falcon.HTTP_400, self.srmock.status)
def test_should_fail_missing_timestamp_in_body(self, bulk_processor):
events_resource = _init_resource(self)
events_resource._processor = bulk_processor
unit_test_patch = os.path.dirname(__file__)
json_file_path = 'event_template_json/req_simple_event.json'
patch_to_req_simple_event_file = os.path.join(unit_test_patch,
json_file_path)
with open(patch_to_req_simple_event_file, 'r') as fi:
events = json.load(fi)['events']
body = {'events': [events]}
self.simulate_request(
path=ENDPOINT,
method='POST',
headers={
'Content-Type': 'application/json',
'X_ROLES': 'monasca'
},
body=json.dumps(body)
)
self.assertEqual(falcon.HTTP_422, self.srmock.status)
def test_should_fail_missing_events_in_body(self, bulk_processor):
events_resource = _init_resource(self)
events_resource._processor = bulk_processor
body = {'timestamp': '2012-10-29T13:42:11Z+0200'}
self.simulate_request(
path=ENDPOINT,
method='POST',
headers={
'Content-Type': 'application/json',
'X_ROLES': 'monasca'
},
body=json.dumps(body)
)
self.assertEqual(falcon.HTTP_422, self.srmock.status)
class TestApiEventsVersion(base.BaseApiTestCase):
@mock.patch('monasca_events_api.app.controller.v1.'
'bulk_processor.EventsBulkProcessor')
def test_should_return_v1_as_version(self, _):
resource = events.Events()
self.assertEqual('v1.0', resource.version)

View File

@ -0,0 +1,77 @@
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import falcon
import mock
import ujson as json
from monasca_events_api.app.controller import healthchecks
from monasca_events_api.app.healthcheck import kafka_check as healthcheck
from monasca_events_api.tests.unit import base
ENDPOINT = '/healthcheck'
class TestApiHealthChecks(base.BaseApiTestCase):
def before(self):
self.resource = healthchecks.HealthChecks()
self.api.add_route(
ENDPOINT,
self.resource
)
def test_should_return_200_for_head(self):
self.simulate_request(ENDPOINT, method='HEAD')
self.assertEqual(falcon.HTTP_NO_CONTENT, self.srmock.status)
@mock.patch('monasca_events_api.app.healthcheck.'
'kafka_check.KafkaHealthCheck')
def test_should_report_healthy_if_kafka_healthy(self, kafka_check):
kafka_check.healthcheck.return_value = healthcheck.CheckResult(True,
'OK')
self.resource._kafka_check = kafka_check
ret = self.simulate_request(ENDPOINT,
headers={
'Content-Type': 'application/json'
},
decode='utf8',
method='GET')
self.assertEqual(falcon.HTTP_OK, self.srmock.status)
ret = json.loads(ret)
self.assertIn('kafka', ret)
self.assertEqual('OK', ret.get('kafka'))
@mock.patch('monasca_events_api.app.healthcheck.'
'kafka_check.KafkaHealthCheck')
def test_should_report_unhealthy_if_kafka_unhealthy(self, kafka_check):
url = 'localhost:8200'
err_str = 'Could not connect to kafka at %s' % url
kafka_check.healthcheck.return_value = healthcheck.CheckResult(False,
err_str)
self.resource._kafka_check = kafka_check
ret = self.simulate_request(ENDPOINT,
headers={
'Content-Type': 'application/json'
},
decode='utf8',
method='GET')
self.assertEqual(falcon.HTTP_SERVICE_UNAVAILABLE, self.srmock.status)
ret = json.loads(ret)
self.assertIn('kafka', ret)
self.assertEqual(err_str, ret.get('kafka'))

View File

@ -82,7 +82,7 @@ class TestPolicyCase(base.BaseTestCase):
)
)
self.assertRaises(os_policy.PolicyNotRegistered, policy.authorize,
ctx, action, {})
ctx.context, action, {})
def test_authorize_bad_action_throws(self):
action = "example:denied"
@ -97,7 +97,7 @@ class TestPolicyCase(base.BaseTestCase):
)
)
self.assertRaises(os_policy.PolicyNotAuthorized, policy.authorize,
ctx, action, {})
ctx.context, action, {})
def test_authorize_bad_action_no_exception(self):
action = "example:denied"
@ -111,7 +111,7 @@ class TestPolicyCase(base.BaseTestCase):
}
)
)
result = policy.authorize(ctx, action, {}, False)
result = policy.authorize(ctx.context, action, {}, False)
self.assertFalse(result)
def test_authorize_good_action(self):
@ -126,7 +126,7 @@ class TestPolicyCase(base.BaseTestCase):
}
)
)
result = policy.authorize(ctx, action, False)
result = policy.authorize(ctx.context, action, False)
self.assertTrue(result)
def test_ignore_case_role_check(self):
@ -143,7 +143,9 @@ class TestPolicyCase(base.BaseTestCase):
}
)
)
self.assertTrue(policy.authorize(admin_context, lowercase_action,
self.assertTrue(policy.authorize(admin_context.context,
lowercase_action,
{}))
self.assertTrue(policy.authorize(admin_context, uppercase_action,
self.assertTrue(policy.authorize(admin_context.context,
uppercase_action,
{}))

View File

@ -0,0 +1,47 @@
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import falcon
from monasca_events_api.middleware import validation_middleware as vm
from monasca_events_api.tests.unit import base
class FakeRequest(object):
def __init__(self, content=None, length=0):
self.content_type = content if content else None
self.content_length = (length if length is not None and length > 0
else None)
class TestValidation(base.BaseTestCase):
def setUp(self):
super(TestValidation, self).setUp()
def test_should_validate_right_content_type(self):
req = FakeRequest('application/json')
vm._validate_content_type(req)
def test_should_fail_missing_content_type(self):
req = FakeRequest()
self.assertRaises(falcon.HTTPMissingHeader,
vm._validate_content_type,
req)
def test_should_fail_unsupported_content_type(self):
req = FakeRequest('test/plain')
self.assertRaises(falcon.HTTPUnsupportedMediaType,
vm._validate_content_type,
req)

View File

@ -0,0 +1,95 @@
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from six.moves.urllib.parse import urlparse as urlparse
import falcon
import ujson as json
from monasca_events_api.app.controller import versions
from monasca_events_api.tests.unit import base
def _get_versioned_url(version_id):
return '/version/%s' % version_id
class TestVersionApi(base.BaseApiTestCase):
def before(self):
self.versions = versions.Versions()
self.api.add_route("/version/", self.versions)
self.api.add_route("/version/{version_id}", self.versions)
def test_request_for_incorrect_version(self):
incorrect_version = 'v2.0'
uri = _get_versioned_url(incorrect_version)
self.simulate_request(
uri,
method='GET',
headers={
'Content-Type': 'application/json'
}
)
self.assertEqual(falcon.HTTP_400, self.srmock.status)
def test_should_return_supported_event_api_version(self):
def _check_global_links(current_endpoint, links):
expected_links = {'self': current_endpoint,
'version': '/version',
'healthcheck': '/healthcheck'}
_check_links(links, expected_links)
def _check_links(links, expected_links):
for link in links:
self.assertIn('rel', link)
self.assertIn('href', link)
key = link.get('rel')
href_path = urlparse(link.get('href')).path
self.assertIn(key, expected_links.keys())
self.assertEqual(expected_links[key], href_path)
def _check_elements(elements, expected_versions):
self.assertIsInstance(elements, list)
for el in elements:
self.assertItemsEqual([
u'id',
u'links',
u'status',
u'updated'
], el.keys())
id_v = el.get('id')
self.assertEqual(expected_versions, id_v)
supported_versions = ['v1.0']
version_endpoint = '/version'
for version in supported_versions:
endpoint = '%s/%s' % (version_endpoint, version)
res = self.simulate_request(
endpoint,
method='GET',
headers={
'Content-Type': 'application/json'
},
decode='utf-8'
)
self.assertEqual(falcon.HTTP_200, self.srmock.status)
response = json.loads(res)
self.assertIn('links', response)
_check_global_links(endpoint, response['links'])
self.assertIn('elements', response)
_check_elements(response['elements'], version)

View File

@ -14,5 +14,5 @@
import pbr.version
version_info = pbr.version.VersionInfo('monasca-events-app')
version_info = pbr.version.VersionInfo('monasca-events-api')
version_str = version_info.version_string()

7
policy-sample.yaml Normal file
View File

@ -0,0 +1,7 @@
# Admin role
# POST /
#"admin_required": "role:admin or is_admin:1"
# Send events to api
# POST /v1.0/events
#"events_api:agent_required": "role:monasca_events_agent"

View File

@ -15,3 +15,6 @@ oslo.serialization>=1.10.0,!=2.19.1 # Apache-2.0
oslo.utils>=3.20.0 # Apache-2.0
PasteDeploy>=1.5.0 # MIT
eventlet!=0.18.3,!=0.20.1,<0.21.0,>=0.18.2 # MIT
monasca-common>=1.4.0 # Apache-2.0
voluptuous>=0.8.9 # BSD License
six>=1.10.0 # MIT

View File

@ -31,6 +31,9 @@ data_files =
etc/monasca/events-api-paste.ini
etc/monasca/events-api-logging.conf
wsgi_scripts =
monasca-events-api-wsgi = monasca_events_api.app.wsgi:main
[entry_points]
oslo.config.opts =
@ -66,5 +69,6 @@ universal = 1
[pbr]
autodoc_index_modules = True
autodoc_exclude_modules =
monasca_events_api.app.wsgi*
monasca_events_api.tests.*
api_doc_dir = contributor/api

View File

@ -14,6 +14,7 @@ mock>=2.0 # BSD
oslotest>=1.10.0 # Apache-2.0
os-testr>=0.8.0 # Apache-2.0
simplejson>=2.2.0 # MIT
voluptuous>=0.8.9 # BSD License
# documentation
doc8 # Apache-2.0

View File

@ -96,7 +96,9 @@ commands =
[testenv:devdocs]
description = Builds developer documentation
commands =
{[testenv]commands}
rm -rf doc/build
rm -rf doc/source/contributor/api
{[testenv:checkjson]commands}
python setup.py build_sphinx