Fixing issues spotted in dev environment
- removed own kafka abstraction in favour of monasca-common - removed monasca keystone context filter, not actually used - changed URI of logs endpoints to /v1.0/log/single Change-Id: Iaceabdce2b2862451cfe63d2a612577d7710022b
This commit is contained in:
parent
9d8c24d1f7
commit
f07a38e388
|
@ -12,6 +12,7 @@ cover
|
|||
.tox
|
||||
ChangeLog
|
||||
MANIFEST
|
||||
AUTHORS
|
||||
monasca.log
|
||||
|
||||
|
||||
|
|
|
@ -16,19 +16,9 @@ driver = v1_reference
|
|||
[service]
|
||||
region = 'pl'
|
||||
|
||||
[kafka]
|
||||
client_id = 'monasca-log-api'
|
||||
timeout = 60
|
||||
host = 'localhost:8900'
|
||||
|
||||
[kafka_producer]
|
||||
batch_send_every_n = 10
|
||||
async = True
|
||||
ack_timeout = 1000
|
||||
req_acks = 'Local'
|
||||
|
||||
[log_publisher]
|
||||
topics = 'logs'
|
||||
kafka_url = 'localhost:8900'
|
||||
|
||||
[keystone_authtoken]
|
||||
identity_uri = http://192.168.10.5:35357
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
name = monasca_log_api
|
||||
|
||||
[pipeline:main]
|
||||
pipeline = auth keystonecontext api
|
||||
pipeline = auth api
|
||||
|
||||
[app:api]
|
||||
paste.app_factory = monasca_log_api.server:launch
|
||||
|
@ -10,9 +10,6 @@ paste.app_factory = monasca_log_api.server:launch
|
|||
[filter:auth]
|
||||
paste.filter_factory = keystonemiddleware.auth_token:filter_factory
|
||||
|
||||
[filter:keystonecontext]
|
||||
paste.filter_factory = monasca_log_api.middleware.keystone_context_filter:filter_factory
|
||||
|
||||
[server:main]
|
||||
use = egg:gunicorn#main
|
||||
host = 127.0.0.1
|
||||
|
|
|
@ -0,0 +1,49 @@
|
|||
# Copyright 2015 kornicameister@gmail.com
|
||||
# Copyright 2015 FUJITSU LIMITED
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import falcon
|
||||
import simplejson as json
|
||||
|
||||
ENCODING = 'utf8'
|
||||
|
||||
|
||||
def read_body(payload, content_type='application/json'):
|
||||
try:
|
||||
content = payload.read()
|
||||
if not content:
|
||||
return False
|
||||
except Exception as ex:
|
||||
raise falcon.HTTPBadRequest(title='Failed to read body',
|
||||
description=ex.message)
|
||||
|
||||
if content_type == 'application/json':
|
||||
try:
|
||||
content = from_json(content)
|
||||
except Exception as ex:
|
||||
raise falcon.HTTPBadRequest(title='Failed to read body as json',
|
||||
description=ex.message)
|
||||
|
||||
return content
|
||||
|
||||
|
||||
def as_json(data):
|
||||
return json.dumps(data,
|
||||
encoding=ENCODING,
|
||||
sort_keys=False,
|
||||
ensure_ascii=False)
|
||||
|
||||
|
||||
def from_json(data):
|
||||
return json.loads(data, encoding=ENCODING)
|
|
@ -1,16 +0,0 @@
|
|||
# Copyright 2015 kornicameister@gmail.com
|
||||
# Copyright 2015 FUJITSU LIMITED
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
ENCODING = 'utf8'
|
|
@ -1,83 +0,0 @@
|
|||
# Copyright (c) 2015 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""RequestContext: context for requests that persist through monasca."""
|
||||
|
||||
import uuid
|
||||
|
||||
from oslo_log import log
|
||||
from oslo_utils import timeutils
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class RequestContext(object):
|
||||
"""Security context and request information.
|
||||
|
||||
Represents the user taking a given action within the system.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, user_id, project_id, domain_id=None, domain_name=None,
|
||||
roles=None, timestamp=None, request_id=None,
|
||||
auth_token=None, user_name=None, project_name=None,
|
||||
service_catalog=None, user_auth_plugin=None, **kwargs):
|
||||
"""Creates the Keystone Context. Supports additional parameters:
|
||||
|
||||
:param user_auth_plugin:
|
||||
The auth plugin for the current request's authentication data.
|
||||
:param kwargs:
|
||||
Extra arguments that might be present
|
||||
"""
|
||||
if kwargs:
|
||||
LOG.warning(
|
||||
'Arguments dropped when creating context: %s') % str(kwargs)
|
||||
|
||||
self._roles = roles or []
|
||||
self.timestamp = timeutils.utcnow()
|
||||
|
||||
if not request_id:
|
||||
request_id = self.generate_request_id()
|
||||
self._request_id = request_id
|
||||
self._auth_token = auth_token
|
||||
|
||||
self._service_catalog = service_catalog
|
||||
|
||||
self._domain_id = domain_id
|
||||
self._domain_name = domain_name
|
||||
|
||||
self._user_id = user_id
|
||||
self._user_name = user_name
|
||||
|
||||
self._project_id = project_id
|
||||
self._project_name = project_name
|
||||
|
||||
self._user_auth_plugin = user_auth_plugin
|
||||
|
||||
def to_dict(self):
|
||||
return {'user_id': self._user_id,
|
||||
'project_id': self._project_id,
|
||||
'domain_id': self._domain_id,
|
||||
'domain_name': self._domain_name,
|
||||
'roles': self._roles,
|
||||
'timestamp': timeutils.strtime(self._timestamp),
|
||||
'request_id': self._request_id,
|
||||
'auth_token': self._auth_token,
|
||||
'user_name': self._user_name,
|
||||
'service_catalog': self._service_catalog,
|
||||
'project_name': self._project_name,
|
||||
'user': self._user}
|
||||
|
||||
def generate_request_id(self):
|
||||
return b'req-' + str(uuid.uuid4()).encode('ascii')
|
|
@ -1,109 +0,0 @@
|
|||
# Copyright (c) 2015 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import falcon
|
||||
from oslo_log import log
|
||||
from oslo_middleware import request_id
|
||||
from oslo_serialization import jsonutils
|
||||
|
||||
from monasca_log_api.middleware import context
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
def filter_factory(global_conf, **local_conf):
|
||||
def validator_filter(app):
|
||||
return KeystoneContextFilter(app, local_conf)
|
||||
|
||||
return validator_filter
|
||||
|
||||
|
||||
class KeystoneContextFilter(object):
|
||||
"""Make a request context from keystone headers."""
|
||||
|
||||
def __init__(self, app, conf):
|
||||
self._app = app
|
||||
self._conf = conf
|
||||
|
||||
def __call__(self, env, start_response):
|
||||
|
||||
LOG.debug("Creating Keystone Context Object.")
|
||||
|
||||
user_id = env.get('HTTP_X_USER_ID', env.get('HTTP_X_USER'))
|
||||
if user_id is None:
|
||||
msg = "Neither X_USER_ID nor X_USER found in request"
|
||||
LOG.error(msg)
|
||||
raise falcon.HTTPUnauthorized(title='Forbidden', description=msg)
|
||||
|
||||
roles = self._get_roles(env)
|
||||
|
||||
project_id = env.get('HTTP_X_PROJECT_ID')
|
||||
project_name = env.get('HTTP_X_PROJECT_NAME')
|
||||
|
||||
domain_id = env.get('HTTP_X_DOMAIN_ID')
|
||||
domain_name = env.get('HTTP_X_DOMAIN_NAME')
|
||||
|
||||
user_name = env.get('HTTP_X_USER_NAME')
|
||||
|
||||
req_id = env.get(request_id.ENV_REQUEST_ID)
|
||||
|
||||
# Get the auth token
|
||||
auth_token = env.get('HTTP_X_AUTH_TOKEN',
|
||||
env.get('HTTP_X_STORAGE_TOKEN'))
|
||||
|
||||
service_catalog = None
|
||||
if env.get('HTTP_X_SERVICE_CATALOG') is not None:
|
||||
try:
|
||||
catalog_header = env.get('HTTP_X_SERVICE_CATALOG')
|
||||
service_catalog = jsonutils.loads(catalog_header)
|
||||
except ValueError:
|
||||
msg = "Invalid service catalog json."
|
||||
LOG.error(msg)
|
||||
raise falcon.HTTPInternalServerError(msg)
|
||||
|
||||
# NOTE(jamielennox): This is a full auth plugin set by auth_token
|
||||
# middleware in newer versions.
|
||||
user_auth_plugin = env.get('keystone.token_auth')
|
||||
|
||||
# Build a context
|
||||
ctx = context.RequestContext(user_id,
|
||||
project_id,
|
||||
user_name=user_name,
|
||||
project_name=project_name,
|
||||
domain_id=domain_id,
|
||||
domain_name=domain_name,
|
||||
roles=roles,
|
||||
auth_token=auth_token,
|
||||
service_catalog=service_catalog,
|
||||
request_id=req_id,
|
||||
user_auth_plugin=user_auth_plugin)
|
||||
|
||||
env['monasca.context'] = ctx
|
||||
|
||||
LOG.debug("Keystone Context successfully created.")
|
||||
|
||||
return self._app(env, start_response)
|
||||
|
||||
def _get_roles(self, env):
|
||||
"""Get the list of roles."""
|
||||
|
||||
if 'HTTP_X_ROLES' in env:
|
||||
roles = env.get('HTTP_X_ROLES', '')
|
||||
else:
|
||||
# Fallback to deprecated role header:
|
||||
roles = env.get('HTTP_X_ROLE', '')
|
||||
if roles:
|
||||
LOG.warning(
|
||||
'Sourcing roles from deprecated X-Role HTTP header')
|
||||
return [r.strip() for r in roles.split(',')]
|
|
@ -1,50 +0,0 @@
|
|||
# Copyright 2015 kornicameister@gmail.com
|
||||
# Copyright 2015 FUJITSU LIMITED
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
class ExceptionWrapper(Exception):
|
||||
"""Wrapper around exception with custom message.
|
||||
|
||||
ExceptionWrapper provides a way to keep old
|
||||
exceptions but providing to describe the context of error
|
||||
occurrence.
|
||||
|
||||
"""
|
||||
def __init__(self, message, caught=None):
|
||||
self._message = message
|
||||
self._caught = caught
|
||||
|
||||
@property
|
||||
def caught(self):
|
||||
return self._caught
|
||||
|
||||
@property
|
||||
def message(self):
|
||||
return self._message
|
||||
|
||||
def __str__(self):
|
||||
return '%s <message=%s, caught=%s>' % (
|
||||
self.__class__.__name__,
|
||||
repr(self.message),
|
||||
repr(self.caught)
|
||||
)
|
||||
|
||||
|
||||
class PublisherInitException(ExceptionWrapper):
|
||||
pass
|
||||
|
||||
|
||||
class MessageQueueException(ExceptionWrapper):
|
||||
pass
|
|
@ -1,220 +0,0 @@
|
|||
# Copyright 2015 kornicameister@gmail.com
|
||||
# Copyright 2015 FUJITSU LIMITED
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import time
|
||||
|
||||
from kafka import client
|
||||
from kafka import common
|
||||
from kafka import conn
|
||||
from kafka import producer
|
||||
from kafka import protocol
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
import simport
|
||||
|
||||
from monasca_log_api.publisher import exceptions
|
||||
from monasca_log_api.publisher import publisher
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
|
||||
ACK_MAP = {
|
||||
'None': producer.KeyedProducer.ACK_NOT_REQUIRED,
|
||||
'Local': producer.KeyedProducer.ACK_AFTER_LOCAL_WRITE,
|
||||
'Server': producer.KeyedProducer.ACK_AFTER_CLUSTER_COMMIT
|
||||
}
|
||||
|
||||
kafka_opts = [
|
||||
cfg.StrOpt('client_id',
|
||||
default=None,
|
||||
help='Client Id',
|
||||
required=True),
|
||||
cfg.IntOpt('timeout',
|
||||
default=conn.DEFAULT_SOCKET_TIMEOUT_SECONDS,
|
||||
help='Socket timeout'),
|
||||
cfg.StrOpt('host',
|
||||
default=None,
|
||||
help='List of hosts, comma delimited',
|
||||
required=True)
|
||||
]
|
||||
kafka_group = cfg.OptGroup(name='kafka', title='kafka')
|
||||
|
||||
kafka_producer_opts = [
|
||||
cfg.IntOpt('batch_send_every_n',
|
||||
default=None,
|
||||
help='Send every n items'),
|
||||
cfg.IntOpt('batch_send_every_t',
|
||||
default=None,
|
||||
help='Send every n seconds'),
|
||||
cfg.BoolOpt('async',
|
||||
default=False,
|
||||
help='Async communication'),
|
||||
cfg.StrOpt('reg_acks',
|
||||
default='None',
|
||||
help='Acknowledge options'),
|
||||
cfg.StrOpt('partitioner',
|
||||
default=None,
|
||||
help='Partitioner algorithm')
|
||||
]
|
||||
kafka_producer_group = cfg.OptGroup(name='kafka_producer',
|
||||
title='kafka_producer')
|
||||
|
||||
CONF.register_group(kafka_group)
|
||||
CONF.register_opts(kafka_opts, kafka_group)
|
||||
|
||||
CONF.register_group(kafka_producer_group)
|
||||
CONF.register_opts(kafka_producer_opts, kafka_producer_group)
|
||||
|
||||
|
||||
class KafkaPublisher(publisher.Publisher):
|
||||
def __init__(self, max_retry=3, wait_time=None):
|
||||
self._producer_conf = None
|
||||
self._client_conf = None
|
||||
|
||||
self._producer = None
|
||||
self._client = None
|
||||
|
||||
self._max_retry = max_retry
|
||||
self._wait_time = wait_time
|
||||
|
||||
LOG.info('Initializing KafkaPublisher <%s>' % self)
|
||||
|
||||
def _get_client_conf(self):
|
||||
if self._client_conf:
|
||||
return self._client_conf
|
||||
|
||||
client_conf = {
|
||||
'hosts': CONF.kafka.host,
|
||||
'client_id': CONF.kafka.client_id,
|
||||
'timeout': CONF.kafka.timeout
|
||||
}
|
||||
|
||||
self._client_conf = client_conf
|
||||
|
||||
return client_conf
|
||||
|
||||
def _get_producer_conf(self):
|
||||
if self._producer_conf:
|
||||
return self._producer_conf
|
||||
|
||||
batch_send_every_n = CONF.kafka_producer.batch_send_every_n
|
||||
batch_send_every_t = CONF.kafka_producer.batch_send_every_t
|
||||
partitioner = CONF.kafka_producer.partitioner
|
||||
|
||||
producer_conf = {
|
||||
'codec': protocol.CODEC_GZIP,
|
||||
'batch_send': batch_send_every_n or batch_send_every_t,
|
||||
'async': CONF.kafka_producer.async,
|
||||
'reg_acks': ACK_MAP[CONF.kafka_producer.reg_acks]
|
||||
}
|
||||
if batch_send_every_t:
|
||||
producer_conf.update(
|
||||
{'batch_send_every_t': batch_send_every_t})
|
||||
if batch_send_every_n:
|
||||
producer_conf.update(
|
||||
{'batch_send_every_n': batch_send_every_n})
|
||||
if partitioner:
|
||||
partitioner = simport.load(partitioner)
|
||||
producer_conf.update({'partitioner': partitioner})
|
||||
|
||||
self._producer_conf = producer_conf
|
||||
|
||||
return producer_conf
|
||||
|
||||
def _init_client(self):
|
||||
if self._client:
|
||||
self._client.close()
|
||||
self._producer = None
|
||||
self._client = None
|
||||
|
||||
client_opts = self._get_client_conf()
|
||||
|
||||
for i in range(self._max_retry):
|
||||
kafka_host = client_opts['hosts']
|
||||
try:
|
||||
self._client = client.KafkaClient(
|
||||
client_opts['hosts'],
|
||||
client_opts['client_id'],
|
||||
client_opts['timeout']
|
||||
)
|
||||
if self._wait_time:
|
||||
time.sleep(self._wait_time)
|
||||
break
|
||||
except common.KafkaUnavailableError as ex:
|
||||
LOG.error('Server is down at <host="%s">' % kafka_host)
|
||||
err = ex
|
||||
except common.LeaderNotAvailableError as ex:
|
||||
LOG.error('No leader at <host="%s">.' % kafka_host)
|
||||
err = ex
|
||||
except Exception as ex:
|
||||
LOG.error('Initialization failed at <host="%s">.' % kafka_host)
|
||||
err = ex
|
||||
|
||||
if err:
|
||||
raise err
|
||||
|
||||
def _init_producer(self):
|
||||
try:
|
||||
if not self._client:
|
||||
self._init_client()
|
||||
|
||||
producer_opts = self._get_producer_conf()
|
||||
producer_opts.update({'client': self._client})
|
||||
|
||||
self._producer = producer.KeyedProducer(*producer_opts)
|
||||
if self._wait_time:
|
||||
time.sleep(self._wait_time)
|
||||
except Exception as ex:
|
||||
self._producer = None
|
||||
LOG.exception(ex.message, exc_info=1)
|
||||
raise exceptions.PublisherInitException(
|
||||
message='KeyedProducer can not be created at <host="%s>"'
|
||||
% self._client_conf['host'],
|
||||
caught=ex)
|
||||
|
||||
def send_message(self, topic, key, message):
|
||||
if not message or not key or not topic:
|
||||
return
|
||||
try:
|
||||
if not self._producer:
|
||||
self._init_producer()
|
||||
|
||||
return self._producer.send(topic, key, message)
|
||||
except (common.KafkaUnavailableError,
|
||||
common.LeaderNotAvailableError) as ex:
|
||||
self._client = None
|
||||
LOG.error(ex.message, exc_info=1)
|
||||
raise exceptions.MessageQueueException(
|
||||
message='Failed to post message to kafka',
|
||||
caught=ex
|
||||
)
|
||||
except Exception as ex:
|
||||
LOG.error(ex.message, exc_info=1)
|
||||
raise exceptions.MessageQueueException(
|
||||
message='Unknown error while sending message to kafka',
|
||||
caught=ex
|
||||
)
|
||||
|
||||
# TODO(question) How to ensure that connection will be closed when program
|
||||
# stops ?
|
||||
def close(self):
|
||||
if self._client:
|
||||
self._producer = None
|
||||
self._client.close()
|
||||
|
||||
def __repr__(self):
|
||||
return 'KafkaPublisher <host=%s>' % (
|
||||
self._client_conf['hosts'] if self._client_conf else None
|
||||
)
|
|
@ -1,29 +0,0 @@
|
|||
# Copyright 2015 kornicameister@gmail.com
|
||||
# Copyright 2015 FUJITSU LIMITED
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import abc
|
||||
|
||||
import six
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class Publisher(object):
|
||||
@abc.abstractmethod
|
||||
def send_message(self, *args, **kwargs):
|
||||
return
|
||||
|
||||
@abc.abstractmethod
|
||||
def close(self):
|
||||
return
|
|
@ -61,7 +61,7 @@ def launch(conf, config_file='/etc/monasca/log-api-config.conf'):
|
|||
|
||||
def load_logs_resource(app):
|
||||
logs = simport.load(CONF.dispatcher.logs)()
|
||||
app.add_route('/v1.0/logs/single', logs)
|
||||
app.add_route('/v1.0/log/single', logs)
|
||||
|
||||
|
||||
def load_versions_resource(app):
|
||||
|
|
|
@ -1,254 +0,0 @@
|
|||
# Copyright 2015 kornicameister@gmail.com
|
||||
# Copyright 2015 FUJITSU LIMITED
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import unittest
|
||||
|
||||
from falcon import testing
|
||||
from kafka import common
|
||||
import mock
|
||||
|
||||
from monasca_log_api.publisher import exceptions
|
||||
from monasca_log_api.publisher import kafka_publisher as publisher
|
||||
from monasca_log_api.tests import base as base_test
|
||||
|
||||
TOPIC = 'test'
|
||||
|
||||
|
||||
class MockKafkaClient(object):
|
||||
pass
|
||||
|
||||
|
||||
class TestConfiguration(testing.TestBase):
|
||||
def setUp(self):
|
||||
self.conf = base_test.mock_config(self)
|
||||
self.instance = publisher.KafkaPublisher()
|
||||
super(TestConfiguration, self).setUp()
|
||||
|
||||
def test_should_not_have_client_conf_at_begin(self):
|
||||
self.assertIsNone(self.instance._client_conf)
|
||||
|
||||
def test_should_not_have_producer_conf_at_begin(self):
|
||||
self.assertIsNone(self.instance._producer_conf)
|
||||
|
||||
def test_should_init_producer_conf(self):
|
||||
self.instance._get_producer_conf()
|
||||
self.assertIsNotNone(self.instance._producer_conf)
|
||||
|
||||
def test_should_init_client_conf(self):
|
||||
self.instance._get_client_conf()
|
||||
self.assertIsNotNone(self.instance._client_conf)
|
||||
|
||||
|
||||
class TestInitialization(testing.TestBase):
|
||||
def setUp(self):
|
||||
self.conf = base_test.mock_config(self)
|
||||
super(TestInitialization, self).setUp()
|
||||
|
||||
def test_should_have_init_client_not_set(self):
|
||||
instance = publisher.KafkaPublisher()
|
||||
self.assertIsNone(instance._client)
|
||||
|
||||
def test_should_have_init_producer_not_set(self):
|
||||
instance = publisher.KafkaPublisher()
|
||||
self.assertIsNone(instance._producer)
|
||||
|
||||
@mock.patch('monasca_log_api.publisher.kafka_publisher.client.KafkaClient',
|
||||
side_effect=[common.KafkaUnavailableError])
|
||||
def test_client_should_fail_kafka_unavailable(self, kafka_client):
|
||||
instance = publisher.KafkaPublisher()
|
||||
self.assertRaises(
|
||||
common.KafkaUnavailableError,
|
||||
instance._init_client
|
||||
)
|
||||
|
||||
@mock.patch('monasca_log_api.publisher.kafka_publisher.client.KafkaClient',
|
||||
side_effect=[common.LeaderNotAvailableError])
|
||||
def test_client_should_fail_leader_unavailable(self, kafka_client):
|
||||
instance = publisher.KafkaPublisher()
|
||||
self.assertRaises(
|
||||
common.LeaderNotAvailableError,
|
||||
instance._init_client
|
||||
)
|
||||
|
||||
@mock.patch('monasca_log_api.publisher.kafka_publisher.client.KafkaClient',
|
||||
side_effect=[ValueError])
|
||||
def test_client_should_fail_other_error(self, kafka_client):
|
||||
instance = publisher.KafkaPublisher()
|
||||
self.assertRaises(
|
||||
ValueError,
|
||||
instance._init_client
|
||||
)
|
||||
|
||||
@mock.patch('monasca_log_api.publisher.kafka_publisher.client.KafkaClient',
|
||||
autospec=True)
|
||||
def test_client_should_initialize(self, kafka_client):
|
||||
client_id = 'mock_client'
|
||||
timeout = 3600
|
||||
hosts = 'localhost:666'
|
||||
|
||||
self.conf.config(
|
||||
client_id=client_id,
|
||||
timeout=timeout,
|
||||
host=hosts,
|
||||
group='kafka'
|
||||
)
|
||||
|
||||
instance = publisher.KafkaPublisher()
|
||||
instance._init_client()
|
||||
|
||||
self.assertIsNotNone(instance._client)
|
||||
kafka_client.assert_called_with(hosts, client_id, timeout)
|
||||
|
||||
@mock.patch('monasca_log_api.publisher.kafka_publisher'
|
||||
'.producer.KeyedProducer',
|
||||
side_effect=[ValueError])
|
||||
def test_producer_should_fail_any_error(self, producer):
|
||||
instance = publisher.KafkaPublisher()
|
||||
instance._client = MockKafkaClient()
|
||||
instance._client_conf = {
|
||||
'host': 'localhost'
|
||||
}
|
||||
|
||||
self.assertRaises(
|
||||
exceptions.PublisherInitException,
|
||||
instance._init_producer
|
||||
)
|
||||
|
||||
@mock.patch('monasca_log_api.publisher.kafka_publisher'
|
||||
'.producer.KeyedProducer',
|
||||
autospec=True)
|
||||
def test_producer_should_initialize(self, producer):
|
||||
instance = publisher.KafkaPublisher()
|
||||
client = MockKafkaClient()
|
||||
|
||||
instance._client = client
|
||||
instance._get_producer_conf = mock.Mock(return_value={})
|
||||
|
||||
instance._init_producer()
|
||||
|
||||
self.assertIsNotNone(instance._producer)
|
||||
self.assertTrue(producer.called)
|
||||
|
||||
|
||||
class TestLogic(unittest.TestCase):
|
||||
@mock.patch('monasca_log_api.publisher.kafka_publisher'
|
||||
'.producer.KeyedProducer',
|
||||
autospec=True)
|
||||
def test_should_not_call_producer_for_empty_key(self, producer):
|
||||
producer.send.return_value = None
|
||||
|
||||
instance = publisher.KafkaPublisher()
|
||||
instance._producer = producer
|
||||
|
||||
instance.send_message(TOPIC, None, 'msg')
|
||||
|
||||
self.assertFalse(producer.send.called)
|
||||
|
||||
@mock.patch('monasca_log_api.publisher.kafka_publisher'
|
||||
'.producer.KeyedProducer',
|
||||
autospec=True)
|
||||
def test_should_not_call_producer_for_empty_message(self, producer):
|
||||
producer.send.return_value = None
|
||||
|
||||
instance = publisher.KafkaPublisher()
|
||||
instance._producer = producer
|
||||
|
||||
instance.send_message(TOPIC, 'key', None)
|
||||
|
||||
self.assertFalse(producer.send.called)
|
||||
|
||||
@mock.patch('monasca_log_api.publisher.kafka_publisher'
|
||||
'.producer.KeyedProducer',
|
||||
autospec=True)
|
||||
def test_should_not_call_producer_for_empty_topic(self, producer):
|
||||
producer.send.return_value = None
|
||||
|
||||
instance = publisher.KafkaPublisher()
|
||||
instance._producer = producer
|
||||
|
||||
instance.send_message(None, 'key', 'msg')
|
||||
|
||||
self.assertFalse(producer.send.called)
|
||||
|
||||
@mock.patch('monasca_log_api.publisher.kafka_publisher'
|
||||
'.producer.KeyedProducer',
|
||||
autospec=True)
|
||||
def test_should_fail_kafka_not_available(self, producer):
|
||||
producer.send.side_effect = [common.KafkaUnavailableError]
|
||||
|
||||
instance = publisher.KafkaPublisher()
|
||||
instance._producer = producer
|
||||
instance._client = mock.Mock('client')
|
||||
|
||||
with self.assertRaises(exceptions.MessageQueueException) as context:
|
||||
instance.send_message('a', 'b', 'c')
|
||||
|
||||
self.assertEqual('Failed to post message to kafka',
|
||||
context.exception.message)
|
||||
self.assertIsInstance(context.exception.caught,
|
||||
common.KafkaUnavailableError)
|
||||
self.assertIsNone(instance._client)
|
||||
|
||||
@mock.patch('monasca_log_api.publisher.kafka_publisher'
|
||||
'.producer.KeyedProducer',
|
||||
autospec=True)
|
||||
def test_should_fail_leader_not_available(self, producer):
|
||||
producer.send.side_effect = [common.LeaderNotAvailableError]
|
||||
|
||||
instance = publisher.KafkaPublisher()
|
||||
instance._producer = producer
|
||||
instance._client = mock.Mock('client')
|
||||
|
||||
with self.assertRaises(exceptions.MessageQueueException) as context:
|
||||
instance.send_message('a', 'b', 'c')
|
||||
|
||||
self.assertEqual('Failed to post message to kafka',
|
||||
context.exception.message)
|
||||
self.assertIsInstance(context.exception.caught,
|
||||
common.LeaderNotAvailableError)
|
||||
self.assertIsNone(instance._client)
|
||||
|
||||
@mock.patch('monasca_log_api.publisher.kafka_publisher'
|
||||
'.producer.KeyedProducer',
|
||||
autospec=True)
|
||||
def test_should_fail_any_error(self, producer):
|
||||
producer.send.side_effect = [Exception]
|
||||
|
||||
instance = publisher.KafkaPublisher()
|
||||
instance._producer = producer
|
||||
|
||||
with self.assertRaises(exceptions.MessageQueueException) as context:
|
||||
instance.send_message('a', 'b', 'c')
|
||||
|
||||
self.assertEqual('Unknown error while sending message to kafka',
|
||||
context.exception.message)
|
||||
self.assertIsInstance(context.exception.caught,
|
||||
Exception)
|
||||
|
||||
@mock.patch('monasca_log_api.publisher.kafka_publisher'
|
||||
'.producer.KeyedProducer',
|
||||
autospec=True)
|
||||
def test_should_send_message(self, producer):
|
||||
producer.send.return_value = None
|
||||
|
||||
instance = publisher.KafkaPublisher()
|
||||
instance._producer = producer
|
||||
|
||||
msg = 'msg'
|
||||
key = 'key'
|
||||
|
||||
instance.send_message(TOPIC, key, msg)
|
||||
|
||||
producer.send.assert_called_once_with(TOPIC, key, msg)
|
|
@ -13,16 +13,19 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import datetime
|
||||
import random
|
||||
import unittest
|
||||
|
||||
from falcon import testing
|
||||
import mock
|
||||
import simplejson
|
||||
|
||||
from monasca_log_api.publisher import exceptions
|
||||
from monasca_log_api.tests import base
|
||||
from monasca_log_api.v1.common import log_publisher
|
||||
from monasca_log_api.v1.common import service
|
||||
|
||||
|
||||
EPOCH_START = datetime.datetime(1970, 1, 1)
|
||||
|
||||
|
||||
class TestBuildKey(unittest.TestCase):
|
||||
|
@ -60,13 +63,16 @@ class TestBuildKey(unittest.TestCase):
|
|||
# application_type and single dimension
|
||||
tenant_id = 'monasca'
|
||||
application_type = 'monasca-log-api'
|
||||
dimension = service.Dimension('cpu_time', 50)
|
||||
dimension_name = 'cpu_time'
|
||||
dimension_value = '50'
|
||||
log_object = {
|
||||
'application_type': application_type,
|
||||
'dimensions': [dimension]
|
||||
'dimensions': {
|
||||
dimension_name: dimension_value
|
||||
}
|
||||
}
|
||||
expected_key = tenant_id + application_type + dimension.name + str(
|
||||
dimension.value)
|
||||
expected_key = tenant_id + application_type + dimension_name + str(
|
||||
dimension_value)
|
||||
|
||||
self.assertEqual(expected_key,
|
||||
log_publisher.LogPublisher._build_key(tenant_id,
|
||||
|
@ -77,15 +83,20 @@ class TestBuildKey(unittest.TestCase):
|
|||
# application_type and two dimensions dimensions given unsorted
|
||||
tenant_id = 'monasca'
|
||||
application_type = 'monasca-log-api'
|
||||
dimension_1 = service.Dimension('disk_usage', 50)
|
||||
dimension_2 = service.Dimension('cpu_time', 50)
|
||||
dimension_1_name = 'disk_usage'
|
||||
dimension_1_value = '50'
|
||||
dimension_2_name = 'cpu_time'
|
||||
dimension_2_value = '60'
|
||||
log_object = {
|
||||
'application_type': application_type,
|
||||
'dimensions': [dimension_1, dimension_2]
|
||||
'dimensions': {
|
||||
dimension_1_name: dimension_1_value,
|
||||
dimension_2_name: dimension_2_value
|
||||
}
|
||||
}
|
||||
expected_key = ''.join([tenant_id, application_type, dimension_2.name,
|
||||
str(dimension_2.value), dimension_1.name,
|
||||
str(dimension_1.value)])
|
||||
expected_key = ''.join([tenant_id, application_type,
|
||||
dimension_2_name, dimension_2_value,
|
||||
dimension_1_name, dimension_1_value])
|
||||
|
||||
self.assertEqual(expected_key,
|
||||
log_publisher.LogPublisher._build_key(tenant_id,
|
||||
|
@ -97,44 +108,92 @@ class TestSendMessage(testing.TestBase):
|
|||
self.conf = base.mock_config(self)
|
||||
return super(TestSendMessage, self).setUp()
|
||||
|
||||
def test_should_not_send_empty_message(self):
|
||||
@mock.patch('monasca_log_api.v1.common.log_publisher.producer'
|
||||
'.KafkaProducer')
|
||||
def test_should_not_send_empty_message(self, _):
|
||||
instance = log_publisher.LogPublisher()
|
||||
instance._kafka_publisher.send_message = mock.Mock()
|
||||
|
||||
instance._kafka_publisher = mock.Mock()
|
||||
instance.send_message({})
|
||||
|
||||
self.assertFalse(instance._kafka_publisher.send_message.called)
|
||||
self.assertFalse(instance._kafka_publisher.publish.called)
|
||||
|
||||
def test_should_raise_exception(self):
|
||||
@unittest.expectedFailure
|
||||
def test_should_not_send_message_not_dict(self):
|
||||
instance = log_publisher.LogPublisher()
|
||||
instance._kafka_publisher.send_message = mock.Mock(
|
||||
side_effect=[exceptions.MessageQueueException(1, 1)]
|
||||
)
|
||||
not_dict_value = 123
|
||||
instance.send_message(not_dict_value)
|
||||
|
||||
msg = {
|
||||
def test_should_not_send_message_missing_keys(self):
|
||||
# checks every combination of missing keys
|
||||
# test does not rely on those keys having a value or not,
|
||||
# it simply assumes that values are set but important
|
||||
# message (i.e. envelope) properties are missing entirely
|
||||
# that's why there are two loops instead of three
|
||||
|
||||
instance = log_publisher.LogPublisher()
|
||||
keys = ['log', 'creation_time', 'meta']
|
||||
|
||||
for key_1 in keys:
|
||||
diff = keys[:]
|
||||
diff.remove(key_1)
|
||||
for key_2 in diff:
|
||||
message = {
|
||||
key_1: random.randint(10, 20),
|
||||
key_2: random.randint(30, 50)
|
||||
}
|
||||
self.assertRaises(log_publisher.InvalidMessageException,
|
||||
instance.send_message,
|
||||
message)
|
||||
|
||||
def test_should_not_send_message_missing_values(self):
|
||||
# original message assumes that every property has value
|
||||
# test modify each property one by one by removing that value
|
||||
# (i.e. creating false-like value)
|
||||
instance = log_publisher.LogPublisher()
|
||||
message = {
|
||||
'log': {
|
||||
'message': 1
|
||||
'message': '11'
|
||||
},
|
||||
'creation_time': 123456,
|
||||
'meta': {
|
||||
'tenantId': 1
|
||||
'region': 'pl'
|
||||
}
|
||||
}
|
||||
self.assertRaises(exceptions.MessageQueueException,
|
||||
instance.send_message, msg)
|
||||
|
||||
def test_should_send_message(self):
|
||||
for key in message:
|
||||
tmp_message = message
|
||||
tmp_message[key] = None
|
||||
self.assertRaises(log_publisher.InvalidMessageException,
|
||||
instance.send_message,
|
||||
tmp_message)
|
||||
|
||||
@mock.patch('monasca_log_api.v1.common.log_publisher.producer'
|
||||
'.KafkaProducer')
|
||||
def test_should_send_message(self, _):
|
||||
instance = log_publisher.LogPublisher()
|
||||
instance._kafka_publisher.send_message = mock.Mock(name='send_message',
|
||||
return_value={})
|
||||
instance._kafka_publisher = mock.Mock()
|
||||
instance.send_message({})
|
||||
instance._build_key = mock.Mock(name='_build_key',
|
||||
return_value='some_key')
|
||||
|
||||
creation_time = ((datetime.datetime.utcnow() - EPOCH_START)
|
||||
.total_seconds())
|
||||
application_type = 'monasca-log-api'
|
||||
dimension_1_name = 'disk_usage'
|
||||
dimension_1_value = '50'
|
||||
dimension_2_name = 'cpu_time'
|
||||
dimension_2_value = '60'
|
||||
msg = {
|
||||
'log': {
|
||||
'message': 1,
|
||||
'application_type': 'monasca_log_api',
|
||||
'dimensions': [service.Dimension('disk_usage', 50),
|
||||
service.Dimension('cpu_time', 50)]
|
||||
'application_type': application_type,
|
||||
'dimensions': {
|
||||
dimension_1_name: dimension_1_value,
|
||||
dimension_2_name: dimension_2_value
|
||||
}
|
||||
},
|
||||
'creation_time': creation_time,
|
||||
'meta': {
|
||||
'tenantId': 1
|
||||
}
|
||||
|
@ -142,27 +201,40 @@ class TestSendMessage(testing.TestBase):
|
|||
|
||||
instance.send_message(msg)
|
||||
|
||||
instance._kafka_publisher.send_message.assert_called_once_with(
|
||||
instance._kafka_publisher.publish.assert_called_once_with(
|
||||
self.conf.conf.log_publisher.topics[0],
|
||||
'some_key',
|
||||
# 'some_key', # TODO(feature) next version of monasca-common
|
||||
simplejson.dumps(msg))
|
||||
|
||||
def test_should_send_message_multiple_topics(self):
|
||||
@mock.patch('monasca_log_api.v1.common.log_publisher.producer'
|
||||
'.KafkaProducer')
|
||||
def test_should_send_message_multiple_topics(self, _):
|
||||
topics = ['logs', 'analyzer', 'tester']
|
||||
self.conf.config(topics=topics, group='log_publisher')
|
||||
|
||||
instance = log_publisher.LogPublisher()
|
||||
instance._kafka_publisher.send_message = mock.Mock(name='send_message',
|
||||
return_value={})
|
||||
instance._kafka_publisher = mock.Mock()
|
||||
instance.send_message({})
|
||||
instance._build_key = mock.Mock(name='_build_key',
|
||||
return_value='some_key')
|
||||
|
||||
creation_time = ((datetime.datetime.utcnow() - EPOCH_START)
|
||||
.total_seconds())
|
||||
dimension_1_name = 'disk_usage'
|
||||
dimension_1_value = '50'
|
||||
dimension_2_name = 'cpu_time'
|
||||
dimension_2_value = '60'
|
||||
application_type = 'monasca-log-api'
|
||||
msg = {
|
||||
'log': {
|
||||
'message': 1,
|
||||
'application_type': 'monasca_log_api',
|
||||
'dimensions': [service.Dimension('disk_usage', 50),
|
||||
service.Dimension('cpu_time', 50)]
|
||||
'application_type': application_type,
|
||||
'dimensions': {
|
||||
dimension_1_name: dimension_1_value,
|
||||
dimension_2_name: dimension_2_value
|
||||
}
|
||||
},
|
||||
'creation_time': creation_time,
|
||||
'meta': {
|
||||
'tenantId': 1
|
||||
}
|
||||
|
@ -172,9 +244,9 @@ class TestSendMessage(testing.TestBase):
|
|||
instance.send_message(msg)
|
||||
|
||||
self.assertEqual(len(topics),
|
||||
instance._kafka_publisher.send_message.call_count)
|
||||
instance._kafka_publisher.publish.call_count)
|
||||
for topic in topics:
|
||||
instance._kafka_publisher.send_message.assert_any_call(
|
||||
instance._kafka_publisher.publish.assert_any_call(
|
||||
topic,
|
||||
'some_key',
|
||||
# 'some_key', # TODO(feature) next version of monasca-common
|
||||
json_msg)
|
||||
|
|
|
@ -29,13 +29,13 @@ class TestLogs(testing.TestBase):
|
|||
self.conf = base.mock_config(self)
|
||||
self.logs_resource = logs.Logs()
|
||||
self.api.add_route(
|
||||
'/logs/single',
|
||||
'/log/single',
|
||||
self.logs_resource
|
||||
)
|
||||
|
||||
def test_should_fail_not_delegate_ok_cross_tenant_id(self):
|
||||
self.simulate_request(
|
||||
'/logs/single',
|
||||
'/log/single',
|
||||
method='POST',
|
||||
query_string='tenant_id=1',
|
||||
headers={
|
||||
|
@ -45,19 +45,15 @@ class TestLogs(testing.TestBase):
|
|||
self.assertEqual(falcon.HTTP_403, self.srmock.status)
|
||||
|
||||
@mock.patch('monasca_log_api.v1.common.service.LogCreator')
|
||||
@mock.patch('monasca_log_api.publisher.kafka_publisher.KafkaPublisher')
|
||||
@mock.patch('monasca_log_api.v1.common.log_publisher.LogPublisher')
|
||||
def test_should_pass_empty_cross_tenant_id_wrong_role(self,
|
||||
log_creator,
|
||||
kafka_publisher):
|
||||
log_creator.configure_mock(**{'new_log.return_value': None,
|
||||
'new_log_envelope.return_value': None})
|
||||
kafka_publisher.configure_mock(**{'send_message.return_value': None})
|
||||
|
||||
self.logs_resource._log_creator = log_creator
|
||||
self.logs_resource._kafka_publisher = kafka_publisher
|
||||
|
||||
self.simulate_request(
|
||||
'/logs/single',
|
||||
'/log/single',
|
||||
method='POST',
|
||||
headers={
|
||||
headers.X_ROLES.name: 'some_role',
|
||||
|
@ -72,19 +68,15 @@ class TestLogs(testing.TestBase):
|
|||
self.assertEqual(1, log_creator.new_log_envelope.call_count)
|
||||
|
||||
@mock.patch('monasca_log_api.v1.common.service.LogCreator')
|
||||
@mock.patch('monasca_log_api.publisher.kafka_publisher.KafkaPublisher')
|
||||
@mock.patch('monasca_log_api.v1.common.log_publisher.LogPublisher')
|
||||
def test_should_pass_empty_cross_tenant_id_ok_role(self,
|
||||
log_creator,
|
||||
kafka_publisher):
|
||||
log_creator.configure_mock(**{'new_log.return_value': None,
|
||||
'new_log_envelope.return_value': None})
|
||||
kafka_publisher.configure_mock(**{'send_message.return_value': None})
|
||||
|
||||
self.logs_resource._log_creator = log_creator
|
||||
self.logs_resource._kafka_publisher = kafka_publisher
|
||||
|
||||
self.simulate_request(
|
||||
'/logs/single',
|
||||
'/log/single',
|
||||
method='POST',
|
||||
headers={
|
||||
headers.X_ROLES.name: logs_api.MONITORING_DELEGATE_ROLE,
|
||||
|
@ -99,19 +91,15 @@ class TestLogs(testing.TestBase):
|
|||
self.assertEqual(1, log_creator.new_log_envelope.call_count)
|
||||
|
||||
@mock.patch('monasca_log_api.v1.common.service.LogCreator')
|
||||
@mock.patch('monasca_log_api.publisher.kafka_publisher.KafkaPublisher')
|
||||
@mock.patch('monasca_log_api.v1.common.log_publisher.LogPublisher')
|
||||
def test_should_pass_delegate_cross_tenant_id_ok_role(self,
|
||||
log_creator,
|
||||
kafka_publisher):
|
||||
log_creator.configure_mock(**{'new_log.return_value': None,
|
||||
'new_log_envelope.return_value': None})
|
||||
kafka_publisher.configure_mock(**{'send_message.return_value': None})
|
||||
|
||||
log_publisher):
|
||||
self.logs_resource._log_creator = log_creator
|
||||
self.logs_resource._kafka_publisher = kafka_publisher
|
||||
self.logs_resource._kafka_publisher = log_publisher
|
||||
|
||||
self.simulate_request(
|
||||
'/logs/single',
|
||||
'/log/single',
|
||||
method='POST',
|
||||
query_string='tenant_id=1',
|
||||
headers={
|
||||
|
@ -122,28 +110,28 @@ class TestLogs(testing.TestBase):
|
|||
)
|
||||
self.assertEqual(falcon.HTTP_204, self.srmock.status)
|
||||
|
||||
self.assertEqual(1, kafka_publisher.send_message.call_count)
|
||||
self.assertEqual(1, log_publisher.send_message.call_count)
|
||||
self.assertEqual(1, log_creator.new_log.call_count)
|
||||
self.assertEqual(1, log_creator.new_log_envelope.call_count)
|
||||
|
||||
def test_should_fail_empty_dimensions_delegate(self):
|
||||
with mock.patch.object(self.logs_resource._log_creator,
|
||||
'_read_payload',
|
||||
return_value=True):
|
||||
self.simulate_request(
|
||||
'/logs/single',
|
||||
method='POST',
|
||||
headers={
|
||||
headers.X_ROLES.name: logs_api.MONITORING_DELEGATE_ROLE,
|
||||
headers.X_DIMENSIONS.name: '',
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
)
|
||||
@mock.patch('monasca_log_api.v1.common.service.rest_utils')
|
||||
def test_should_fail_empty_dimensions_delegate(self, rest_utils):
|
||||
rest_utils.read_body.return_value = True
|
||||
|
||||
self.simulate_request(
|
||||
'/log/single',
|
||||
method='POST',
|
||||
headers={
|
||||
headers.X_ROLES.name: logs_api.MONITORING_DELEGATE_ROLE,
|
||||
headers.X_DIMENSIONS.name: '',
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
)
|
||||
self.assertEqual(log_api_exceptions.HTTP_422, self.srmock.status)
|
||||
|
||||
def test_should_fail_for_invalid_content_type(self):
|
||||
self.simulate_request(
|
||||
'/logs/single',
|
||||
'/log/single',
|
||||
method='POST',
|
||||
headers={
|
||||
headers.X_ROLES.name: logs_api.MONITORING_DELEGATE_ROLE,
|
||||
|
|
|
@ -0,0 +1,53 @@
|
|||
# Copyright 2015 kornicameister@gmail.com
|
||||
# Copyright 2015 FUJITSU LIMITED
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import unittest
|
||||
|
||||
import falcon
|
||||
import mock
|
||||
import simplejson
|
||||
|
||||
from monasca_log_api.api import rest_utils as ru
|
||||
|
||||
|
||||
class RestUtilsTest(unittest.TestCase):
|
||||
|
||||
@mock.patch('io.IOBase')
|
||||
def test_should_read_text_for_plain_text(self, payload):
|
||||
raw_msg = 'Hello World'
|
||||
msg = u''.join(raw_msg)
|
||||
|
||||
payload.read.return_value = raw_msg
|
||||
|
||||
self.assertEqual(msg, ru.read_body(payload, 'text/plain'))
|
||||
|
||||
@mock.patch('io.IOBase')
|
||||
def test_should_read_json_for_application_json(self, payload):
|
||||
raw_msg = u'{"path":"/var/log/messages","message":"This is message"}'
|
||||
json_msg = simplejson.loads(raw_msg, encoding='utf-8')
|
||||
|
||||
payload.read.return_value = raw_msg
|
||||
|
||||
self.assertEqual(json_msg, ru.read_body(payload, 'application/json'))
|
||||
|
||||
@mock.patch('io.IOBase')
|
||||
def test_should_fail_read_text_for_application_json(self, payload):
|
||||
with self.assertRaises(falcon.HTTPBadRequest) as context:
|
||||
raw_msg = 'Hello World'
|
||||
payload.read.return_value = raw_msg
|
||||
ru.read_body(payload, 'application/json')
|
||||
|
||||
self.assertEqual(context.exception.title,
|
||||
'Failed to read body as json')
|
|
@ -16,10 +16,8 @@
|
|||
import datetime
|
||||
import unittest
|
||||
|
||||
import falcon
|
||||
from falcon import testing
|
||||
import mock
|
||||
import simplejson
|
||||
|
||||
from monasca_log_api.api import exceptions
|
||||
from monasca_log_api.api import logs_api
|
||||
|
@ -64,9 +62,12 @@ class ParseDimensions(unittest.TestCase):
|
|||
|
||||
def test_should_pass_for_valid_dimensions(self):
|
||||
dimensions = 'a:1,b:2'
|
||||
expected = [('a', '1'), ('b', '2')]
|
||||
expected = {
|
||||
'a': '1',
|
||||
'b': '2'
|
||||
}
|
||||
|
||||
self.assertListEqual(expected,
|
||||
self.assertDictEqual(expected,
|
||||
common_service.parse_dimensions(dimensions))
|
||||
|
||||
|
||||
|
@ -138,10 +139,10 @@ class DimensionsValidations(unittest.TestCase):
|
|||
common_service.Validations.validate_dimensions(1)
|
||||
|
||||
def test_should_pass_for_empty_dimensions_array(self):
|
||||
common_service.Validations.validate_dimensions([])
|
||||
common_service.Validations.validate_dimensions({})
|
||||
|
||||
def test_should_fail_too_empty_name(self):
|
||||
dimensions = [('', 1)]
|
||||
dimensions = {'': 1}
|
||||
with self.assertRaises(exceptions.HTTPUnprocessableEntity) as context:
|
||||
common_service.Validations.validate_dimensions(dimensions)
|
||||
|
||||
|
@ -150,7 +151,7 @@ class DimensionsValidations(unittest.TestCase):
|
|||
|
||||
def test_should_fail_too_long_name(self):
|
||||
name = testing.rand_string(256, 260)
|
||||
dimensions = [(name, 1)]
|
||||
dimensions = {name: 1}
|
||||
with self.assertRaises(exceptions.HTTPUnprocessableEntity) as context:
|
||||
common_service.Validations.validate_dimensions(dimensions)
|
||||
|
||||
|
@ -159,7 +160,7 @@ class DimensionsValidations(unittest.TestCase):
|
|||
|
||||
def test_should_fail_underscore_at_begin(self):
|
||||
name = '_aDim'
|
||||
dimensions = [(name, 1)]
|
||||
dimensions = {name: 1}
|
||||
with self.assertRaises(exceptions.HTTPUnprocessableEntity) as context:
|
||||
common_service.Validations.validate_dimensions(dimensions)
|
||||
|
||||
|
@ -168,7 +169,7 @@ class DimensionsValidations(unittest.TestCase):
|
|||
|
||||
def test_should_fail_invalid_chars(self):
|
||||
name = '<>'
|
||||
dimensions = [(name, 1)]
|
||||
dimensions = {name: 1}
|
||||
with self.assertRaises(exceptions.HTTPUnprocessableEntity) as context:
|
||||
common_service.Validations.validate_dimensions(dimensions)
|
||||
|
||||
|
@ -178,7 +179,7 @@ class DimensionsValidations(unittest.TestCase):
|
|||
|
||||
def test_should_fail_ok_name_empty_value(self):
|
||||
name = 'monasca'
|
||||
dimensions = [(name, '')]
|
||||
dimensions = {name: ''}
|
||||
with self.assertRaises(exceptions.HTTPUnprocessableEntity) as context:
|
||||
common_service.Validations.validate_dimensions(dimensions)
|
||||
|
||||
|
@ -188,7 +189,7 @@ class DimensionsValidations(unittest.TestCase):
|
|||
def test_should_fail_ok_name_too_long_value(self):
|
||||
name = 'monasca'
|
||||
value = testing.rand_string(256, 300)
|
||||
dimensions = [(name, value)]
|
||||
dimensions = {name: value}
|
||||
with self.assertRaises(exceptions.HTTPUnprocessableEntity) as context:
|
||||
common_service.Validations.validate_dimensions(dimensions)
|
||||
|
||||
|
@ -198,55 +199,17 @@ class DimensionsValidations(unittest.TestCase):
|
|||
def test_should_pass_ok_name_ok_value_empty_service(self):
|
||||
name = 'monasca'
|
||||
value = '1'
|
||||
dimensions = [(name, value)]
|
||||
dimensions = {name: value}
|
||||
common_service.Validations.validate_dimensions(dimensions)
|
||||
|
||||
def test_should_pass_ok_name_ok_value_service_SERVICE_DIMENSIONS_as_name(
|
||||
self):
|
||||
name = 'some_name'
|
||||
value = '1'
|
||||
dimensions = [(name, value)]
|
||||
dimensions = {name: value}
|
||||
common_service.Validations.validate_dimensions(dimensions)
|
||||
|
||||
|
||||
class LogsCreatorPayload(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.instance = common_service.LogCreator()
|
||||
|
||||
@mock.patch('io.IOBase')
|
||||
def test_should_read_text_for_plain_text(self, payload):
|
||||
msg = u'Hello World'
|
||||
payload.configure_mock(
|
||||
**{'readable.return_value': True, 'read.return_value': msg})
|
||||
|
||||
self.assertEqual(msg,
|
||||
self.instance._read_payload(payload, 'text/plain'))
|
||||
|
||||
@mock.patch('io.IOBase')
|
||||
def test_should_read_json_for_application_json(self, payload):
|
||||
msg = u'{"path":"/var/log/messages","message":"This is message"}'
|
||||
payload.configure_mock(
|
||||
**{'readable.return_value': True, 'read.return_value': msg})
|
||||
|
||||
json_msg = simplejson.loads(msg, encoding='utf-8')
|
||||
|
||||
self.assertEqual(json_msg,
|
||||
self.instance._read_payload(payload,
|
||||
'application/json'))
|
||||
|
||||
@mock.patch('io.IOBase')
|
||||
def test_should_fail_read_text_for_application_json(self, payload):
|
||||
with self.assertRaises(falcon.HTTPBadRequest) as context:
|
||||
msg = u'Hello World'
|
||||
payload.configure_mock(
|
||||
**{'readable.return_value': True, 'read.return_value': msg})
|
||||
self.instance._read_payload(payload,
|
||||
'application/json')
|
||||
|
||||
self.assertEqual(context.exception.title,
|
||||
'Failed to read body as json')
|
||||
|
||||
|
||||
class LogsCreatorNewLog(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.instance = common_service.LogCreator()
|
||||
|
@ -258,13 +221,14 @@ class LogsCreatorNewLog(unittest.TestCase):
|
|||
json_msg = u'{"path":"%s","message":"%s"}' % (path, msg)
|
||||
app_type = 'monasca'
|
||||
dimensions = 'cpu_time:30'
|
||||
payload.configure_mock(
|
||||
**{'readable.return_value': True, 'read.return_value': json_msg})
|
||||
payload.read.return_value = json_msg
|
||||
|
||||
expected_log = {
|
||||
'message': msg,
|
||||
'application_type': app_type,
|
||||
'dimensions': [('cpu_time', '30')],
|
||||
'dimensions': {
|
||||
'cpu_time': '30'
|
||||
},
|
||||
'path': path
|
||||
}
|
||||
|
||||
|
@ -278,14 +242,17 @@ class LogsCreatorNewLog(unittest.TestCase):
|
|||
def test_should_create_log_from_text(self, payload):
|
||||
msg = u'Hello World'
|
||||
app_type = 'monasca'
|
||||
dimensions = 'cpu_time:30'
|
||||
payload.configure_mock(
|
||||
**{'readable.return_value': True, 'read.return_value': msg})
|
||||
dimension_name = 'cpu_time'
|
||||
dimension_value = 30
|
||||
dimensions = '%s:%s' % (dimension_name, str(dimension_value))
|
||||
payload.read.return_value = msg
|
||||
|
||||
expected_log = {
|
||||
'message': msg,
|
||||
'application_type': app_type,
|
||||
'dimensions': [('cpu_time', '30')]
|
||||
'dimensions': {
|
||||
dimension_name: str(dimension_value)
|
||||
}
|
||||
}
|
||||
|
||||
self.assertEqual(expected_log, self.instance.new_log(
|
||||
|
@ -304,10 +271,14 @@ class LogCreatorNewEnvelope(unittest.TestCase):
|
|||
msg = u'Hello World'
|
||||
path = u'/var/log/messages'
|
||||
app_type = 'monasca'
|
||||
dimension_name = 'cpu_time'
|
||||
dimension_value = 30
|
||||
expected_log = {
|
||||
'message': msg,
|
||||
'application_type': app_type,
|
||||
'dimensions': [('cpu_time', '30')],
|
||||
'dimensions': {
|
||||
dimension_name: str(dimension_value)
|
||||
},
|
||||
'path': path
|
||||
}
|
||||
tenant_id = 'a_tenant'
|
||||
|
@ -330,3 +301,38 @@ class LogCreatorNewEnvelope(unittest.TestCase):
|
|||
actual_envelope.get('log'))
|
||||
self.assertEqual(expected_envelope.get('meta'),
|
||||
actual_envelope.get('meta'))
|
||||
self.assertDictEqual(
|
||||
expected_envelope.get('log').get('dimensions'),
|
||||
actual_envelope.get('log').get('dimensions'))
|
||||
|
||||
@unittest.expectedFailure
|
||||
def test_should_not_create_log_none(self):
|
||||
log_object = None
|
||||
tenant_id = 'a_tenant'
|
||||
|
||||
self.instance.new_log_envelope(log_object, tenant_id)
|
||||
|
||||
@unittest.expectedFailure
|
||||
def test_should_not_create_log_empty(self):
|
||||
log_object = {}
|
||||
tenant_id = 'a_tenant'
|
||||
|
||||
self.instance.new_log_envelope(log_object, tenant_id)
|
||||
|
||||
@unittest.expectedFailure
|
||||
def test_should_not_create_tenant_none(self):
|
||||
log_object = {
|
||||
'message': ''
|
||||
}
|
||||
tenant_id = None
|
||||
|
||||
self.instance.new_log_envelope(log_object, tenant_id)
|
||||
|
||||
@unittest.expectedFailure
|
||||
def test_should_not_create_tenant_empty(self):
|
||||
log_object = {
|
||||
'message': ''
|
||||
}
|
||||
tenant_id = ''
|
||||
|
||||
self.instance.new_log_envelope(log_object, tenant_id)
|
||||
|
|
|
@ -13,30 +13,39 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from monasca_common.kafka import producer
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
import simplejson
|
||||
|
||||
from monasca_log_api.publisher import kafka_publisher
|
||||
import simplejson as json
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
|
||||
log_publisher_opts = [
|
||||
cfg.StrOpt('kafka_url',
|
||||
required=True,
|
||||
help='Url to kafka server'),
|
||||
cfg.MultiStrOpt('topics',
|
||||
default=['logs'],
|
||||
help='Target topic in kafka')
|
||||
help='Consumer topics')
|
||||
]
|
||||
log_publisher_group = cfg.OptGroup(name='log_publisher', title='log_publisher')
|
||||
|
||||
cfg.CONF.register_group(log_publisher_group)
|
||||
cfg.CONF.register_opts(log_publisher_opts, log_publisher_group)
|
||||
|
||||
ENVELOPE_SCHEMA = ['log', 'meta', 'creation_time']
|
||||
"""Log envelope (i.e.) message keys"""
|
||||
|
||||
|
||||
class InvalidMessageException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class LogPublisher(object):
|
||||
def __init__(self):
|
||||
self._topics = CONF.log_publisher.topics
|
||||
self._kafka_publisher = kafka_publisher.KafkaPublisher()
|
||||
self._kafka_publisher = None
|
||||
LOG.info('Initializing LogPublisher <%s>', self)
|
||||
|
||||
@staticmethod
|
||||
|
@ -51,16 +60,6 @@ class LogPublisher(object):
|
|||
:param obj: log instance
|
||||
:return: key
|
||||
"""
|
||||
def comparator(a, b):
|
||||
"""Comparator for dimensions
|
||||
|
||||
:param a: dimension_a, tuple with properties name,value
|
||||
:param b: dimension_b, tuple with properties name,value
|
||||
:return: sorting result
|
||||
"""
|
||||
if a.name == b.name:
|
||||
return (a.value > b.value) - (a.value < b.value)
|
||||
return (a.name > b.name) - (a.name < b.name)
|
||||
|
||||
str_list = []
|
||||
|
||||
|
@ -72,29 +71,68 @@ class LogPublisher(object):
|
|||
str_list += obj['application_type']
|
||||
|
||||
if 'dimensions' in obj and obj['dimensions']:
|
||||
dimensions = sorted(obj['dimensions'], cmp=comparator)
|
||||
for name, value in dimensions:
|
||||
dims = obj['dimensions']
|
||||
sorted_dims = sorted(dims)
|
||||
for name in sorted_dims:
|
||||
str_list += name
|
||||
str_list += str(value)
|
||||
str_list += str(dims[name])
|
||||
|
||||
return ''.join(str_list)
|
||||
|
||||
@staticmethod
|
||||
def _is_message_valid(message):
|
||||
"""Validates message before sending.
|
||||
|
||||
Methods checks if message is :py:class:`dict`.
|
||||
If so dictionary is verified against having following keys:
|
||||
|
||||
* meta
|
||||
* log
|
||||
* creation_time
|
||||
|
||||
If keys are found, each key must have a value.
|
||||
|
||||
If at least none of the conditions is met
|
||||
:py:class:`.InvalidMessageException` is raised
|
||||
|
||||
:raises InvalidMessageException: if message does not comply to schema
|
||||
|
||||
"""
|
||||
if not isinstance(message, dict):
|
||||
return False
|
||||
|
||||
for key in ENVELOPE_SCHEMA:
|
||||
if not (key in message and message.get(key)):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def _publisher(self):
|
||||
if not self._kafka_publisher:
|
||||
self._kafka_publisher = producer.KafkaProducer(
|
||||
url=CONF.log_publisher.kafka_url
|
||||
)
|
||||
return self._kafka_publisher
|
||||
|
||||
def send_message(self, message):
|
||||
if not message:
|
||||
return
|
||||
if not self._is_message_valid(message):
|
||||
raise InvalidMessageException()
|
||||
|
||||
key = self._build_key(message['meta']['tenantId'], message['log'])
|
||||
msg = simplejson.dumps(message,
|
||||
sort_keys=False,
|
||||
ensure_ascii=False).encode('utf8')
|
||||
msg = json.dumps(message,
|
||||
sort_keys=False,
|
||||
ensure_ascii=False).encode('utf8')
|
||||
|
||||
# TODO(feature) next version of monasca-common
|
||||
LOG.debug('Build key [%s] for message' % key)
|
||||
LOG.debug('Sending message {topics=%s,key=%s,message=%s}' %
|
||||
(self._topics, key, msg))
|
||||
|
||||
try:
|
||||
for topic in self._topics:
|
||||
self._kafka_publisher.send_message(topic, key, msg)
|
||||
self._publisher().publish(topic, msg)
|
||||
except Exception as ex:
|
||||
LOG.error(ex.message)
|
||||
raise ex
|
||||
|
|
|
@ -13,17 +13,15 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import datetime
|
||||
import re
|
||||
|
||||
import falcon
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
import simplejson
|
||||
|
||||
from monasca_log_api.api import exceptions
|
||||
from monasca_log_api.api import logs_api
|
||||
from monasca_log_api.api import rest_utils
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
|
@ -49,8 +47,11 @@ DIMENSION_NAME_CONSTRAINTS = {
|
|||
DIMENSION_VALUE_CONSTRAINTS = {
|
||||
'MAX_LENGTH': 255
|
||||
}
|
||||
EPOCH_START = datetime.datetime(1970, 1, 1)
|
||||
|
||||
Dimension = collections.namedtuple('Dimensions', ['name', 'value'])
|
||||
|
||||
class LogEnvelopeException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class Validations(object):
|
||||
|
@ -118,16 +119,16 @@ class Validations(object):
|
|||
value
|
||||
)
|
||||
|
||||
if (isinstance(dimensions, (list, tuple)) and not
|
||||
if (isinstance(dimensions, dict) and not
|
||||
isinstance(dimensions, basestring)):
|
||||
|
||||
for dim_name, dim_value in dimensions:
|
||||
for dim_name in dimensions:
|
||||
validate_name(dim_name)
|
||||
validate_value(dim_value)
|
||||
validate_value(dimensions[dim_name])
|
||||
|
||||
else:
|
||||
raise exceptions.HTTPUnprocessableEntity(
|
||||
'Dimensions %s must be a collections' % dimensions)
|
||||
'Dimensions %s must be a dictionary (map)' % dimensions)
|
||||
|
||||
|
||||
class LogCreator(object):
|
||||
|
@ -135,31 +136,13 @@ class LogCreator(object):
|
|||
self._log = log.getLogger('service.LogCreator')
|
||||
self._log.info('Initializing LogCreator')
|
||||
|
||||
# noinspection PyMethodMayBeStatic
|
||||
def _create_meta_info(self, tenant_id):
|
||||
@staticmethod
|
||||
def _create_meta_info(tenant_id):
|
||||
return {
|
||||
'tenantId': tenant_id,
|
||||
'region': cfg.CONF.service.region
|
||||
}
|
||||
|
||||
def _read_payload(self, payload, content_type):
|
||||
|
||||
try:
|
||||
content = payload.read()
|
||||
except Exception as ex:
|
||||
raise falcon.HTTPBadRequest(title='Failed to read body',
|
||||
description=ex.message)
|
||||
|
||||
if content and content_type == 'application/json':
|
||||
try:
|
||||
content = simplejson.loads(content, encoding='utf-8')
|
||||
except Exception as ex:
|
||||
raise falcon.HTTPBadRequest(title='Failed to read body as '
|
||||
'json',
|
||||
description=ex.message)
|
||||
|
||||
return content
|
||||
|
||||
def new_log(self,
|
||||
application_type,
|
||||
dimensions,
|
||||
|
@ -167,7 +150,7 @@ class LogCreator(object):
|
|||
content_type='application/json',
|
||||
validate=True):
|
||||
|
||||
payload = self._read_payload(payload, content_type)
|
||||
payload = rest_utils.read_body(payload, content_type)
|
||||
if not payload:
|
||||
return None
|
||||
|
||||
|
@ -199,8 +182,15 @@ class LogCreator(object):
|
|||
return log_object
|
||||
|
||||
def new_log_envelope(self, log_object, tenant_id):
|
||||
timestamp = (datetime.datetime.utcnow() -
|
||||
datetime.datetime(1970, 1, 1)).total_seconds()
|
||||
if not log_object:
|
||||
raise LogEnvelopeException('Envelope cannot be '
|
||||
'created without log')
|
||||
if not tenant_id:
|
||||
raise LogEnvelopeException('Envelope cannot be '
|
||||
'created without tenant')
|
||||
|
||||
timestamp = (datetime.datetime.utcnow() - EPOCH_START).total_seconds()
|
||||
|
||||
return {
|
||||
'log': log_object,
|
||||
'creation_time': timestamp,
|
||||
|
@ -212,7 +202,6 @@ def is_delegate(roles):
|
|||
if roles:
|
||||
roles = roles.split(',')
|
||||
return logs_api.MONITORING_DELEGATE_ROLE in roles
|
||||
pass
|
||||
return False
|
||||
|
||||
|
||||
|
@ -226,7 +215,7 @@ def parse_dimensions(dimensions):
|
|||
if not dimensions:
|
||||
raise exceptions.HTTPUnprocessableEntity('Dimension are required')
|
||||
|
||||
new_dimensions = []
|
||||
new_dimensions = {}
|
||||
dimensions = map(str.strip, dimensions.split(','))
|
||||
|
||||
for dim in dimensions:
|
||||
|
@ -241,6 +230,6 @@ def parse_dimensions(dimensions):
|
|||
name = str(dim[0].strip()) if dim[0] else None
|
||||
value = str(dim[1].strip()) if dim[1] else None
|
||||
if name and value:
|
||||
new_dimensions.append(Dimension(name, value))
|
||||
new_dimensions.update({name: value})
|
||||
|
||||
return new_dimensions
|
||||
|
|
|
@ -15,10 +15,9 @@
|
|||
|
||||
import falcon
|
||||
from oslo_log import log
|
||||
import simplejson
|
||||
|
||||
from monasca_log_api.api import rest_utils
|
||||
from monasca_log_api.api import versions_api
|
||||
from monasca_log_api import constants
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
VERSIONS = {
|
||||
|
@ -50,14 +49,14 @@ class Versions(versions_api.VersionsAPI):
|
|||
VERSIONS[version]['links'][0]['href'] = (
|
||||
req.uri.decode('utf8') + version)
|
||||
result['elements'].append(VERSIONS[version])
|
||||
res.body = simplejson.dumps(result)
|
||||
res.body = rest_utils.as_json(result)
|
||||
res.status = falcon.HTTP_200
|
||||
|
||||
@staticmethod
|
||||
def handle_version_id(req, res, version_id):
|
||||
if version_id in VERSIONS:
|
||||
VERSIONS[version_id]['links'][0]['href'] = (
|
||||
req.uri.decode(constants.ENCODING)
|
||||
req.uri.decode(rest_utils.ENCODING)
|
||||
)
|
||||
for version in VERSIONS:
|
||||
VERSIONS[version]['links'][0]['href'] = (
|
||||
|
@ -67,7 +66,7 @@ class Versions(versions_api.VersionsAPI):
|
|||
req.uri.decode('utf8') +
|
||||
VERSIONS[version_id]['links'][1]['href']
|
||||
)
|
||||
res.body = simplejson.dumps(VERSIONS[version_id])
|
||||
res.body = rest_utils.as_json(VERSIONS[version_id])
|
||||
res.status = falcon.HTTP_200
|
||||
else:
|
||||
res.body = 'Invalid Version ID'
|
||||
|
@ -77,7 +76,7 @@ class Versions(versions_api.VersionsAPI):
|
|||
result = {
|
||||
'links': [{
|
||||
'rel': 'self',
|
||||
'href': req.uri.decode(constants.ENCODING)
|
||||
'href': req.uri.decode(rest_utils.ENCODING)
|
||||
}],
|
||||
'elements': []
|
||||
}
|
||||
|
|
|
@ -10,7 +10,7 @@ oslo.utils
|
|||
pastedeploy>=1.3.3
|
||||
pbr>=1.6.0,<2.0
|
||||
six>=1.9.0
|
||||
kafka-python>=0.9.3,<0.9.4
|
||||
simplejson>=3.8.0
|
||||
simport
|
||||
monasca-common>=0.0.2
|
||||
eventlet>=0.9.7
|
||||
|
|
Loading…
Reference in New Issue