Use the tenant_id from context to name the index in ES
Story: 2003955 Task: 26881 Change-Id: I17fc5121f30127ed3e53cc6cdaf904820c65dda8
This commit is contained in:
parent
09356e0b9f
commit
60b86fff4d
|
@ -1,5 +1,4 @@
|
||||||
# Copyright 2015 kornicameister@gmail.com
|
# Copyright 2018 FUJITSU LIMITED
|
||||||
# Copyright 2017 FUJITSU LIMITED
|
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
# not use this file except in compliance with the License. You may obtain
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
@ -16,10 +15,10 @@
|
||||||
import falcon
|
import falcon
|
||||||
from monasca_common.kafka import producer
|
from monasca_common.kafka import producer
|
||||||
from monasca_common.kafka_lib.common import FailedPayloadsError
|
from monasca_common.kafka_lib.common import FailedPayloadsError
|
||||||
from monasca_common.rest import utils as rest_utils
|
from monasca_events_api.app.model import envelope as ev_envelope
|
||||||
from oslo_log import log
|
|
||||||
|
|
||||||
from monasca_events_api import conf
|
from monasca_events_api import conf
|
||||||
|
from oslo_log import log
|
||||||
|
from oslo_utils import encodeutils
|
||||||
|
|
||||||
|
|
||||||
LOG = log.getLogger(__name__)
|
LOG = log.getLogger(__name__)
|
||||||
|
@ -30,10 +29,6 @@ _KAFKA_META_DATA_SIZE = 32
|
||||||
_TRUNCATION_SAFE_OFFSET = 1
|
_TRUNCATION_SAFE_OFFSET = 1
|
||||||
|
|
||||||
|
|
||||||
class InvalidMessageException(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class EventPublisher(object):
|
class EventPublisher(object):
|
||||||
"""Publishes events data to Kafka
|
"""Publishes events data to Kafka
|
||||||
|
|
||||||
|
@ -63,88 +58,20 @@ class EventPublisher(object):
|
||||||
|
|
||||||
LOG.info('Initializing EventPublisher <%s>', self)
|
LOG.info('Initializing EventPublisher <%s>', self)
|
||||||
|
|
||||||
def send_message(self, messages):
|
def _transform_message(self, message):
|
||||||
"""Sends message to each configured topic.
|
"""Serialize and ensure that message has proper type
|
||||||
|
|
||||||
Note:
|
|
||||||
Empty content is not shipped to kafka
|
|
||||||
|
|
||||||
:param dict| list messages:
|
|
||||||
"""
|
|
||||||
if not messages:
|
|
||||||
return
|
|
||||||
if not isinstance(messages, list):
|
|
||||||
messages = [messages]
|
|
||||||
|
|
||||||
sent_counter = 0
|
|
||||||
num_of_msgs = len(messages)
|
|
||||||
|
|
||||||
LOG.debug('About to publish %d messages to %s topics',
|
|
||||||
num_of_msgs, self._topics)
|
|
||||||
|
|
||||||
send_messages = []
|
|
||||||
|
|
||||||
for message in messages:
|
|
||||||
try:
|
|
||||||
msg = self._transform_message_to_json(message)
|
|
||||||
send_messages.append(msg)
|
|
||||||
except Exception as ex:
|
|
||||||
LOG.exception(
|
|
||||||
'Failed to transform message, '
|
|
||||||
'this massage is dropped {} '
|
|
||||||
'Exception: {}'.format(message, str(ex)))
|
|
||||||
try:
|
|
||||||
self._publish(send_messages)
|
|
||||||
sent_counter = len(send_messages)
|
|
||||||
except Exception as ex:
|
|
||||||
LOG.exception('Failure in publishing messages to kafka')
|
|
||||||
raise ex
|
|
||||||
finally:
|
|
||||||
self._check_if_all_messages_was_publish(sent_counter, num_of_msgs)
|
|
||||||
|
|
||||||
def _transform_message_to_json(self, message):
|
|
||||||
"""Transforms message into JSON.
|
|
||||||
|
|
||||||
Method transforms message to JSON and
|
|
||||||
encode to utf8
|
|
||||||
:param str message: instance of message
|
:param str message: instance of message
|
||||||
:return: serialized message
|
:return: serialized message
|
||||||
:rtype: str
|
:rtype: str
|
||||||
"""
|
"""
|
||||||
msg_json = rest_utils.as_json(message)
|
serialized = ev_envelope.serialize_envelope(message)
|
||||||
return msg_json.encode('utf-8')
|
return encodeutils.safe_encode(serialized, 'utf-8')
|
||||||
|
|
||||||
def _create_message_for_persister_from_request_body(self, body):
|
|
||||||
"""Create message for persister from request body
|
|
||||||
|
|
||||||
Method take original request body and them
|
|
||||||
transform the request to proper message format
|
|
||||||
acceptable by event-prsister
|
|
||||||
:param body: original request body
|
|
||||||
:return: transformed message
|
|
||||||
"""
|
|
||||||
timestamp = body['timestamp']
|
|
||||||
final_body = []
|
|
||||||
for events in body['events']:
|
|
||||||
ev = events['event'].copy()
|
|
||||||
ev.update({'timestamp': timestamp})
|
|
||||||
final_body.append(ev)
|
|
||||||
return final_body
|
|
||||||
|
|
||||||
def _ensure_type_bytes(self, message):
|
|
||||||
"""Ensures that message will have proper type.
|
|
||||||
|
|
||||||
:param str message: instance of message
|
|
||||||
|
|
||||||
"""
|
|
||||||
|
|
||||||
return message.encode('utf-8')
|
|
||||||
|
|
||||||
def _publish(self, messages):
|
def _publish(self, messages):
|
||||||
"""Publishes messages to kafka.
|
"""Publishes messages to kafka.
|
||||||
|
|
||||||
:param list messages: list of messages
|
:param list messages: list of messages
|
||||||
|
|
||||||
"""
|
"""
|
||||||
num_of_msg = len(messages)
|
num_of_msg = len(messages)
|
||||||
|
|
||||||
|
@ -178,6 +105,17 @@ class EventPublisher(object):
|
||||||
raise falcon.HTTPServiceUnavailable('Service unavailable',
|
raise falcon.HTTPServiceUnavailable('Service unavailable',
|
||||||
str(ex), 60)
|
str(ex), 60)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _is_message_valid(message):
|
||||||
|
"""Validates message before sending.
|
||||||
|
|
||||||
|
Methods checks if message is :py:class:`model.envelope.Envelope`.
|
||||||
|
By being instance of this class it is ensured that all required
|
||||||
|
keys are found and they will have their values.
|
||||||
|
|
||||||
|
"""
|
||||||
|
return isinstance(message, ev_envelope.Envelope)
|
||||||
|
|
||||||
def _check_if_all_messages_was_publish(self, send_count, to_send_count):
|
def _check_if_all_messages_was_publish(self, send_count, to_send_count):
|
||||||
"""Executed after publishing to sent metrics.
|
"""Executed after publishing to sent metrics.
|
||||||
|
|
||||||
|
@ -185,7 +123,6 @@ class EventPublisher(object):
|
||||||
:param int to_send_count: how many messages should be sent
|
:param int to_send_count: how many messages should be sent
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
failed_to_send = to_send_count - send_count
|
failed_to_send = to_send_count - send_count
|
||||||
|
|
||||||
if failed_to_send == 0:
|
if failed_to_send == 0:
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
# Copyright 2017 FUJITSU LIMITED
|
# Copyright 2018 FUJITSU LIMITED
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
# not use this file except in compliance with the License. You may obtain
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
@ -12,10 +12,12 @@
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
from oslo_log import log
|
from monasca_common.rest import utils as rest_utils
|
||||||
|
|
||||||
from monasca_events_api.app.common import events_publisher
|
from monasca_events_api.app.common import events_publisher
|
||||||
|
from monasca_events_api.app.model import envelope
|
||||||
from monasca_events_api import conf
|
from monasca_events_api import conf
|
||||||
|
from oslo_log import log
|
||||||
|
from oslo_utils import encodeutils
|
||||||
|
|
||||||
LOG = log.getLogger(__name__)
|
LOG = log.getLogger(__name__)
|
||||||
CONF = conf.CONF
|
CONF = conf.CONF
|
||||||
|
@ -30,11 +32,11 @@ class EventsBulkProcessor(events_publisher.EventPublisher):
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def send_message(self, events):
|
def send_message(self, events, event_project_id=None):
|
||||||
"""Sends bulk package to kafka
|
"""Sends bulk package to kafka
|
||||||
|
|
||||||
:param list events: received events
|
:param list events: received events
|
||||||
|
:param str event_project_id: project id
|
||||||
"""
|
"""
|
||||||
|
|
||||||
num_of_msgs = len(events) if events else 0
|
num_of_msgs = len(events) if events else 0
|
||||||
|
@ -45,7 +47,7 @@ class EventsBulkProcessor(events_publisher.EventPublisher):
|
||||||
|
|
||||||
for ev_el in events:
|
for ev_el in events:
|
||||||
try:
|
try:
|
||||||
t_el = self._transform_message_to_json(ev_el)
|
t_el = self._transform_message(ev_el, event_project_id)
|
||||||
if t_el:
|
if t_el:
|
||||||
to_send_msgs.append(t_el)
|
to_send_msgs.append(t_el)
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
|
@ -62,3 +64,28 @@ class EventsBulkProcessor(events_publisher.EventPublisher):
|
||||||
raise ex
|
raise ex
|
||||||
finally:
|
finally:
|
||||||
self._check_if_all_messages_was_publish(num_of_msgs, sent_count)
|
self._check_if_all_messages_was_publish(num_of_msgs, sent_count)
|
||||||
|
|
||||||
|
def _transform_message(self, event_element, event_project_id):
|
||||||
|
"""Transform the message
|
||||||
|
|
||||||
|
:param dict event_element: original event element
|
||||||
|
:param str event_project_id: project id
|
||||||
|
:return: message payload
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
msg_json = rest_utils.as_json(event_element)
|
||||||
|
msg_json = encodeutils.safe_encode(msg_json, 'utf-8')
|
||||||
|
|
||||||
|
event_envelope = envelope.Envelope.new_envelope(
|
||||||
|
event=msg_json,
|
||||||
|
project_id=event_project_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
msg_payload = (super(EventsBulkProcessor, self)
|
||||||
|
._transform_message(event_envelope))
|
||||||
|
return msg_payload
|
||||||
|
|
||||||
|
except Exception as ex:
|
||||||
|
LOG.error("Event transformation failed, rejecting event")
|
||||||
|
LOG.exception(ex)
|
||||||
|
return None
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
# Copyright 2017 FUJITSU LIMITED
|
# Copyright 2018 FUJITSU LIMITED
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
# not use this file except in compliance with the License. You may obtain
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
@ -53,9 +53,10 @@ class Events(object):
|
||||||
try:
|
try:
|
||||||
request_body = helpers.read_json_msg_body(req)
|
request_body = helpers.read_json_msg_body(req)
|
||||||
req.can(policy_action)
|
req.can(policy_action)
|
||||||
|
project_id = req.project_id
|
||||||
body_validation.validate_body(request_body)
|
body_validation.validate_body(request_body)
|
||||||
messages = prepare_message_to_sent(request_body)
|
messages = prepare_message_to_sent(request_body)
|
||||||
self._processor.send_message(messages)
|
self._processor.send_message(messages, event_project_id=project_id)
|
||||||
res.status = falcon.HTTP_200
|
res.status = falcon.HTTP_200
|
||||||
except MultipleInvalid as ex:
|
except MultipleInvalid as ex:
|
||||||
LOG.error('Entire bulk package was rejected, unsupported body')
|
LOG.error('Entire bulk package was rejected, unsupported body')
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
# Copyright 2017 FUJITSU LIMITED
|
# Copyright 2018 FUJITSU LIMITED
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
# not use this file except in compliance with the License. You may obtain
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
@ -25,5 +25,5 @@ def events_envelope_exception_handlet(ex, req, resp, params):
|
||||||
|
|
||||||
|
|
||||||
def register_error_handler(app):
|
def register_error_handler(app):
|
||||||
app.add_error_handler(envelope.EventsEnvelopeException,
|
app.add_error_handler(envelope.EventEnvelopeException,
|
||||||
events_envelope_exception_handlet)
|
events_envelope_exception_handlet)
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
# Copyright 2017 FUJITSU LIMITED
|
# Copyright 2018 FUJITSU LIMITED
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
# not use this file except in compliance with the License. You may obtain
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
@ -24,5 +24,6 @@ def prepare_message_to_sent(body):
|
||||||
for events in body['events']:
|
for events in body['events']:
|
||||||
ev = events['event'].copy()
|
ev = events['event'].copy()
|
||||||
ev.update({'timestamp': timestamp})
|
ev.update({'timestamp': timestamp})
|
||||||
|
ev.update({'dimensions': events.get('dimensions')})
|
||||||
final_body.append(ev)
|
final_body.append(ev)
|
||||||
return final_body
|
return final_body
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
# Copyright 2017 FUJITSU LIMITED
|
# Copyright 2018 FUJITSU LIMITED
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
# not use this file except in compliance with the License. You may obtain
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
@ -36,6 +36,7 @@ class Request(falcon.Request):
|
||||||
self.context = \
|
self.context = \
|
||||||
request_contex.RequestContext.from_environ(self.env)
|
request_contex.RequestContext.from_environ(self.env)
|
||||||
self.is_admin = policy.check_is_admin(self.context)
|
self.is_admin = policy.check_is_admin(self.context)
|
||||||
|
self.project_id = self.context.project_id
|
||||||
|
|
||||||
def can(self, action, target=None):
|
def can(self, action, target=None):
|
||||||
return self.context.can(action, target)
|
return self.context.can(action, target)
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
# Copyright 2017 FUJITSU LIMITED
|
# Copyright 2018 FUJITSU LIMITED
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
# not use this file except in compliance with the License. You may obtain
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
@ -12,6 +12,91 @@
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
|
from monasca_common.rest import utils as rest_utils
|
||||||
|
from oslo_utils import encodeutils
|
||||||
|
from oslo_utils import timeutils
|
||||||
|
|
||||||
class EventsEnvelopeException(Exception):
|
|
||||||
|
def serialize_envelope(envelope):
|
||||||
|
"""Returns json representation of an envelope.
|
||||||
|
|
||||||
|
:return: json object of envelope
|
||||||
|
:rtype: json or a bytestring `encoding` encoded
|
||||||
|
representation of it.
|
||||||
|
|
||||||
|
"""
|
||||||
|
json = rest_utils.as_json(envelope, ensure_ascii=False)
|
||||||
|
return encodeutils.safe_decode(json, 'utf-8')
|
||||||
|
|
||||||
|
|
||||||
|
class EventEnvelopeException(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class Envelope(dict):
|
||||||
|
def __init__(self, event, meta):
|
||||||
|
if not event:
|
||||||
|
error_msg = 'Envelope cannot be created without event'
|
||||||
|
raise EventEnvelopeException(error_msg)
|
||||||
|
if 'project_id' not in meta or not meta.get('project_id'):
|
||||||
|
error_msg = 'Envelope cannot be created without project_id'
|
||||||
|
raise EventEnvelopeException(error_msg)
|
||||||
|
|
||||||
|
creation_time = self._get_creation_time()
|
||||||
|
super(Envelope, self).__init__(
|
||||||
|
event=rest_utils.from_json(event),
|
||||||
|
creation_time=creation_time,
|
||||||
|
meta=meta
|
||||||
|
)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _get_creation_time():
|
||||||
|
return timeutils.utcnow_ts()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def new_envelope(cls, event, project_id):
|
||||||
|
"""Creates new event envelope
|
||||||
|
|
||||||
|
Event envelope is combined of of following properties
|
||||||
|
|
||||||
|
* event - dict
|
||||||
|
* creation_time - timestamp
|
||||||
|
* meta - meta block
|
||||||
|
|
||||||
|
Example output json would be like this:
|
||||||
|
|
||||||
|
.. code-block:: json
|
||||||
|
|
||||||
|
{
|
||||||
|
"event": {
|
||||||
|
"message": "Some message",
|
||||||
|
"dimensions": {
|
||||||
|
"hostname": "devstack"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"creation_time": 1447834886,
|
||||||
|
"meta": {
|
||||||
|
"project_id": "e4bd29509eda473092d32aadfee3e7b1",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
:param dict event: original event element
|
||||||
|
:param str project_id: project id to be put in meta field
|
||||||
|
"""
|
||||||
|
event_meta = {
|
||||||
|
'project_id': project_id
|
||||||
|
}
|
||||||
|
|
||||||
|
return cls(event, event_meta)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def event(self):
|
||||||
|
return self.get('event', None)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def creation_time(self):
|
||||||
|
return self.get('creation_time', None)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def meta(self):
|
||||||
|
return self.get('meta', None)
|
||||||
|
|
Loading…
Reference in New Issue