Check periodically if host is leader

Check periodically if host continues to be a
leader once elected. Failure to check
might lead to a situation where the host
has lost leadership but is not aware of the
situation.
If the host is no longer the leader then
stand down as a leader, stop any spark-submit
processes running on the node and reset state
in the transform thread.
Removed --supervise option when invoking
spark-submit to turn off built in driver
management.
Added some hardening to better catch exceptions
in main transform service thread and also
periodic leader check function so that the
threads don't die when they encounter
an unhandled exception.

Change-Id: If2e13e3ed6cb30b3d7fa5f1b440c4c39b87692be
This commit is contained in:
Ashwin Agate 2017-06-13 17:26:03 -07:00
parent b40c3b17d6
commit 94510afed9
6 changed files with 375 additions and 95 deletions

View File

@ -42,7 +42,7 @@ HADOOP_VERSION=${HADOOP_VERSION:-2.6}
SPARK_HADOOP_VERSION=spark-$SPARK_VERSION-bin-hadoop$HADOOP_VERSION
SPARK_TARBALL_NAME=${SPARK_HADOOP_VERSION}.tgz
MAVEN_REPO=${MAVEN_REPO:-https://repo1.maven.org/maven2}
APACHE_MIRROR=${APACHE_MIRROR:-http://archive.apache.org/dist}
APACHE_MIRROR=${APACHE_MIRROR:-http://archive.apache.org/dist/}
# Kafka deb consists of the version of scala plus the version of kafka
BASE_KAFKA_VERSION=${BASE_KAFKA_VERSION:-0.8.1.1}

View File

@ -13,29 +13,28 @@
# under the License.
import os
import psutil
import signal
import socket
from subprocess import call
import subprocess
import sys
import threading
import time
import traceback
from oslo_config import cfg
from oslo_log import log
from oslo_service import loopingcall
from oslo_service import service as os_service
from tooz import coordination
from monasca_transform.config.config_initializer import ConfigInitializer
LOG = log.getLogger(__name__)
log.register_options(cfg.CONF)
log.set_defaults()
log.setup(cfg.CONF, 'transform')
from monasca_transform.log_utils import LogUtils
CONF = cfg.CONF
SPARK_SUBMIT_PROC_NAME = "spark-submit"
def main():
transform_service = TransformService()
@ -46,14 +45,57 @@ def shutdown_all_threads_and_die():
"""Shut down all threads and exit process.
Hit it with a hammer to kill all threads and die.
"""
LOG = log.getLogger(__name__)
LOG.info('Monasca Transform service stopping...')
os._exit(1)
def get_process(proc_name):
"""Get process given string in
process cmd line.
"""
LOG = log.getLogger(__name__)
proc = None
try:
for pr in psutil.process_iter():
for args in pr.cmdline():
if proc_name in args.split(" "):
proc = pr
return proc
except BaseException:
# pass
LOG.error("Error fetching {%s} process..." % proc_name)
return None
def stop_spark_submit_process():
"""Stop spark submit program."""
LOG = log.getLogger(__name__)
try:
# get the driver proc
pr = get_process(SPARK_SUBMIT_PROC_NAME)
if pr:
# terminate (SIGTERM) spark driver proc
for cpr in pr.children(recursive=False):
LOG.info("Terminate child pid {%s} ..." % str(cpr.pid))
cpr.terminate()
# terminate spark submit proc
LOG.info("Terminate pid {%s} ..." % str(pr.pid))
pr.terminate()
except Exception as e:
LOG.error("Error killing spark submit "
"process: got exception: {%s}" % e.message)
class Transform(os_service.Service):
"""Class used with Openstack service.
"""
LOG = log.getLogger(__name__)
def __init__(self, threads=1):
super(Transform, self).__init__(threads)
@ -68,128 +110,193 @@ class Transform(os_service.Service):
main()
except Exception:
LOG.exception('Monasca Transform service encountered fatal error. '
'Shutting down all threads and exiting')
except BaseException:
self.LOG.exception("Monasca Transform service "
"encountered fatal error. "
"Shutting down all threads and exiting")
shutdown_all_threads_and_die()
def stop(self, graceful):
shutdown_all_threads_and_die()
def stop(self):
stop_spark_submit_process()
super(os_service.Service, self).stop()
class TransformService(threading.Thread):
previously_running = False
# A unique name used for establishing election candidacy
my_host_name = socket.getfqdn()
LOG = log.getLogger(__name__)
def __init__(self):
super(TransformService, self).__init__()
def when_i_am_elected_leader(self, event):
self.coordinator = None
self.group = CONF.service.coordinator_group
# A unique name used for establishing election candidacy
self.my_host_name = socket.getfqdn()
# periodic check
leader_check = loopingcall.FixedIntervalLoopingCall(
self.periodic_leader_check)
leader_check.start(interval=float(
CONF.service.election_polling_frequency))
def check_if_still_leader(self):
"""Return true if the this host is the
leader
"""
leader = None
try:
LOG.info('Monasca Transform service running on ' +
self.my_host_name + ' has been elected leader')
self.previously_running = True
leader = self.coordinator.get_leader(self.group).get()
except BaseException:
self.LOG.info('No leader elected yet for group %s' %
(self.group))
if leader and self.my_host_name == leader:
return True
# default
return False
if CONF.service.spark_python_files:
pyfiles = " --py-files %s" % CONF.service.spark_python_files
else:
pyfiles = ''
def periodic_leader_check(self):
self.LOG.debug("Called periodic_leader_check...")
try:
if self.previously_running:
if not self.check_if_still_leader():
if (CONF.service.spark_event_logging_enabled and
CONF.service.spark_event_logging_dest):
event_logging_dest = (" --conf spark.eventLog.dir=file://%s" %
CONF.service.spark_event_logging_dest)
else:
event_logging_dest = ''
# stop spark submit process
stop_spark_submit_process()
# Build the command to start the Spark driver
spark_cmd = ("export SPARK_HOME=" +
CONF.service.spark_home + " && "
"spark-submit --supervise --master " +
CONF.service.spark_master_list +
" --conf spark.eventLog.enabled=" +
CONF.service.spark_event_logging_enabled +
event_logging_dest +
" --jars " + CONF.service.spark_jars_list +
pyfiles +
" " + CONF.service.spark_driver)
# stand down as a leader
try:
self.coordinator.stand_down_group_leader(
self.group)
except BaseException as e:
self.LOG.info("Host %s cannot stand down as "
"leader for group %s: "
"got exception {%s}" %
(self.my_host_name, self.group,
e.message))
# reset state
self.previously_running = False
except BaseException as e:
self.LOG.info("periodic_leader_check: "
"caught unhandled exception: {%s}" % e.message)
# Start the Spark driver (specify shell=True in order to
# correctly handle wildcards in the spark_cmd
call(spark_cmd, shell=True)
def when_i_am_elected_leader(self, event):
"""Callback when this host gets elected leader."""
except Exception:
LOG.exception(
'TransformService on ' + self.my_host_name +
' encountered fatal exception. '
'Shutting down all threads and exiting')
shutdown_all_threads_and_die()
# set running state
self.previously_running = True
self.LOG.info("Monasca Transform service running on %s "
"has been elected leader" % str(self.my_host_name))
if CONF.service.spark_python_files:
pyfiles = (" --py-files %s"
% CONF.service.spark_python_files)
else:
pyfiles = ''
event_logging_dest = ''
if (CONF.service.spark_event_logging_enabled and
CONF.service.spark_event_logging_dest):
event_logging_dest = (
"--conf spark.eventLog.dir="
"file://%s" %
CONF.service.spark_event_logging_dest)
# Build the command to start the Spark driver
spark_cmd = "".join((
"export SPARK_HOME=",
CONF.service.spark_home,
" && ",
"spark-submit --master ",
CONF.service.spark_master_list,
" --conf spark.eventLog.enabled=",
CONF.service.spark_event_logging_enabled,
event_logging_dest,
" --jars " + CONF.service.spark_jars_list,
pyfiles,
" " + CONF.service.spark_driver))
# Start the Spark driver
# (specify shell=True in order to
# correctly handle wildcards in the spark_cmd)
subprocess.call(spark_cmd, shell=True)
def run(self):
LOG.info('The host of this Monasca Transform service is ' +
self.my_host_name)
self.LOG.info('The host of this Monasca Transform service is ' +
self.my_host_name)
# Loop until the service is stopped
while True:
self.previously_running = False
# Start an election coordinator
coordinator = coordination.get_coordinator(
CONF.service.coordinator_address, self.my_host_name)
coordinator.start()
# Create a coordination/election group
group = CONF.service.coordinator_group
try:
request = coordinator.create_group(group)
request.get()
except coordination.GroupAlreadyExist:
LOG.info('Group %s already exists' % group)
# Join the coordination/election group
try:
request = coordinator.join_group(group)
request.get()
except coordination.MemberAlreadyExist:
LOG.info('Host already joined to group %s as %s' %
(group, self.my_host_name))
self.previously_running = False
# Announce the candidacy and wait to be elected
coordinator.watch_elected_as_leader(group,
self.when_i_am_elected_leader)
# Start an election coordinator
self.coordinator = coordination.get_coordinator(
CONF.service.coordinator_address, self.my_host_name)
while self.previously_running is False:
LOG.info('Monasca Transform service on %s is checking'
' election results...' % self.my_host_name)
coordinator.heartbeat()
coordinator.run_watchers()
if self.previously_running is True:
try:
# Leave/exit the coordination/election group
request = coordinator.leave_group(group)
request.get()
except coordination.MemberNotJoined:
LOG.info('Host has not yet joined group %s as %s' %
(group, self.my_host_name))
time.sleep(float(CONF.service.election_polling_frequency))
self.coordinator.start()
coordinator.stop()
# Create a coordination/election group
try:
request = self.coordinator.create_group(self.group)
request.get()
except coordination.GroupAlreadyExist:
self.LOG.info('Group %s already exists' % self.group)
# Join the coordination/election group
try:
request = self.coordinator.join_group(self.group)
request.get()
except coordination.MemberAlreadyExist:
self.LOG.info('Host already joined to group %s as %s' %
(self.group, self.my_host_name))
# Announce the candidacy and wait to be elected
self.coordinator.watch_elected_as_leader(
self.group,
self.when_i_am_elected_leader)
while self.previously_running is False:
self.LOG.debug('Monasca Transform service on %s is '
'checking election results...'
% self.my_host_name)
self.coordinator.heartbeat()
self.coordinator.run_watchers()
if self.previously_running is True:
try:
# Leave/exit the coordination/election group
request = self.coordinator.leave_group(self.group)
request.get()
except coordination.MemberNotJoined:
self.LOG.info("Host has not yet "
"joined group %s as %s" %
(self.group, self.my_host_name))
time.sleep(float(CONF.service.election_polling_frequency))
self.coordinator.stop()
except BaseException as e:
# catch any unhandled exception and continue
self.LOG.info("Ran into unhandled exception: {%s}" % e.message)
self.LOG.info("Going to restart coordinator again...")
traceback.print_exc()
def main_service():
"""Method to use with Openstack service.
"""
ConfigInitializer.basic_config()
launcher = os_service.ServiceLauncher(CONF)
LogUtils.init_logger(__name__)
launcher = os_service.ServiceLauncher(cfg.CONF)
launcher.launch_service(Transform())
launcher.wait()
# Used if run without Openstack service.
if __name__ == "__main__":
sys.exit(main())

View File

@ -2,6 +2,7 @@
# of appearance. Changing the order has an impact on the overall integration
# process, which may cause wedges in the gate later.
pbr!=2.1.0,>=2.0.0 # Apache-2.0
psutil>=3.2.2 # BSD
PyMySQL>=0.7.6 # MIT License
six>=1.9.0 # MIT
SQLAlchemy!=1.1.5,!=1.1.6,!=1.1.7,!=1.1.8,>=1.0.10 # MIT

View File

View File

@ -0,0 +1,149 @@
# Copyright 2016-2017 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from mock import MagicMock
import os
from oslo_config import cfg
import oslo_service
import time
import traceback
import unittest
from monasca_transform.config.config_initializer import ConfigInitializer
from monasca_transform.service import transform_service
from tooz.drivers.zookeeper import KazooDriver
ConfigInitializer.basic_config(
default_config_files=[
'tests/unit/test_resources/config/'
'test_config.conf']
)
class TransformServiceTestBase(unittest.TestCase):
def setUp(self):
super(TransformServiceTestBase, self).setUp()
self.conf = cfg.CONF
def _spawn_transform_service(self):
"""Launch transform service and get pid."""
status = 0
pid = os.fork()
if pid == 0:
try:
os.setsid()
# start transform service
launcher = oslo_service.service.launch(
self.conf,
transform_service.Transform(),
workers=1)
status = launcher.wait()
except SystemExit as exc:
traceback.print_exc()
status = exc.code
except BaseException:
try:
traceback.print_exc()
except BaseException:
print("Could not print traceback")
status = 2
os._exit(status or 0)
return pid
@mock.patch('tooz.coordination.get_coordinator')
def test_transform_service_heartbeat(self, coordinator):
# mock coordinator
fake_kazoo_driver = MagicMock(name="MagicKazooDriver",
spec=KazooDriver)
coordinator.return_value = fake_kazoo_driver
# option1
serv_thread = transform_service.TransformService()
serv_thread.daemon = True
serv_thread.start()
time.sleep(2)
# option2
# mocks dont seem to work when spawning a service
# pid = _spawn_transform_service()
# time.sleep()
# os.kill(pid, signal.SIGNAL_SIGTERM)
fake_kazoo_driver.heartbeat.assert_called_with()
@mock.patch('tooz.coordination.get_coordinator')
@mock.patch('monasca_transform.service.transform_service'
'.stop_spark_submit_process')
def test_transform_service_periodic_check(self,
stopspark,
coordinator):
# mock coordinator
fake_kazoo_driver = MagicMock(name="MagicKazooDriver",
spec=KazooDriver)
fake_kazoo_driver.get_leader.get.return_value = "someotherhost"
coordinator.return_value = fake_kazoo_driver
# set up transform service
serv_thread = transform_service.TransformService()
serv_thread.daemon = True
# assume that the driver was running
serv_thread.previously_running = True
# set the coordinator
serv_thread.coordinator = fake_kazoo_driver
# call periodic leader check
serv_thread.periodic_leader_check()
# verify if standown leader was called
fake_kazoo_driver.stand_down_group_leader.assert_called_with(
'monasca-transform')
# verify if stop spark submit process was called
stopspark.assert_called()
@mock.patch('tooz.coordination.get_coordinator')
@mock.patch('subprocess.call')
def test_transform_service_leader_election(self,
spark_submit_call,
coordinator):
# mock coordinator
fake_kazoo_driver = MagicMock(name="MagicKazooDriver",
spec=KazooDriver)
coordinator.return_value = fake_kazoo_driver
# set up transform service
serv_thread = transform_service.TransformService()
serv_thread.daemon = True
# test previously running value
self.assertFalse(serv_thread.previously_running)
fake_event = MagicMock(name="fake_event",
spec='tooz.coordination.Event')
# call leader election function
serv_thread.when_i_am_elected_leader(fake_event)
# test if subcall was called
spark_submit_call.assert_called()
# test previously running value
self.assertTrue(serv_thread.previously_running)

View File

@ -36,4 +36,27 @@ service_log_filename=monasca-transform.log
# Whether debug-level log entries should be included in the application
# log. If this setting is false, info-level will be used for logging.
enable_debug_log_entries = true
enable_debug_log_entries = true
# The address of the mechanism being used for election coordination
coordinator_address = kazoo://localhost:2181
# The name of the coordination/election group
coordinator_group = monasca-transform
# How long the candidate should sleep between election result
# queries (in seconds)
election_polling_frequency = 15
spark_jars_list = /opt/spark/current/lib/spark-streaming-kafka.jar,/opt/spark/current/lib/scala-library-2.10.1.jar,/opt/spark/current/lib/kafka_2.10-0.8.1.1.jar,/opt/spark/current/lib/metrics-core-2.2.0.jar,/usr/share/java/mysql.jar
spark_driver = /opt/stack/monasca-transform/monasca_transform/driver/mon_metrics_kafka.py
# Whether Spark event logging should be enabled (true/false)
spark_event_logging_enabled = true
# A list of where the Spark master(s) should run
spark_master_list = spark://192.168.10.4:7077,192.168.10.5:7077
# spark_home for the environment
spark_home = /opt/spark/current