Add LegacyKafkaMessage and Kafka client factory

The change moves LegacyKafkaMessage object and client factory module
from the commponents commonly using it (persister, notification).

Story: 2003705
Task: 36094

Change-Id: I61ecf2cdf1dbe9a134df53c3a36614f84e8baa4e
This commit is contained in:
Witek Bedyk 2019-08-08 11:38:07 +02:00
parent 875c233fd7
commit bed8b9cd9f
3 changed files with 111 additions and 0 deletions

View File

@ -0,0 +1,56 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from monasca_common.confluent_kafka import consumer
from monasca_common.confluent_kafka import producer
from monasca_common.kafka import legacy_kafka_consumer
from monasca_common.kafka import producer as legacy_kafka_producer
def get_kafka_consumer(kafka_url,
kafka_consumer_group,
kafka_topic,
zookeeper_url=None,
zookeeper_path=None,
use_legacy_client=False,
repartition_callback=None,
commit_callback=None,
max_commit_interval=30,
client_id=""):
if use_legacy_client:
return legacy_kafka_consumer.LegacyKafkaConsumer(
kafka_url,
','.join(zookeeper_url),
zookeeper_path,
kafka_consumer_group,
kafka_topic,
repartition_callback=repartition_callback,
commit_callback=commit_callback,
commit_timeout=max_commit_interval
)
else:
return consumer.KafkaConsumer(
",".join(kafka_url),
kafka_consumer_group,
kafka_topic,
client_id=client_id,
repartition_callback=repartition_callback,
commit_callback=commit_callback,
max_commit_interval=max_commit_interval
)
def get_kafka_producer(kafka_url, use_legacy_client=False):
if use_legacy_client:
return legacy_kafka_producer.KafkaProducer(kafka_url)
else:
return producer.KafkaProducer(",".join(kafka_url))

View File

@ -0,0 +1,23 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from monasca_common.kafka import consumer, legacy_kafka_message
class LegacyKafkaConsumer(consumer.KafkaConsumer):
def __iter__(self):
""":return: Kafka message object compatible with Confluent Kafka client
object
"""
for raw_message in super(LegacyKafkaConsumer, self).__iter__():
yield legacy_kafka_message.LegacyKafkaMessage(raw_message)

View File

@ -0,0 +1,32 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class LegacyKafkaMessage(object):
def __init__(self, raw_message):
self.m_partition = raw_message[0]
self.m_offset = raw_message[1].offset
self.m_key = raw_message[1].message.key
self.m_value = raw_message[1].message.value
def key(self):
return self.m_key
def offset(self):
return self.m_offset
def partition(self):
return self.m_partition
def value(self):
return self.m_value