From 977b07a29c54df6b803a66a5d6f9ee8aa3532291 Mon Sep 17 00:00:00 2001 From: Michael James Hoppal Date: Fri, 21 Aug 2015 16:04:11 -0600 Subject: [PATCH] Remove watchdog from the agent The watchdog is causing problems by restarting the agent even if it does not need to. Also we are not gaining anything by running it. In result we are removing watchdog completely from the agent. Change-Id: I45b80e28d81749c98dff7273d7756e10db23da70 --- monasca_agent/collector/daemon.py | 15 --- monasca_agent/common/config.py | 1 - monasca_agent/common/util.py | 45 -------- monasca_agent/forwarder/daemon.py | 14 +-- monasca_agent/statsd/reporter.py | 11 +- monasca_agent/win32/agent.py | 2 +- tests_to_fix/test_watchdog.py | 168 ------------------------------ 7 files changed, 3 insertions(+), 253 deletions(-) delete mode 100644 tests_to_fix/test_watchdog.py diff --git a/monasca_agent/collector/daemon.py b/monasca_agent/collector/daemon.py index 25f56dc2..081becfd 100644 --- a/monasca_agent/collector/daemon.py +++ b/monasca_agent/collector/daemon.py @@ -30,7 +30,6 @@ if int(sys.version_info[1]) <= 3: # Constants PID_NAME = "monasca-agent" -WATCHDOG_MULTIPLIER = 10 RESTART_INTERVAL = 4 * 24 * 60 * 60 # Defaults to 4 days START_COMMANDS = ['start', 'restart', 'foreground'] @@ -92,9 +91,7 @@ class CollectorDaemon(monasca_agent.common.daemon.Daemon): self.collector = checks.collector.Collector(config, monasca_agent.common.emitter.http_emitter, checksd) - # Configure the watchdog. check_frequency = int(config['check_freq']) - watchdog = self._get_watchdog(check_frequency, config) # Initialize the auto-restarter self.restart_interval = int(config.get('restart_interval', RESTART_INTERVAL)) @@ -138,8 +135,6 @@ class CollectorDaemon(monasca_agent.common.daemon.Daemon): # Only plan for the next loop if we will continue, # otherwise just exit quickly. if self.run_forever: - if watchdog: - watchdog.reset() time.sleep(check_frequency) # Now clean-up. @@ -153,16 +148,6 @@ class CollectorDaemon(monasca_agent.common.daemon.Daemon): log.info("Exiting. Bye bye.") sys.exit(0) - @staticmethod - def _get_watchdog(check_freq, agentConfig): - watchdog = None - if agentConfig.get("watchdog", True): - watchdog = util.Watchdog(check_freq * WATCHDOG_MULTIPLIER, - max_mem_mb=agentConfig.get('limit_memory_consumption', - None)) - watchdog.reset() - return watchdog - def _should_restart(self): if time.time() - self.agent_start > self.restart_interval: return True diff --git a/monasca_agent/common/config.py b/monasca_agent/common/config.py index f25e403d..56176e8a 100644 --- a/monasca_agent/common/config.py +++ b/monasca_agent/common/config.py @@ -46,7 +46,6 @@ class Config(object): 'additional_checksd': '/usr/lib/monasca/agent/custom_checks.d', 'limit_memory_consumption': None, 'skip_ssl_validation': False, - 'watchdog': True, 'autorestart': False, 'non_local_traffic': False}, 'Api': {'is_enabled': False, diff --git a/monasca_agent/common/util.py b/monasca_agent/common/util.py index 68cbb4c2..b0976c4a 100644 --- a/monasca_agent/common/util.py +++ b/monasca_agent/common/util.py @@ -8,7 +8,6 @@ import optparse import os import platform import re -import signal import socket import subprocess import sys @@ -41,50 +40,6 @@ import monasca_agent.common.config as configuration from monasca_agent.common.exceptions import PathNotFound -class Watchdog(object): - - """Simple signal-based watchdog that will scuttle the current process - if it has not been reset every N seconds, or if the processes exceeds - a specified memory threshold. - Can only be invoked once per process, so don't use with multiple threads. - If you instantiate more than one, you're also asking for trouble. - """ - - def __init__(self, duration, max_mem_mb=None): - import resource - - # Set the duration - self._duration = int(duration) - signal.signal(signal.SIGALRM, Watchdog.self_destruct) - - # cap memory usage - if max_mem_mb is not None: - self._max_mem_kb = 1024 * max_mem_mb - max_mem_bytes = 1024 * self._max_mem_kb - resource.setrlimit(resource.RLIMIT_AS, (max_mem_bytes, max_mem_bytes)) - self.memory_limit_enabled = True - else: - self.memory_limit_enabled = False - - @staticmethod - def self_destruct(signum, frame): - try: - import traceback - log.error("Self-destructing...") - log.error(traceback.format_exc()) - finally: - os.kill(os.getpid(), signal.SIGKILL) - - def reset(self): - # self destruct if using too much memory, as tornado will swallow MemoryErrors - mem_usage_kb = int(os.popen('ps -p %d -o %s | tail -1' % (os.getpid(), 'rss')).read()) - if self.memory_limit_enabled and mem_usage_kb > (0.95 * self._max_mem_kb): - Watchdog.self_destruct(signal.SIGKILL, sys._getframe(0)) - - log.debug("Resetting watchdog for %d" % self._duration) - signal.alarm(self._duration) - - class PidFile(object): """A small helper class for pidfiles. """ diff --git a/monasca_agent/forwarder/daemon.py b/monasca_agent/forwarder/daemon.py index ddca2137..00185571 100644 --- a/monasca_agent/forwarder/daemon.py +++ b/monasca_agent/forwarder/daemon.py @@ -38,8 +38,6 @@ import monasca_agent.forwarder.transaction as transaction log = logging.getLogger('forwarder') -WATCHDOG_INTERVAL_MULTIPLIER = 10 # 10x flush interval - # Maximum delay before replaying a transaction MAX_WAIT_FOR_REPLAY = datetime.timedelta(seconds=90) @@ -98,7 +96,7 @@ class AgentInputHandler(tornado.web.RequestHandler): class Forwarder(tornado.web.Application): - def __init__(self, port, agent_config, watchdog=True, skip_ssl_validation=False, + def __init__(self, port, agent_config, skip_ssl_validation=False, use_simple_http_client=False): self._port = int(port) self._agent_config = agent_config @@ -112,18 +110,12 @@ class Forwarder(tornado.web.Application): agent_config) transaction.MetricTransaction.set_tr_manager(self._tr_manager) - self._watchdog = None self.skip_ssl_validation = skip_ssl_validation or agent_config.get( 'skip_ssl_validation', False) self.use_simple_http_client = use_simple_http_client if self.skip_ssl_validation: log.info("Skipping SSL hostname validation, useful when using a transparent proxy") - if watchdog: - watchdog_timeout = self.flush_interval * WATCHDOG_INTERVAL_MULTIPLIER - self._watchdog = util.Watchdog( - watchdog_timeout, max_mem_mb=agent_config.get('limit_memory_consumption', None)) - def _post_metrics(self): if len(self._metrics) > 0: @@ -199,8 +191,6 @@ class Forwarder(tornado.web.Application): logging.getLogger().setLevel(self._agent_config.get('log_level', logging.INFO)) def flush_trs(): - if self._watchdog: - self._watchdog.reset() self._post_metrics() self._tr_manager.flush() @@ -208,8 +198,6 @@ class Forwarder(tornado.web.Application): flush_trs, self.flush_interval, io_loop=self.mloop) # Start everything - if self._watchdog: - self._watchdog.reset() tr_sched.start() self.mloop.start() diff --git a/monasca_agent/statsd/reporter.py b/monasca_agent/statsd/reporter.py index de5c8f68..35e4065f 100644 --- a/monasca_agent/statsd/reporter.py +++ b/monasca_agent/statsd/reporter.py @@ -9,7 +9,6 @@ import monasca_agent.common.util as util log = logging.getLogger(__name__) -WATCHDOG_TIMEOUT = 120 # Since we call flush more often than the metrics aggregation interval, we should # log a bunch of flushes in a row every so often. FLUSH_LOGGING_PERIOD = 70 @@ -23,7 +22,7 @@ class Reporter(threading.Thread): server. """ - def __init__(self, interval, aggregator, api_host, use_watchdog=False, event_chunk_size=None): + def __init__(self, interval, aggregator, api_host, event_chunk_size=None): threading.Thread.__init__(self) self.interval = int(interval) self.finished = threading.Event() @@ -31,11 +30,6 @@ class Reporter(threading.Thread): self.flush_count = 0 self.log_count = 0 - self.watchdog = None - if use_watchdog: - from monasca_agent.common.util import Watchdog - self.watchdog = Watchdog(WATCHDOG_TIMEOUT) - self.api_host = api_host self.event_chunk_size = event_chunk_size or EVENT_CHUNK_SIZE @@ -50,7 +44,6 @@ class Reporter(threading.Thread): def run(self): log.info("Reporting to %s every %ss" % (self.api_host, self.interval)) - log.debug("Watchdog enabled: %s" % bool(self.watchdog)) # Persist a start-up message. check_status.MonascaStatsdStatus().persist() @@ -58,8 +51,6 @@ class Reporter(threading.Thread): while not self.finished.isSet(): # Use camel case isSet for 2.4 support. self.finished.wait(self.interval) self.flush() - if self.watchdog: - self.watchdog.reset() # Clean up the status messages. log.debug("Stopped reporter") diff --git a/monasca_agent/win32/agent.py b/monasca_agent/win32/agent.py index fd046b9f..53470ab3 100644 --- a/monasca_agent/win32/agent.py +++ b/monasca_agent/win32/agent.py @@ -167,7 +167,7 @@ class MonascaForwarder(multiprocessing.Process): else: port = int(port) app_config = get_config(parse_args=False) - self.forwarder = Application(port, app_config, watchdog=False) + self.forwarder = Application(port, app_config) self.forwarder.run() def stop(self): diff --git a/tests_to_fix/test_watchdog.py b/tests_to_fix/test_watchdog.py deleted file mode 100644 index 9d434e27..00000000 --- a/tests_to_fix/test_watchdog.py +++ /dev/null @@ -1,168 +0,0 @@ -import unittest -import subprocess -import os -import sys -from random import random, randrange -import urllib as url -import time - -sys.path.append(os.path.join(os.path.dirname(__file__), '../monasca_agent')) -from monasca_agent.collector.daemon import CollectorDaemon -from monasca_agent.common.util import Watchdog - - -class TestWatchdog(unittest.TestCase): - - """Test watchdog in various conditions - """ - - JITTER_FACTOR = 2 - - def test_watchdog(self): - """Verify that watchdog kills ourselves even when spinning - Verify that watchdog kills ourselves when hanging - """ - start = time.time() - try: - subprocess.check_call( - ["python", "tests/test_watchdog.py", "busy"], stderr=subprocess.STDOUT) - raise Exception("Should have died with an error") - except subprocess.CalledProcessError: - duration = int(time.time() - start) - self.assertTrue(duration < self.JITTER_FACTOR * 5) - - # Start pseudo web server - subprocess.Popen(["nc", "-l", "31834"]) - start = time.time() - try: - subprocess.check_call(["python", "tests/test_watchdog.py", "net"]) - raise Exception("Should have died with an error") - except subprocess.CalledProcessError: - duration = int(time.time() - start) - self.assertTrue(duration < self.JITTER_FACTOR * 5) - - # Normal loop, should run 5 times - start = time.time() - try: - subprocess.check_call(["python", "tests/test_watchdog.py", "normal"]) - duration = int(time.time() - start) - self.assertTrue(duration < self.JITTER_FACTOR * 5) - except subprocess.CalledProcessError: - self.fail("Watchdog killed normal process after %s seconds" % int(time.time() - start)) - - # Fast tornado, not killed - start = time.time() - p = subprocess.Popen(["python", "tests/test_watchdog.py", "fast"]) - p.wait() - duration = int(time.time() - start) - # should die as soon as flush_trs has been called - self.assertTrue(duration, self.JITTER_FACTOR * 10) - - # Slow tornado, killed by the Watchdog - start = time.time() - p = subprocess.Popen(["python", "tests/test_watchdog.py", "slow"]) - p.wait() - duration = int(time.time() - start) - self.assertTrue(duration < self.JITTER_FACTOR * 4) - - # Too much memory used, killed by Watchdog - start = time.time() - p = subprocess.Popen(["python", "tests/test_watchdog.py", "memory"]) - p.wait() - duration = int(time.time() - start) - # process should be killed well before the restart interval of 30. - assert duration < 20 - - -class MockTxManager(object): - - @staticmethod - def flush(): - "Pretend to flush for a long time" - time.sleep(5) - sys.exit(0) - - -class MemoryHogTxManager(object): - - @staticmethod - def flush(): - rand_data = [] - while True: - rand_data.append('%030x' % randrange(256 ** 15)) - - -class PseudoAgent(object): - - """Same logic as the agent, simplified""" - @staticmethod - def busy_run(): - w = Watchdog(5) - w.reset() - x = 0 - while True: - x = random() - return x - - @staticmethod - def hanging_net(): - w = Watchdog(5) - w.reset() - x = url.urlopen("http://localhost:31834") - print("ERROR Net call returned", x) - return True - - @staticmethod - def normal_run(): - w = Watchdog(2) - w.reset() - for i in range(5): - time.sleep(1) - w.reset() - - @staticmethod - def slow_tornado(): - a = CollectorDaemon(12345, {}) - a._watchdog = Watchdog(4) - a._tr_manager = MockTxManager() - a.run() - - @staticmethod - def fast_tornado(): - a = CollectorDaemon(12345, {}) - a._watchdog = Watchdog(6) - a._tr_manager = MockTxManager() - a.run() - - @staticmethod - def use_lots_of_memory(): - # Skip this step on travis - if os.environ.get('TRAVIS', False): - return - a = CollectorDaemon(12345, {}) - a._watchdog = Watchdog(30, 50) - a._tr_manager = MemoryHogTxManager() - a.run() - -if __name__ == "__main__": - if sys.argv[1] == "busy": - a = PseudoAgent() - a.busy_run() - elif sys.argv[1] == "net": - a = PseudoAgent() - a.hanging_net() - elif sys.argv[1] == "normal": - a = PseudoAgent() - a.normal_run() - elif sys.argv[1] == "slow": - a = PseudoAgent() - a.slow_tornado() - elif sys.argv[1] == "fast": - a = PseudoAgent() - a.fast_tornado() - elif sys.argv[1] == "test": - t = TestWatchdog() - t.runTest() - elif sys.argv[1] == "memory": - a = PseudoAgent() - a.use_lots_of_memory()