monasca-events-api/monasca_events_api/app/common/events_publisher.py

198 lines
6.2 KiB
Python

# Copyright 2015 kornicameister@gmail.com
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import falcon
from monasca_common.kafka import producer
from monasca_common.kafka_lib.common import FailedPayloadsError
from monasca_common.rest import utils as rest_utils
from oslo_log import log
from monasca_events_api import conf
LOG = log.getLogger(__name__)
CONF = conf.CONF
_RETRY_AFTER = 60
_KAFKA_META_DATA_SIZE = 32
_TRUNCATION_SAFE_OFFSET = 1
class InvalidMessageException(Exception):
pass
class EventPublisher(object):
"""Publishes events data to Kafka
EventPublisher is able to send single message to multiple configured topic.
It uses following configuration written in conf file ::
[event_publisher]
topics = 'monevents'
kafka_url = 'localhost:8900'
Note:
Uses :py:class:`monasca_common.kafka.producer.KafkaProducer`
to ship events to kafka. For more details
see `monasca-common`_ github repository.
.. _monasca-common: https://github.com/openstack/monasca-common
"""
def __init__(self):
self._topics = CONF.events_publisher.topics
self._kafka_publisher = producer.KafkaProducer(
url=CONF.events_publisher.kafka_url
)
LOG.info('Initializing EventPublisher <%s>', self)
def send_message(self, messages):
"""Sends message to each configured topic.
Note:
Empty content is not shipped to kafka
:param dict| list messages:
"""
if not messages:
return
if not isinstance(messages, list):
messages = [messages]
sent_counter = 0
num_of_msgs = len(messages)
LOG.debug('About to publish %d messages to %s topics',
num_of_msgs, self._topics)
send_messages = []
for message in messages:
try:
msg = self._transform_message_to_json(message)
send_messages.append(msg)
except Exception as ex:
LOG.exception(
'Failed to transform message, '
'this massage is dropped {} '
'Exception: {}'.format(message, str(ex)))
try:
self._publish(send_messages)
sent_counter = len(send_messages)
except Exception as ex:
LOG.exception('Failure in publishing messages to kafka')
raise ex
finally:
self._check_if_all_messages_was_publish(sent_counter, num_of_msgs)
def _transform_message_to_json(self, message):
"""Transforms message into JSON.
Method transforms message to JSON and
encode to utf8
:param str message: instance of message
:return: serialized message
:rtype: str
"""
msg_json = rest_utils.as_json(message)
return msg_json.encode('utf-8')
def _create_message_for_persister_from_request_body(self, body):
"""Create message for persister from request body
Method take original request body and them
transform the request to proper message format
acceptable by event-prsister
:param body: original request body
:return: transformed message
"""
timestamp = body['timestamp']
final_body = []
for events in body['events']:
ev = events['event'].copy()
ev.update({'timestamp': timestamp})
final_body.append(ev)
return final_body
def _ensure_type_bytes(self, message):
"""Ensures that message will have proper type.
:param str message: instance of message
"""
return message.encode('utf-8')
def _publish(self, messages):
"""Publishes messages to kafka.
:param list messages: list of messages
"""
num_of_msg = len(messages)
LOG.debug('Publishing %d messages', num_of_msg)
first = True
while True:
try:
for topic in self._topics:
self._kafka_publisher.publish(
topic,
messages
)
LOG.debug('Sent %d messages to topic %s',
num_of_msg, topic)
break
except FailedPayloadsError as ex:
# FailedPayloadsError exception can be cause by connection
# problem, to make sure that is not connection issue
# message is sent again.
LOG.error('Failed to send messages %s', ex)
if first:
LOG.error('Retrying')
first = False
continue
else:
raise falcon.HTTPServiceUnavailable('Service unavailable',
str(ex), 60)
except Exception as ex:
LOG.error('Failed to send messages %s', ex)
raise falcon.HTTPServiceUnavailable('Service unavailable',
str(ex), 60)
def _check_if_all_messages_was_publish(self, send_count, to_send_count):
"""Executed after publishing to sent metrics.
:param int send_count: how many messages have been sent
:param int to_send_count: how many messages should be sent
"""
failed_to_send = to_send_count - send_count
if failed_to_send == 0:
LOG.debug('Successfully published all [%d] messages',
send_count)
else:
error_str = ('Failed to send all messages, %d '
'messages out of %d have not been published')
LOG.error(error_str, failed_to_send, to_send_count)