Merge "Modifying Persister to use common kafka interface"

This commit is contained in:
Jenkins 2015-11-04 23:38:30 +00:00 committed by Gerrit Code Review
commit b2479ae244
2 changed files with 33 additions and 189 deletions

View File

@ -35,10 +35,6 @@ import sys
import threading
from influxdb import InfluxDBClient
from kafka import KafkaClient
from kafka import SimpleConsumer
from kazoo.client import KazooClient
from kazoo.recipe.partitioner import SetPartitioner
import pytz
from oslo.config import cfg
@ -46,6 +42,8 @@ from openstack.common import log
from openstack.common import service as os_service
import service
from monasca_common.kafka.consumer import KafkaConsumer
LOG = log.getLogger(__name__)
@ -169,215 +167,62 @@ class AbstractPersister(threading.Thread):
super(AbstractPersister, self).__init__()
self._kafka_client = KafkaClient(kafka_conf.uri)
self._consumer = (
SimpleConsumer(self._kafka_client,
kafka_conf.group_id,
kafka_conf.topic,
# Set to true even though we actually do
# the commits manually. Needed to
# initialize
# offsets correctly.
auto_commit=True,
# Make these values None so that the
# manual commit will do the actual
# commit.
# Needed so that offsets are initialized
# correctly. If not done, then restarts
# will reread messages from beginning of
# the queue.
auto_commit_every_n=None,
auto_commit_every_t=None,
fetch_size_bytes=kafka_conf.fetch_size_bytes,
buffer_size=kafka_conf.buffer_size,
max_buffer_size=kafka_conf.max_buffer_size,
iter_timeout=1))
self._data_points = []
self._kafka_topic = kafka_conf.topic
self._database_batch_size = kafka_conf.database_batch_size
self._consumer = KafkaConsumer(kafka_conf.uri,
zookeeper_conf.uri,
kafka_conf.zookeeper_path,
kafka_conf.group_id,
kafka_conf.topic,
repartition_callback=self._flush,
periodic_callback=self._flush,
periodic_callback_rate=kafka_conf.max_wait_time_seconds)
self._influxdb_client = InfluxDBClient(influxdb_conf.ip_address,
influxdb_conf.port,
influxdb_conf.user,
influxdb_conf.password,
influxdb_conf.database_name)
self._kazoo_client = KazooClient(hosts=zookeeper_conf.uri)
self._kazoo_client.start()
self._zookeeper_path = kafka_conf.zookeeper_path
self._partition_interval_recheck_secs = (
zookeeper_conf.partition_interval_recheck_seconds)
self._max_wait_time_secs = kafka_conf.max_wait_time_seconds
self._database_batch_size = kafka_conf.database_batch_size
self._kafka_topic = kafka_conf.topic
self._message_count = 0
self._data_points = []
self._last_flush = datetime.now()
self._last_partition_check = datetime.now()
@abc.abstractmethod
def process_message(self, message):
pass
def _flush(self, partitions):
def _flush(self):
if not self._data_points:
return
if self._data_points:
try:
self._influxdb_client.write_points(self._data_points, 'ms')
except Exception:
LOG.exception("Error writing to influxdb: {}"
.format(self._data_points))
raise
self._consumer.commit(partitions=partitions)
try:
self._influxdb_client.write_points(self._data_points, 'ms')
LOG.info("Processed {} messages from topic '{}'".format(
self._message_count, self._kafka_topic))
len(self._data_points), self._kafka_topic))
self._data_points = []
self._consumer.commit()
except Exception:
LOG.exception("Error writing to influxdb: {}"
.format(self._data_points))
raise
self._message_count = 0
self._last_flush = datetime.now()
def _is_time_for_repartition_check(self):
delta_partition_check_time = (datetime.now() -
self._last_partition_check)
return delta_partition_check_time.seconds >= (
self._partition_interval_recheck_secs)
def _process_messages(self, partitions):
while 1:
if self._is_time_for_repartition_check():
return
delta_flush_time = datetime.now() - self._last_flush
if delta_flush_time.seconds >= self._max_wait_time_secs:
self._flush(partitions)
for message in self._consumer:
def run(self):
try:
for raw_message in self._consumer:
try:
message = raw_message[1]
data_point = self.process_message(message)
self._data_points.append(data_point)
self._message_count += 1
if self._is_time_for_repartition_check():
return
except Exception:
LOG.exception('Error processing message. Message is '
'being dropped. {}'.format(message))
if self._message_count >= self._database_batch_size:
self._flush(partitions)
def _get_set_partitioner(self):
"""Partition the set of Kafka topic partitions.
Acquire a lock on a subset of the Kafka partitions for a topic
to allow other instances of the persister to run without reading
from the same Kafka partitions for the given topic.
"""
# Refresh the Kafka partitions and their offsets from Kafka to get
# a list of all available partitions. The set of available partitions
# configured in Kafka should not change.
self._consumer.fetch_last_known_offsets()
# Partition on the partitions.
set_partitioner = (
SetPartitioner(self._kazoo_client,
path=self._zookeeper_path,
set=self._consumer.fetch_offsets.keys(),
identifier=str(datetime.now())))
return set_partitioner
def run(self):
try:
set_partitioner = self._get_set_partitioner()
partitions = []
while 1:
if set_partitioner.failed:
raise Exception("Failed to acquire partition")
elif set_partitioner.release:
self._flush(partitions)
LOG.info("Releasing locks on partition set {} "
"for topic {}".format(partitions,
self._kafka_topic))
set_partitioner.release_set()
partitions = []
elif set_partitioner.acquired:
if not partitions:
partitions = [p for p in set_partitioner]
LOG.info("Acquired locks on partition set {} "
"for topic {}".format(
partitions, self._kafka_topic))
# Refresh the last known offsets again to make sure
# that they are the latest after having acquired the
# lock. Updates self._consumer.fetch_offsets.
self._consumer.fetch_last_known_offsets()
# Modify self._consumer.fetch_offsets to hold only the
# offsets for the set of Kafka partitions acquired
# by this instance of the persister.
partitioned_fetch_offsets = {}
for p in partitions:
partitioned_fetch_offsets[p] = (
self._consumer.fetch_offsets[p])
self._consumer.fetch_offsets = partitioned_fetch_offsets
self._last_partition_check = datetime.now()
self._process_messages(partitions)
elif set_partitioner.allocating:
LOG.info("Waiting to acquire locks on partition set")
set_partitioner.wait_for_acquire()
if len(self._data_points) >= self._database_batch_size:
self._flush()
except:
LOG.exception(
'Persister encountered fatal exception processing messages. '
'Shutting down all threads and exiting')

View File

@ -1,8 +1,7 @@
six==1.9.0
babel
eventlet
influxdb==2.8.0
iso8601
kafka-python>=0.9.2,<0.9.3
kazoo>=2.0
oslo.config<2.0
monasca-common