Add implementation of /v3.0/logs API

Implemented specification proposal for batch support at,
https://review.openstack.org/#/c/273058/

Note, if you want to use this in the monasca-vagrant environment
it should work. I've modified the value of the field kafka_url in
etc/monasca/monasca_log_api.conf to use the kafka server in the
mini-mon vm.

After you've deployed monasca-vagrant using "vagrant up"
ssh into mini-mon "vagrant ssh mini-mon" and then create a new
topic for logs which can be done running the command:

/opt/kafka/bin/kafka-topics.sh --create -zookeeper localhost:2181
--replication-factor 1  --partitions 128 --topic logs

To watch the log messages at the console in the mini-mon VM

/opt/kafka/bin/kafka-console-consumer.sh
--zookeeper localhost:2181 --topic logs

Change-Id: I4247d1824a237ecbe4db878e72485938f40a31c3
This commit is contained in:
Roland Hochmuth 2016-01-30 07:59:06 -07:00
parent 412892aed2
commit 9b93fc8864
11 changed files with 225 additions and 16 deletions

3
.gitignore vendored
View File

@ -13,7 +13,7 @@ cover
ChangeLog
MANIFEST
AUTHORS
monasca.log
monasca-log-api.log
*.swp
@ -31,3 +31,4 @@ test-output/
logs/
*config*.yml
db/config.yml
.venv

View File

@ -3,32 +3,48 @@
# to write to the directory.
log_file = monasca-log-api.log
log_dir = .
debug=True
debug=False
# Dispatchers to be loaded to serve restful APIs
[dispatcher]
logs = monasca_log_api.v2.reference.logs:Logs
logs_v3 = monasca_log_api.v3.reference.logs:Logs
versions = monasca_log_api.v2.reference.versions:Versions
healthchecks = monasca_log_api.v2.reference.healthchecks:HealthChecks
[service]
max_log_size = 1048576
region = 'pl'
region = 'region-one'
[log_publisher]
topics = 'logs'
kafka_url = 'localhost:8900'
# Uncomment for DevStack environment
kafka_url = '192.168.10.6:9092'
# Uncomment for monasca-vagrant environment
#kafka_url = '192.168.10.4:9092'
[keystone_authtoken]
identity_uri = http://192.168.10.5:35357
auth_uri = http://192.168.10.5:5000
admin_password = admin
# Uncomment for DevStack environment
identity_uri = http://192.168.10.6:35357
auth_uri = http://192.168.10.6:5000
admin_password = secretadmin
# Uncomment for monaasca-vagrant environment
# identity_uri = http://192.168.10.5:35357
# auth_uri = http://192.168.10.5:5000
# admin_password = admin
admin_user = admin
admin_tenant_name = admin
cafile =
certfile =
keyfile =
insecure = false
# memcached_servers = 127.0.0.1:11211
# token_cache_time=300
[kafka_healthcheck]
kafka_url = localhost:8900
@ -36,5 +52,6 @@ kafka_topics = log
[roles_middleware]
path = /v2.0/log
default_roles = monasca-user
agent_roles = monasca-log-agent
path = /v3.0/logs
default_roles = user, domainuser, domainadmin, monasca-user
agent_roles = monasca-agent, admin

View File

@ -16,6 +16,6 @@ paste.filter_factory = monasca_log_api.middleware.role_middleware:RoleMiddleware
[server:main]
use = egg:gunicorn#main
host = 127.0.0.1
port = 8082
port = 8074
workers = 1
proc_name = monasca_log_api

View File

@ -0,0 +1,28 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import falcon
from oslo_log import log
LOG = log.getLogger(__name__)
class LogsV3Api(object):
def __init__(self):
super(LogsV3Api, self).__init__()
LOG.info('Initializing LogsV3Api!')
def on_post(self, req, res):
res.status = falcon.HTTP_501 # pragma: no cover

View File

@ -37,14 +37,17 @@ dispatcher_opts = [
cfg.StrOpt('healthchecks',
default=None,
required=True,
help='Healthchecks endpoint')
help='Healthchecks endpoint'),
cfg.StrOpt('logs_v3',
default=None,
help='Logs')
]
dispatcher_group = cfg.OptGroup(name='dispatcher', title='dispatcher')
CONF.register_group(dispatcher_group)
CONF.register_opts(dispatcher_opts, dispatcher_group)
def launch(conf, config_file='/etc/monasca/log-api-config.conf'):
def launch(conf, config_file='etc/monasca/log-api-config.conf'):
if conf and 'config_file' in conf:
config_file = conf.get('config_file')
@ -75,6 +78,9 @@ def load_logs_resource(app):
logs = simport.load(CONF.dispatcher.logs)()
app.add_route('/v2.0/log/single', logs)
logs_v3 = simport.load(CONF.dispatcher.logs_v3)()
app.add_route('/v3.0/logs', logs_v3)
def load_versions_resource(app):
versions = simport.load(CONF.dispatcher.versions)()
@ -86,7 +92,7 @@ if __name__ == '__main__':
base_path = '%s/..' % os.getcwd()
global_conf = {'config_file': (
'%s/%s' % (base_path, '/etc/monasca/log-api-config.conf'))}
'%s/%s' % (base_path, 'etc/monasca/log-api-config.conf'))}
wsgi_app = (
paste.deploy.loadapp(
@ -96,5 +102,5 @@ if __name__ == '__main__':
)
)
httpd = simple_server.make_server('127.0.0.1', 8080, wsgi_app)
httpd = simple_server.make_server('127.0.0.1', 8074, wsgi_app)
httpd.serve_forever()

View File

View File

View File

@ -0,0 +1,44 @@
# Copyright 2014 Hewlett-Packard
# Copyright 2015 Cray Inc. All Rights Reserved.
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import falcon
from monasca_common.rest import utils as rest_utils
from oslo_log import log
LOG = log.getLogger(__name__)
def read_json_msg_body(req):
"""Read the json_msg from the http request body and return them as JSON.
:param req: HTTP request object.
:return: Returns the metrics as a JSON object.
:raises falcon.HTTPBadRequest:
"""
try:
msg = req.stream.read()
json_msg = rest_utils.from_json(msg)
return json_msg
except ValueError as ex:
LOG.debug(ex)
raise falcon.HTTPBadRequest('Bad request',
'Request body is not valid JSON')
def validate_json_content_type(req):
if req.content_type not in ['application/json']:
raise falcon.HTTPBadRequest('Bad request', 'Bad content type. Must be '
'application/json')

View File

@ -0,0 +1,113 @@
# Copyright 2016 Hewlett Packard Enterprise Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import falcon
from monasca_common.kafka import producer
from monasca_common.rest import utils as rest_utils
from oslo_config import cfg
from oslo_log import log
from oslo_utils import timeutils
from monasca_log_api.api import exceptions
from monasca_log_api.api import headers
from monasca_log_api.api import logs_v3_api
from monasca_log_api.v2.common import service
from monasca_log_api.v3.reference import helpers
LOG = log.getLogger(__name__)
class Logs(logs_v3_api.LogsV3Api):
def __init__(self):
super(Logs, self).__init__()
self._service_config = cfg.CONF.service
self._log_publisher_config = cfg.CONF.log_publisher
self._kafka_producer = producer.KafkaProducer(
self._log_publisher_config.kafka_url)
def on_post(self, req, res):
helpers.validate_json_content_type(req)
service.Validations.validate_payload_size(req)
cross_tenant_id = req.get_param('tenant_id')
tenant_id = req.get_header(*headers.X_TENANT_ID)
self._validate_cross_tenant_id(tenant_id, cross_tenant_id)
request_body = helpers.read_json_msg_body(req)
log_list = self._get_logs(request_body)
envelopes = []
for log_element in log_list:
dimensions = self._get_dimensions(log_element)
service.Validations.validate_dimensions(dimensions)
log_message = self._get_log_message(log_element)
envelope = self._create_log_envelope(tenant_id, cross_tenant_id,
self._service_config.region,
dimensions, log_message)
service.Validations.validate_envelope_size(envelope)
envelopes.append(envelope)
self._send_logs(envelopes)
res.status = falcon.HTTP_204
def _validate_cross_tenant_id(self, tenant_id, cross_tenant_id):
if not service.is_delegate(tenant_id):
if cross_tenant_id:
raise falcon.HTTPForbidden(
'Permission denied',
'Projects %s cannot POST cross tenant logs' % tenant_id
)
def _get_dimensions(self, log_element):
'''Get the dimensions in the log element.'''
if 'dimensions' not in log_element:
raise exceptions.HTTPUnprocessableEntityError(
'Unprocessable Entity Dimensions not found')
return log_element['dimensions']
def _get_log_message(self, log_element):
'''Get the message in the log element.'''
if 'message' not in log_element:
raise exceptions.HTTPUnprocessableEntityError(
'Unprocessable Entity Log message not found')
return log_element['message']
def _get_logs(self, request_body):
'''Get the logs in the HTTP request body.'''
if 'logs' not in request_body:
raise exceptions.HTTPUnprocessableEntityError(
'Unprocessable Entity Logs not found')
return request_body['logs']
def _create_log_envelope(self, tenant_id, cross_tenant_id, region='',
dimensions={}, logs={}):
'''Create a log envelope and return it as a json string.'''
envelope = {
'creation_time': timeutils.utcnow_ts(),
'meta': {
'tenantId': tenant_id if tenant_id else cross_tenant_id,
'region': region
},
'dimensions': dimensions,
'logs': logs
}
return rest_utils.as_json(envelope)
def _send_logs(self, logs):
'''Send the logs to Kafka.'''
try:
self._kafka_producer.publish(self._log_publisher_config.topics[0],
logs,
key=None)
except Exception as ex:
LOG.exception(ex)
raise falcon.HTTPServiceUnavailable('Service unavailable',
ex.message, 60)

View File

@ -10,7 +10,6 @@ oslo.utils>=3.4.0
pastedeploy>=1.5.0
pbr>=1.6.0
six>=1.9.0
simplejson>=2.2.0
simport
monasca-common>=0.0.6
eventlet>=0.17.4,!=0.18.0

View File

@ -3,4 +3,5 @@ flake8>=2.2.4,<=2.4.1
coverage>=3.6
hacking>=0.10.2,<0.11
mock>=1.2
nose
nose
simplejson