Remove watchdog from the agent

The watchdog is causing problems by restarting the agent
even if it does not need to. Also we are not gaining
anything by running it. In result we are removing
watchdog completely from the agent.

Change-Id: I45b80e28d81749c98dff7273d7756e10db23da70
This commit is contained in:
Michael James Hoppal 2015-08-21 16:04:11 -06:00
parent 22c704560c
commit 977b07a29c
7 changed files with 3 additions and 253 deletions

View File

@ -30,7 +30,6 @@ if int(sys.version_info[1]) <= 3:
# Constants
PID_NAME = "monasca-agent"
WATCHDOG_MULTIPLIER = 10
RESTART_INTERVAL = 4 * 24 * 60 * 60 # Defaults to 4 days
START_COMMANDS = ['start', 'restart', 'foreground']
@ -92,9 +91,7 @@ class CollectorDaemon(monasca_agent.common.daemon.Daemon):
self.collector = checks.collector.Collector(config, monasca_agent.common.emitter.http_emitter, checksd)
# Configure the watchdog.
check_frequency = int(config['check_freq'])
watchdog = self._get_watchdog(check_frequency, config)
# Initialize the auto-restarter
self.restart_interval = int(config.get('restart_interval', RESTART_INTERVAL))
@ -138,8 +135,6 @@ class CollectorDaemon(monasca_agent.common.daemon.Daemon):
# Only plan for the next loop if we will continue,
# otherwise just exit quickly.
if self.run_forever:
if watchdog:
watchdog.reset()
time.sleep(check_frequency)
# Now clean-up.
@ -153,16 +148,6 @@ class CollectorDaemon(monasca_agent.common.daemon.Daemon):
log.info("Exiting. Bye bye.")
sys.exit(0)
@staticmethod
def _get_watchdog(check_freq, agentConfig):
watchdog = None
if agentConfig.get("watchdog", True):
watchdog = util.Watchdog(check_freq * WATCHDOG_MULTIPLIER,
max_mem_mb=agentConfig.get('limit_memory_consumption',
None))
watchdog.reset()
return watchdog
def _should_restart(self):
if time.time() - self.agent_start > self.restart_interval:
return True

View File

@ -46,7 +46,6 @@ class Config(object):
'additional_checksd': '/usr/lib/monasca/agent/custom_checks.d',
'limit_memory_consumption': None,
'skip_ssl_validation': False,
'watchdog': True,
'autorestart': False,
'non_local_traffic': False},
'Api': {'is_enabled': False,

View File

@ -8,7 +8,6 @@ import optparse
import os
import platform
import re
import signal
import socket
import subprocess
import sys
@ -41,50 +40,6 @@ import monasca_agent.common.config as configuration
from monasca_agent.common.exceptions import PathNotFound
class Watchdog(object):
"""Simple signal-based watchdog that will scuttle the current process
if it has not been reset every N seconds, or if the processes exceeds
a specified memory threshold.
Can only be invoked once per process, so don't use with multiple threads.
If you instantiate more than one, you're also asking for trouble.
"""
def __init__(self, duration, max_mem_mb=None):
import resource
# Set the duration
self._duration = int(duration)
signal.signal(signal.SIGALRM, Watchdog.self_destruct)
# cap memory usage
if max_mem_mb is not None:
self._max_mem_kb = 1024 * max_mem_mb
max_mem_bytes = 1024 * self._max_mem_kb
resource.setrlimit(resource.RLIMIT_AS, (max_mem_bytes, max_mem_bytes))
self.memory_limit_enabled = True
else:
self.memory_limit_enabled = False
@staticmethod
def self_destruct(signum, frame):
try:
import traceback
log.error("Self-destructing...")
log.error(traceback.format_exc())
finally:
os.kill(os.getpid(), signal.SIGKILL)
def reset(self):
# self destruct if using too much memory, as tornado will swallow MemoryErrors
mem_usage_kb = int(os.popen('ps -p %d -o %s | tail -1' % (os.getpid(), 'rss')).read())
if self.memory_limit_enabled and mem_usage_kb > (0.95 * self._max_mem_kb):
Watchdog.self_destruct(signal.SIGKILL, sys._getframe(0))
log.debug("Resetting watchdog for %d" % self._duration)
signal.alarm(self._duration)
class PidFile(object):
"""A small helper class for pidfiles. """

View File

@ -38,8 +38,6 @@ import monasca_agent.forwarder.transaction as transaction
log = logging.getLogger('forwarder')
WATCHDOG_INTERVAL_MULTIPLIER = 10 # 10x flush interval
# Maximum delay before replaying a transaction
MAX_WAIT_FOR_REPLAY = datetime.timedelta(seconds=90)
@ -98,7 +96,7 @@ class AgentInputHandler(tornado.web.RequestHandler):
class Forwarder(tornado.web.Application):
def __init__(self, port, agent_config, watchdog=True, skip_ssl_validation=False,
def __init__(self, port, agent_config, skip_ssl_validation=False,
use_simple_http_client=False):
self._port = int(port)
self._agent_config = agent_config
@ -112,18 +110,12 @@ class Forwarder(tornado.web.Application):
agent_config)
transaction.MetricTransaction.set_tr_manager(self._tr_manager)
self._watchdog = None
self.skip_ssl_validation = skip_ssl_validation or agent_config.get(
'skip_ssl_validation', False)
self.use_simple_http_client = use_simple_http_client
if self.skip_ssl_validation:
log.info("Skipping SSL hostname validation, useful when using a transparent proxy")
if watchdog:
watchdog_timeout = self.flush_interval * WATCHDOG_INTERVAL_MULTIPLIER
self._watchdog = util.Watchdog(
watchdog_timeout, max_mem_mb=agent_config.get('limit_memory_consumption', None))
def _post_metrics(self):
if len(self._metrics) > 0:
@ -199,8 +191,6 @@ class Forwarder(tornado.web.Application):
logging.getLogger().setLevel(self._agent_config.get('log_level', logging.INFO))
def flush_trs():
if self._watchdog:
self._watchdog.reset()
self._post_metrics()
self._tr_manager.flush()
@ -208,8 +198,6 @@ class Forwarder(tornado.web.Application):
flush_trs, self.flush_interval, io_loop=self.mloop)
# Start everything
if self._watchdog:
self._watchdog.reset()
tr_sched.start()
self.mloop.start()

View File

@ -9,7 +9,6 @@ import monasca_agent.common.util as util
log = logging.getLogger(__name__)
WATCHDOG_TIMEOUT = 120
# Since we call flush more often than the metrics aggregation interval, we should
# log a bunch of flushes in a row every so often.
FLUSH_LOGGING_PERIOD = 70
@ -23,7 +22,7 @@ class Reporter(threading.Thread):
server.
"""
def __init__(self, interval, aggregator, api_host, use_watchdog=False, event_chunk_size=None):
def __init__(self, interval, aggregator, api_host, event_chunk_size=None):
threading.Thread.__init__(self)
self.interval = int(interval)
self.finished = threading.Event()
@ -31,11 +30,6 @@ class Reporter(threading.Thread):
self.flush_count = 0
self.log_count = 0
self.watchdog = None
if use_watchdog:
from monasca_agent.common.util import Watchdog
self.watchdog = Watchdog(WATCHDOG_TIMEOUT)
self.api_host = api_host
self.event_chunk_size = event_chunk_size or EVENT_CHUNK_SIZE
@ -50,7 +44,6 @@ class Reporter(threading.Thread):
def run(self):
log.info("Reporting to %s every %ss" % (self.api_host, self.interval))
log.debug("Watchdog enabled: %s" % bool(self.watchdog))
# Persist a start-up message.
check_status.MonascaStatsdStatus().persist()
@ -58,8 +51,6 @@ class Reporter(threading.Thread):
while not self.finished.isSet(): # Use camel case isSet for 2.4 support.
self.finished.wait(self.interval)
self.flush()
if self.watchdog:
self.watchdog.reset()
# Clean up the status messages.
log.debug("Stopped reporter")

View File

@ -167,7 +167,7 @@ class MonascaForwarder(multiprocessing.Process):
else:
port = int(port)
app_config = get_config(parse_args=False)
self.forwarder = Application(port, app_config, watchdog=False)
self.forwarder = Application(port, app_config)
self.forwarder.run()
def stop(self):

View File

@ -1,168 +0,0 @@
import unittest
import subprocess
import os
import sys
from random import random, randrange
import urllib as url
import time
sys.path.append(os.path.join(os.path.dirname(__file__), '../monasca_agent'))
from monasca_agent.collector.daemon import CollectorDaemon
from monasca_agent.common.util import Watchdog
class TestWatchdog(unittest.TestCase):
"""Test watchdog in various conditions
"""
JITTER_FACTOR = 2
def test_watchdog(self):
"""Verify that watchdog kills ourselves even when spinning
Verify that watchdog kills ourselves when hanging
"""
start = time.time()
try:
subprocess.check_call(
["python", "tests/test_watchdog.py", "busy"], stderr=subprocess.STDOUT)
raise Exception("Should have died with an error")
except subprocess.CalledProcessError:
duration = int(time.time() - start)
self.assertTrue(duration < self.JITTER_FACTOR * 5)
# Start pseudo web server
subprocess.Popen(["nc", "-l", "31834"])
start = time.time()
try:
subprocess.check_call(["python", "tests/test_watchdog.py", "net"])
raise Exception("Should have died with an error")
except subprocess.CalledProcessError:
duration = int(time.time() - start)
self.assertTrue(duration < self.JITTER_FACTOR * 5)
# Normal loop, should run 5 times
start = time.time()
try:
subprocess.check_call(["python", "tests/test_watchdog.py", "normal"])
duration = int(time.time() - start)
self.assertTrue(duration < self.JITTER_FACTOR * 5)
except subprocess.CalledProcessError:
self.fail("Watchdog killed normal process after %s seconds" % int(time.time() - start))
# Fast tornado, not killed
start = time.time()
p = subprocess.Popen(["python", "tests/test_watchdog.py", "fast"])
p.wait()
duration = int(time.time() - start)
# should die as soon as flush_trs has been called
self.assertTrue(duration, self.JITTER_FACTOR * 10)
# Slow tornado, killed by the Watchdog
start = time.time()
p = subprocess.Popen(["python", "tests/test_watchdog.py", "slow"])
p.wait()
duration = int(time.time() - start)
self.assertTrue(duration < self.JITTER_FACTOR * 4)
# Too much memory used, killed by Watchdog
start = time.time()
p = subprocess.Popen(["python", "tests/test_watchdog.py", "memory"])
p.wait()
duration = int(time.time() - start)
# process should be killed well before the restart interval of 30.
assert duration < 20
class MockTxManager(object):
@staticmethod
def flush():
"Pretend to flush for a long time"
time.sleep(5)
sys.exit(0)
class MemoryHogTxManager(object):
@staticmethod
def flush():
rand_data = []
while True:
rand_data.append('%030x' % randrange(256 ** 15))
class PseudoAgent(object):
"""Same logic as the agent, simplified"""
@staticmethod
def busy_run():
w = Watchdog(5)
w.reset()
x = 0
while True:
x = random()
return x
@staticmethod
def hanging_net():
w = Watchdog(5)
w.reset()
x = url.urlopen("http://localhost:31834")
print("ERROR Net call returned", x)
return True
@staticmethod
def normal_run():
w = Watchdog(2)
w.reset()
for i in range(5):
time.sleep(1)
w.reset()
@staticmethod
def slow_tornado():
a = CollectorDaemon(12345, {})
a._watchdog = Watchdog(4)
a._tr_manager = MockTxManager()
a.run()
@staticmethod
def fast_tornado():
a = CollectorDaemon(12345, {})
a._watchdog = Watchdog(6)
a._tr_manager = MockTxManager()
a.run()
@staticmethod
def use_lots_of_memory():
# Skip this step on travis
if os.environ.get('TRAVIS', False):
return
a = CollectorDaemon(12345, {})
a._watchdog = Watchdog(30, 50)
a._tr_manager = MemoryHogTxManager()
a.run()
if __name__ == "__main__":
if sys.argv[1] == "busy":
a = PseudoAgent()
a.busy_run()
elif sys.argv[1] == "net":
a = PseudoAgent()
a.hanging_net()
elif sys.argv[1] == "normal":
a = PseudoAgent()
a.normal_run()
elif sys.argv[1] == "slow":
a = PseudoAgent()
a.slow_tornado()
elif sys.argv[1] == "fast":
a = PseudoAgent()
a.fast_tornado()
elif sys.argv[1] == "test":
t = TestWatchdog()
t.runTest()
elif sys.argv[1] == "memory":
a = PseudoAgent()
a.use_lots_of_memory()