From 94510afed99abe161973451e89f2265cd00c11ab Mon Sep 17 00:00:00 2001 From: Ashwin Agate Date: Tue, 13 Jun 2017 17:26:03 -0700 Subject: [PATCH] Check periodically if host is leader Check periodically if host continues to be a leader once elected. Failure to check might lead to a situation where the host has lost leadership but is not aware of the situation. If the host is no longer the leader then stand down as a leader, stop any spark-submit processes running on the node and reset state in the transform thread. Removed --supervise option when invoking spark-submit to turn off built in driver management. Added some hardening to better catch exceptions in main transform service thread and also periodic leader check function so that the threads don't die when they encounter an unhandled exception. Change-Id: If2e13e3ed6cb30b3d7fa5f1b440c4c39b87692be --- devstack/settings | 2 +- .../service/transform_service.py | 293 ++++++++++++------ requirements.txt | 1 + tests/unit/service/__init__.py | 0 tests/unit/service/test_transform_service.py | 149 +++++++++ .../test_resources/config/test_config.conf | 25 +- 6 files changed, 375 insertions(+), 95 deletions(-) create mode 100644 tests/unit/service/__init__.py create mode 100644 tests/unit/service/test_transform_service.py diff --git a/devstack/settings b/devstack/settings index 665ed4f..87e53b6 100644 --- a/devstack/settings +++ b/devstack/settings @@ -42,7 +42,7 @@ HADOOP_VERSION=${HADOOP_VERSION:-2.6} SPARK_HADOOP_VERSION=spark-$SPARK_VERSION-bin-hadoop$HADOOP_VERSION SPARK_TARBALL_NAME=${SPARK_HADOOP_VERSION}.tgz MAVEN_REPO=${MAVEN_REPO:-https://repo1.maven.org/maven2} -APACHE_MIRROR=${APACHE_MIRROR:-http://archive.apache.org/dist} +APACHE_MIRROR=${APACHE_MIRROR:-http://archive.apache.org/dist/} # Kafka deb consists of the version of scala plus the version of kafka BASE_KAFKA_VERSION=${BASE_KAFKA_VERSION:-0.8.1.1} diff --git a/monasca_transform/service/transform_service.py b/monasca_transform/service/transform_service.py index 794e0b8..d62091a 100644 --- a/monasca_transform/service/transform_service.py +++ b/monasca_transform/service/transform_service.py @@ -13,29 +13,28 @@ # under the License. import os +import psutil import signal import socket -from subprocess import call +import subprocess import sys import threading import time +import traceback from oslo_config import cfg from oslo_log import log +from oslo_service import loopingcall from oslo_service import service as os_service - from tooz import coordination from monasca_transform.config.config_initializer import ConfigInitializer - - -LOG = log.getLogger(__name__) -log.register_options(cfg.CONF) -log.set_defaults() -log.setup(cfg.CONF, 'transform') +from monasca_transform.log_utils import LogUtils CONF = cfg.CONF +SPARK_SUBMIT_PROC_NAME = "spark-submit" + def main(): transform_service = TransformService() @@ -46,14 +45,57 @@ def shutdown_all_threads_and_die(): """Shut down all threads and exit process. Hit it with a hammer to kill all threads and die. """ + LOG = log.getLogger(__name__) LOG.info('Monasca Transform service stopping...') os._exit(1) +def get_process(proc_name): + """Get process given string in + process cmd line. + """ + LOG = log.getLogger(__name__) + proc = None + try: + for pr in psutil.process_iter(): + for args in pr.cmdline(): + if proc_name in args.split(" "): + proc = pr + return proc + except BaseException: + # pass + LOG.error("Error fetching {%s} process..." % proc_name) + return None + + +def stop_spark_submit_process(): + """Stop spark submit program.""" + LOG = log.getLogger(__name__) + try: + # get the driver proc + pr = get_process(SPARK_SUBMIT_PROC_NAME) + + if pr: + # terminate (SIGTERM) spark driver proc + for cpr in pr.children(recursive=False): + LOG.info("Terminate child pid {%s} ..." % str(cpr.pid)) + cpr.terminate() + + # terminate spark submit proc + LOG.info("Terminate pid {%s} ..." % str(pr.pid)) + pr.terminate() + + except Exception as e: + LOG.error("Error killing spark submit " + "process: got exception: {%s}" % e.message) + + class Transform(os_service.Service): """Class used with Openstack service. """ + LOG = log.getLogger(__name__) + def __init__(self, threads=1): super(Transform, self).__init__(threads) @@ -68,128 +110,193 @@ class Transform(os_service.Service): main() - except Exception: - LOG.exception('Monasca Transform service encountered fatal error. ' - 'Shutting down all threads and exiting') + except BaseException: + self.LOG.exception("Monasca Transform service " + "encountered fatal error. " + "Shutting down all threads and exiting") shutdown_all_threads_and_die() - def stop(self, graceful): - shutdown_all_threads_and_die() + def stop(self): + stop_spark_submit_process() + super(os_service.Service, self).stop() class TransformService(threading.Thread): previously_running = False - - # A unique name used for establishing election candidacy - my_host_name = socket.getfqdn() + LOG = log.getLogger(__name__) def __init__(self): super(TransformService, self).__init__() - def when_i_am_elected_leader(self, event): + self.coordinator = None + + self.group = CONF.service.coordinator_group + + # A unique name used for establishing election candidacy + self.my_host_name = socket.getfqdn() + + # periodic check + leader_check = loopingcall.FixedIntervalLoopingCall( + self.periodic_leader_check) + leader_check.start(interval=float( + CONF.service.election_polling_frequency)) + + def check_if_still_leader(self): + """Return true if the this host is the + leader + """ + leader = None try: - LOG.info('Monasca Transform service running on ' + - self.my_host_name + ' has been elected leader') - self.previously_running = True + leader = self.coordinator.get_leader(self.group).get() + except BaseException: + self.LOG.info('No leader elected yet for group %s' % + (self.group)) + if leader and self.my_host_name == leader: + return True + # default + return False - if CONF.service.spark_python_files: - pyfiles = " --py-files %s" % CONF.service.spark_python_files - else: - pyfiles = '' + def periodic_leader_check(self): + self.LOG.debug("Called periodic_leader_check...") + try: + if self.previously_running: + if not self.check_if_still_leader(): - if (CONF.service.spark_event_logging_enabled and - CONF.service.spark_event_logging_dest): - event_logging_dest = (" --conf spark.eventLog.dir=file://%s" % - CONF.service.spark_event_logging_dest) - else: - event_logging_dest = '' + # stop spark submit process + stop_spark_submit_process() - # Build the command to start the Spark driver - spark_cmd = ("export SPARK_HOME=" + - CONF.service.spark_home + " && " - "spark-submit --supervise --master " + - CONF.service.spark_master_list + - " --conf spark.eventLog.enabled=" + - CONF.service.spark_event_logging_enabled + - event_logging_dest + - " --jars " + CONF.service.spark_jars_list + - pyfiles + - " " + CONF.service.spark_driver) + # stand down as a leader + try: + self.coordinator.stand_down_group_leader( + self.group) + except BaseException as e: + self.LOG.info("Host %s cannot stand down as " + "leader for group %s: " + "got exception {%s}" % + (self.my_host_name, self.group, + e.message)) + # reset state + self.previously_running = False + except BaseException as e: + self.LOG.info("periodic_leader_check: " + "caught unhandled exception: {%s}" % e.message) - # Start the Spark driver (specify shell=True in order to - # correctly handle wildcards in the spark_cmd - call(spark_cmd, shell=True) + def when_i_am_elected_leader(self, event): + """Callback when this host gets elected leader.""" - except Exception: - LOG.exception( - 'TransformService on ' + self.my_host_name + - ' encountered fatal exception. ' - 'Shutting down all threads and exiting') - shutdown_all_threads_and_die() + # set running state + self.previously_running = True + + self.LOG.info("Monasca Transform service running on %s " + "has been elected leader" % str(self.my_host_name)) + + if CONF.service.spark_python_files: + pyfiles = (" --py-files %s" + % CONF.service.spark_python_files) + else: + pyfiles = '' + + event_logging_dest = '' + if (CONF.service.spark_event_logging_enabled and + CONF.service.spark_event_logging_dest): + event_logging_dest = ( + "--conf spark.eventLog.dir=" + "file://%s" % + CONF.service.spark_event_logging_dest) + + # Build the command to start the Spark driver + spark_cmd = "".join(( + "export SPARK_HOME=", + CONF.service.spark_home, + " && ", + "spark-submit --master ", + CONF.service.spark_master_list, + " --conf spark.eventLog.enabled=", + CONF.service.spark_event_logging_enabled, + event_logging_dest, + " --jars " + CONF.service.spark_jars_list, + pyfiles, + " " + CONF.service.spark_driver)) + + # Start the Spark driver + # (specify shell=True in order to + # correctly handle wildcards in the spark_cmd) + subprocess.call(spark_cmd, shell=True) def run(self): - LOG.info('The host of this Monasca Transform service is ' + - self.my_host_name) + + self.LOG.info('The host of this Monasca Transform service is ' + + self.my_host_name) # Loop until the service is stopped while True: - self.previously_running = False - - # Start an election coordinator - coordinator = coordination.get_coordinator( - CONF.service.coordinator_address, self.my_host_name) - coordinator.start() - - # Create a coordination/election group - group = CONF.service.coordinator_group try: - request = coordinator.create_group(group) - request.get() - except coordination.GroupAlreadyExist: - LOG.info('Group %s already exists' % group) - # Join the coordination/election group - try: - request = coordinator.join_group(group) - request.get() - except coordination.MemberAlreadyExist: - LOG.info('Host already joined to group %s as %s' % - (group, self.my_host_name)) + self.previously_running = False - # Announce the candidacy and wait to be elected - coordinator.watch_elected_as_leader(group, - self.when_i_am_elected_leader) + # Start an election coordinator + self.coordinator = coordination.get_coordinator( + CONF.service.coordinator_address, self.my_host_name) - while self.previously_running is False: - LOG.info('Monasca Transform service on %s is checking' - ' election results...' % self.my_host_name) - coordinator.heartbeat() - coordinator.run_watchers() - if self.previously_running is True: - try: - # Leave/exit the coordination/election group - request = coordinator.leave_group(group) - request.get() - except coordination.MemberNotJoined: - LOG.info('Host has not yet joined group %s as %s' % - (group, self.my_host_name)) - time.sleep(float(CONF.service.election_polling_frequency)) + self.coordinator.start() - coordinator.stop() + # Create a coordination/election group + try: + request = self.coordinator.create_group(self.group) + request.get() + except coordination.GroupAlreadyExist: + self.LOG.info('Group %s already exists' % self.group) + + # Join the coordination/election group + try: + request = self.coordinator.join_group(self.group) + request.get() + except coordination.MemberAlreadyExist: + self.LOG.info('Host already joined to group %s as %s' % + (self.group, self.my_host_name)) + + # Announce the candidacy and wait to be elected + self.coordinator.watch_elected_as_leader( + self.group, + self.when_i_am_elected_leader) + + while self.previously_running is False: + self.LOG.debug('Monasca Transform service on %s is ' + 'checking election results...' + % self.my_host_name) + self.coordinator.heartbeat() + self.coordinator.run_watchers() + if self.previously_running is True: + try: + # Leave/exit the coordination/election group + request = self.coordinator.leave_group(self.group) + request.get() + except coordination.MemberNotJoined: + self.LOG.info("Host has not yet " + "joined group %s as %s" % + (self.group, self.my_host_name)) + time.sleep(float(CONF.service.election_polling_frequency)) + + self.coordinator.stop() + + except BaseException as e: + # catch any unhandled exception and continue + self.LOG.info("Ran into unhandled exception: {%s}" % e.message) + self.LOG.info("Going to restart coordinator again...") + traceback.print_exc() def main_service(): """Method to use with Openstack service. """ ConfigInitializer.basic_config() - - launcher = os_service.ServiceLauncher(CONF) + LogUtils.init_logger(__name__) + launcher = os_service.ServiceLauncher(cfg.CONF) launcher.launch_service(Transform()) launcher.wait() - # Used if run without Openstack service. if __name__ == "__main__": sys.exit(main()) diff --git a/requirements.txt b/requirements.txt index eb18bd6..9209a7a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,7 @@ # of appearance. Changing the order has an impact on the overall integration # process, which may cause wedges in the gate later. pbr!=2.1.0,>=2.0.0 # Apache-2.0 +psutil>=3.2.2 # BSD PyMySQL>=0.7.6 # MIT License six>=1.9.0 # MIT SQLAlchemy!=1.1.5,!=1.1.6,!=1.1.7,!=1.1.8,>=1.0.10 # MIT diff --git a/tests/unit/service/__init__.py b/tests/unit/service/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/service/test_transform_service.py b/tests/unit/service/test_transform_service.py new file mode 100644 index 0000000..27f5a47 --- /dev/null +++ b/tests/unit/service/test_transform_service.py @@ -0,0 +1,149 @@ +# Copyright 2016-2017 Hewlett Packard Enterprise Development Company LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +from mock import MagicMock +import os + +from oslo_config import cfg +import oslo_service + +import time +import traceback +import unittest + +from monasca_transform.config.config_initializer import ConfigInitializer + +from monasca_transform.service import transform_service + +from tooz.drivers.zookeeper import KazooDriver + +ConfigInitializer.basic_config( + default_config_files=[ + 'tests/unit/test_resources/config/' + 'test_config.conf'] +) + + +class TransformServiceTestBase(unittest.TestCase): + + def setUp(self): + super(TransformServiceTestBase, self).setUp() + self.conf = cfg.CONF + + def _spawn_transform_service(self): + """Launch transform service and get pid.""" + status = 0 + pid = os.fork() + if pid == 0: + try: + os.setsid() + # start transform service + launcher = oslo_service.service.launch( + self.conf, + transform_service.Transform(), + workers=1) + status = launcher.wait() + except SystemExit as exc: + traceback.print_exc() + status = exc.code + except BaseException: + try: + traceback.print_exc() + except BaseException: + print("Could not print traceback") + status = 2 + os._exit(status or 0) + return pid + + @mock.patch('tooz.coordination.get_coordinator') + def test_transform_service_heartbeat(self, coordinator): + + # mock coordinator + fake_kazoo_driver = MagicMock(name="MagicKazooDriver", + spec=KazooDriver) + coordinator.return_value = fake_kazoo_driver + + # option1 + serv_thread = transform_service.TransformService() + serv_thread.daemon = True + serv_thread.start() + time.sleep(2) + + # option2 + # mocks dont seem to work when spawning a service + # pid = _spawn_transform_service() + # time.sleep() + # os.kill(pid, signal.SIGNAL_SIGTERM) + + fake_kazoo_driver.heartbeat.assert_called_with() + + @mock.patch('tooz.coordination.get_coordinator') + @mock.patch('monasca_transform.service.transform_service' + '.stop_spark_submit_process') + def test_transform_service_periodic_check(self, + stopspark, + coordinator): + + # mock coordinator + fake_kazoo_driver = MagicMock(name="MagicKazooDriver", + spec=KazooDriver) + fake_kazoo_driver.get_leader.get.return_value = "someotherhost" + coordinator.return_value = fake_kazoo_driver + + # set up transform service + serv_thread = transform_service.TransformService() + serv_thread.daemon = True + # assume that the driver was running + serv_thread.previously_running = True + # set the coordinator + serv_thread.coordinator = fake_kazoo_driver + + # call periodic leader check + serv_thread.periodic_leader_check() + + # verify if standown leader was called + fake_kazoo_driver.stand_down_group_leader.assert_called_with( + 'monasca-transform') + # verify if stop spark submit process was called + stopspark.assert_called() + + @mock.patch('tooz.coordination.get_coordinator') + @mock.patch('subprocess.call') + def test_transform_service_leader_election(self, + spark_submit_call, + coordinator): + # mock coordinator + fake_kazoo_driver = MagicMock(name="MagicKazooDriver", + spec=KazooDriver) + coordinator.return_value = fake_kazoo_driver + + # set up transform service + serv_thread = transform_service.TransformService() + serv_thread.daemon = True + + # test previously running value + self.assertFalse(serv_thread.previously_running) + + fake_event = MagicMock(name="fake_event", + spec='tooz.coordination.Event') + + # call leader election function + serv_thread.when_i_am_elected_leader(fake_event) + + # test if subcall was called + spark_submit_call.assert_called() + + # test previously running value + self.assertTrue(serv_thread.previously_running) diff --git a/tests/unit/test_resources/config/test_config.conf b/tests/unit/test_resources/config/test_config.conf index 5585a33..59a9dad 100644 --- a/tests/unit/test_resources/config/test_config.conf +++ b/tests/unit/test_resources/config/test_config.conf @@ -36,4 +36,27 @@ service_log_filename=monasca-transform.log # Whether debug-level log entries should be included in the application # log. If this setting is false, info-level will be used for logging. -enable_debug_log_entries = true \ No newline at end of file +enable_debug_log_entries = true + +# The address of the mechanism being used for election coordination +coordinator_address = kazoo://localhost:2181 + +# The name of the coordination/election group +coordinator_group = monasca-transform + +# How long the candidate should sleep between election result +# queries (in seconds) +election_polling_frequency = 15 + +spark_jars_list = /opt/spark/current/lib/spark-streaming-kafka.jar,/opt/spark/current/lib/scala-library-2.10.1.jar,/opt/spark/current/lib/kafka_2.10-0.8.1.1.jar,/opt/spark/current/lib/metrics-core-2.2.0.jar,/usr/share/java/mysql.jar + +spark_driver = /opt/stack/monasca-transform/monasca_transform/driver/mon_metrics_kafka.py + +# Whether Spark event logging should be enabled (true/false) +spark_event_logging_enabled = true + +# A list of where the Spark master(s) should run +spark_master_list = spark://192.168.10.4:7077,192.168.10.5:7077 + +# spark_home for the environment +spark_home = /opt/spark/current \ No newline at end of file