Merge "Remove references to kafka-python"
This commit is contained in:
commit
ca5e57d5db
|
@ -1,4 +1,4 @@
|
|||
# Copyright 2014 Hewlett-Packard
|
||||
# Copyright 2014,2017 Hewlett-Packard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
|
@ -12,17 +12,15 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import time
|
||||
|
||||
from kafka import client
|
||||
from kafka import common
|
||||
from kafka import producer
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
|
||||
from monasca_api.common.messaging import exceptions
|
||||
from monasca_api.common.messaging import publisher
|
||||
|
||||
import monasca_common.kafka.producer as kafka_producer
|
||||
import monasca_common.kafka_lib.common as kafka_common
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
|
@ -46,77 +44,17 @@ class KafkaPublisher(publisher.Publisher):
|
|||
self.partitions = cfg.CONF.kafka.partitions
|
||||
self.drop_data = cfg.CONF.kafka.drop_data
|
||||
|
||||
self._client = None
|
||||
self._producer = None
|
||||
|
||||
def _init_client(self, wait_time=None):
|
||||
for i in range(self.max_retry):
|
||||
try:
|
||||
# if there is a client instance, but _init_client is called
|
||||
# again, most likely the connection has gone stale, close that
|
||||
# connection and reconnect.
|
||||
if self._client:
|
||||
self._client.close()
|
||||
|
||||
if not wait_time:
|
||||
wait_time = self.wait_time
|
||||
time.sleep(wait_time)
|
||||
|
||||
self._client = client.KafkaClient(self.uri)
|
||||
|
||||
# when a client is re-initialized, existing consumer should be
|
||||
# reset as well.
|
||||
self._producer = None
|
||||
break
|
||||
except common.KafkaUnavailableError:
|
||||
LOG.error('Kafka server at %s is down.' % self.uri)
|
||||
except common.LeaderNotAvailableError:
|
||||
LOG.error('Kafka at %s has no leader available.' % self.uri)
|
||||
except Exception:
|
||||
LOG.error('Kafka at %s initialization failed.' % self.uri)
|
||||
|
||||
# Wait a bit and try again to get a client
|
||||
time.sleep(self.wait_time)
|
||||
|
||||
def _init_producer(self):
|
||||
try:
|
||||
if not self._client:
|
||||
self._init_client()
|
||||
self._producer = producer.SimpleProducer(
|
||||
self._client, async=self.async, ack_timeout=self.ack_time)
|
||||
LOG.debug('Kafka SimpleProducer was created successfully.')
|
||||
except Exception:
|
||||
self._producer = None
|
||||
LOG.exception('Kafka (%s) producer can not be created.' % self.uri)
|
||||
self._producer = kafka_producer.KafkaProducer(self.uri)
|
||||
|
||||
def close(self):
|
||||
if self._client:
|
||||
self._producer = None
|
||||
self._client.close()
|
||||
pass
|
||||
|
||||
def send_message(self, message):
|
||||
try:
|
||||
if not self._producer:
|
||||
self._init_producer()
|
||||
self._producer.send_messages(self.topic, message)
|
||||
self._producer.publish(self.topic, message)
|
||||
|
||||
except (common.KafkaUnavailableError,
|
||||
common.LeaderNotAvailableError):
|
||||
self._client = None
|
||||
LOG.exception('Error occurred while posting data to Kafka.')
|
||||
raise exceptions.MessageQueueException()
|
||||
except Exception:
|
||||
LOG.exception('Unknown error.')
|
||||
raise exceptions.MessageQueueException()
|
||||
|
||||
def send_message_batch(self, messages):
|
||||
try:
|
||||
if not self._producer:
|
||||
self._init_producer()
|
||||
self._producer.send_messages(self.topic, *messages)
|
||||
except (common.KafkaUnavailableError,
|
||||
common.LeaderNotAvailableError):
|
||||
self._client = None
|
||||
except (kafka_common.KafkaUnavailableError,
|
||||
kafka_common.LeaderNotAvailableError):
|
||||
LOG.exception('Error occurred while posting data to Kafka.')
|
||||
raise exceptions.MessageQueueException()
|
||||
except Exception:
|
||||
|
|
|
@ -95,7 +95,7 @@ class Metrics(metrics_api_v2.MetricsV2API):
|
|||
|
||||
def _send_metrics(self, metrics):
|
||||
try:
|
||||
self._message_queue.send_message_batch(metrics)
|
||||
self._message_queue.send_message(metrics)
|
||||
except message_queue_exceptions.MessageQueueException as ex:
|
||||
LOG.exception(ex)
|
||||
raise falcon.HTTPServiceUnavailable('Service unavailable',
|
||||
|
|
|
@ -20,7 +20,6 @@ voluptuous>=0.8.9 # BSD License
|
|||
#influxdb
|
||||
#cassandra-driver>=2.1.4,!=3.6.0 # Apache-2.0
|
||||
eventlet!=0.18.3,>=0.18.2 # MIT
|
||||
kafka-python<1.0.0,>=0.9.5 # Apache-2.0
|
||||
simplejson>=2.2.0 # MIT
|
||||
monasca-common>=1.4.0 # Apache-2.0
|
||||
SQLAlchemy<1.1.0,>=1.0.10 # MIT
|
||||
|
|
Loading…
Reference in New Issue