diff --git a/monasca_agent/collector/daemon.py b/monasca_agent/collector/daemon.py index 25f56dc2..081becfd 100644 --- a/monasca_agent/collector/daemon.py +++ b/monasca_agent/collector/daemon.py @@ -30,7 +30,6 @@ if int(sys.version_info[1]) <= 3: # Constants PID_NAME = "monasca-agent" -WATCHDOG_MULTIPLIER = 10 RESTART_INTERVAL = 4 * 24 * 60 * 60 # Defaults to 4 days START_COMMANDS = ['start', 'restart', 'foreground'] @@ -92,9 +91,7 @@ class CollectorDaemon(monasca_agent.common.daemon.Daemon): self.collector = checks.collector.Collector(config, monasca_agent.common.emitter.http_emitter, checksd) - # Configure the watchdog. check_frequency = int(config['check_freq']) - watchdog = self._get_watchdog(check_frequency, config) # Initialize the auto-restarter self.restart_interval = int(config.get('restart_interval', RESTART_INTERVAL)) @@ -138,8 +135,6 @@ class CollectorDaemon(monasca_agent.common.daemon.Daemon): # Only plan for the next loop if we will continue, # otherwise just exit quickly. if self.run_forever: - if watchdog: - watchdog.reset() time.sleep(check_frequency) # Now clean-up. @@ -153,16 +148,6 @@ class CollectorDaemon(monasca_agent.common.daemon.Daemon): log.info("Exiting. Bye bye.") sys.exit(0) - @staticmethod - def _get_watchdog(check_freq, agentConfig): - watchdog = None - if agentConfig.get("watchdog", True): - watchdog = util.Watchdog(check_freq * WATCHDOG_MULTIPLIER, - max_mem_mb=agentConfig.get('limit_memory_consumption', - None)) - watchdog.reset() - return watchdog - def _should_restart(self): if time.time() - self.agent_start > self.restart_interval: return True diff --git a/monasca_agent/common/config.py b/monasca_agent/common/config.py index f25e403d..56176e8a 100644 --- a/monasca_agent/common/config.py +++ b/monasca_agent/common/config.py @@ -46,7 +46,6 @@ class Config(object): 'additional_checksd': '/usr/lib/monasca/agent/custom_checks.d', 'limit_memory_consumption': None, 'skip_ssl_validation': False, - 'watchdog': True, 'autorestart': False, 'non_local_traffic': False}, 'Api': {'is_enabled': False, diff --git a/monasca_agent/common/util.py b/monasca_agent/common/util.py index 68cbb4c2..b0976c4a 100644 --- a/monasca_agent/common/util.py +++ b/monasca_agent/common/util.py @@ -8,7 +8,6 @@ import optparse import os import platform import re -import signal import socket import subprocess import sys @@ -41,50 +40,6 @@ import monasca_agent.common.config as configuration from monasca_agent.common.exceptions import PathNotFound -class Watchdog(object): - - """Simple signal-based watchdog that will scuttle the current process - if it has not been reset every N seconds, or if the processes exceeds - a specified memory threshold. - Can only be invoked once per process, so don't use with multiple threads. - If you instantiate more than one, you're also asking for trouble. - """ - - def __init__(self, duration, max_mem_mb=None): - import resource - - # Set the duration - self._duration = int(duration) - signal.signal(signal.SIGALRM, Watchdog.self_destruct) - - # cap memory usage - if max_mem_mb is not None: - self._max_mem_kb = 1024 * max_mem_mb - max_mem_bytes = 1024 * self._max_mem_kb - resource.setrlimit(resource.RLIMIT_AS, (max_mem_bytes, max_mem_bytes)) - self.memory_limit_enabled = True - else: - self.memory_limit_enabled = False - - @staticmethod - def self_destruct(signum, frame): - try: - import traceback - log.error("Self-destructing...") - log.error(traceback.format_exc()) - finally: - os.kill(os.getpid(), signal.SIGKILL) - - def reset(self): - # self destruct if using too much memory, as tornado will swallow MemoryErrors - mem_usage_kb = int(os.popen('ps -p %d -o %s | tail -1' % (os.getpid(), 'rss')).read()) - if self.memory_limit_enabled and mem_usage_kb > (0.95 * self._max_mem_kb): - Watchdog.self_destruct(signal.SIGKILL, sys._getframe(0)) - - log.debug("Resetting watchdog for %d" % self._duration) - signal.alarm(self._duration) - - class PidFile(object): """A small helper class for pidfiles. """ diff --git a/monasca_agent/forwarder/daemon.py b/monasca_agent/forwarder/daemon.py index ddca2137..00185571 100644 --- a/monasca_agent/forwarder/daemon.py +++ b/monasca_agent/forwarder/daemon.py @@ -38,8 +38,6 @@ import monasca_agent.forwarder.transaction as transaction log = logging.getLogger('forwarder') -WATCHDOG_INTERVAL_MULTIPLIER = 10 # 10x flush interval - # Maximum delay before replaying a transaction MAX_WAIT_FOR_REPLAY = datetime.timedelta(seconds=90) @@ -98,7 +96,7 @@ class AgentInputHandler(tornado.web.RequestHandler): class Forwarder(tornado.web.Application): - def __init__(self, port, agent_config, watchdog=True, skip_ssl_validation=False, + def __init__(self, port, agent_config, skip_ssl_validation=False, use_simple_http_client=False): self._port = int(port) self._agent_config = agent_config @@ -112,18 +110,12 @@ class Forwarder(tornado.web.Application): agent_config) transaction.MetricTransaction.set_tr_manager(self._tr_manager) - self._watchdog = None self.skip_ssl_validation = skip_ssl_validation or agent_config.get( 'skip_ssl_validation', False) self.use_simple_http_client = use_simple_http_client if self.skip_ssl_validation: log.info("Skipping SSL hostname validation, useful when using a transparent proxy") - if watchdog: - watchdog_timeout = self.flush_interval * WATCHDOG_INTERVAL_MULTIPLIER - self._watchdog = util.Watchdog( - watchdog_timeout, max_mem_mb=agent_config.get('limit_memory_consumption', None)) - def _post_metrics(self): if len(self._metrics) > 0: @@ -199,8 +191,6 @@ class Forwarder(tornado.web.Application): logging.getLogger().setLevel(self._agent_config.get('log_level', logging.INFO)) def flush_trs(): - if self._watchdog: - self._watchdog.reset() self._post_metrics() self._tr_manager.flush() @@ -208,8 +198,6 @@ class Forwarder(tornado.web.Application): flush_trs, self.flush_interval, io_loop=self.mloop) # Start everything - if self._watchdog: - self._watchdog.reset() tr_sched.start() self.mloop.start() diff --git a/monasca_agent/statsd/reporter.py b/monasca_agent/statsd/reporter.py index de5c8f68..35e4065f 100644 --- a/monasca_agent/statsd/reporter.py +++ b/monasca_agent/statsd/reporter.py @@ -9,7 +9,6 @@ import monasca_agent.common.util as util log = logging.getLogger(__name__) -WATCHDOG_TIMEOUT = 120 # Since we call flush more often than the metrics aggregation interval, we should # log a bunch of flushes in a row every so often. FLUSH_LOGGING_PERIOD = 70 @@ -23,7 +22,7 @@ class Reporter(threading.Thread): server. """ - def __init__(self, interval, aggregator, api_host, use_watchdog=False, event_chunk_size=None): + def __init__(self, interval, aggregator, api_host, event_chunk_size=None): threading.Thread.__init__(self) self.interval = int(interval) self.finished = threading.Event() @@ -31,11 +30,6 @@ class Reporter(threading.Thread): self.flush_count = 0 self.log_count = 0 - self.watchdog = None - if use_watchdog: - from monasca_agent.common.util import Watchdog - self.watchdog = Watchdog(WATCHDOG_TIMEOUT) - self.api_host = api_host self.event_chunk_size = event_chunk_size or EVENT_CHUNK_SIZE @@ -50,7 +44,6 @@ class Reporter(threading.Thread): def run(self): log.info("Reporting to %s every %ss" % (self.api_host, self.interval)) - log.debug("Watchdog enabled: %s" % bool(self.watchdog)) # Persist a start-up message. check_status.MonascaStatsdStatus().persist() @@ -58,8 +51,6 @@ class Reporter(threading.Thread): while not self.finished.isSet(): # Use camel case isSet for 2.4 support. self.finished.wait(self.interval) self.flush() - if self.watchdog: - self.watchdog.reset() # Clean up the status messages. log.debug("Stopped reporter") diff --git a/monasca_agent/win32/agent.py b/monasca_agent/win32/agent.py index fd046b9f..53470ab3 100644 --- a/monasca_agent/win32/agent.py +++ b/monasca_agent/win32/agent.py @@ -167,7 +167,7 @@ class MonascaForwarder(multiprocessing.Process): else: port = int(port) app_config = get_config(parse_args=False) - self.forwarder = Application(port, app_config, watchdog=False) + self.forwarder = Application(port, app_config) self.forwarder.run() def stop(self): diff --git a/tests_to_fix/test_watchdog.py b/tests_to_fix/test_watchdog.py deleted file mode 100644 index 9d434e27..00000000 --- a/tests_to_fix/test_watchdog.py +++ /dev/null @@ -1,168 +0,0 @@ -import unittest -import subprocess -import os -import sys -from random import random, randrange -import urllib as url -import time - -sys.path.append(os.path.join(os.path.dirname(__file__), '../monasca_agent')) -from monasca_agent.collector.daemon import CollectorDaemon -from monasca_agent.common.util import Watchdog - - -class TestWatchdog(unittest.TestCase): - - """Test watchdog in various conditions - """ - - JITTER_FACTOR = 2 - - def test_watchdog(self): - """Verify that watchdog kills ourselves even when spinning - Verify that watchdog kills ourselves when hanging - """ - start = time.time() - try: - subprocess.check_call( - ["python", "tests/test_watchdog.py", "busy"], stderr=subprocess.STDOUT) - raise Exception("Should have died with an error") - except subprocess.CalledProcessError: - duration = int(time.time() - start) - self.assertTrue(duration < self.JITTER_FACTOR * 5) - - # Start pseudo web server - subprocess.Popen(["nc", "-l", "31834"]) - start = time.time() - try: - subprocess.check_call(["python", "tests/test_watchdog.py", "net"]) - raise Exception("Should have died with an error") - except subprocess.CalledProcessError: - duration = int(time.time() - start) - self.assertTrue(duration < self.JITTER_FACTOR * 5) - - # Normal loop, should run 5 times - start = time.time() - try: - subprocess.check_call(["python", "tests/test_watchdog.py", "normal"]) - duration = int(time.time() - start) - self.assertTrue(duration < self.JITTER_FACTOR * 5) - except subprocess.CalledProcessError: - self.fail("Watchdog killed normal process after %s seconds" % int(time.time() - start)) - - # Fast tornado, not killed - start = time.time() - p = subprocess.Popen(["python", "tests/test_watchdog.py", "fast"]) - p.wait() - duration = int(time.time() - start) - # should die as soon as flush_trs has been called - self.assertTrue(duration, self.JITTER_FACTOR * 10) - - # Slow tornado, killed by the Watchdog - start = time.time() - p = subprocess.Popen(["python", "tests/test_watchdog.py", "slow"]) - p.wait() - duration = int(time.time() - start) - self.assertTrue(duration < self.JITTER_FACTOR * 4) - - # Too much memory used, killed by Watchdog - start = time.time() - p = subprocess.Popen(["python", "tests/test_watchdog.py", "memory"]) - p.wait() - duration = int(time.time() - start) - # process should be killed well before the restart interval of 30. - assert duration < 20 - - -class MockTxManager(object): - - @staticmethod - def flush(): - "Pretend to flush for a long time" - time.sleep(5) - sys.exit(0) - - -class MemoryHogTxManager(object): - - @staticmethod - def flush(): - rand_data = [] - while True: - rand_data.append('%030x' % randrange(256 ** 15)) - - -class PseudoAgent(object): - - """Same logic as the agent, simplified""" - @staticmethod - def busy_run(): - w = Watchdog(5) - w.reset() - x = 0 - while True: - x = random() - return x - - @staticmethod - def hanging_net(): - w = Watchdog(5) - w.reset() - x = url.urlopen("http://localhost:31834") - print("ERROR Net call returned", x) - return True - - @staticmethod - def normal_run(): - w = Watchdog(2) - w.reset() - for i in range(5): - time.sleep(1) - w.reset() - - @staticmethod - def slow_tornado(): - a = CollectorDaemon(12345, {}) - a._watchdog = Watchdog(4) - a._tr_manager = MockTxManager() - a.run() - - @staticmethod - def fast_tornado(): - a = CollectorDaemon(12345, {}) - a._watchdog = Watchdog(6) - a._tr_manager = MockTxManager() - a.run() - - @staticmethod - def use_lots_of_memory(): - # Skip this step on travis - if os.environ.get('TRAVIS', False): - return - a = CollectorDaemon(12345, {}) - a._watchdog = Watchdog(30, 50) - a._tr_manager = MemoryHogTxManager() - a.run() - -if __name__ == "__main__": - if sys.argv[1] == "busy": - a = PseudoAgent() - a.busy_run() - elif sys.argv[1] == "net": - a = PseudoAgent() - a.hanging_net() - elif sys.argv[1] == "normal": - a = PseudoAgent() - a.normal_run() - elif sys.argv[1] == "slow": - a = PseudoAgent() - a.slow_tornado() - elif sys.argv[1] == "fast": - a = PseudoAgent() - a.fast_tornado() - elif sys.argv[1] == "test": - t = TestWatchdog() - t.runTest() - elif sys.argv[1] == "memory": - a = PseudoAgent() - a.use_lots_of_memory()