Use Confluent Kafka client

The change introduces the possibility to run the API with the new
confluent-kafka client. It has to be enabled in the configuration file.

Story: 2003705
Task: 35859

Depends-On: https://review.opendev.org/680653
Change-Id: Id513e01c60ea584548c954a8d2e61b9510eee8de
This commit is contained in:
Witek Bedyk 2019-08-13 09:50:10 +02:00 committed by Witold Bedyk
parent 9d2c0940a0
commit 47c5ad37d5
6 changed files with 40 additions and 50 deletions

View File

@ -829,6 +829,7 @@ function configure_monasca_api_python {
# messaging
iniset "$MONASCA_API_CONF" messaging driver "monasca_api.common.messaging.kafka_publisher:KafkaPublisher"
iniset "$MONASCA_API_CONF" kafka uri "$SERVICE_HOST:9092"
iniset "$MONASCA_API_CONF" kafka legacy_kafka_client_enabled false
# databases
iniset "$MONASCA_API_CONF" database connection $dbAlarmUrl

View File

@ -46,7 +46,7 @@ Mako==0.4.0
MarkupSafe==1.0
mccabe==0.2.1
mock==2.0.0
monasca-common==2.7.0
monasca-common==2.16.0
monotonic==0.6
mox3==0.20.0
msgpack-python==0.4.0

View File

@ -12,14 +12,14 @@
# License for the specific language governing permissions and limitations
# under the License.
from monasca_common.kafka import client_factory
import monasca_common.kafka_lib.common as kafka_common
from oslo_config import cfg
from oslo_log import log
from monasca_api.common.messaging import exceptions
from monasca_api.common.messaging import publisher
import monasca_common.kafka.producer as kafka_producer
import monasca_common.kafka_lib.common as kafka_common
LOG = log.getLogger(__name__)
@ -44,7 +44,8 @@ class KafkaPublisher(publisher.Publisher):
self.partitions = cfg.CONF.kafka.partitions
self.drop_data = cfg.CONF.kafka.drop_data
self._producer = kafka_producer.KafkaProducer(self.uri)
self._producer = client_factory.get_kafka_producer(
self.uri, cfg.CONF.kafka.legacy_kafka_client_enabled)
def close(self):
pass

View File

@ -23,65 +23,46 @@ kafka_opts = [
cfg.ListOpt('uri',
default=['127.0.0.1:9092'],
item_type=types.HostAddressPortType(),
help='''
Comma separated list of Kafka broker host:port
'''),
help='Comma separated list of Kafka broker host:port'),
cfg.StrOpt('metrics_topic', default='metrics',
help='''
The topic that metrics will be published to
'''),
help='The topic that metrics will be published to'),
cfg.StrOpt('events_topic', default='events',
help='''
The topic that events will be published too
'''),
help='The topic that events will be published to'),
cfg.StrOpt('alarm_state_transitions_topic',
default='alarm-state-transitions',
help='''
The topic that alarm state will be published too
'''),
help='The topic that alarm state will be published to'),
cfg.StrOpt('group', default='api',
help='''
The group name that this service belongs to
'''),
help='The group name that this service belongs to'),
cfg.IntOpt('wait_time', default=1,
advanced=True, min=1,
help='''
The wait time when no messages on kafka queue
'''),
help='The wait time when no messages on kafka queue (NOT USED)'),
cfg.IntOpt('ack_time', default=20,
help='''
The ack time back to kafka.
'''),
help='The ack time back to kafka. (NOT USED)'),
cfg.IntOpt('max_retry', default=3,
help='''
The number of retry when there is a connection error
'''),
help='Number of retries in case of connection error (NOT USED)'),
cfg.BoolOpt('auto_commit', default=False,
advanced=True, help='''
Should messages be automatically committed
'''),
advanced=True,
help='Whether the message is automatically committed '
'(NOT USED)'),
cfg.BoolOpt('is_async', default=True,
help='''
The type of posting
'''),
help='Whether posting is asynchronous or not (NOT USED)'),
cfg.BoolOpt('compact', default=True,
help='''
Specify if the message received should be parsed.
If True, message will not be parsed, otherwise
messages will be parsed
'''),
help='Specify if the message received should be parsed. If '
'True, message will not be parsed, otherwise messages '
'will be parsed (NOT USED)'),
cfg.ListOpt('partitions', item_type=int,
default=[0], help='''
The partitions this connection should
listen for messages on. Currently does not
support multiple partitions.
Default is to listen on partition 0
'''),
default=[0],
help='The partitions this connection should listen for '
'messages on. (NOT USED)'),
cfg.BoolOpt('drop_data', default=False,
help='''
Specify if received data should be simply dropped.
This parameter is only for testing purposes
''')
help='Specify if received data should be simply dropped. '
'This parameter is only for testing purposes. (NOT USED)'),
cfg.BoolOpt(name='legacy_kafka_client_enabled', default=True,
required=True, advanced=True,
help='Enable legacy Kafka client. When set old version of '
'kafka-python library is used. Message format version '
'for the brokers should be set to 0.9.0.0 to avoid '
'performance issues until all consumers are upgraded.')
]
kafka_group = cfg.OptGroup(name='kafka', title='kafka')

View File

@ -0,0 +1,7 @@
---
upgrade:
- |
Configuration option `legacy_kafka_client_enabled` added to allow working
with both legacy kafka-python and new Confluent Kafka client. Please set
message format version for the Kafka brokers to 0.9.0.0 to avoid
performance issues until all consumers are upgraded.

View File

@ -21,5 +21,5 @@ six>=1.10.0 # MIT
pyparsing>=2.1.0 # MIT
voluptuous>=0.8.9 # BSD License
eventlet!=0.18.3,!=0.20.1,!=0.21.0,!=0.23.0,!=0.25.0,>=0.18.2 # MIT
monasca-common>=2.7.0 # Apache-2.0
monasca-common>=2.16.0 # Apache-2.0
SQLAlchemy!=1.1.5,!=1.1.6,!=1.1.7,!=1.1.8,>=1.0.10 # MIT