From 901cbd040a03aba0bfd78ddacaed19f53b95baec Mon Sep 17 00:00:00 2001 From: Joe Keen Date: Sun, 22 Jan 2017 22:04:31 -0700 Subject: [PATCH] Remove references to kafka-python Because kafka-python is now forked and embedded in monasca-common monasca-api needs to use those interfaces instead of using kafka-python directly. Change-Id: I3b30721c46af3e472947cf8d776604b33c82aa8a --- .../common/messaging/kafka_publisher.py | 80 +++---------------- monasca_api/v2/reference/metrics.py | 2 +- requirements.txt | 1 - 3 files changed, 10 insertions(+), 73 deletions(-) diff --git a/monasca_api/common/messaging/kafka_publisher.py b/monasca_api/common/messaging/kafka_publisher.py index ee04310a3..fbbe19ce4 100644 --- a/monasca_api/common/messaging/kafka_publisher.py +++ b/monasca_api/common/messaging/kafka_publisher.py @@ -1,4 +1,4 @@ -# Copyright 2014 Hewlett-Packard +# Copyright 2014,2017 Hewlett-Packard # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -12,17 +12,15 @@ # License for the specific language governing permissions and limitations # under the License. -import time - -from kafka import client -from kafka import common -from kafka import producer from oslo_config import cfg from oslo_log import log from monasca_api.common.messaging import exceptions from monasca_api.common.messaging import publisher +import monasca_common.kafka.producer as kafka_producer +import monasca_common.kafka_lib.common as kafka_common + LOG = log.getLogger(__name__) @@ -46,77 +44,17 @@ class KafkaPublisher(publisher.Publisher): self.partitions = cfg.CONF.kafka.partitions self.drop_data = cfg.CONF.kafka.drop_data - self._client = None - self._producer = None - - def _init_client(self, wait_time=None): - for i in range(self.max_retry): - try: - # if there is a client instance, but _init_client is called - # again, most likely the connection has gone stale, close that - # connection and reconnect. - if self._client: - self._client.close() - - if not wait_time: - wait_time = self.wait_time - time.sleep(wait_time) - - self._client = client.KafkaClient(self.uri) - - # when a client is re-initialized, existing consumer should be - # reset as well. - self._producer = None - break - except common.KafkaUnavailableError: - LOG.error('Kafka server at %s is down.' % self.uri) - except common.LeaderNotAvailableError: - LOG.error('Kafka at %s has no leader available.' % self.uri) - except Exception: - LOG.error('Kafka at %s initialization failed.' % self.uri) - - # Wait a bit and try again to get a client - time.sleep(self.wait_time) - - def _init_producer(self): - try: - if not self._client: - self._init_client() - self._producer = producer.SimpleProducer( - self._client, async=self.async, ack_timeout=self.ack_time) - LOG.debug('Kafka SimpleProducer was created successfully.') - except Exception: - self._producer = None - LOG.exception('Kafka (%s) producer can not be created.' % self.uri) + self._producer = kafka_producer.KafkaProducer(self.uri) def close(self): - if self._client: - self._producer = None - self._client.close() + pass def send_message(self, message): try: - if not self._producer: - self._init_producer() - self._producer.send_messages(self.topic, message) + self._producer.publish(self.topic, message) - except (common.KafkaUnavailableError, - common.LeaderNotAvailableError): - self._client = None - LOG.exception('Error occurred while posting data to Kafka.') - raise exceptions.MessageQueueException() - except Exception: - LOG.exception('Unknown error.') - raise exceptions.MessageQueueException() - - def send_message_batch(self, messages): - try: - if not self._producer: - self._init_producer() - self._producer.send_messages(self.topic, *messages) - except (common.KafkaUnavailableError, - common.LeaderNotAvailableError): - self._client = None + except (kafka_common.KafkaUnavailableError, + kafka_common.LeaderNotAvailableError): LOG.exception('Error occurred while posting data to Kafka.') raise exceptions.MessageQueueException() except Exception: diff --git a/monasca_api/v2/reference/metrics.py b/monasca_api/v2/reference/metrics.py index c343e4788..50a468236 100644 --- a/monasca_api/v2/reference/metrics.py +++ b/monasca_api/v2/reference/metrics.py @@ -95,7 +95,7 @@ class Metrics(metrics_api_v2.MetricsV2API): def _send_metrics(self, metrics): try: - self._message_queue.send_message_batch(metrics) + self._message_queue.send_message(metrics) except message_queue_exceptions.MessageQueueException as ex: LOG.exception(ex) raise falcon.HTTPServiceUnavailable('Service unavailable', diff --git a/requirements.txt b/requirements.txt index 1f90e1f9d..a85847b84 100644 --- a/requirements.txt +++ b/requirements.txt @@ -20,7 +20,6 @@ voluptuous>=0.8.9 # BSD License #influxdb #cassandra-driver>=2.1.4,!=3.6.0 # Apache-2.0 eventlet!=0.18.3,>=0.18.2 # MIT -kafka-python<1.0.0,>=0.9.5 # Apache-2.0 simplejson>=2.2.0 # MIT monasca-common>=1.0.0 # Apache-2.0 SQLAlchemy<1.1.0,>=1.0.10 # MIT