diff --git a/monasca_events_api/app/common/events_publisher.py b/monasca_events_api/app/common/events_publisher.py index 7e49df8..7b90bc9 100644 --- a/monasca_events_api/app/common/events_publisher.py +++ b/monasca_events_api/app/common/events_publisher.py @@ -1,5 +1,4 @@ -# Copyright 2015 kornicameister@gmail.com -# Copyright 2017 FUJITSU LIMITED +# Copyright 2018 FUJITSU LIMITED # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -16,10 +15,10 @@ import falcon from monasca_common.kafka import producer from monasca_common.kafka_lib.common import FailedPayloadsError -from monasca_common.rest import utils as rest_utils -from oslo_log import log - +from monasca_events_api.app.model import envelope as ev_envelope from monasca_events_api import conf +from oslo_log import log +from oslo_utils import encodeutils LOG = log.getLogger(__name__) @@ -30,10 +29,6 @@ _KAFKA_META_DATA_SIZE = 32 _TRUNCATION_SAFE_OFFSET = 1 -class InvalidMessageException(Exception): - pass - - class EventPublisher(object): """Publishes events data to Kafka @@ -63,88 +58,20 @@ class EventPublisher(object): LOG.info('Initializing EventPublisher <%s>', self) - def send_message(self, messages): - """Sends message to each configured topic. + def _transform_message(self, message): + """Serialize and ensure that message has proper type - Note: - Empty content is not shipped to kafka - - :param dict| list messages: - """ - if not messages: - return - if not isinstance(messages, list): - messages = [messages] - - sent_counter = 0 - num_of_msgs = len(messages) - - LOG.debug('About to publish %d messages to %s topics', - num_of_msgs, self._topics) - - send_messages = [] - - for message in messages: - try: - msg = self._transform_message_to_json(message) - send_messages.append(msg) - except Exception as ex: - LOG.exception( - 'Failed to transform message, ' - 'this massage is dropped {} ' - 'Exception: {}'.format(message, str(ex))) - try: - self._publish(send_messages) - sent_counter = len(send_messages) - except Exception as ex: - LOG.exception('Failure in publishing messages to kafka') - raise ex - finally: - self._check_if_all_messages_was_publish(sent_counter, num_of_msgs) - - def _transform_message_to_json(self, message): - """Transforms message into JSON. - - Method transforms message to JSON and - encode to utf8 :param str message: instance of message :return: serialized message :rtype: str """ - msg_json = rest_utils.as_json(message) - return msg_json.encode('utf-8') - - def _create_message_for_persister_from_request_body(self, body): - """Create message for persister from request body - - Method take original request body and them - transform the request to proper message format - acceptable by event-prsister - :param body: original request body - :return: transformed message - """ - timestamp = body['timestamp'] - final_body = [] - for events in body['events']: - ev = events['event'].copy() - ev.update({'timestamp': timestamp}) - final_body.append(ev) - return final_body - - def _ensure_type_bytes(self, message): - """Ensures that message will have proper type. - - :param str message: instance of message - - """ - - return message.encode('utf-8') + serialized = ev_envelope.serialize_envelope(message) + return encodeutils.safe_encode(serialized, 'utf-8') def _publish(self, messages): """Publishes messages to kafka. :param list messages: list of messages - """ num_of_msg = len(messages) @@ -178,6 +105,17 @@ class EventPublisher(object): raise falcon.HTTPServiceUnavailable('Service unavailable', str(ex), 60) + @staticmethod + def _is_message_valid(message): + """Validates message before sending. + + Methods checks if message is :py:class:`model.envelope.Envelope`. + By being instance of this class it is ensured that all required + keys are found and they will have their values. + + """ + return isinstance(message, ev_envelope.Envelope) + def _check_if_all_messages_was_publish(self, send_count, to_send_count): """Executed after publishing to sent metrics. @@ -185,7 +123,6 @@ class EventPublisher(object): :param int to_send_count: how many messages should be sent """ - failed_to_send = to_send_count - send_count if failed_to_send == 0: diff --git a/monasca_events_api/app/controller/v1/bulk_processor.py b/monasca_events_api/app/controller/v1/bulk_processor.py index 7bf13cf..da74da5 100644 --- a/monasca_events_api/app/controller/v1/bulk_processor.py +++ b/monasca_events_api/app/controller/v1/bulk_processor.py @@ -1,4 +1,4 @@ -# Copyright 2017 FUJITSU LIMITED +# Copyright 2018 FUJITSU LIMITED # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -12,10 +12,12 @@ # License for the specific language governing permissions and limitations # under the License. -from oslo_log import log - +from monasca_common.rest import utils as rest_utils from monasca_events_api.app.common import events_publisher +from monasca_events_api.app.model import envelope from monasca_events_api import conf +from oslo_log import log +from oslo_utils import encodeutils LOG = log.getLogger(__name__) CONF = conf.CONF @@ -30,11 +32,11 @@ class EventsBulkProcessor(events_publisher.EventPublisher): """ - def send_message(self, events): + def send_message(self, events, event_project_id=None): """Sends bulk package to kafka :param list events: received events - + :param str event_project_id: project id """ num_of_msgs = len(events) if events else 0 @@ -45,7 +47,7 @@ class EventsBulkProcessor(events_publisher.EventPublisher): for ev_el in events: try: - t_el = self._transform_message_to_json(ev_el) + t_el = self._transform_message(ev_el, event_project_id) if t_el: to_send_msgs.append(t_el) except Exception as ex: @@ -62,3 +64,28 @@ class EventsBulkProcessor(events_publisher.EventPublisher): raise ex finally: self._check_if_all_messages_was_publish(num_of_msgs, sent_count) + + def _transform_message(self, event_element, event_project_id): + """Transform the message + + :param dict event_element: original event element + :param str event_project_id: project id + :return: message payload + """ + try: + msg_json = rest_utils.as_json(event_element) + msg_json = encodeutils.safe_encode(msg_json, 'utf-8') + + event_envelope = envelope.Envelope.new_envelope( + event=msg_json, + project_id=event_project_id, + ) + + msg_payload = (super(EventsBulkProcessor, self) + ._transform_message(event_envelope)) + return msg_payload + + except Exception as ex: + LOG.error("Event transformation failed, rejecting event") + LOG.exception(ex) + return None diff --git a/monasca_events_api/app/controller/v1/events.py b/monasca_events_api/app/controller/v1/events.py index a938f51..5cf22c0 100644 --- a/monasca_events_api/app/controller/v1/events.py +++ b/monasca_events_api/app/controller/v1/events.py @@ -1,4 +1,4 @@ -# Copyright 2017 FUJITSU LIMITED +# Copyright 2018 FUJITSU LIMITED # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -53,9 +53,10 @@ class Events(object): try: request_body = helpers.read_json_msg_body(req) req.can(policy_action) + project_id = req.project_id body_validation.validate_body(request_body) messages = prepare_message_to_sent(request_body) - self._processor.send_message(messages) + self._processor.send_message(messages, event_project_id=project_id) res.status = falcon.HTTP_200 except MultipleInvalid as ex: LOG.error('Entire bulk package was rejected, unsupported body') diff --git a/monasca_events_api/app/core/error_handlers.py b/monasca_events_api/app/core/error_handlers.py index 1e4868c..c9f2d11 100644 --- a/monasca_events_api/app/core/error_handlers.py +++ b/monasca_events_api/app/core/error_handlers.py @@ -1,4 +1,4 @@ -# Copyright 2017 FUJITSU LIMITED +# Copyright 2018 FUJITSU LIMITED # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -25,5 +25,5 @@ def events_envelope_exception_handlet(ex, req, resp, params): def register_error_handler(app): - app.add_error_handler(envelope.EventsEnvelopeException, + app.add_error_handler(envelope.EventEnvelopeException, events_envelope_exception_handlet) diff --git a/monasca_events_api/app/core/model.py b/monasca_events_api/app/core/model.py index bbb6304..dac04d3 100644 --- a/monasca_events_api/app/core/model.py +++ b/monasca_events_api/app/core/model.py @@ -1,4 +1,4 @@ -# Copyright 2017 FUJITSU LIMITED +# Copyright 2018 FUJITSU LIMITED # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -24,5 +24,6 @@ def prepare_message_to_sent(body): for events in body['events']: ev = events['event'].copy() ev.update({'timestamp': timestamp}) + ev.update({'dimensions': events.get('dimensions')}) final_body.append(ev) return final_body diff --git a/monasca_events_api/app/core/request.py b/monasca_events_api/app/core/request.py index a065678..ca7b2e7 100644 --- a/monasca_events_api/app/core/request.py +++ b/monasca_events_api/app/core/request.py @@ -1,4 +1,4 @@ -# Copyright 2017 FUJITSU LIMITED +# Copyright 2018 FUJITSU LIMITED # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -36,6 +36,7 @@ class Request(falcon.Request): self.context = \ request_contex.RequestContext.from_environ(self.env) self.is_admin = policy.check_is_admin(self.context) + self.project_id = self.context.project_id def can(self, action, target=None): return self.context.can(action, target) diff --git a/monasca_events_api/app/model/envelope.py b/monasca_events_api/app/model/envelope.py index 9739c5d..84d320f 100644 --- a/monasca_events_api/app/model/envelope.py +++ b/monasca_events_api/app/model/envelope.py @@ -1,4 +1,4 @@ -# Copyright 2017 FUJITSU LIMITED +# Copyright 2018 FUJITSU LIMITED # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -12,6 +12,91 @@ # License for the specific language governing permissions and limitations # under the License. +from monasca_common.rest import utils as rest_utils +from oslo_utils import encodeutils +from oslo_utils import timeutils -class EventsEnvelopeException(Exception): + +def serialize_envelope(envelope): + """Returns json representation of an envelope. + + :return: json object of envelope + :rtype: json or a bytestring `encoding` encoded + representation of it. + + """ + json = rest_utils.as_json(envelope, ensure_ascii=False) + return encodeutils.safe_decode(json, 'utf-8') + + +class EventEnvelopeException(Exception): pass + + +class Envelope(dict): + def __init__(self, event, meta): + if not event: + error_msg = 'Envelope cannot be created without event' + raise EventEnvelopeException(error_msg) + if 'project_id' not in meta or not meta.get('project_id'): + error_msg = 'Envelope cannot be created without project_id' + raise EventEnvelopeException(error_msg) + + creation_time = self._get_creation_time() + super(Envelope, self).__init__( + event=rest_utils.from_json(event), + creation_time=creation_time, + meta=meta + ) + + @staticmethod + def _get_creation_time(): + return timeutils.utcnow_ts() + + @classmethod + def new_envelope(cls, event, project_id): + """Creates new event envelope + + Event envelope is combined of of following properties + + * event - dict + * creation_time - timestamp + * meta - meta block + + Example output json would be like this: + + .. code-block:: json + + { + "event": { + "message": "Some message", + "dimensions": { + "hostname": "devstack" + } + }, + "creation_time": 1447834886, + "meta": { + "project_id": "e4bd29509eda473092d32aadfee3e7b1", + } + } + + :param dict event: original event element + :param str project_id: project id to be put in meta field + """ + event_meta = { + 'project_id': project_id + } + + return cls(event, event_meta) + + @property + def event(self): + return self.get('event', None) + + @property + def creation_time(self): + return self.get('creation_time', None) + + @property + def meta(self): + return self.get('meta', None) diff --git a/monasca_events_api/tests/unit/test_body_valodiation.py b/monasca_events_api/tests/unit/test_body_validation.py similarity index 100% rename from monasca_events_api/tests/unit/test_body_valodiation.py rename to monasca_events_api/tests/unit/test_body_validation.py