Fixing issues spotted in dev environment

- removed own kafka abstraction in favour of monasca-common
- removed monasca keystone context filter, not actually used
- changed URI of logs endpoints to /v1.0/log/single

Change-Id: Iaceabdce2b2862451cfe63d2a612577d7710022b
This commit is contained in:
Tomasz Trębski 2015-11-12 12:00:13 +01:00
parent 9d8c24d1f7
commit f07a38e388
22 changed files with 398 additions and 977 deletions

1
.gitignore vendored
View File

@ -12,6 +12,7 @@ cover
.tox
ChangeLog
MANIFEST
AUTHORS
monasca.log

View File

@ -16,19 +16,9 @@ driver = v1_reference
[service]
region = 'pl'
[kafka]
client_id = 'monasca-log-api'
timeout = 60
host = 'localhost:8900'
[kafka_producer]
batch_send_every_n = 10
async = True
ack_timeout = 1000
req_acks = 'Local'
[log_publisher]
topics = 'logs'
kafka_url = 'localhost:8900'
[keystone_authtoken]
identity_uri = http://192.168.10.5:35357

View File

@ -2,7 +2,7 @@
name = monasca_log_api
[pipeline:main]
pipeline = auth keystonecontext api
pipeline = auth api
[app:api]
paste.app_factory = monasca_log_api.server:launch
@ -10,9 +10,6 @@ paste.app_factory = monasca_log_api.server:launch
[filter:auth]
paste.filter_factory = keystonemiddleware.auth_token:filter_factory
[filter:keystonecontext]
paste.filter_factory = monasca_log_api.middleware.keystone_context_filter:filter_factory
[server:main]
use = egg:gunicorn#main
host = 127.0.0.1

View File

@ -0,0 +1,49 @@
# Copyright 2015 kornicameister@gmail.com
# Copyright 2015 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import falcon
import simplejson as json
ENCODING = 'utf8'
def read_body(payload, content_type='application/json'):
try:
content = payload.read()
if not content:
return False
except Exception as ex:
raise falcon.HTTPBadRequest(title='Failed to read body',
description=ex.message)
if content_type == 'application/json':
try:
content = from_json(content)
except Exception as ex:
raise falcon.HTTPBadRequest(title='Failed to read body as json',
description=ex.message)
return content
def as_json(data):
return json.dumps(data,
encoding=ENCODING,
sort_keys=False,
ensure_ascii=False)
def from_json(data):
return json.loads(data, encoding=ENCODING)

View File

@ -1,16 +0,0 @@
# Copyright 2015 kornicameister@gmail.com
# Copyright 2015 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
ENCODING = 'utf8'

View File

@ -1,83 +0,0 @@
# Copyright (c) 2015 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""RequestContext: context for requests that persist through monasca."""
import uuid
from oslo_log import log
from oslo_utils import timeutils
LOG = log.getLogger(__name__)
class RequestContext(object):
"""Security context and request information.
Represents the user taking a given action within the system.
"""
def __init__(self, user_id, project_id, domain_id=None, domain_name=None,
roles=None, timestamp=None, request_id=None,
auth_token=None, user_name=None, project_name=None,
service_catalog=None, user_auth_plugin=None, **kwargs):
"""Creates the Keystone Context. Supports additional parameters:
:param user_auth_plugin:
The auth plugin for the current request's authentication data.
:param kwargs:
Extra arguments that might be present
"""
if kwargs:
LOG.warning(
'Arguments dropped when creating context: %s') % str(kwargs)
self._roles = roles or []
self.timestamp = timeutils.utcnow()
if not request_id:
request_id = self.generate_request_id()
self._request_id = request_id
self._auth_token = auth_token
self._service_catalog = service_catalog
self._domain_id = domain_id
self._domain_name = domain_name
self._user_id = user_id
self._user_name = user_name
self._project_id = project_id
self._project_name = project_name
self._user_auth_plugin = user_auth_plugin
def to_dict(self):
return {'user_id': self._user_id,
'project_id': self._project_id,
'domain_id': self._domain_id,
'domain_name': self._domain_name,
'roles': self._roles,
'timestamp': timeutils.strtime(self._timestamp),
'request_id': self._request_id,
'auth_token': self._auth_token,
'user_name': self._user_name,
'service_catalog': self._service_catalog,
'project_name': self._project_name,
'user': self._user}
def generate_request_id(self):
return b'req-' + str(uuid.uuid4()).encode('ascii')

View File

@ -1,109 +0,0 @@
# Copyright (c) 2015 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import falcon
from oslo_log import log
from oslo_middleware import request_id
from oslo_serialization import jsonutils
from monasca_log_api.middleware import context
LOG = log.getLogger(__name__)
def filter_factory(global_conf, **local_conf):
def validator_filter(app):
return KeystoneContextFilter(app, local_conf)
return validator_filter
class KeystoneContextFilter(object):
"""Make a request context from keystone headers."""
def __init__(self, app, conf):
self._app = app
self._conf = conf
def __call__(self, env, start_response):
LOG.debug("Creating Keystone Context Object.")
user_id = env.get('HTTP_X_USER_ID', env.get('HTTP_X_USER'))
if user_id is None:
msg = "Neither X_USER_ID nor X_USER found in request"
LOG.error(msg)
raise falcon.HTTPUnauthorized(title='Forbidden', description=msg)
roles = self._get_roles(env)
project_id = env.get('HTTP_X_PROJECT_ID')
project_name = env.get('HTTP_X_PROJECT_NAME')
domain_id = env.get('HTTP_X_DOMAIN_ID')
domain_name = env.get('HTTP_X_DOMAIN_NAME')
user_name = env.get('HTTP_X_USER_NAME')
req_id = env.get(request_id.ENV_REQUEST_ID)
# Get the auth token
auth_token = env.get('HTTP_X_AUTH_TOKEN',
env.get('HTTP_X_STORAGE_TOKEN'))
service_catalog = None
if env.get('HTTP_X_SERVICE_CATALOG') is not None:
try:
catalog_header = env.get('HTTP_X_SERVICE_CATALOG')
service_catalog = jsonutils.loads(catalog_header)
except ValueError:
msg = "Invalid service catalog json."
LOG.error(msg)
raise falcon.HTTPInternalServerError(msg)
# NOTE(jamielennox): This is a full auth plugin set by auth_token
# middleware in newer versions.
user_auth_plugin = env.get('keystone.token_auth')
# Build a context
ctx = context.RequestContext(user_id,
project_id,
user_name=user_name,
project_name=project_name,
domain_id=domain_id,
domain_name=domain_name,
roles=roles,
auth_token=auth_token,
service_catalog=service_catalog,
request_id=req_id,
user_auth_plugin=user_auth_plugin)
env['monasca.context'] = ctx
LOG.debug("Keystone Context successfully created.")
return self._app(env, start_response)
def _get_roles(self, env):
"""Get the list of roles."""
if 'HTTP_X_ROLES' in env:
roles = env.get('HTTP_X_ROLES', '')
else:
# Fallback to deprecated role header:
roles = env.get('HTTP_X_ROLE', '')
if roles:
LOG.warning(
'Sourcing roles from deprecated X-Role HTTP header')
return [r.strip() for r in roles.split(',')]

View File

@ -1,50 +0,0 @@
# Copyright 2015 kornicameister@gmail.com
# Copyright 2015 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class ExceptionWrapper(Exception):
"""Wrapper around exception with custom message.
ExceptionWrapper provides a way to keep old
exceptions but providing to describe the context of error
occurrence.
"""
def __init__(self, message, caught=None):
self._message = message
self._caught = caught
@property
def caught(self):
return self._caught
@property
def message(self):
return self._message
def __str__(self):
return '%s <message=%s, caught=%s>' % (
self.__class__.__name__,
repr(self.message),
repr(self.caught)
)
class PublisherInitException(ExceptionWrapper):
pass
class MessageQueueException(ExceptionWrapper):
pass

View File

@ -1,220 +0,0 @@
# Copyright 2015 kornicameister@gmail.com
# Copyright 2015 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from kafka import client
from kafka import common
from kafka import conn
from kafka import producer
from kafka import protocol
from oslo_config import cfg
from oslo_log import log
import simport
from monasca_log_api.publisher import exceptions
from monasca_log_api.publisher import publisher
LOG = log.getLogger(__name__)
CONF = cfg.CONF
ACK_MAP = {
'None': producer.KeyedProducer.ACK_NOT_REQUIRED,
'Local': producer.KeyedProducer.ACK_AFTER_LOCAL_WRITE,
'Server': producer.KeyedProducer.ACK_AFTER_CLUSTER_COMMIT
}
kafka_opts = [
cfg.StrOpt('client_id',
default=None,
help='Client Id',
required=True),
cfg.IntOpt('timeout',
default=conn.DEFAULT_SOCKET_TIMEOUT_SECONDS,
help='Socket timeout'),
cfg.StrOpt('host',
default=None,
help='List of hosts, comma delimited',
required=True)
]
kafka_group = cfg.OptGroup(name='kafka', title='kafka')
kafka_producer_opts = [
cfg.IntOpt('batch_send_every_n',
default=None,
help='Send every n items'),
cfg.IntOpt('batch_send_every_t',
default=None,
help='Send every n seconds'),
cfg.BoolOpt('async',
default=False,
help='Async communication'),
cfg.StrOpt('reg_acks',
default='None',
help='Acknowledge options'),
cfg.StrOpt('partitioner',
default=None,
help='Partitioner algorithm')
]
kafka_producer_group = cfg.OptGroup(name='kafka_producer',
title='kafka_producer')
CONF.register_group(kafka_group)
CONF.register_opts(kafka_opts, kafka_group)
CONF.register_group(kafka_producer_group)
CONF.register_opts(kafka_producer_opts, kafka_producer_group)
class KafkaPublisher(publisher.Publisher):
def __init__(self, max_retry=3, wait_time=None):
self._producer_conf = None
self._client_conf = None
self._producer = None
self._client = None
self._max_retry = max_retry
self._wait_time = wait_time
LOG.info('Initializing KafkaPublisher <%s>' % self)
def _get_client_conf(self):
if self._client_conf:
return self._client_conf
client_conf = {
'hosts': CONF.kafka.host,
'client_id': CONF.kafka.client_id,
'timeout': CONF.kafka.timeout
}
self._client_conf = client_conf
return client_conf
def _get_producer_conf(self):
if self._producer_conf:
return self._producer_conf
batch_send_every_n = CONF.kafka_producer.batch_send_every_n
batch_send_every_t = CONF.kafka_producer.batch_send_every_t
partitioner = CONF.kafka_producer.partitioner
producer_conf = {
'codec': protocol.CODEC_GZIP,
'batch_send': batch_send_every_n or batch_send_every_t,
'async': CONF.kafka_producer.async,
'reg_acks': ACK_MAP[CONF.kafka_producer.reg_acks]
}
if batch_send_every_t:
producer_conf.update(
{'batch_send_every_t': batch_send_every_t})
if batch_send_every_n:
producer_conf.update(
{'batch_send_every_n': batch_send_every_n})
if partitioner:
partitioner = simport.load(partitioner)
producer_conf.update({'partitioner': partitioner})
self._producer_conf = producer_conf
return producer_conf
def _init_client(self):
if self._client:
self._client.close()
self._producer = None
self._client = None
client_opts = self._get_client_conf()
for i in range(self._max_retry):
kafka_host = client_opts['hosts']
try:
self._client = client.KafkaClient(
client_opts['hosts'],
client_opts['client_id'],
client_opts['timeout']
)
if self._wait_time:
time.sleep(self._wait_time)
break
except common.KafkaUnavailableError as ex:
LOG.error('Server is down at <host="%s">' % kafka_host)
err = ex
except common.LeaderNotAvailableError as ex:
LOG.error('No leader at <host="%s">.' % kafka_host)
err = ex
except Exception as ex:
LOG.error('Initialization failed at <host="%s">.' % kafka_host)
err = ex
if err:
raise err
def _init_producer(self):
try:
if not self._client:
self._init_client()
producer_opts = self._get_producer_conf()
producer_opts.update({'client': self._client})
self._producer = producer.KeyedProducer(*producer_opts)
if self._wait_time:
time.sleep(self._wait_time)
except Exception as ex:
self._producer = None
LOG.exception(ex.message, exc_info=1)
raise exceptions.PublisherInitException(
message='KeyedProducer can not be created at <host="%s>"'
% self._client_conf['host'],
caught=ex)
def send_message(self, topic, key, message):
if not message or not key or not topic:
return
try:
if not self._producer:
self._init_producer()
return self._producer.send(topic, key, message)
except (common.KafkaUnavailableError,
common.LeaderNotAvailableError) as ex:
self._client = None
LOG.error(ex.message, exc_info=1)
raise exceptions.MessageQueueException(
message='Failed to post message to kafka',
caught=ex
)
except Exception as ex:
LOG.error(ex.message, exc_info=1)
raise exceptions.MessageQueueException(
message='Unknown error while sending message to kafka',
caught=ex
)
# TODO(question) How to ensure that connection will be closed when program
# stops ?
def close(self):
if self._client:
self._producer = None
self._client.close()
def __repr__(self):
return 'KafkaPublisher <host=%s>' % (
self._client_conf['hosts'] if self._client_conf else None
)

View File

@ -1,29 +0,0 @@
# Copyright 2015 kornicameister@gmail.com
# Copyright 2015 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class Publisher(object):
@abc.abstractmethod
def send_message(self, *args, **kwargs):
return
@abc.abstractmethod
def close(self):
return

View File

@ -61,7 +61,7 @@ def launch(conf, config_file='/etc/monasca/log-api-config.conf'):
def load_logs_resource(app):
logs = simport.load(CONF.dispatcher.logs)()
app.add_route('/v1.0/logs/single', logs)
app.add_route('/v1.0/log/single', logs)
def load_versions_resource(app):

View File

@ -1,254 +0,0 @@
# Copyright 2015 kornicameister@gmail.com
# Copyright 2015 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import unittest
from falcon import testing
from kafka import common
import mock
from monasca_log_api.publisher import exceptions
from monasca_log_api.publisher import kafka_publisher as publisher
from monasca_log_api.tests import base as base_test
TOPIC = 'test'
class MockKafkaClient(object):
pass
class TestConfiguration(testing.TestBase):
def setUp(self):
self.conf = base_test.mock_config(self)
self.instance = publisher.KafkaPublisher()
super(TestConfiguration, self).setUp()
def test_should_not_have_client_conf_at_begin(self):
self.assertIsNone(self.instance._client_conf)
def test_should_not_have_producer_conf_at_begin(self):
self.assertIsNone(self.instance._producer_conf)
def test_should_init_producer_conf(self):
self.instance._get_producer_conf()
self.assertIsNotNone(self.instance._producer_conf)
def test_should_init_client_conf(self):
self.instance._get_client_conf()
self.assertIsNotNone(self.instance._client_conf)
class TestInitialization(testing.TestBase):
def setUp(self):
self.conf = base_test.mock_config(self)
super(TestInitialization, self).setUp()
def test_should_have_init_client_not_set(self):
instance = publisher.KafkaPublisher()
self.assertIsNone(instance._client)
def test_should_have_init_producer_not_set(self):
instance = publisher.KafkaPublisher()
self.assertIsNone(instance._producer)
@mock.patch('monasca_log_api.publisher.kafka_publisher.client.KafkaClient',
side_effect=[common.KafkaUnavailableError])
def test_client_should_fail_kafka_unavailable(self, kafka_client):
instance = publisher.KafkaPublisher()
self.assertRaises(
common.KafkaUnavailableError,
instance._init_client
)
@mock.patch('monasca_log_api.publisher.kafka_publisher.client.KafkaClient',
side_effect=[common.LeaderNotAvailableError])
def test_client_should_fail_leader_unavailable(self, kafka_client):
instance = publisher.KafkaPublisher()
self.assertRaises(
common.LeaderNotAvailableError,
instance._init_client
)
@mock.patch('monasca_log_api.publisher.kafka_publisher.client.KafkaClient',
side_effect=[ValueError])
def test_client_should_fail_other_error(self, kafka_client):
instance = publisher.KafkaPublisher()
self.assertRaises(
ValueError,
instance._init_client
)
@mock.patch('monasca_log_api.publisher.kafka_publisher.client.KafkaClient',
autospec=True)
def test_client_should_initialize(self, kafka_client):
client_id = 'mock_client'
timeout = 3600
hosts = 'localhost:666'
self.conf.config(
client_id=client_id,
timeout=timeout,
host=hosts,
group='kafka'
)
instance = publisher.KafkaPublisher()
instance._init_client()
self.assertIsNotNone(instance._client)
kafka_client.assert_called_with(hosts, client_id, timeout)
@mock.patch('monasca_log_api.publisher.kafka_publisher'
'.producer.KeyedProducer',
side_effect=[ValueError])
def test_producer_should_fail_any_error(self, producer):
instance = publisher.KafkaPublisher()
instance._client = MockKafkaClient()
instance._client_conf = {
'host': 'localhost'
}
self.assertRaises(
exceptions.PublisherInitException,
instance._init_producer
)
@mock.patch('monasca_log_api.publisher.kafka_publisher'
'.producer.KeyedProducer',
autospec=True)
def test_producer_should_initialize(self, producer):
instance = publisher.KafkaPublisher()
client = MockKafkaClient()
instance._client = client
instance._get_producer_conf = mock.Mock(return_value={})
instance._init_producer()
self.assertIsNotNone(instance._producer)
self.assertTrue(producer.called)
class TestLogic(unittest.TestCase):
@mock.patch('monasca_log_api.publisher.kafka_publisher'
'.producer.KeyedProducer',
autospec=True)
def test_should_not_call_producer_for_empty_key(self, producer):
producer.send.return_value = None
instance = publisher.KafkaPublisher()
instance._producer = producer
instance.send_message(TOPIC, None, 'msg')
self.assertFalse(producer.send.called)
@mock.patch('monasca_log_api.publisher.kafka_publisher'
'.producer.KeyedProducer',
autospec=True)
def test_should_not_call_producer_for_empty_message(self, producer):
producer.send.return_value = None
instance = publisher.KafkaPublisher()
instance._producer = producer
instance.send_message(TOPIC, 'key', None)
self.assertFalse(producer.send.called)
@mock.patch('monasca_log_api.publisher.kafka_publisher'
'.producer.KeyedProducer',
autospec=True)
def test_should_not_call_producer_for_empty_topic(self, producer):
producer.send.return_value = None
instance = publisher.KafkaPublisher()
instance._producer = producer
instance.send_message(None, 'key', 'msg')
self.assertFalse(producer.send.called)
@mock.patch('monasca_log_api.publisher.kafka_publisher'
'.producer.KeyedProducer',
autospec=True)
def test_should_fail_kafka_not_available(self, producer):
producer.send.side_effect = [common.KafkaUnavailableError]
instance = publisher.KafkaPublisher()
instance._producer = producer
instance._client = mock.Mock('client')
with self.assertRaises(exceptions.MessageQueueException) as context:
instance.send_message('a', 'b', 'c')
self.assertEqual('Failed to post message to kafka',
context.exception.message)
self.assertIsInstance(context.exception.caught,
common.KafkaUnavailableError)
self.assertIsNone(instance._client)
@mock.patch('monasca_log_api.publisher.kafka_publisher'
'.producer.KeyedProducer',
autospec=True)
def test_should_fail_leader_not_available(self, producer):
producer.send.side_effect = [common.LeaderNotAvailableError]
instance = publisher.KafkaPublisher()
instance._producer = producer
instance._client = mock.Mock('client')
with self.assertRaises(exceptions.MessageQueueException) as context:
instance.send_message('a', 'b', 'c')
self.assertEqual('Failed to post message to kafka',
context.exception.message)
self.assertIsInstance(context.exception.caught,
common.LeaderNotAvailableError)
self.assertIsNone(instance._client)
@mock.patch('monasca_log_api.publisher.kafka_publisher'
'.producer.KeyedProducer',
autospec=True)
def test_should_fail_any_error(self, producer):
producer.send.side_effect = [Exception]
instance = publisher.KafkaPublisher()
instance._producer = producer
with self.assertRaises(exceptions.MessageQueueException) as context:
instance.send_message('a', 'b', 'c')
self.assertEqual('Unknown error while sending message to kafka',
context.exception.message)
self.assertIsInstance(context.exception.caught,
Exception)
@mock.patch('monasca_log_api.publisher.kafka_publisher'
'.producer.KeyedProducer',
autospec=True)
def test_should_send_message(self, producer):
producer.send.return_value = None
instance = publisher.KafkaPublisher()
instance._producer = producer
msg = 'msg'
key = 'key'
instance.send_message(TOPIC, key, msg)
producer.send.assert_called_once_with(TOPIC, key, msg)

View File

@ -13,16 +13,19 @@
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import random
import unittest
from falcon import testing
import mock
import simplejson
from monasca_log_api.publisher import exceptions
from monasca_log_api.tests import base
from monasca_log_api.v1.common import log_publisher
from monasca_log_api.v1.common import service
EPOCH_START = datetime.datetime(1970, 1, 1)
class TestBuildKey(unittest.TestCase):
@ -60,13 +63,16 @@ class TestBuildKey(unittest.TestCase):
# application_type and single dimension
tenant_id = 'monasca'
application_type = 'monasca-log-api'
dimension = service.Dimension('cpu_time', 50)
dimension_name = 'cpu_time'
dimension_value = '50'
log_object = {
'application_type': application_type,
'dimensions': [dimension]
'dimensions': {
dimension_name: dimension_value
}
}
expected_key = tenant_id + application_type + dimension.name + str(
dimension.value)
expected_key = tenant_id + application_type + dimension_name + str(
dimension_value)
self.assertEqual(expected_key,
log_publisher.LogPublisher._build_key(tenant_id,
@ -77,15 +83,20 @@ class TestBuildKey(unittest.TestCase):
# application_type and two dimensions dimensions given unsorted
tenant_id = 'monasca'
application_type = 'monasca-log-api'
dimension_1 = service.Dimension('disk_usage', 50)
dimension_2 = service.Dimension('cpu_time', 50)
dimension_1_name = 'disk_usage'
dimension_1_value = '50'
dimension_2_name = 'cpu_time'
dimension_2_value = '60'
log_object = {
'application_type': application_type,
'dimensions': [dimension_1, dimension_2]
'dimensions': {
dimension_1_name: dimension_1_value,
dimension_2_name: dimension_2_value
}
}
expected_key = ''.join([tenant_id, application_type, dimension_2.name,
str(dimension_2.value), dimension_1.name,
str(dimension_1.value)])
expected_key = ''.join([tenant_id, application_type,
dimension_2_name, dimension_2_value,
dimension_1_name, dimension_1_value])
self.assertEqual(expected_key,
log_publisher.LogPublisher._build_key(tenant_id,
@ -97,44 +108,92 @@ class TestSendMessage(testing.TestBase):
self.conf = base.mock_config(self)
return super(TestSendMessage, self).setUp()
def test_should_not_send_empty_message(self):
@mock.patch('monasca_log_api.v1.common.log_publisher.producer'
'.KafkaProducer')
def test_should_not_send_empty_message(self, _):
instance = log_publisher.LogPublisher()
instance._kafka_publisher.send_message = mock.Mock()
instance._kafka_publisher = mock.Mock()
instance.send_message({})
self.assertFalse(instance._kafka_publisher.send_message.called)
self.assertFalse(instance._kafka_publisher.publish.called)
def test_should_raise_exception(self):
@unittest.expectedFailure
def test_should_not_send_message_not_dict(self):
instance = log_publisher.LogPublisher()
instance._kafka_publisher.send_message = mock.Mock(
side_effect=[exceptions.MessageQueueException(1, 1)]
)
not_dict_value = 123
instance.send_message(not_dict_value)
msg = {
def test_should_not_send_message_missing_keys(self):
# checks every combination of missing keys
# test does not rely on those keys having a value or not,
# it simply assumes that values are set but important
# message (i.e. envelope) properties are missing entirely
# that's why there are two loops instead of three
instance = log_publisher.LogPublisher()
keys = ['log', 'creation_time', 'meta']
for key_1 in keys:
diff = keys[:]
diff.remove(key_1)
for key_2 in diff:
message = {
key_1: random.randint(10, 20),
key_2: random.randint(30, 50)
}
self.assertRaises(log_publisher.InvalidMessageException,
instance.send_message,
message)
def test_should_not_send_message_missing_values(self):
# original message assumes that every property has value
# test modify each property one by one by removing that value
# (i.e. creating false-like value)
instance = log_publisher.LogPublisher()
message = {
'log': {
'message': 1
'message': '11'
},
'creation_time': 123456,
'meta': {
'tenantId': 1
'region': 'pl'
}
}
self.assertRaises(exceptions.MessageQueueException,
instance.send_message, msg)
def test_should_send_message(self):
for key in message:
tmp_message = message
tmp_message[key] = None
self.assertRaises(log_publisher.InvalidMessageException,
instance.send_message,
tmp_message)
@mock.patch('monasca_log_api.v1.common.log_publisher.producer'
'.KafkaProducer')
def test_should_send_message(self, _):
instance = log_publisher.LogPublisher()
instance._kafka_publisher.send_message = mock.Mock(name='send_message',
return_value={})
instance._kafka_publisher = mock.Mock()
instance.send_message({})
instance._build_key = mock.Mock(name='_build_key',
return_value='some_key')
creation_time = ((datetime.datetime.utcnow() - EPOCH_START)
.total_seconds())
application_type = 'monasca-log-api'
dimension_1_name = 'disk_usage'
dimension_1_value = '50'
dimension_2_name = 'cpu_time'
dimension_2_value = '60'
msg = {
'log': {
'message': 1,
'application_type': 'monasca_log_api',
'dimensions': [service.Dimension('disk_usage', 50),
service.Dimension('cpu_time', 50)]
'application_type': application_type,
'dimensions': {
dimension_1_name: dimension_1_value,
dimension_2_name: dimension_2_value
}
},
'creation_time': creation_time,
'meta': {
'tenantId': 1
}
@ -142,27 +201,40 @@ class TestSendMessage(testing.TestBase):
instance.send_message(msg)
instance._kafka_publisher.send_message.assert_called_once_with(
instance._kafka_publisher.publish.assert_called_once_with(
self.conf.conf.log_publisher.topics[0],
'some_key',
# 'some_key', # TODO(feature) next version of monasca-common
simplejson.dumps(msg))
def test_should_send_message_multiple_topics(self):
@mock.patch('monasca_log_api.v1.common.log_publisher.producer'
'.KafkaProducer')
def test_should_send_message_multiple_topics(self, _):
topics = ['logs', 'analyzer', 'tester']
self.conf.config(topics=topics, group='log_publisher')
instance = log_publisher.LogPublisher()
instance._kafka_publisher.send_message = mock.Mock(name='send_message',
return_value={})
instance._kafka_publisher = mock.Mock()
instance.send_message({})
instance._build_key = mock.Mock(name='_build_key',
return_value='some_key')
creation_time = ((datetime.datetime.utcnow() - EPOCH_START)
.total_seconds())
dimension_1_name = 'disk_usage'
dimension_1_value = '50'
dimension_2_name = 'cpu_time'
dimension_2_value = '60'
application_type = 'monasca-log-api'
msg = {
'log': {
'message': 1,
'application_type': 'monasca_log_api',
'dimensions': [service.Dimension('disk_usage', 50),
service.Dimension('cpu_time', 50)]
'application_type': application_type,
'dimensions': {
dimension_1_name: dimension_1_value,
dimension_2_name: dimension_2_value
}
},
'creation_time': creation_time,
'meta': {
'tenantId': 1
}
@ -172,9 +244,9 @@ class TestSendMessage(testing.TestBase):
instance.send_message(msg)
self.assertEqual(len(topics),
instance._kafka_publisher.send_message.call_count)
instance._kafka_publisher.publish.call_count)
for topic in topics:
instance._kafka_publisher.send_message.assert_any_call(
instance._kafka_publisher.publish.assert_any_call(
topic,
'some_key',
# 'some_key', # TODO(feature) next version of monasca-common
json_msg)

View File

@ -29,13 +29,13 @@ class TestLogs(testing.TestBase):
self.conf = base.mock_config(self)
self.logs_resource = logs.Logs()
self.api.add_route(
'/logs/single',
'/log/single',
self.logs_resource
)
def test_should_fail_not_delegate_ok_cross_tenant_id(self):
self.simulate_request(
'/logs/single',
'/log/single',
method='POST',
query_string='tenant_id=1',
headers={
@ -45,19 +45,15 @@ class TestLogs(testing.TestBase):
self.assertEqual(falcon.HTTP_403, self.srmock.status)
@mock.patch('monasca_log_api.v1.common.service.LogCreator')
@mock.patch('monasca_log_api.publisher.kafka_publisher.KafkaPublisher')
@mock.patch('monasca_log_api.v1.common.log_publisher.LogPublisher')
def test_should_pass_empty_cross_tenant_id_wrong_role(self,
log_creator,
kafka_publisher):
log_creator.configure_mock(**{'new_log.return_value': None,
'new_log_envelope.return_value': None})
kafka_publisher.configure_mock(**{'send_message.return_value': None})
self.logs_resource._log_creator = log_creator
self.logs_resource._kafka_publisher = kafka_publisher
self.simulate_request(
'/logs/single',
'/log/single',
method='POST',
headers={
headers.X_ROLES.name: 'some_role',
@ -72,19 +68,15 @@ class TestLogs(testing.TestBase):
self.assertEqual(1, log_creator.new_log_envelope.call_count)
@mock.patch('monasca_log_api.v1.common.service.LogCreator')
@mock.patch('monasca_log_api.publisher.kafka_publisher.KafkaPublisher')
@mock.patch('monasca_log_api.v1.common.log_publisher.LogPublisher')
def test_should_pass_empty_cross_tenant_id_ok_role(self,
log_creator,
kafka_publisher):
log_creator.configure_mock(**{'new_log.return_value': None,
'new_log_envelope.return_value': None})
kafka_publisher.configure_mock(**{'send_message.return_value': None})
self.logs_resource._log_creator = log_creator
self.logs_resource._kafka_publisher = kafka_publisher
self.simulate_request(
'/logs/single',
'/log/single',
method='POST',
headers={
headers.X_ROLES.name: logs_api.MONITORING_DELEGATE_ROLE,
@ -99,19 +91,15 @@ class TestLogs(testing.TestBase):
self.assertEqual(1, log_creator.new_log_envelope.call_count)
@mock.patch('monasca_log_api.v1.common.service.LogCreator')
@mock.patch('monasca_log_api.publisher.kafka_publisher.KafkaPublisher')
@mock.patch('monasca_log_api.v1.common.log_publisher.LogPublisher')
def test_should_pass_delegate_cross_tenant_id_ok_role(self,
log_creator,
kafka_publisher):
log_creator.configure_mock(**{'new_log.return_value': None,
'new_log_envelope.return_value': None})
kafka_publisher.configure_mock(**{'send_message.return_value': None})
log_publisher):
self.logs_resource._log_creator = log_creator
self.logs_resource._kafka_publisher = kafka_publisher
self.logs_resource._kafka_publisher = log_publisher
self.simulate_request(
'/logs/single',
'/log/single',
method='POST',
query_string='tenant_id=1',
headers={
@ -122,28 +110,28 @@ class TestLogs(testing.TestBase):
)
self.assertEqual(falcon.HTTP_204, self.srmock.status)
self.assertEqual(1, kafka_publisher.send_message.call_count)
self.assertEqual(1, log_publisher.send_message.call_count)
self.assertEqual(1, log_creator.new_log.call_count)
self.assertEqual(1, log_creator.new_log_envelope.call_count)
def test_should_fail_empty_dimensions_delegate(self):
with mock.patch.object(self.logs_resource._log_creator,
'_read_payload',
return_value=True):
self.simulate_request(
'/logs/single',
method='POST',
headers={
headers.X_ROLES.name: logs_api.MONITORING_DELEGATE_ROLE,
headers.X_DIMENSIONS.name: '',
'Content-Type': 'application/json'
}
)
@mock.patch('monasca_log_api.v1.common.service.rest_utils')
def test_should_fail_empty_dimensions_delegate(self, rest_utils):
rest_utils.read_body.return_value = True
self.simulate_request(
'/log/single',
method='POST',
headers={
headers.X_ROLES.name: logs_api.MONITORING_DELEGATE_ROLE,
headers.X_DIMENSIONS.name: '',
'Content-Type': 'application/json'
}
)
self.assertEqual(log_api_exceptions.HTTP_422, self.srmock.status)
def test_should_fail_for_invalid_content_type(self):
self.simulate_request(
'/logs/single',
'/log/single',
method='POST',
headers={
headers.X_ROLES.name: logs_api.MONITORING_DELEGATE_ROLE,

View File

@ -0,0 +1,53 @@
# Copyright 2015 kornicameister@gmail.com
# Copyright 2015 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import unittest
import falcon
import mock
import simplejson
from monasca_log_api.api import rest_utils as ru
class RestUtilsTest(unittest.TestCase):
@mock.patch('io.IOBase')
def test_should_read_text_for_plain_text(self, payload):
raw_msg = 'Hello World'
msg = u''.join(raw_msg)
payload.read.return_value = raw_msg
self.assertEqual(msg, ru.read_body(payload, 'text/plain'))
@mock.patch('io.IOBase')
def test_should_read_json_for_application_json(self, payload):
raw_msg = u'{"path":"/var/log/messages","message":"This is message"}'
json_msg = simplejson.loads(raw_msg, encoding='utf-8')
payload.read.return_value = raw_msg
self.assertEqual(json_msg, ru.read_body(payload, 'application/json'))
@mock.patch('io.IOBase')
def test_should_fail_read_text_for_application_json(self, payload):
with self.assertRaises(falcon.HTTPBadRequest) as context:
raw_msg = 'Hello World'
payload.read.return_value = raw_msg
ru.read_body(payload, 'application/json')
self.assertEqual(context.exception.title,
'Failed to read body as json')

View File

@ -16,10 +16,8 @@
import datetime
import unittest
import falcon
from falcon import testing
import mock
import simplejson
from monasca_log_api.api import exceptions
from monasca_log_api.api import logs_api
@ -64,9 +62,12 @@ class ParseDimensions(unittest.TestCase):
def test_should_pass_for_valid_dimensions(self):
dimensions = 'a:1,b:2'
expected = [('a', '1'), ('b', '2')]
expected = {
'a': '1',
'b': '2'
}
self.assertListEqual(expected,
self.assertDictEqual(expected,
common_service.parse_dimensions(dimensions))
@ -138,10 +139,10 @@ class DimensionsValidations(unittest.TestCase):
common_service.Validations.validate_dimensions(1)
def test_should_pass_for_empty_dimensions_array(self):
common_service.Validations.validate_dimensions([])
common_service.Validations.validate_dimensions({})
def test_should_fail_too_empty_name(self):
dimensions = [('', 1)]
dimensions = {'': 1}
with self.assertRaises(exceptions.HTTPUnprocessableEntity) as context:
common_service.Validations.validate_dimensions(dimensions)
@ -150,7 +151,7 @@ class DimensionsValidations(unittest.TestCase):
def test_should_fail_too_long_name(self):
name = testing.rand_string(256, 260)
dimensions = [(name, 1)]
dimensions = {name: 1}
with self.assertRaises(exceptions.HTTPUnprocessableEntity) as context:
common_service.Validations.validate_dimensions(dimensions)
@ -159,7 +160,7 @@ class DimensionsValidations(unittest.TestCase):
def test_should_fail_underscore_at_begin(self):
name = '_aDim'
dimensions = [(name, 1)]
dimensions = {name: 1}
with self.assertRaises(exceptions.HTTPUnprocessableEntity) as context:
common_service.Validations.validate_dimensions(dimensions)
@ -168,7 +169,7 @@ class DimensionsValidations(unittest.TestCase):
def test_should_fail_invalid_chars(self):
name = '<>'
dimensions = [(name, 1)]
dimensions = {name: 1}
with self.assertRaises(exceptions.HTTPUnprocessableEntity) as context:
common_service.Validations.validate_dimensions(dimensions)
@ -178,7 +179,7 @@ class DimensionsValidations(unittest.TestCase):
def test_should_fail_ok_name_empty_value(self):
name = 'monasca'
dimensions = [(name, '')]
dimensions = {name: ''}
with self.assertRaises(exceptions.HTTPUnprocessableEntity) as context:
common_service.Validations.validate_dimensions(dimensions)
@ -188,7 +189,7 @@ class DimensionsValidations(unittest.TestCase):
def test_should_fail_ok_name_too_long_value(self):
name = 'monasca'
value = testing.rand_string(256, 300)
dimensions = [(name, value)]
dimensions = {name: value}
with self.assertRaises(exceptions.HTTPUnprocessableEntity) as context:
common_service.Validations.validate_dimensions(dimensions)
@ -198,55 +199,17 @@ class DimensionsValidations(unittest.TestCase):
def test_should_pass_ok_name_ok_value_empty_service(self):
name = 'monasca'
value = '1'
dimensions = [(name, value)]
dimensions = {name: value}
common_service.Validations.validate_dimensions(dimensions)
def test_should_pass_ok_name_ok_value_service_SERVICE_DIMENSIONS_as_name(
self):
name = 'some_name'
value = '1'
dimensions = [(name, value)]
dimensions = {name: value}
common_service.Validations.validate_dimensions(dimensions)
class LogsCreatorPayload(unittest.TestCase):
def setUp(self):
self.instance = common_service.LogCreator()
@mock.patch('io.IOBase')
def test_should_read_text_for_plain_text(self, payload):
msg = u'Hello World'
payload.configure_mock(
**{'readable.return_value': True, 'read.return_value': msg})
self.assertEqual(msg,
self.instance._read_payload(payload, 'text/plain'))
@mock.patch('io.IOBase')
def test_should_read_json_for_application_json(self, payload):
msg = u'{"path":"/var/log/messages","message":"This is message"}'
payload.configure_mock(
**{'readable.return_value': True, 'read.return_value': msg})
json_msg = simplejson.loads(msg, encoding='utf-8')
self.assertEqual(json_msg,
self.instance._read_payload(payload,
'application/json'))
@mock.patch('io.IOBase')
def test_should_fail_read_text_for_application_json(self, payload):
with self.assertRaises(falcon.HTTPBadRequest) as context:
msg = u'Hello World'
payload.configure_mock(
**{'readable.return_value': True, 'read.return_value': msg})
self.instance._read_payload(payload,
'application/json')
self.assertEqual(context.exception.title,
'Failed to read body as json')
class LogsCreatorNewLog(unittest.TestCase):
def setUp(self):
self.instance = common_service.LogCreator()
@ -258,13 +221,14 @@ class LogsCreatorNewLog(unittest.TestCase):
json_msg = u'{"path":"%s","message":"%s"}' % (path, msg)
app_type = 'monasca'
dimensions = 'cpu_time:30'
payload.configure_mock(
**{'readable.return_value': True, 'read.return_value': json_msg})
payload.read.return_value = json_msg
expected_log = {
'message': msg,
'application_type': app_type,
'dimensions': [('cpu_time', '30')],
'dimensions': {
'cpu_time': '30'
},
'path': path
}
@ -278,14 +242,17 @@ class LogsCreatorNewLog(unittest.TestCase):
def test_should_create_log_from_text(self, payload):
msg = u'Hello World'
app_type = 'monasca'
dimensions = 'cpu_time:30'
payload.configure_mock(
**{'readable.return_value': True, 'read.return_value': msg})
dimension_name = 'cpu_time'
dimension_value = 30
dimensions = '%s:%s' % (dimension_name, str(dimension_value))
payload.read.return_value = msg
expected_log = {
'message': msg,
'application_type': app_type,
'dimensions': [('cpu_time', '30')]
'dimensions': {
dimension_name: str(dimension_value)
}
}
self.assertEqual(expected_log, self.instance.new_log(
@ -304,10 +271,14 @@ class LogCreatorNewEnvelope(unittest.TestCase):
msg = u'Hello World'
path = u'/var/log/messages'
app_type = 'monasca'
dimension_name = 'cpu_time'
dimension_value = 30
expected_log = {
'message': msg,
'application_type': app_type,
'dimensions': [('cpu_time', '30')],
'dimensions': {
dimension_name: str(dimension_value)
},
'path': path
}
tenant_id = 'a_tenant'
@ -330,3 +301,38 @@ class LogCreatorNewEnvelope(unittest.TestCase):
actual_envelope.get('log'))
self.assertEqual(expected_envelope.get('meta'),
actual_envelope.get('meta'))
self.assertDictEqual(
expected_envelope.get('log').get('dimensions'),
actual_envelope.get('log').get('dimensions'))
@unittest.expectedFailure
def test_should_not_create_log_none(self):
log_object = None
tenant_id = 'a_tenant'
self.instance.new_log_envelope(log_object, tenant_id)
@unittest.expectedFailure
def test_should_not_create_log_empty(self):
log_object = {}
tenant_id = 'a_tenant'
self.instance.new_log_envelope(log_object, tenant_id)
@unittest.expectedFailure
def test_should_not_create_tenant_none(self):
log_object = {
'message': ''
}
tenant_id = None
self.instance.new_log_envelope(log_object, tenant_id)
@unittest.expectedFailure
def test_should_not_create_tenant_empty(self):
log_object = {
'message': ''
}
tenant_id = ''
self.instance.new_log_envelope(log_object, tenant_id)

View File

@ -13,30 +13,39 @@
# License for the specific language governing permissions and limitations
# under the License.
from monasca_common.kafka import producer
from oslo_config import cfg
from oslo_log import log
import simplejson
from monasca_log_api.publisher import kafka_publisher
import simplejson as json
LOG = log.getLogger(__name__)
CONF = cfg.CONF
log_publisher_opts = [
cfg.StrOpt('kafka_url',
required=True,
help='Url to kafka server'),
cfg.MultiStrOpt('topics',
default=['logs'],
help='Target topic in kafka')
help='Consumer topics')
]
log_publisher_group = cfg.OptGroup(name='log_publisher', title='log_publisher')
cfg.CONF.register_group(log_publisher_group)
cfg.CONF.register_opts(log_publisher_opts, log_publisher_group)
ENVELOPE_SCHEMA = ['log', 'meta', 'creation_time']
"""Log envelope (i.e.) message keys"""
class InvalidMessageException(Exception):
pass
class LogPublisher(object):
def __init__(self):
self._topics = CONF.log_publisher.topics
self._kafka_publisher = kafka_publisher.KafkaPublisher()
self._kafka_publisher = None
LOG.info('Initializing LogPublisher <%s>', self)
@staticmethod
@ -51,16 +60,6 @@ class LogPublisher(object):
:param obj: log instance
:return: key
"""
def comparator(a, b):
"""Comparator for dimensions
:param a: dimension_a, tuple with properties name,value
:param b: dimension_b, tuple with properties name,value
:return: sorting result
"""
if a.name == b.name:
return (a.value > b.value) - (a.value < b.value)
return (a.name > b.name) - (a.name < b.name)
str_list = []
@ -72,29 +71,68 @@ class LogPublisher(object):
str_list += obj['application_type']
if 'dimensions' in obj and obj['dimensions']:
dimensions = sorted(obj['dimensions'], cmp=comparator)
for name, value in dimensions:
dims = obj['dimensions']
sorted_dims = sorted(dims)
for name in sorted_dims:
str_list += name
str_list += str(value)
str_list += str(dims[name])
return ''.join(str_list)
@staticmethod
def _is_message_valid(message):
"""Validates message before sending.
Methods checks if message is :py:class:`dict`.
If so dictionary is verified against having following keys:
* meta
* log
* creation_time
If keys are found, each key must have a value.
If at least none of the conditions is met
:py:class:`.InvalidMessageException` is raised
:raises InvalidMessageException: if message does not comply to schema
"""
if not isinstance(message, dict):
return False
for key in ENVELOPE_SCHEMA:
if not (key in message and message.get(key)):
return False
return True
def _publisher(self):
if not self._kafka_publisher:
self._kafka_publisher = producer.KafkaProducer(
url=CONF.log_publisher.kafka_url
)
return self._kafka_publisher
def send_message(self, message):
if not message:
return
if not self._is_message_valid(message):
raise InvalidMessageException()
key = self._build_key(message['meta']['tenantId'], message['log'])
msg = simplejson.dumps(message,
sort_keys=False,
ensure_ascii=False).encode('utf8')
msg = json.dumps(message,
sort_keys=False,
ensure_ascii=False).encode('utf8')
# TODO(feature) next version of monasca-common
LOG.debug('Build key [%s] for message' % key)
LOG.debug('Sending message {topics=%s,key=%s,message=%s}' %
(self._topics, key, msg))
try:
for topic in self._topics:
self._kafka_publisher.send_message(topic, key, msg)
self._publisher().publish(topic, msg)
except Exception as ex:
LOG.error(ex.message)
raise ex

View File

@ -13,17 +13,15 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import datetime
import re
import falcon
from oslo_config import cfg
from oslo_log import log
import simplejson
from monasca_log_api.api import exceptions
from monasca_log_api.api import logs_api
from monasca_log_api.api import rest_utils
LOG = log.getLogger(__name__)
CONF = cfg.CONF
@ -49,8 +47,11 @@ DIMENSION_NAME_CONSTRAINTS = {
DIMENSION_VALUE_CONSTRAINTS = {
'MAX_LENGTH': 255
}
EPOCH_START = datetime.datetime(1970, 1, 1)
Dimension = collections.namedtuple('Dimensions', ['name', 'value'])
class LogEnvelopeException(Exception):
pass
class Validations(object):
@ -118,16 +119,16 @@ class Validations(object):
value
)
if (isinstance(dimensions, (list, tuple)) and not
if (isinstance(dimensions, dict) and not
isinstance(dimensions, basestring)):
for dim_name, dim_value in dimensions:
for dim_name in dimensions:
validate_name(dim_name)
validate_value(dim_value)
validate_value(dimensions[dim_name])
else:
raise exceptions.HTTPUnprocessableEntity(
'Dimensions %s must be a collections' % dimensions)
'Dimensions %s must be a dictionary (map)' % dimensions)
class LogCreator(object):
@ -135,31 +136,13 @@ class LogCreator(object):
self._log = log.getLogger('service.LogCreator')
self._log.info('Initializing LogCreator')
# noinspection PyMethodMayBeStatic
def _create_meta_info(self, tenant_id):
@staticmethod
def _create_meta_info(tenant_id):
return {
'tenantId': tenant_id,
'region': cfg.CONF.service.region
}
def _read_payload(self, payload, content_type):
try:
content = payload.read()
except Exception as ex:
raise falcon.HTTPBadRequest(title='Failed to read body',
description=ex.message)
if content and content_type == 'application/json':
try:
content = simplejson.loads(content, encoding='utf-8')
except Exception as ex:
raise falcon.HTTPBadRequest(title='Failed to read body as '
'json',
description=ex.message)
return content
def new_log(self,
application_type,
dimensions,
@ -167,7 +150,7 @@ class LogCreator(object):
content_type='application/json',
validate=True):
payload = self._read_payload(payload, content_type)
payload = rest_utils.read_body(payload, content_type)
if not payload:
return None
@ -199,8 +182,15 @@ class LogCreator(object):
return log_object
def new_log_envelope(self, log_object, tenant_id):
timestamp = (datetime.datetime.utcnow() -
datetime.datetime(1970, 1, 1)).total_seconds()
if not log_object:
raise LogEnvelopeException('Envelope cannot be '
'created without log')
if not tenant_id:
raise LogEnvelopeException('Envelope cannot be '
'created without tenant')
timestamp = (datetime.datetime.utcnow() - EPOCH_START).total_seconds()
return {
'log': log_object,
'creation_time': timestamp,
@ -212,7 +202,6 @@ def is_delegate(roles):
if roles:
roles = roles.split(',')
return logs_api.MONITORING_DELEGATE_ROLE in roles
pass
return False
@ -226,7 +215,7 @@ def parse_dimensions(dimensions):
if not dimensions:
raise exceptions.HTTPUnprocessableEntity('Dimension are required')
new_dimensions = []
new_dimensions = {}
dimensions = map(str.strip, dimensions.split(','))
for dim in dimensions:
@ -241,6 +230,6 @@ def parse_dimensions(dimensions):
name = str(dim[0].strip()) if dim[0] else None
value = str(dim[1].strip()) if dim[1] else None
if name and value:
new_dimensions.append(Dimension(name, value))
new_dimensions.update({name: value})
return new_dimensions

View File

@ -15,10 +15,9 @@
import falcon
from oslo_log import log
import simplejson
from monasca_log_api.api import rest_utils
from monasca_log_api.api import versions_api
from monasca_log_api import constants
LOG = log.getLogger(__name__)
VERSIONS = {
@ -50,14 +49,14 @@ class Versions(versions_api.VersionsAPI):
VERSIONS[version]['links'][0]['href'] = (
req.uri.decode('utf8') + version)
result['elements'].append(VERSIONS[version])
res.body = simplejson.dumps(result)
res.body = rest_utils.as_json(result)
res.status = falcon.HTTP_200
@staticmethod
def handle_version_id(req, res, version_id):
if version_id in VERSIONS:
VERSIONS[version_id]['links'][0]['href'] = (
req.uri.decode(constants.ENCODING)
req.uri.decode(rest_utils.ENCODING)
)
for version in VERSIONS:
VERSIONS[version]['links'][0]['href'] = (
@ -67,7 +66,7 @@ class Versions(versions_api.VersionsAPI):
req.uri.decode('utf8') +
VERSIONS[version_id]['links'][1]['href']
)
res.body = simplejson.dumps(VERSIONS[version_id])
res.body = rest_utils.as_json(VERSIONS[version_id])
res.status = falcon.HTTP_200
else:
res.body = 'Invalid Version ID'
@ -77,7 +76,7 @@ class Versions(versions_api.VersionsAPI):
result = {
'links': [{
'rel': 'self',
'href': req.uri.decode(constants.ENCODING)
'href': req.uri.decode(rest_utils.ENCODING)
}],
'elements': []
}

View File

@ -10,7 +10,7 @@ oslo.utils
pastedeploy>=1.3.3
pbr>=1.6.0,<2.0
six>=1.9.0
kafka-python>=0.9.3,<0.9.4
simplejson>=3.8.0
simport
monasca-common>=0.0.2
eventlet>=0.9.7