Add high-availability

Allow persister instances to come and go. Automatically rebalance
reading the partitions amongst the existing persisters.

Change-Id: Ifd47459b0a29e91680e155d22b82086e04426215
This commit is contained in:
Deklan Dieterly 2014-12-02 13:44:20 -07:00
parent a133bf4f59
commit f4231d41f4
3 changed files with 160 additions and 29 deletions

View File

@ -7,8 +7,13 @@ debug = false
# Show more verbose log output (sets INFO log level output) if debug is False
verbose = true
[zookeeper]
# Comma separated list of host:port
uri = 192.168.10.4:2181
partition_interval_recheck_seconds = 15
[kafka_alarm_history]
#Comma separated list of hosts.
# Comma separated list of Kafka broker host:port.
uri = 192.168.10.4:9092
group_id = 1_alarm-state-transitions
topic = alarm-state-transitions
@ -21,9 +26,11 @@ fetch_size_bytes = 4096
buffer_size = 4096
# 8 times buffer size
max_buffer_size = 32768
# Path in zookeeper for kafka consumer group partitioning algo
zookeeper_path = /persister_partitions/alarm-state-transitions
[kafka_metrics]
#Comma separated list of hosts.
# Comma separated list of Kafka broker host:port
uri = 192.168.10.4:9092
group_id = 1_metrics
topic = metrics
@ -36,6 +43,8 @@ fetch_size_bytes = 4096
buffer_size = 4096
# 8 times buffer size
max_buffer_size = 32768
# Path in zookeeper for kafka consumer group partitioning algo
zookeeper_path = /persister_partitions/metrics
[influxdb]
database_name = test

View File

@ -38,6 +38,8 @@ import urllib
from influxdb import InfluxDBClient
from kafka import KafkaClient
from kafka import SimpleConsumer
from kazoo.client import KazooClient
from kazoo.recipe.partitioner import SetPartitioner
from oslo.config import cfg
from openstack.common import log
@ -47,6 +49,13 @@ import service
LOG = log.getLogger(__name__)
zookeeper_opts = [cfg.StrOpt('uri'),
cfg.IntOpt('partition_interval_recheck_seconds')]
zookeeper_group = cfg.OptGroup(name='zookeeper', title='zookeeper')
cfg.CONF.register_group(zookeeper_group)
cfg.CONF.register_opts(zookeeper_opts, zookeeper_group)
kafka_common_opts = [cfg.StrOpt('uri'),
cfg.StrOpt('group_id'),
cfg.StrOpt('topic'),
@ -56,7 +65,8 @@ kafka_common_opts = [cfg.StrOpt('uri'),
cfg.IntOpt('max_wait_time_seconds'),
cfg.IntOpt('fetch_size_bytes'),
cfg.IntOpt('buffer_size'),
cfg.IntOpt('max_buffer_size')]
cfg.IntOpt('max_buffer_size'),
cfg.StrOpt('zookeeper_path')]
kafka_metrics_opts = kafka_common_opts
kafka_alarm_history_opts = kafka_common_opts
@ -94,9 +104,12 @@ def main():
"""
metric_persister = MetricPersister(cfg.CONF.kafka_metrics,
cfg.CONF.influxdb)
cfg.CONF.influxdb,
cfg.CONF.zookeeper)
alarm_persister = AlarmPersister(cfg.CONF.kafka_alarm_history,
cfg.CONF.influxdb)
cfg.CONF.influxdb,
cfg.CONF.zookeeper)
metric_persister.start()
alarm_persister.start()
@ -154,13 +167,13 @@ class Persister(os_service.Service):
@six.add_metaclass(abc.ABCMeta)
class AbstractPersister(threading.Thread):
def __init__(self, kafka_conf, influxdb_conf):
def __init__(self, kafka_conf, influxdb_conf, zookeeper_conf):
super(AbstractPersister, self).__init__()
kafka = KafkaClient(kafka_conf.uri)
self._kafka_client = KafkaClient(kafka_conf.uri)
self._consumer = (
SimpleConsumer(kafka,
SimpleConsumer(self._kafka_client,
kafka_conf.group_id,
kafka_conf.topic,
# Set to true even though we actually do
@ -182,53 +195,157 @@ class AbstractPersister(threading.Thread):
max_buffer_size=kafka_conf.max_buffer_size,
iter_timeout=1))
self._kafka_topic = kafka_conf.topic
self._influxdb_client = InfluxDBClient(influxdb_conf.ip_address,
influxdb_conf.port,
influxdb_conf.user,
influxdb_conf.password,
influxdb_conf.database_name)
self._kazoo_client = KazooClient(hosts=zookeeper_conf.uri)
self._kazoo_client.start()
self._zookeeper_path = kafka_conf.zookeeper_path
self._partition_interval_recheck_secs = (
zookeeper_conf.partition_interval_recheck_seconds)
self._max_wait_time_secs = kafka_conf.max_wait_time_seconds
self._database_batch_size = kafka_conf.database_batch_size
self._kafka_topic = kafka_conf.topic
self._json_body = []
self._last_flush = datetime.now()
self._last_partition_check = datetime.now()
@abc.abstractmethod
def process_message(self, message):
pass
def _flush(self):
def _flush(self, partitions):
if self._json_body:
self._influxdb_client.write_points(self._json_body)
self._consumer.commit()
self._consumer.commit(partitions=partitions)
LOG.info("processed {} messages from topic '{}'".format(
len(self._json_body), self._kafka_topic))
self._json_body = []
self._last_flush = datetime.now()
def _is_time_for_repartition_check(self):
delta_partition_check_time = datetime.now() - self._last_partition_check
return delta_partition_check_time.seconds >= (
self._partition_interval_recheck_secs)
def _process_messages(self, partitions):
while 1:
if self._is_time_for_repartition_check():
return
delta_flush_time = datetime.now() - self._last_flush
if delta_flush_time.seconds >= self._max_wait_time_secs:
self._flush(partitions)
for message in self._consumer:
try:
self._json_body.append(self.process_message(message))
if self._is_time_for_repartition_check():
return
except Exception:
LOG.exception('Error processing message. Message is '
'being dropped. {}'.format(message))
if len(self._json_body) >= self._database_batch_size:
self._flush(partitions)
def _get_set_partitioner(self):
"""Partition the set of Kafka topic partitions.
Acquire a lock on a subset of the Kafka partitions for a topic
to allow other instances of the persister to run without reading
from the same Kafka partitions for the given topic.
"""
# Refresh the Kafka partitions and their offsets from Kafka to get
# a list of all available partitions. The set of available partitions
# configured in Kafka should not change.
self._consumer.fetch_last_known_offsets()
# Partition on the partitions.
set_partitioner = (
SetPartitioner(self._kazoo_client,
path=self._zookeeper_path,
set=self._consumer.fetch_offsets.keys(),
identifier=str(datetime.now())))
return set_partitioner
def run(self):
try:
while True:
set_partitioner = self._get_set_partitioner()
partitions = []
delta_time = datetime.now() - self._last_flush
if delta_time.seconds > self._max_wait_time_secs:
self._flush()
while 1:
for message in self._consumer:
try:
self._json_body.append(self.process_message(message))
except Exception:
LOG.exception('Error processing message. Message is '
'being dropped. {}'.format(message))
if len(self._json_body) >= self._database_batch_size:
self._flush()
if set_partitioner.failed:
raise Exception("Failed to acquire partition")
elif set_partitioner.release:
self._flush(partitions)
LOG.info("Releasing locks on partition set {} "
"for topic {}".format(partitions,
self._kafka_topic))
set_partitioner.release_set()
partitions = []
elif set_partitioner.acquired:
if not partitions:
partitions = [p for p in set_partitioner]
LOG.info("Acquired locks on partition set {} "
"for topic {}".format(
partitions, self._kafka_topic))
# Refresh the last known offsets again to make sure
# that they are the latest after having acquired the
# lock. Updates self._consumer.fetch_offsets.
self._consumer.fetch_last_known_offsets()
# Modify self._consumer.fetch_offsets to hold only the
# offsets for the set of Kafka partitions acquired
# by this instance of the persister.
partitioned_fetch_offsets = {}
for p in partitions:
partitioned_fetch_offsets[p] = (
self._consumer.fetch_offsets[p])
self._consumer.fetch_offsets = partitioned_fetch_offsets
self._last_partition_check = datetime.now()
self._process_messages(partitions)
elif set_partitioner.allocating:
set_partitioner.wait_for_acquire()
except:
LOG.exception(
'Persister encountered fatal exception processing messages. '
'Shutting down all threads and exiting')
@ -239,9 +356,11 @@ class AlarmPersister(AbstractPersister):
"""Class for persisting alarms.
"""
def __init__(self, kafka_conf, influxdb_conf):
def __init__(self, kafka_conf, influxdb_conf, zookeeper_conf):
super(AlarmPersister, self).__init__(kafka_conf, influxdb_conf)
super(AlarmPersister, self).__init__(kafka_conf,
influxdb_conf,
zookeeper_conf)
def process_message(self, message):
@ -317,9 +436,11 @@ class MetricPersister(AbstractPersister):
"""Class for persisting metrics.
"""
def __init__(self, kafka_conf, influxdb_conf):
def __init__(self, kafka_conf, influxdb_conf, zookeeper_conf):
super(MetricPersister, self).__init__(kafka_conf, influxdb_conf)
super(MetricPersister, self).__init__(kafka_conf,
influxdb_conf,
zookeeper_conf)
def process_message(self, message):

View File

@ -1,7 +1,8 @@
babel
iso8601
eventlet
kafka-python>=0.9.2
oslo.config>=1.3.0
influxdb>=0.1.12
iso8601
kafka-python>=0.9.2
kazoo>=2.0
oslo.config>=1.3.0