Enable the ability to run multiple consumer processes at once

Change-Id: Ibed328d0b2bc11206d9975e84ce957305f8b973a
This commit is contained in:
Michael James Hoppal 2016-02-29 14:47:13 -07:00
parent 22e18453dd
commit 7c77ad3e44
3 changed files with 86 additions and 21 deletions

View File

@ -35,6 +35,7 @@ buffer_size = 4096
max_buffer_size = 32768
# Path in zookeeper for kafka consumer group partitioning algo
zookeeper_path = /persister_partitions/alarm-state-transitions
num_processors = 1
[kafka_metrics]
# Comma separated list of Kafka broker host:port
@ -52,6 +53,7 @@ buffer_size = 4096
max_buffer_size = 32768
# Path in zookeeper for kafka consumer group partitioning algo
zookeeper_path = /persister_partitions/metrics
num_processors = 1
[influxdb]
database_name = mon

View File

@ -21,8 +21,11 @@
Start the perister as stand-alone process by running 'persister.py
--config-file <config file>'
"""
import multiprocessing
import os
import signal
import sys
import time
import simport
from oslo_config import cfg
@ -49,7 +52,8 @@ kafka_common_opts = [cfg.StrOpt('uri'),
cfg.IntOpt('fetch_size_bytes'),
cfg.IntOpt('buffer_size'),
cfg.IntOpt('max_buffer_size'),
cfg.StrOpt('zookeeper_path')]
cfg.StrOpt('zookeeper_path'),
cfg.IntOpt('num_processors')]
kafka_metrics_opts = kafka_common_opts
kafka_alarm_history_opts = kafka_common_opts
@ -71,6 +75,57 @@ repositories_group = cfg.OptGroup(name='repositories', title='repositories')
cfg.CONF.register_group(repositories_group)
cfg.CONF.register_opts(repositories_opts, repositories_group)
processors = [] # global list to facilitate clean signal handling
exiting = False
def clean_exit(signum, frame=None):
"""Exit all processes attempting to finish uncommited active work before exit.
Can be called on an os signal or no zookeeper losing connection.
"""
global exiting
if exiting:
# Since this is set up as a handler for SIGCHLD when this kills one
# child it gets another signal, the global exiting avoids this running
# multiple times.
LOG.debug('Exit in progress clean_exit received additional signal %s' % signum)
return
LOG.info('Received signal %s, beginning graceful shutdown.' % signum)
exiting = True
wait_for_exit = False
for process in processors:
try:
if process.is_alive():
process.terminate() # Sends sigterm which any processes after a notification is sent attempt to handle
wait_for_exit = True
except Exception:
pass
# wait for a couple seconds to give the subprocesses a chance to shut down correctly.
if wait_for_exit:
time.sleep(2)
# Kill everything, that didn't already die
for child in multiprocessing.active_children():
LOG.debug('Killing pid %s' % child.pid)
try:
os.kill(child.pid, signal.SIGKILL)
except Exception:
pass
if signum == signal.SIGTERM:
sys.exit(0)
sys.exit(signum)
def start_process(respository, kafka_config):
LOG.info("start process: {}".format(respository))
persister = Persister(kafka_config, cfg.CONF.zookeeper, respository)
persister.run()
def main():
log.register_options(cfg.CONF)
@ -78,26 +133,24 @@ def main():
cfg.CONF(sys.argv[1:], project='monasca', prog='persister')
log.setup(cfg.CONF, "monasca-persister")
"""Start persister.
Start metric persister and alarm persister in separate threads.
"""
"""Start persister."""
metric_repository = simport.load(cfg.CONF.repositories.metrics_driver)
alarm_state_history_repository = simport.load(cfg.CONF.repositories.alarm_state_history_driver)
metric_persister = Persister(cfg.CONF.kafka_metrics,
cfg.CONF.zookeeper,
metric_repository)
# Add processors for metrics topic
for proc in range(0, cfg.CONF.kafka_metrics.num_processors):
processors.append(multiprocessing.Process(
target=start_process, args=(metric_repository, cfg.CONF.kafka_metrics)))
alarm_persister = Persister(cfg.CONF.kafka_alarm_history,
cfg.CONF.zookeeper,
alarm_state_history_repository)
# Add processors for alarm history topic
for proc in range(0, cfg.CONF.kafka_alarm_history.num_processors):
processors.append(multiprocessing.Process(
target=start_process, args=(alarm_state_history_repository, cfg.CONF.kafka_alarm_history)))
metric_persister.start()
alarm_persister.start()
LOG.info('''
# Start
try:
LOG.info('''
_____
/ \ ____ ____ _____ ______ ____ _____
@ -113,8 +166,21 @@ def main():
\/ \/ \/ \/
''')
for process in processors:
process.start()
LOG.info('Monasca Persister has started successfully!')
# The signal handlers must be added after the processes start otherwise
# they run on all processes
signal.signal(signal.SIGCHLD, clean_exit)
signal.signal(signal.SIGINT, clean_exit)
signal.signal(signal.SIGTERM, clean_exit)
while True:
time.sleep(10)
except Exception:
LOG.exception('Error! Exiting.')
clean_exit(signal.SIGKILL)
if __name__ == "__main__":
sys.exit(main())

View File

@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import threading
from oslo_log import log
@ -22,12 +21,10 @@ from monasca_common.kafka.consumer import KafkaConsumer
LOG = log.getLogger(__name__)
class Persister(threading.Thread):
class Persister(object):
def __init__(self, kafka_conf, zookeeper_conf, repository):
super(Persister, self).__init__()
self._data_points = []
self._kafka_topic = kafka_conf.topic