From f07a38e388fd85ed170b654489b63fe3a0e35ae5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Tr=C4=99bski?= Date: Thu, 12 Nov 2015 12:00:13 +0100 Subject: [PATCH] Fixing issues spotted in dev environment - removed own kafka abstraction in favour of monasca-common - removed monasca keystone context filter, not actually used - changed URI of logs endpoints to /v1.0/log/single Change-Id: Iaceabdce2b2862451cfe63d2a612577d7710022b --- .gitignore | 1 + etc/monasca/log-api-config.conf | 12 +- etc/monasca/log-api-config.ini | 5 +- monasca_log_api/api/rest_utils.py | 49 ++++ monasca_log_api/constants.py | 16 -- monasca_log_api/middleware/__init__.py | 0 monasca_log_api/middleware/context.py | 83 ------ .../middleware/keystone_context_filter.py | 109 -------- monasca_log_api/publisher/__init__.py | 0 monasca_log_api/publisher/exceptions.py | 50 ---- monasca_log_api/publisher/kafka_publisher.py | 220 --------------- monasca_log_api/publisher/publisher.py | 29 -- monasca_log_api/server.py | 2 +- monasca_log_api/tests/test_kafka_publisher.py | 254 ------------------ monasca_log_api/tests/test_log_publisher.py | 154 ++++++++--- monasca_log_api/tests/test_logs.py | 62 ++--- monasca_log_api/tests/test_rest_utils.py | 53 ++++ monasca_log_api/tests/test_service.py | 124 +++++---- monasca_log_api/v1/common/log_publisher.py | 82 ++++-- monasca_log_api/v1/common/service.py | 57 ++-- monasca_log_api/v1/reference/versions.py | 11 +- requirements.txt | 2 +- 22 files changed, 398 insertions(+), 977 deletions(-) create mode 100644 monasca_log_api/api/rest_utils.py delete mode 100644 monasca_log_api/constants.py delete mode 100644 monasca_log_api/middleware/__init__.py delete mode 100644 monasca_log_api/middleware/context.py delete mode 100644 monasca_log_api/middleware/keystone_context_filter.py delete mode 100644 monasca_log_api/publisher/__init__.py delete mode 100644 monasca_log_api/publisher/exceptions.py delete mode 100644 monasca_log_api/publisher/kafka_publisher.py delete mode 100644 monasca_log_api/publisher/publisher.py delete mode 100644 monasca_log_api/tests/test_kafka_publisher.py create mode 100644 monasca_log_api/tests/test_rest_utils.py diff --git a/.gitignore b/.gitignore index 6a106ddc..579cabea 100644 --- a/.gitignore +++ b/.gitignore @@ -12,6 +12,7 @@ cover .tox ChangeLog MANIFEST +AUTHORS monasca.log diff --git a/etc/monasca/log-api-config.conf b/etc/monasca/log-api-config.conf index 7a4cdf3a..7c634ed3 100644 --- a/etc/monasca/log-api-config.conf +++ b/etc/monasca/log-api-config.conf @@ -16,19 +16,9 @@ driver = v1_reference [service] region = 'pl' -[kafka] -client_id = 'monasca-log-api' -timeout = 60 -host = 'localhost:8900' - -[kafka_producer] -batch_send_every_n = 10 -async = True -ack_timeout = 1000 -req_acks = 'Local' - [log_publisher] topics = 'logs' +kafka_url = 'localhost:8900' [keystone_authtoken] identity_uri = http://192.168.10.5:35357 diff --git a/etc/monasca/log-api-config.ini b/etc/monasca/log-api-config.ini index 2220f8ef..c472a94c 100644 --- a/etc/monasca/log-api-config.ini +++ b/etc/monasca/log-api-config.ini @@ -2,7 +2,7 @@ name = monasca_log_api [pipeline:main] -pipeline = auth keystonecontext api +pipeline = auth api [app:api] paste.app_factory = monasca_log_api.server:launch @@ -10,9 +10,6 @@ paste.app_factory = monasca_log_api.server:launch [filter:auth] paste.filter_factory = keystonemiddleware.auth_token:filter_factory -[filter:keystonecontext] -paste.filter_factory = monasca_log_api.middleware.keystone_context_filter:filter_factory - [server:main] use = egg:gunicorn#main host = 127.0.0.1 diff --git a/monasca_log_api/api/rest_utils.py b/monasca_log_api/api/rest_utils.py new file mode 100644 index 00000000..6a9cf2b2 --- /dev/null +++ b/monasca_log_api/api/rest_utils.py @@ -0,0 +1,49 @@ +# Copyright 2015 kornicameister@gmail.com +# Copyright 2015 FUJITSU LIMITED +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import falcon +import simplejson as json + +ENCODING = 'utf8' + + +def read_body(payload, content_type='application/json'): + try: + content = payload.read() + if not content: + return False + except Exception as ex: + raise falcon.HTTPBadRequest(title='Failed to read body', + description=ex.message) + + if content_type == 'application/json': + try: + content = from_json(content) + except Exception as ex: + raise falcon.HTTPBadRequest(title='Failed to read body as json', + description=ex.message) + + return content + + +def as_json(data): + return json.dumps(data, + encoding=ENCODING, + sort_keys=False, + ensure_ascii=False) + + +def from_json(data): + return json.loads(data, encoding=ENCODING) diff --git a/monasca_log_api/constants.py b/monasca_log_api/constants.py deleted file mode 100644 index 353f7ad2..00000000 --- a/monasca_log_api/constants.py +++ /dev/null @@ -1,16 +0,0 @@ -# Copyright 2015 kornicameister@gmail.com -# Copyright 2015 FUJITSU LIMITED -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -ENCODING = 'utf8' diff --git a/monasca_log_api/middleware/__init__.py b/monasca_log_api/middleware/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/monasca_log_api/middleware/context.py b/monasca_log_api/middleware/context.py deleted file mode 100644 index 5cf4a874..00000000 --- a/monasca_log_api/middleware/context.py +++ /dev/null @@ -1,83 +0,0 @@ -# Copyright (c) 2015 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""RequestContext: context for requests that persist through monasca.""" - -import uuid - -from oslo_log import log -from oslo_utils import timeutils - -LOG = log.getLogger(__name__) - - -class RequestContext(object): - """Security context and request information. - - Represents the user taking a given action within the system. - - """ - - def __init__(self, user_id, project_id, domain_id=None, domain_name=None, - roles=None, timestamp=None, request_id=None, - auth_token=None, user_name=None, project_name=None, - service_catalog=None, user_auth_plugin=None, **kwargs): - """Creates the Keystone Context. Supports additional parameters: - - :param user_auth_plugin: - The auth plugin for the current request's authentication data. - :param kwargs: - Extra arguments that might be present - """ - if kwargs: - LOG.warning( - 'Arguments dropped when creating context: %s') % str(kwargs) - - self._roles = roles or [] - self.timestamp = timeutils.utcnow() - - if not request_id: - request_id = self.generate_request_id() - self._request_id = request_id - self._auth_token = auth_token - - self._service_catalog = service_catalog - - self._domain_id = domain_id - self._domain_name = domain_name - - self._user_id = user_id - self._user_name = user_name - - self._project_id = project_id - self._project_name = project_name - - self._user_auth_plugin = user_auth_plugin - - def to_dict(self): - return {'user_id': self._user_id, - 'project_id': self._project_id, - 'domain_id': self._domain_id, - 'domain_name': self._domain_name, - 'roles': self._roles, - 'timestamp': timeutils.strtime(self._timestamp), - 'request_id': self._request_id, - 'auth_token': self._auth_token, - 'user_name': self._user_name, - 'service_catalog': self._service_catalog, - 'project_name': self._project_name, - 'user': self._user} - - def generate_request_id(self): - return b'req-' + str(uuid.uuid4()).encode('ascii') diff --git a/monasca_log_api/middleware/keystone_context_filter.py b/monasca_log_api/middleware/keystone_context_filter.py deleted file mode 100644 index 13e4a822..00000000 --- a/monasca_log_api/middleware/keystone_context_filter.py +++ /dev/null @@ -1,109 +0,0 @@ -# Copyright (c) 2015 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import falcon -from oslo_log import log -from oslo_middleware import request_id -from oslo_serialization import jsonutils - -from monasca_log_api.middleware import context - -LOG = log.getLogger(__name__) - - -def filter_factory(global_conf, **local_conf): - def validator_filter(app): - return KeystoneContextFilter(app, local_conf) - - return validator_filter - - -class KeystoneContextFilter(object): - """Make a request context from keystone headers.""" - - def __init__(self, app, conf): - self._app = app - self._conf = conf - - def __call__(self, env, start_response): - - LOG.debug("Creating Keystone Context Object.") - - user_id = env.get('HTTP_X_USER_ID', env.get('HTTP_X_USER')) - if user_id is None: - msg = "Neither X_USER_ID nor X_USER found in request" - LOG.error(msg) - raise falcon.HTTPUnauthorized(title='Forbidden', description=msg) - - roles = self._get_roles(env) - - project_id = env.get('HTTP_X_PROJECT_ID') - project_name = env.get('HTTP_X_PROJECT_NAME') - - domain_id = env.get('HTTP_X_DOMAIN_ID') - domain_name = env.get('HTTP_X_DOMAIN_NAME') - - user_name = env.get('HTTP_X_USER_NAME') - - req_id = env.get(request_id.ENV_REQUEST_ID) - - # Get the auth token - auth_token = env.get('HTTP_X_AUTH_TOKEN', - env.get('HTTP_X_STORAGE_TOKEN')) - - service_catalog = None - if env.get('HTTP_X_SERVICE_CATALOG') is not None: - try: - catalog_header = env.get('HTTP_X_SERVICE_CATALOG') - service_catalog = jsonutils.loads(catalog_header) - except ValueError: - msg = "Invalid service catalog json." - LOG.error(msg) - raise falcon.HTTPInternalServerError(msg) - - # NOTE(jamielennox): This is a full auth plugin set by auth_token - # middleware in newer versions. - user_auth_plugin = env.get('keystone.token_auth') - - # Build a context - ctx = context.RequestContext(user_id, - project_id, - user_name=user_name, - project_name=project_name, - domain_id=domain_id, - domain_name=domain_name, - roles=roles, - auth_token=auth_token, - service_catalog=service_catalog, - request_id=req_id, - user_auth_plugin=user_auth_plugin) - - env['monasca.context'] = ctx - - LOG.debug("Keystone Context successfully created.") - - return self._app(env, start_response) - - def _get_roles(self, env): - """Get the list of roles.""" - - if 'HTTP_X_ROLES' in env: - roles = env.get('HTTP_X_ROLES', '') - else: - # Fallback to deprecated role header: - roles = env.get('HTTP_X_ROLE', '') - if roles: - LOG.warning( - 'Sourcing roles from deprecated X-Role HTTP header') - return [r.strip() for r in roles.split(',')] diff --git a/monasca_log_api/publisher/__init__.py b/monasca_log_api/publisher/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/monasca_log_api/publisher/exceptions.py b/monasca_log_api/publisher/exceptions.py deleted file mode 100644 index c66ce68a..00000000 --- a/monasca_log_api/publisher/exceptions.py +++ /dev/null @@ -1,50 +0,0 @@ -# Copyright 2015 kornicameister@gmail.com -# Copyright 2015 FUJITSU LIMITED -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - - -class ExceptionWrapper(Exception): - """Wrapper around exception with custom message. - - ExceptionWrapper provides a way to keep old - exceptions but providing to describe the context of error - occurrence. - - """ - def __init__(self, message, caught=None): - self._message = message - self._caught = caught - - @property - def caught(self): - return self._caught - - @property - def message(self): - return self._message - - def __str__(self): - return '%s ' % ( - self.__class__.__name__, - repr(self.message), - repr(self.caught) - ) - - -class PublisherInitException(ExceptionWrapper): - pass - - -class MessageQueueException(ExceptionWrapper): - pass diff --git a/monasca_log_api/publisher/kafka_publisher.py b/monasca_log_api/publisher/kafka_publisher.py deleted file mode 100644 index e63c2b78..00000000 --- a/monasca_log_api/publisher/kafka_publisher.py +++ /dev/null @@ -1,220 +0,0 @@ -# Copyright 2015 kornicameister@gmail.com -# Copyright 2015 FUJITSU LIMITED -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import time - -from kafka import client -from kafka import common -from kafka import conn -from kafka import producer -from kafka import protocol -from oslo_config import cfg -from oslo_log import log -import simport - -from monasca_log_api.publisher import exceptions -from monasca_log_api.publisher import publisher - -LOG = log.getLogger(__name__) -CONF = cfg.CONF - -ACK_MAP = { - 'None': producer.KeyedProducer.ACK_NOT_REQUIRED, - 'Local': producer.KeyedProducer.ACK_AFTER_LOCAL_WRITE, - 'Server': producer.KeyedProducer.ACK_AFTER_CLUSTER_COMMIT -} - -kafka_opts = [ - cfg.StrOpt('client_id', - default=None, - help='Client Id', - required=True), - cfg.IntOpt('timeout', - default=conn.DEFAULT_SOCKET_TIMEOUT_SECONDS, - help='Socket timeout'), - cfg.StrOpt('host', - default=None, - help='List of hosts, comma delimited', - required=True) -] -kafka_group = cfg.OptGroup(name='kafka', title='kafka') - -kafka_producer_opts = [ - cfg.IntOpt('batch_send_every_n', - default=None, - help='Send every n items'), - cfg.IntOpt('batch_send_every_t', - default=None, - help='Send every n seconds'), - cfg.BoolOpt('async', - default=False, - help='Async communication'), - cfg.StrOpt('reg_acks', - default='None', - help='Acknowledge options'), - cfg.StrOpt('partitioner', - default=None, - help='Partitioner algorithm') -] -kafka_producer_group = cfg.OptGroup(name='kafka_producer', - title='kafka_producer') - -CONF.register_group(kafka_group) -CONF.register_opts(kafka_opts, kafka_group) - -CONF.register_group(kafka_producer_group) -CONF.register_opts(kafka_producer_opts, kafka_producer_group) - - -class KafkaPublisher(publisher.Publisher): - def __init__(self, max_retry=3, wait_time=None): - self._producer_conf = None - self._client_conf = None - - self._producer = None - self._client = None - - self._max_retry = max_retry - self._wait_time = wait_time - - LOG.info('Initializing KafkaPublisher <%s>' % self) - - def _get_client_conf(self): - if self._client_conf: - return self._client_conf - - client_conf = { - 'hosts': CONF.kafka.host, - 'client_id': CONF.kafka.client_id, - 'timeout': CONF.kafka.timeout - } - - self._client_conf = client_conf - - return client_conf - - def _get_producer_conf(self): - if self._producer_conf: - return self._producer_conf - - batch_send_every_n = CONF.kafka_producer.batch_send_every_n - batch_send_every_t = CONF.kafka_producer.batch_send_every_t - partitioner = CONF.kafka_producer.partitioner - - producer_conf = { - 'codec': protocol.CODEC_GZIP, - 'batch_send': batch_send_every_n or batch_send_every_t, - 'async': CONF.kafka_producer.async, - 'reg_acks': ACK_MAP[CONF.kafka_producer.reg_acks] - } - if batch_send_every_t: - producer_conf.update( - {'batch_send_every_t': batch_send_every_t}) - if batch_send_every_n: - producer_conf.update( - {'batch_send_every_n': batch_send_every_n}) - if partitioner: - partitioner = simport.load(partitioner) - producer_conf.update({'partitioner': partitioner}) - - self._producer_conf = producer_conf - - return producer_conf - - def _init_client(self): - if self._client: - self._client.close() - self._producer = None - self._client = None - - client_opts = self._get_client_conf() - - for i in range(self._max_retry): - kafka_host = client_opts['hosts'] - try: - self._client = client.KafkaClient( - client_opts['hosts'], - client_opts['client_id'], - client_opts['timeout'] - ) - if self._wait_time: - time.sleep(self._wait_time) - break - except common.KafkaUnavailableError as ex: - LOG.error('Server is down at ' % kafka_host) - err = ex - except common.LeaderNotAvailableError as ex: - LOG.error('No leader at .' % kafka_host) - err = ex - except Exception as ex: - LOG.error('Initialization failed at .' % kafka_host) - err = ex - - if err: - raise err - - def _init_producer(self): - try: - if not self._client: - self._init_client() - - producer_opts = self._get_producer_conf() - producer_opts.update({'client': self._client}) - - self._producer = producer.KeyedProducer(*producer_opts) - if self._wait_time: - time.sleep(self._wait_time) - except Exception as ex: - self._producer = None - LOG.exception(ex.message, exc_info=1) - raise exceptions.PublisherInitException( - message='KeyedProducer can not be created at "' - % self._client_conf['host'], - caught=ex) - - def send_message(self, topic, key, message): - if not message or not key or not topic: - return - try: - if not self._producer: - self._init_producer() - - return self._producer.send(topic, key, message) - except (common.KafkaUnavailableError, - common.LeaderNotAvailableError) as ex: - self._client = None - LOG.error(ex.message, exc_info=1) - raise exceptions.MessageQueueException( - message='Failed to post message to kafka', - caught=ex - ) - except Exception as ex: - LOG.error(ex.message, exc_info=1) - raise exceptions.MessageQueueException( - message='Unknown error while sending message to kafka', - caught=ex - ) - - # TODO(question) How to ensure that connection will be closed when program - # stops ? - def close(self): - if self._client: - self._producer = None - self._client.close() - - def __repr__(self): - return 'KafkaPublisher ' % ( - self._client_conf['hosts'] if self._client_conf else None - ) diff --git a/monasca_log_api/publisher/publisher.py b/monasca_log_api/publisher/publisher.py deleted file mode 100644 index b1be8b08..00000000 --- a/monasca_log_api/publisher/publisher.py +++ /dev/null @@ -1,29 +0,0 @@ -# Copyright 2015 kornicameister@gmail.com -# Copyright 2015 FUJITSU LIMITED -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import abc - -import six - - -@six.add_metaclass(abc.ABCMeta) -class Publisher(object): - @abc.abstractmethod - def send_message(self, *args, **kwargs): - return - - @abc.abstractmethod - def close(self): - return diff --git a/monasca_log_api/server.py b/monasca_log_api/server.py index c0cd48dd..638a6eee 100644 --- a/monasca_log_api/server.py +++ b/monasca_log_api/server.py @@ -61,7 +61,7 @@ def launch(conf, config_file='/etc/monasca/log-api-config.conf'): def load_logs_resource(app): logs = simport.load(CONF.dispatcher.logs)() - app.add_route('/v1.0/logs/single', logs) + app.add_route('/v1.0/log/single', logs) def load_versions_resource(app): diff --git a/monasca_log_api/tests/test_kafka_publisher.py b/monasca_log_api/tests/test_kafka_publisher.py deleted file mode 100644 index 1b740acb..00000000 --- a/monasca_log_api/tests/test_kafka_publisher.py +++ /dev/null @@ -1,254 +0,0 @@ -# Copyright 2015 kornicameister@gmail.com -# Copyright 2015 FUJITSU LIMITED -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import unittest - -from falcon import testing -from kafka import common -import mock - -from monasca_log_api.publisher import exceptions -from monasca_log_api.publisher import kafka_publisher as publisher -from monasca_log_api.tests import base as base_test - -TOPIC = 'test' - - -class MockKafkaClient(object): - pass - - -class TestConfiguration(testing.TestBase): - def setUp(self): - self.conf = base_test.mock_config(self) - self.instance = publisher.KafkaPublisher() - super(TestConfiguration, self).setUp() - - def test_should_not_have_client_conf_at_begin(self): - self.assertIsNone(self.instance._client_conf) - - def test_should_not_have_producer_conf_at_begin(self): - self.assertIsNone(self.instance._producer_conf) - - def test_should_init_producer_conf(self): - self.instance._get_producer_conf() - self.assertIsNotNone(self.instance._producer_conf) - - def test_should_init_client_conf(self): - self.instance._get_client_conf() - self.assertIsNotNone(self.instance._client_conf) - - -class TestInitialization(testing.TestBase): - def setUp(self): - self.conf = base_test.mock_config(self) - super(TestInitialization, self).setUp() - - def test_should_have_init_client_not_set(self): - instance = publisher.KafkaPublisher() - self.assertIsNone(instance._client) - - def test_should_have_init_producer_not_set(self): - instance = publisher.KafkaPublisher() - self.assertIsNone(instance._producer) - - @mock.patch('monasca_log_api.publisher.kafka_publisher.client.KafkaClient', - side_effect=[common.KafkaUnavailableError]) - def test_client_should_fail_kafka_unavailable(self, kafka_client): - instance = publisher.KafkaPublisher() - self.assertRaises( - common.KafkaUnavailableError, - instance._init_client - ) - - @mock.patch('monasca_log_api.publisher.kafka_publisher.client.KafkaClient', - side_effect=[common.LeaderNotAvailableError]) - def test_client_should_fail_leader_unavailable(self, kafka_client): - instance = publisher.KafkaPublisher() - self.assertRaises( - common.LeaderNotAvailableError, - instance._init_client - ) - - @mock.patch('monasca_log_api.publisher.kafka_publisher.client.KafkaClient', - side_effect=[ValueError]) - def test_client_should_fail_other_error(self, kafka_client): - instance = publisher.KafkaPublisher() - self.assertRaises( - ValueError, - instance._init_client - ) - - @mock.patch('monasca_log_api.publisher.kafka_publisher.client.KafkaClient', - autospec=True) - def test_client_should_initialize(self, kafka_client): - client_id = 'mock_client' - timeout = 3600 - hosts = 'localhost:666' - - self.conf.config( - client_id=client_id, - timeout=timeout, - host=hosts, - group='kafka' - ) - - instance = publisher.KafkaPublisher() - instance._init_client() - - self.assertIsNotNone(instance._client) - kafka_client.assert_called_with(hosts, client_id, timeout) - - @mock.patch('monasca_log_api.publisher.kafka_publisher' - '.producer.KeyedProducer', - side_effect=[ValueError]) - def test_producer_should_fail_any_error(self, producer): - instance = publisher.KafkaPublisher() - instance._client = MockKafkaClient() - instance._client_conf = { - 'host': 'localhost' - } - - self.assertRaises( - exceptions.PublisherInitException, - instance._init_producer - ) - - @mock.patch('monasca_log_api.publisher.kafka_publisher' - '.producer.KeyedProducer', - autospec=True) - def test_producer_should_initialize(self, producer): - instance = publisher.KafkaPublisher() - client = MockKafkaClient() - - instance._client = client - instance._get_producer_conf = mock.Mock(return_value={}) - - instance._init_producer() - - self.assertIsNotNone(instance._producer) - self.assertTrue(producer.called) - - -class TestLogic(unittest.TestCase): - @mock.patch('monasca_log_api.publisher.kafka_publisher' - '.producer.KeyedProducer', - autospec=True) - def test_should_not_call_producer_for_empty_key(self, producer): - producer.send.return_value = None - - instance = publisher.KafkaPublisher() - instance._producer = producer - - instance.send_message(TOPIC, None, 'msg') - - self.assertFalse(producer.send.called) - - @mock.patch('monasca_log_api.publisher.kafka_publisher' - '.producer.KeyedProducer', - autospec=True) - def test_should_not_call_producer_for_empty_message(self, producer): - producer.send.return_value = None - - instance = publisher.KafkaPublisher() - instance._producer = producer - - instance.send_message(TOPIC, 'key', None) - - self.assertFalse(producer.send.called) - - @mock.patch('monasca_log_api.publisher.kafka_publisher' - '.producer.KeyedProducer', - autospec=True) - def test_should_not_call_producer_for_empty_topic(self, producer): - producer.send.return_value = None - - instance = publisher.KafkaPublisher() - instance._producer = producer - - instance.send_message(None, 'key', 'msg') - - self.assertFalse(producer.send.called) - - @mock.patch('monasca_log_api.publisher.kafka_publisher' - '.producer.KeyedProducer', - autospec=True) - def test_should_fail_kafka_not_available(self, producer): - producer.send.side_effect = [common.KafkaUnavailableError] - - instance = publisher.KafkaPublisher() - instance._producer = producer - instance._client = mock.Mock('client') - - with self.assertRaises(exceptions.MessageQueueException) as context: - instance.send_message('a', 'b', 'c') - - self.assertEqual('Failed to post message to kafka', - context.exception.message) - self.assertIsInstance(context.exception.caught, - common.KafkaUnavailableError) - self.assertIsNone(instance._client) - - @mock.patch('monasca_log_api.publisher.kafka_publisher' - '.producer.KeyedProducer', - autospec=True) - def test_should_fail_leader_not_available(self, producer): - producer.send.side_effect = [common.LeaderNotAvailableError] - - instance = publisher.KafkaPublisher() - instance._producer = producer - instance._client = mock.Mock('client') - - with self.assertRaises(exceptions.MessageQueueException) as context: - instance.send_message('a', 'b', 'c') - - self.assertEqual('Failed to post message to kafka', - context.exception.message) - self.assertIsInstance(context.exception.caught, - common.LeaderNotAvailableError) - self.assertIsNone(instance._client) - - @mock.patch('monasca_log_api.publisher.kafka_publisher' - '.producer.KeyedProducer', - autospec=True) - def test_should_fail_any_error(self, producer): - producer.send.side_effect = [Exception] - - instance = publisher.KafkaPublisher() - instance._producer = producer - - with self.assertRaises(exceptions.MessageQueueException) as context: - instance.send_message('a', 'b', 'c') - - self.assertEqual('Unknown error while sending message to kafka', - context.exception.message) - self.assertIsInstance(context.exception.caught, - Exception) - - @mock.patch('monasca_log_api.publisher.kafka_publisher' - '.producer.KeyedProducer', - autospec=True) - def test_should_send_message(self, producer): - producer.send.return_value = None - - instance = publisher.KafkaPublisher() - instance._producer = producer - - msg = 'msg' - key = 'key' - - instance.send_message(TOPIC, key, msg) - - producer.send.assert_called_once_with(TOPIC, key, msg) diff --git a/monasca_log_api/tests/test_log_publisher.py b/monasca_log_api/tests/test_log_publisher.py index 855be770..43545c80 100644 --- a/monasca_log_api/tests/test_log_publisher.py +++ b/monasca_log_api/tests/test_log_publisher.py @@ -13,16 +13,19 @@ # License for the specific language governing permissions and limitations # under the License. +import datetime +import random import unittest from falcon import testing import mock import simplejson -from monasca_log_api.publisher import exceptions from monasca_log_api.tests import base from monasca_log_api.v1.common import log_publisher -from monasca_log_api.v1.common import service + + +EPOCH_START = datetime.datetime(1970, 1, 1) class TestBuildKey(unittest.TestCase): @@ -60,13 +63,16 @@ class TestBuildKey(unittest.TestCase): # application_type and single dimension tenant_id = 'monasca' application_type = 'monasca-log-api' - dimension = service.Dimension('cpu_time', 50) + dimension_name = 'cpu_time' + dimension_value = '50' log_object = { 'application_type': application_type, - 'dimensions': [dimension] + 'dimensions': { + dimension_name: dimension_value + } } - expected_key = tenant_id + application_type + dimension.name + str( - dimension.value) + expected_key = tenant_id + application_type + dimension_name + str( + dimension_value) self.assertEqual(expected_key, log_publisher.LogPublisher._build_key(tenant_id, @@ -77,15 +83,20 @@ class TestBuildKey(unittest.TestCase): # application_type and two dimensions dimensions given unsorted tenant_id = 'monasca' application_type = 'monasca-log-api' - dimension_1 = service.Dimension('disk_usage', 50) - dimension_2 = service.Dimension('cpu_time', 50) + dimension_1_name = 'disk_usage' + dimension_1_value = '50' + dimension_2_name = 'cpu_time' + dimension_2_value = '60' log_object = { 'application_type': application_type, - 'dimensions': [dimension_1, dimension_2] + 'dimensions': { + dimension_1_name: dimension_1_value, + dimension_2_name: dimension_2_value + } } - expected_key = ''.join([tenant_id, application_type, dimension_2.name, - str(dimension_2.value), dimension_1.name, - str(dimension_1.value)]) + expected_key = ''.join([tenant_id, application_type, + dimension_2_name, dimension_2_value, + dimension_1_name, dimension_1_value]) self.assertEqual(expected_key, log_publisher.LogPublisher._build_key(tenant_id, @@ -97,44 +108,92 @@ class TestSendMessage(testing.TestBase): self.conf = base.mock_config(self) return super(TestSendMessage, self).setUp() - def test_should_not_send_empty_message(self): + @mock.patch('monasca_log_api.v1.common.log_publisher.producer' + '.KafkaProducer') + def test_should_not_send_empty_message(self, _): instance = log_publisher.LogPublisher() - instance._kafka_publisher.send_message = mock.Mock() + instance._kafka_publisher = mock.Mock() instance.send_message({}) - self.assertFalse(instance._kafka_publisher.send_message.called) + self.assertFalse(instance._kafka_publisher.publish.called) - def test_should_raise_exception(self): + @unittest.expectedFailure + def test_should_not_send_message_not_dict(self): instance = log_publisher.LogPublisher() - instance._kafka_publisher.send_message = mock.Mock( - side_effect=[exceptions.MessageQueueException(1, 1)] - ) + not_dict_value = 123 + instance.send_message(not_dict_value) - msg = { + def test_should_not_send_message_missing_keys(self): + # checks every combination of missing keys + # test does not rely on those keys having a value or not, + # it simply assumes that values are set but important + # message (i.e. envelope) properties are missing entirely + # that's why there are two loops instead of three + + instance = log_publisher.LogPublisher() + keys = ['log', 'creation_time', 'meta'] + + for key_1 in keys: + diff = keys[:] + diff.remove(key_1) + for key_2 in diff: + message = { + key_1: random.randint(10, 20), + key_2: random.randint(30, 50) + } + self.assertRaises(log_publisher.InvalidMessageException, + instance.send_message, + message) + + def test_should_not_send_message_missing_values(self): + # original message assumes that every property has value + # test modify each property one by one by removing that value + # (i.e. creating false-like value) + instance = log_publisher.LogPublisher() + message = { 'log': { - 'message': 1 + 'message': '11' }, + 'creation_time': 123456, 'meta': { - 'tenantId': 1 + 'region': 'pl' } } - self.assertRaises(exceptions.MessageQueueException, - instance.send_message, msg) - def test_should_send_message(self): + for key in message: + tmp_message = message + tmp_message[key] = None + self.assertRaises(log_publisher.InvalidMessageException, + instance.send_message, + tmp_message) + + @mock.patch('monasca_log_api.v1.common.log_publisher.producer' + '.KafkaProducer') + def test_should_send_message(self, _): instance = log_publisher.LogPublisher() - instance._kafka_publisher.send_message = mock.Mock(name='send_message', - return_value={}) + instance._kafka_publisher = mock.Mock() + instance.send_message({}) instance._build_key = mock.Mock(name='_build_key', return_value='some_key') + + creation_time = ((datetime.datetime.utcnow() - EPOCH_START) + .total_seconds()) + application_type = 'monasca-log-api' + dimension_1_name = 'disk_usage' + dimension_1_value = '50' + dimension_2_name = 'cpu_time' + dimension_2_value = '60' msg = { 'log': { 'message': 1, - 'application_type': 'monasca_log_api', - 'dimensions': [service.Dimension('disk_usage', 50), - service.Dimension('cpu_time', 50)] + 'application_type': application_type, + 'dimensions': { + dimension_1_name: dimension_1_value, + dimension_2_name: dimension_2_value + } }, + 'creation_time': creation_time, 'meta': { 'tenantId': 1 } @@ -142,27 +201,40 @@ class TestSendMessage(testing.TestBase): instance.send_message(msg) - instance._kafka_publisher.send_message.assert_called_once_with( + instance._kafka_publisher.publish.assert_called_once_with( self.conf.conf.log_publisher.topics[0], - 'some_key', + # 'some_key', # TODO(feature) next version of monasca-common simplejson.dumps(msg)) - def test_should_send_message_multiple_topics(self): + @mock.patch('monasca_log_api.v1.common.log_publisher.producer' + '.KafkaProducer') + def test_should_send_message_multiple_topics(self, _): topics = ['logs', 'analyzer', 'tester'] self.conf.config(topics=topics, group='log_publisher') instance = log_publisher.LogPublisher() - instance._kafka_publisher.send_message = mock.Mock(name='send_message', - return_value={}) + instance._kafka_publisher = mock.Mock() + instance.send_message({}) instance._build_key = mock.Mock(name='_build_key', return_value='some_key') + + creation_time = ((datetime.datetime.utcnow() - EPOCH_START) + .total_seconds()) + dimension_1_name = 'disk_usage' + dimension_1_value = '50' + dimension_2_name = 'cpu_time' + dimension_2_value = '60' + application_type = 'monasca-log-api' msg = { 'log': { 'message': 1, - 'application_type': 'monasca_log_api', - 'dimensions': [service.Dimension('disk_usage', 50), - service.Dimension('cpu_time', 50)] + 'application_type': application_type, + 'dimensions': { + dimension_1_name: dimension_1_value, + dimension_2_name: dimension_2_value + } }, + 'creation_time': creation_time, 'meta': { 'tenantId': 1 } @@ -172,9 +244,9 @@ class TestSendMessage(testing.TestBase): instance.send_message(msg) self.assertEqual(len(topics), - instance._kafka_publisher.send_message.call_count) + instance._kafka_publisher.publish.call_count) for topic in topics: - instance._kafka_publisher.send_message.assert_any_call( + instance._kafka_publisher.publish.assert_any_call( topic, - 'some_key', + # 'some_key', # TODO(feature) next version of monasca-common json_msg) diff --git a/monasca_log_api/tests/test_logs.py b/monasca_log_api/tests/test_logs.py index 035a75a1..c78759ae 100644 --- a/monasca_log_api/tests/test_logs.py +++ b/monasca_log_api/tests/test_logs.py @@ -29,13 +29,13 @@ class TestLogs(testing.TestBase): self.conf = base.mock_config(self) self.logs_resource = logs.Logs() self.api.add_route( - '/logs/single', + '/log/single', self.logs_resource ) def test_should_fail_not_delegate_ok_cross_tenant_id(self): self.simulate_request( - '/logs/single', + '/log/single', method='POST', query_string='tenant_id=1', headers={ @@ -45,19 +45,15 @@ class TestLogs(testing.TestBase): self.assertEqual(falcon.HTTP_403, self.srmock.status) @mock.patch('monasca_log_api.v1.common.service.LogCreator') - @mock.patch('monasca_log_api.publisher.kafka_publisher.KafkaPublisher') + @mock.patch('monasca_log_api.v1.common.log_publisher.LogPublisher') def test_should_pass_empty_cross_tenant_id_wrong_role(self, log_creator, kafka_publisher): - log_creator.configure_mock(**{'new_log.return_value': None, - 'new_log_envelope.return_value': None}) - kafka_publisher.configure_mock(**{'send_message.return_value': None}) - self.logs_resource._log_creator = log_creator self.logs_resource._kafka_publisher = kafka_publisher self.simulate_request( - '/logs/single', + '/log/single', method='POST', headers={ headers.X_ROLES.name: 'some_role', @@ -72,19 +68,15 @@ class TestLogs(testing.TestBase): self.assertEqual(1, log_creator.new_log_envelope.call_count) @mock.patch('monasca_log_api.v1.common.service.LogCreator') - @mock.patch('monasca_log_api.publisher.kafka_publisher.KafkaPublisher') + @mock.patch('monasca_log_api.v1.common.log_publisher.LogPublisher') def test_should_pass_empty_cross_tenant_id_ok_role(self, log_creator, kafka_publisher): - log_creator.configure_mock(**{'new_log.return_value': None, - 'new_log_envelope.return_value': None}) - kafka_publisher.configure_mock(**{'send_message.return_value': None}) - self.logs_resource._log_creator = log_creator self.logs_resource._kafka_publisher = kafka_publisher self.simulate_request( - '/logs/single', + '/log/single', method='POST', headers={ headers.X_ROLES.name: logs_api.MONITORING_DELEGATE_ROLE, @@ -99,19 +91,15 @@ class TestLogs(testing.TestBase): self.assertEqual(1, log_creator.new_log_envelope.call_count) @mock.patch('monasca_log_api.v1.common.service.LogCreator') - @mock.patch('monasca_log_api.publisher.kafka_publisher.KafkaPublisher') + @mock.patch('monasca_log_api.v1.common.log_publisher.LogPublisher') def test_should_pass_delegate_cross_tenant_id_ok_role(self, log_creator, - kafka_publisher): - log_creator.configure_mock(**{'new_log.return_value': None, - 'new_log_envelope.return_value': None}) - kafka_publisher.configure_mock(**{'send_message.return_value': None}) - + log_publisher): self.logs_resource._log_creator = log_creator - self.logs_resource._kafka_publisher = kafka_publisher + self.logs_resource._kafka_publisher = log_publisher self.simulate_request( - '/logs/single', + '/log/single', method='POST', query_string='tenant_id=1', headers={ @@ -122,28 +110,28 @@ class TestLogs(testing.TestBase): ) self.assertEqual(falcon.HTTP_204, self.srmock.status) - self.assertEqual(1, kafka_publisher.send_message.call_count) + self.assertEqual(1, log_publisher.send_message.call_count) self.assertEqual(1, log_creator.new_log.call_count) self.assertEqual(1, log_creator.new_log_envelope.call_count) - def test_should_fail_empty_dimensions_delegate(self): - with mock.patch.object(self.logs_resource._log_creator, - '_read_payload', - return_value=True): - self.simulate_request( - '/logs/single', - method='POST', - headers={ - headers.X_ROLES.name: logs_api.MONITORING_DELEGATE_ROLE, - headers.X_DIMENSIONS.name: '', - 'Content-Type': 'application/json' - } - ) + @mock.patch('monasca_log_api.v1.common.service.rest_utils') + def test_should_fail_empty_dimensions_delegate(self, rest_utils): + rest_utils.read_body.return_value = True + + self.simulate_request( + '/log/single', + method='POST', + headers={ + headers.X_ROLES.name: logs_api.MONITORING_DELEGATE_ROLE, + headers.X_DIMENSIONS.name: '', + 'Content-Type': 'application/json' + } + ) self.assertEqual(log_api_exceptions.HTTP_422, self.srmock.status) def test_should_fail_for_invalid_content_type(self): self.simulate_request( - '/logs/single', + '/log/single', method='POST', headers={ headers.X_ROLES.name: logs_api.MONITORING_DELEGATE_ROLE, diff --git a/monasca_log_api/tests/test_rest_utils.py b/monasca_log_api/tests/test_rest_utils.py new file mode 100644 index 00000000..bc69d4f6 --- /dev/null +++ b/monasca_log_api/tests/test_rest_utils.py @@ -0,0 +1,53 @@ +# Copyright 2015 kornicameister@gmail.com +# Copyright 2015 FUJITSU LIMITED +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import unittest + +import falcon +import mock +import simplejson + +from monasca_log_api.api import rest_utils as ru + + +class RestUtilsTest(unittest.TestCase): + + @mock.patch('io.IOBase') + def test_should_read_text_for_plain_text(self, payload): + raw_msg = 'Hello World' + msg = u''.join(raw_msg) + + payload.read.return_value = raw_msg + + self.assertEqual(msg, ru.read_body(payload, 'text/plain')) + + @mock.patch('io.IOBase') + def test_should_read_json_for_application_json(self, payload): + raw_msg = u'{"path":"/var/log/messages","message":"This is message"}' + json_msg = simplejson.loads(raw_msg, encoding='utf-8') + + payload.read.return_value = raw_msg + + self.assertEqual(json_msg, ru.read_body(payload, 'application/json')) + + @mock.patch('io.IOBase') + def test_should_fail_read_text_for_application_json(self, payload): + with self.assertRaises(falcon.HTTPBadRequest) as context: + raw_msg = 'Hello World' + payload.read.return_value = raw_msg + ru.read_body(payload, 'application/json') + + self.assertEqual(context.exception.title, + 'Failed to read body as json') diff --git a/monasca_log_api/tests/test_service.py b/monasca_log_api/tests/test_service.py index 4318d47e..29cfe66a 100644 --- a/monasca_log_api/tests/test_service.py +++ b/monasca_log_api/tests/test_service.py @@ -16,10 +16,8 @@ import datetime import unittest -import falcon from falcon import testing import mock -import simplejson from monasca_log_api.api import exceptions from monasca_log_api.api import logs_api @@ -64,9 +62,12 @@ class ParseDimensions(unittest.TestCase): def test_should_pass_for_valid_dimensions(self): dimensions = 'a:1,b:2' - expected = [('a', '1'), ('b', '2')] + expected = { + 'a': '1', + 'b': '2' + } - self.assertListEqual(expected, + self.assertDictEqual(expected, common_service.parse_dimensions(dimensions)) @@ -138,10 +139,10 @@ class DimensionsValidations(unittest.TestCase): common_service.Validations.validate_dimensions(1) def test_should_pass_for_empty_dimensions_array(self): - common_service.Validations.validate_dimensions([]) + common_service.Validations.validate_dimensions({}) def test_should_fail_too_empty_name(self): - dimensions = [('', 1)] + dimensions = {'': 1} with self.assertRaises(exceptions.HTTPUnprocessableEntity) as context: common_service.Validations.validate_dimensions(dimensions) @@ -150,7 +151,7 @@ class DimensionsValidations(unittest.TestCase): def test_should_fail_too_long_name(self): name = testing.rand_string(256, 260) - dimensions = [(name, 1)] + dimensions = {name: 1} with self.assertRaises(exceptions.HTTPUnprocessableEntity) as context: common_service.Validations.validate_dimensions(dimensions) @@ -159,7 +160,7 @@ class DimensionsValidations(unittest.TestCase): def test_should_fail_underscore_at_begin(self): name = '_aDim' - dimensions = [(name, 1)] + dimensions = {name: 1} with self.assertRaises(exceptions.HTTPUnprocessableEntity) as context: common_service.Validations.validate_dimensions(dimensions) @@ -168,7 +169,7 @@ class DimensionsValidations(unittest.TestCase): def test_should_fail_invalid_chars(self): name = '<>' - dimensions = [(name, 1)] + dimensions = {name: 1} with self.assertRaises(exceptions.HTTPUnprocessableEntity) as context: common_service.Validations.validate_dimensions(dimensions) @@ -178,7 +179,7 @@ class DimensionsValidations(unittest.TestCase): def test_should_fail_ok_name_empty_value(self): name = 'monasca' - dimensions = [(name, '')] + dimensions = {name: ''} with self.assertRaises(exceptions.HTTPUnprocessableEntity) as context: common_service.Validations.validate_dimensions(dimensions) @@ -188,7 +189,7 @@ class DimensionsValidations(unittest.TestCase): def test_should_fail_ok_name_too_long_value(self): name = 'monasca' value = testing.rand_string(256, 300) - dimensions = [(name, value)] + dimensions = {name: value} with self.assertRaises(exceptions.HTTPUnprocessableEntity) as context: common_service.Validations.validate_dimensions(dimensions) @@ -198,55 +199,17 @@ class DimensionsValidations(unittest.TestCase): def test_should_pass_ok_name_ok_value_empty_service(self): name = 'monasca' value = '1' - dimensions = [(name, value)] + dimensions = {name: value} common_service.Validations.validate_dimensions(dimensions) def test_should_pass_ok_name_ok_value_service_SERVICE_DIMENSIONS_as_name( self): name = 'some_name' value = '1' - dimensions = [(name, value)] + dimensions = {name: value} common_service.Validations.validate_dimensions(dimensions) -class LogsCreatorPayload(unittest.TestCase): - def setUp(self): - self.instance = common_service.LogCreator() - - @mock.patch('io.IOBase') - def test_should_read_text_for_plain_text(self, payload): - msg = u'Hello World' - payload.configure_mock( - **{'readable.return_value': True, 'read.return_value': msg}) - - self.assertEqual(msg, - self.instance._read_payload(payload, 'text/plain')) - - @mock.patch('io.IOBase') - def test_should_read_json_for_application_json(self, payload): - msg = u'{"path":"/var/log/messages","message":"This is message"}' - payload.configure_mock( - **{'readable.return_value': True, 'read.return_value': msg}) - - json_msg = simplejson.loads(msg, encoding='utf-8') - - self.assertEqual(json_msg, - self.instance._read_payload(payload, - 'application/json')) - - @mock.patch('io.IOBase') - def test_should_fail_read_text_for_application_json(self, payload): - with self.assertRaises(falcon.HTTPBadRequest) as context: - msg = u'Hello World' - payload.configure_mock( - **{'readable.return_value': True, 'read.return_value': msg}) - self.instance._read_payload(payload, - 'application/json') - - self.assertEqual(context.exception.title, - 'Failed to read body as json') - - class LogsCreatorNewLog(unittest.TestCase): def setUp(self): self.instance = common_service.LogCreator() @@ -258,13 +221,14 @@ class LogsCreatorNewLog(unittest.TestCase): json_msg = u'{"path":"%s","message":"%s"}' % (path, msg) app_type = 'monasca' dimensions = 'cpu_time:30' - payload.configure_mock( - **{'readable.return_value': True, 'read.return_value': json_msg}) + payload.read.return_value = json_msg expected_log = { 'message': msg, 'application_type': app_type, - 'dimensions': [('cpu_time', '30')], + 'dimensions': { + 'cpu_time': '30' + }, 'path': path } @@ -278,14 +242,17 @@ class LogsCreatorNewLog(unittest.TestCase): def test_should_create_log_from_text(self, payload): msg = u'Hello World' app_type = 'monasca' - dimensions = 'cpu_time:30' - payload.configure_mock( - **{'readable.return_value': True, 'read.return_value': msg}) + dimension_name = 'cpu_time' + dimension_value = 30 + dimensions = '%s:%s' % (dimension_name, str(dimension_value)) + payload.read.return_value = msg expected_log = { 'message': msg, 'application_type': app_type, - 'dimensions': [('cpu_time', '30')] + 'dimensions': { + dimension_name: str(dimension_value) + } } self.assertEqual(expected_log, self.instance.new_log( @@ -304,10 +271,14 @@ class LogCreatorNewEnvelope(unittest.TestCase): msg = u'Hello World' path = u'/var/log/messages' app_type = 'monasca' + dimension_name = 'cpu_time' + dimension_value = 30 expected_log = { 'message': msg, 'application_type': app_type, - 'dimensions': [('cpu_time', '30')], + 'dimensions': { + dimension_name: str(dimension_value) + }, 'path': path } tenant_id = 'a_tenant' @@ -330,3 +301,38 @@ class LogCreatorNewEnvelope(unittest.TestCase): actual_envelope.get('log')) self.assertEqual(expected_envelope.get('meta'), actual_envelope.get('meta')) + self.assertDictEqual( + expected_envelope.get('log').get('dimensions'), + actual_envelope.get('log').get('dimensions')) + + @unittest.expectedFailure + def test_should_not_create_log_none(self): + log_object = None + tenant_id = 'a_tenant' + + self.instance.new_log_envelope(log_object, tenant_id) + + @unittest.expectedFailure + def test_should_not_create_log_empty(self): + log_object = {} + tenant_id = 'a_tenant' + + self.instance.new_log_envelope(log_object, tenant_id) + + @unittest.expectedFailure + def test_should_not_create_tenant_none(self): + log_object = { + 'message': '' + } + tenant_id = None + + self.instance.new_log_envelope(log_object, tenant_id) + + @unittest.expectedFailure + def test_should_not_create_tenant_empty(self): + log_object = { + 'message': '' + } + tenant_id = '' + + self.instance.new_log_envelope(log_object, tenant_id) diff --git a/monasca_log_api/v1/common/log_publisher.py b/monasca_log_api/v1/common/log_publisher.py index ac585236..33182dc3 100644 --- a/monasca_log_api/v1/common/log_publisher.py +++ b/monasca_log_api/v1/common/log_publisher.py @@ -13,30 +13,39 @@ # License for the specific language governing permissions and limitations # under the License. +from monasca_common.kafka import producer from oslo_config import cfg from oslo_log import log -import simplejson - -from monasca_log_api.publisher import kafka_publisher +import simplejson as json LOG = log.getLogger(__name__) CONF = cfg.CONF log_publisher_opts = [ + cfg.StrOpt('kafka_url', + required=True, + help='Url to kafka server'), cfg.MultiStrOpt('topics', default=['logs'], - help='Target topic in kafka') + help='Consumer topics') ] log_publisher_group = cfg.OptGroup(name='log_publisher', title='log_publisher') cfg.CONF.register_group(log_publisher_group) cfg.CONF.register_opts(log_publisher_opts, log_publisher_group) +ENVELOPE_SCHEMA = ['log', 'meta', 'creation_time'] +"""Log envelope (i.e.) message keys""" + + +class InvalidMessageException(Exception): + pass + class LogPublisher(object): def __init__(self): self._topics = CONF.log_publisher.topics - self._kafka_publisher = kafka_publisher.KafkaPublisher() + self._kafka_publisher = None LOG.info('Initializing LogPublisher <%s>', self) @staticmethod @@ -51,16 +60,6 @@ class LogPublisher(object): :param obj: log instance :return: key """ - def comparator(a, b): - """Comparator for dimensions - - :param a: dimension_a, tuple with properties name,value - :param b: dimension_b, tuple with properties name,value - :return: sorting result - """ - if a.name == b.name: - return (a.value > b.value) - (a.value < b.value) - return (a.name > b.name) - (a.name < b.name) str_list = [] @@ -72,29 +71,68 @@ class LogPublisher(object): str_list += obj['application_type'] if 'dimensions' in obj and obj['dimensions']: - dimensions = sorted(obj['dimensions'], cmp=comparator) - for name, value in dimensions: + dims = obj['dimensions'] + sorted_dims = sorted(dims) + for name in sorted_dims: str_list += name - str_list += str(value) + str_list += str(dims[name]) return ''.join(str_list) + @staticmethod + def _is_message_valid(message): + """Validates message before sending. + + Methods checks if message is :py:class:`dict`. + If so dictionary is verified against having following keys: + + * meta + * log + * creation_time + + If keys are found, each key must have a value. + + If at least none of the conditions is met + :py:class:`.InvalidMessageException` is raised + + :raises InvalidMessageException: if message does not comply to schema + + """ + if not isinstance(message, dict): + return False + + for key in ENVELOPE_SCHEMA: + if not (key in message and message.get(key)): + return False + + return True + + def _publisher(self): + if not self._kafka_publisher: + self._kafka_publisher = producer.KafkaProducer( + url=CONF.log_publisher.kafka_url + ) + return self._kafka_publisher + def send_message(self, message): if not message: return + if not self._is_message_valid(message): + raise InvalidMessageException() key = self._build_key(message['meta']['tenantId'], message['log']) - msg = simplejson.dumps(message, - sort_keys=False, - ensure_ascii=False).encode('utf8') + msg = json.dumps(message, + sort_keys=False, + ensure_ascii=False).encode('utf8') + # TODO(feature) next version of monasca-common LOG.debug('Build key [%s] for message' % key) LOG.debug('Sending message {topics=%s,key=%s,message=%s}' % (self._topics, key, msg)) try: for topic in self._topics: - self._kafka_publisher.send_message(topic, key, msg) + self._publisher().publish(topic, msg) except Exception as ex: LOG.error(ex.message) raise ex diff --git a/monasca_log_api/v1/common/service.py b/monasca_log_api/v1/common/service.py index 00fef245..58d043d9 100644 --- a/monasca_log_api/v1/common/service.py +++ b/monasca_log_api/v1/common/service.py @@ -13,17 +13,15 @@ # License for the specific language governing permissions and limitations # under the License. -import collections import datetime import re -import falcon from oslo_config import cfg from oslo_log import log -import simplejson from monasca_log_api.api import exceptions from monasca_log_api.api import logs_api +from monasca_log_api.api import rest_utils LOG = log.getLogger(__name__) CONF = cfg.CONF @@ -49,8 +47,11 @@ DIMENSION_NAME_CONSTRAINTS = { DIMENSION_VALUE_CONSTRAINTS = { 'MAX_LENGTH': 255 } +EPOCH_START = datetime.datetime(1970, 1, 1) -Dimension = collections.namedtuple('Dimensions', ['name', 'value']) + +class LogEnvelopeException(Exception): + pass class Validations(object): @@ -118,16 +119,16 @@ class Validations(object): value ) - if (isinstance(dimensions, (list, tuple)) and not + if (isinstance(dimensions, dict) and not isinstance(dimensions, basestring)): - for dim_name, dim_value in dimensions: + for dim_name in dimensions: validate_name(dim_name) - validate_value(dim_value) + validate_value(dimensions[dim_name]) else: raise exceptions.HTTPUnprocessableEntity( - 'Dimensions %s must be a collections' % dimensions) + 'Dimensions %s must be a dictionary (map)' % dimensions) class LogCreator(object): @@ -135,31 +136,13 @@ class LogCreator(object): self._log = log.getLogger('service.LogCreator') self._log.info('Initializing LogCreator') - # noinspection PyMethodMayBeStatic - def _create_meta_info(self, tenant_id): + @staticmethod + def _create_meta_info(tenant_id): return { 'tenantId': tenant_id, 'region': cfg.CONF.service.region } - def _read_payload(self, payload, content_type): - - try: - content = payload.read() - except Exception as ex: - raise falcon.HTTPBadRequest(title='Failed to read body', - description=ex.message) - - if content and content_type == 'application/json': - try: - content = simplejson.loads(content, encoding='utf-8') - except Exception as ex: - raise falcon.HTTPBadRequest(title='Failed to read body as ' - 'json', - description=ex.message) - - return content - def new_log(self, application_type, dimensions, @@ -167,7 +150,7 @@ class LogCreator(object): content_type='application/json', validate=True): - payload = self._read_payload(payload, content_type) + payload = rest_utils.read_body(payload, content_type) if not payload: return None @@ -199,8 +182,15 @@ class LogCreator(object): return log_object def new_log_envelope(self, log_object, tenant_id): - timestamp = (datetime.datetime.utcnow() - - datetime.datetime(1970, 1, 1)).total_seconds() + if not log_object: + raise LogEnvelopeException('Envelope cannot be ' + 'created without log') + if not tenant_id: + raise LogEnvelopeException('Envelope cannot be ' + 'created without tenant') + + timestamp = (datetime.datetime.utcnow() - EPOCH_START).total_seconds() + return { 'log': log_object, 'creation_time': timestamp, @@ -212,7 +202,6 @@ def is_delegate(roles): if roles: roles = roles.split(',') return logs_api.MONITORING_DELEGATE_ROLE in roles - pass return False @@ -226,7 +215,7 @@ def parse_dimensions(dimensions): if not dimensions: raise exceptions.HTTPUnprocessableEntity('Dimension are required') - new_dimensions = [] + new_dimensions = {} dimensions = map(str.strip, dimensions.split(',')) for dim in dimensions: @@ -241,6 +230,6 @@ def parse_dimensions(dimensions): name = str(dim[0].strip()) if dim[0] else None value = str(dim[1].strip()) if dim[1] else None if name and value: - new_dimensions.append(Dimension(name, value)) + new_dimensions.update({name: value}) return new_dimensions diff --git a/monasca_log_api/v1/reference/versions.py b/monasca_log_api/v1/reference/versions.py index f81072a7..80a59601 100644 --- a/monasca_log_api/v1/reference/versions.py +++ b/monasca_log_api/v1/reference/versions.py @@ -15,10 +15,9 @@ import falcon from oslo_log import log -import simplejson +from monasca_log_api.api import rest_utils from monasca_log_api.api import versions_api -from monasca_log_api import constants LOG = log.getLogger(__name__) VERSIONS = { @@ -50,14 +49,14 @@ class Versions(versions_api.VersionsAPI): VERSIONS[version]['links'][0]['href'] = ( req.uri.decode('utf8') + version) result['elements'].append(VERSIONS[version]) - res.body = simplejson.dumps(result) + res.body = rest_utils.as_json(result) res.status = falcon.HTTP_200 @staticmethod def handle_version_id(req, res, version_id): if version_id in VERSIONS: VERSIONS[version_id]['links'][0]['href'] = ( - req.uri.decode(constants.ENCODING) + req.uri.decode(rest_utils.ENCODING) ) for version in VERSIONS: VERSIONS[version]['links'][0]['href'] = ( @@ -67,7 +66,7 @@ class Versions(versions_api.VersionsAPI): req.uri.decode('utf8') + VERSIONS[version_id]['links'][1]['href'] ) - res.body = simplejson.dumps(VERSIONS[version_id]) + res.body = rest_utils.as_json(VERSIONS[version_id]) res.status = falcon.HTTP_200 else: res.body = 'Invalid Version ID' @@ -77,7 +76,7 @@ class Versions(versions_api.VersionsAPI): result = { 'links': [{ 'rel': 'self', - 'href': req.uri.decode(constants.ENCODING) + 'href': req.uri.decode(rest_utils.ENCODING) }], 'elements': [] } diff --git a/requirements.txt b/requirements.txt index 7bcb37f8..970728af 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,7 +10,7 @@ oslo.utils pastedeploy>=1.3.3 pbr>=1.6.0,<2.0 six>=1.9.0 -kafka-python>=0.9.3,<0.9.4 simplejson>=3.8.0 simport +monasca-common>=0.0.2 eventlet>=0.9.7