Merge "Use the tenant_id from context to name the index in ES"

This commit is contained in:
Zuul 2018-10-16 12:21:08 +00:00 committed by Gerrit Code Review
commit a6ca0fead5
8 changed files with 148 additions and 96 deletions

View File

@ -1,5 +1,4 @@
# Copyright 2015 kornicameister@gmail.com
# Copyright 2017 FUJITSU LIMITED
# Copyright 2018 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -16,10 +15,10 @@
import falcon
from monasca_common.kafka import producer
from monasca_common.kafka_lib.common import FailedPayloadsError
from monasca_common.rest import utils as rest_utils
from oslo_log import log
from monasca_events_api.app.model import envelope as ev_envelope
from monasca_events_api import conf
from oslo_log import log
from oslo_utils import encodeutils
LOG = log.getLogger(__name__)
@ -30,10 +29,6 @@ _KAFKA_META_DATA_SIZE = 32
_TRUNCATION_SAFE_OFFSET = 1
class InvalidMessageException(Exception):
pass
class EventPublisher(object):
"""Publishes events data to Kafka
@ -63,88 +58,20 @@ class EventPublisher(object):
LOG.info('Initializing EventPublisher <%s>', self)
def send_message(self, messages):
"""Sends message to each configured topic.
def _transform_message(self, message):
"""Serialize and ensure that message has proper type
Note:
Empty content is not shipped to kafka
:param dict| list messages:
"""
if not messages:
return
if not isinstance(messages, list):
messages = [messages]
sent_counter = 0
num_of_msgs = len(messages)
LOG.debug('About to publish %d messages to %s topics',
num_of_msgs, self._topics)
send_messages = []
for message in messages:
try:
msg = self._transform_message_to_json(message)
send_messages.append(msg)
except Exception as ex:
LOG.exception(
'Failed to transform message, '
'this massage is dropped {} '
'Exception: {}'.format(message, str(ex)))
try:
self._publish(send_messages)
sent_counter = len(send_messages)
except Exception as ex:
LOG.exception('Failure in publishing messages to kafka')
raise ex
finally:
self._check_if_all_messages_was_publish(sent_counter, num_of_msgs)
def _transform_message_to_json(self, message):
"""Transforms message into JSON.
Method transforms message to JSON and
encode to utf8
:param str message: instance of message
:return: serialized message
:rtype: str
"""
msg_json = rest_utils.as_json(message)
return msg_json.encode('utf-8')
def _create_message_for_persister_from_request_body(self, body):
"""Create message for persister from request body
Method take original request body and them
transform the request to proper message format
acceptable by event-prsister
:param body: original request body
:return: transformed message
"""
timestamp = body['timestamp']
final_body = []
for events in body['events']:
ev = events['event'].copy()
ev.update({'timestamp': timestamp})
final_body.append(ev)
return final_body
def _ensure_type_bytes(self, message):
"""Ensures that message will have proper type.
:param str message: instance of message
"""
return message.encode('utf-8')
serialized = ev_envelope.serialize_envelope(message)
return encodeutils.safe_encode(serialized, 'utf-8')
def _publish(self, messages):
"""Publishes messages to kafka.
:param list messages: list of messages
"""
num_of_msg = len(messages)
@ -178,6 +105,17 @@ class EventPublisher(object):
raise falcon.HTTPServiceUnavailable('Service unavailable',
str(ex), 60)
@staticmethod
def _is_message_valid(message):
"""Validates message before sending.
Methods checks if message is :py:class:`model.envelope.Envelope`.
By being instance of this class it is ensured that all required
keys are found and they will have their values.
"""
return isinstance(message, ev_envelope.Envelope)
def _check_if_all_messages_was_publish(self, send_count, to_send_count):
"""Executed after publishing to sent metrics.
@ -185,7 +123,6 @@ class EventPublisher(object):
:param int to_send_count: how many messages should be sent
"""
failed_to_send = to_send_count - send_count
if failed_to_send == 0:

View File

@ -1,4 +1,4 @@
# Copyright 2017 FUJITSU LIMITED
# Copyright 2018 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -12,10 +12,12 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log
from monasca_common.rest import utils as rest_utils
from monasca_events_api.app.common import events_publisher
from monasca_events_api.app.model import envelope
from monasca_events_api import conf
from oslo_log import log
from oslo_utils import encodeutils
LOG = log.getLogger(__name__)
CONF = conf.CONF
@ -30,11 +32,11 @@ class EventsBulkProcessor(events_publisher.EventPublisher):
"""
def send_message(self, events):
def send_message(self, events, event_project_id=None):
"""Sends bulk package to kafka
:param list events: received events
:param str event_project_id: project id
"""
num_of_msgs = len(events) if events else 0
@ -45,7 +47,7 @@ class EventsBulkProcessor(events_publisher.EventPublisher):
for ev_el in events:
try:
t_el = self._transform_message_to_json(ev_el)
t_el = self._transform_message(ev_el, event_project_id)
if t_el:
to_send_msgs.append(t_el)
except Exception as ex:
@ -62,3 +64,28 @@ class EventsBulkProcessor(events_publisher.EventPublisher):
raise ex
finally:
self._check_if_all_messages_was_publish(num_of_msgs, sent_count)
def _transform_message(self, event_element, event_project_id):
"""Transform the message
:param dict event_element: original event element
:param str event_project_id: project id
:return: message payload
"""
try:
msg_json = rest_utils.as_json(event_element)
msg_json = encodeutils.safe_encode(msg_json, 'utf-8')
event_envelope = envelope.Envelope.new_envelope(
event=msg_json,
project_id=event_project_id,
)
msg_payload = (super(EventsBulkProcessor, self)
._transform_message(event_envelope))
return msg_payload
except Exception as ex:
LOG.error("Event transformation failed, rejecting event")
LOG.exception(ex)
return None

View File

@ -1,4 +1,4 @@
# Copyright 2017 FUJITSU LIMITED
# Copyright 2018 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -53,9 +53,10 @@ class Events(object):
try:
request_body = helpers.read_json_msg_body(req)
req.can(policy_action)
project_id = req.project_id
body_validation.validate_body(request_body)
messages = prepare_message_to_sent(request_body)
self._processor.send_message(messages)
self._processor.send_message(messages, event_project_id=project_id)
res.status = falcon.HTTP_200
except MultipleInvalid as ex:
LOG.error('Entire bulk package was rejected, unsupported body')

View File

@ -1,4 +1,4 @@
# Copyright 2017 FUJITSU LIMITED
# Copyright 2018 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -25,5 +25,5 @@ def events_envelope_exception_handlet(ex, req, resp, params):
def register_error_handler(app):
app.add_error_handler(envelope.EventsEnvelopeException,
app.add_error_handler(envelope.EventEnvelopeException,
events_envelope_exception_handlet)

View File

@ -1,4 +1,4 @@
# Copyright 2017 FUJITSU LIMITED
# Copyright 2018 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -24,5 +24,6 @@ def prepare_message_to_sent(body):
for events in body['events']:
ev = events['event'].copy()
ev.update({'timestamp': timestamp})
ev.update({'dimensions': events.get('dimensions')})
final_body.append(ev)
return final_body

View File

@ -1,4 +1,4 @@
# Copyright 2017 FUJITSU LIMITED
# Copyright 2018 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -36,6 +36,7 @@ class Request(falcon.Request):
self.context = \
request_contex.RequestContext.from_environ(self.env)
self.is_admin = policy.check_is_admin(self.context)
self.project_id = self.context.project_id
def can(self, action, target=None):
return self.context.can(action, target)

View File

@ -1,4 +1,4 @@
# Copyright 2017 FUJITSU LIMITED
# Copyright 2018 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -12,6 +12,91 @@
# License for the specific language governing permissions and limitations
# under the License.
from monasca_common.rest import utils as rest_utils
from oslo_utils import encodeutils
from oslo_utils import timeutils
class EventsEnvelopeException(Exception):
def serialize_envelope(envelope):
"""Returns json representation of an envelope.
:return: json object of envelope
:rtype: json or a bytestring `encoding` encoded
representation of it.
"""
json = rest_utils.as_json(envelope, ensure_ascii=False)
return encodeutils.safe_decode(json, 'utf-8')
class EventEnvelopeException(Exception):
pass
class Envelope(dict):
def __init__(self, event, meta):
if not event:
error_msg = 'Envelope cannot be created without event'
raise EventEnvelopeException(error_msg)
if 'project_id' not in meta or not meta.get('project_id'):
error_msg = 'Envelope cannot be created without project_id'
raise EventEnvelopeException(error_msg)
creation_time = self._get_creation_time()
super(Envelope, self).__init__(
event=rest_utils.from_json(event),
creation_time=creation_time,
meta=meta
)
@staticmethod
def _get_creation_time():
return timeutils.utcnow_ts()
@classmethod
def new_envelope(cls, event, project_id):
"""Creates new event envelope
Event envelope is combined of of following properties
* event - dict
* creation_time - timestamp
* meta - meta block
Example output json would be like this:
.. code-block:: json
{
"event": {
"message": "Some message",
"dimensions": {
"hostname": "devstack"
}
},
"creation_time": 1447834886,
"meta": {
"project_id": "e4bd29509eda473092d32aadfee3e7b1",
}
}
:param dict event: original event element
:param str project_id: project id to be put in meta field
"""
event_meta = {
'project_id': project_id
}
return cls(event, event_meta)
@property
def event(self):
return self.get('event', None)
@property
def creation_time(self):
return self.get('creation_time', None)
@property
def meta(self):
return self.get('meta', None)