Remove watchdog from the agent
The watchdog is causing problems by restarting the agent even if it does not need to. Also we are not gaining anything by running it. In result we are removing watchdog completely from the agent. Change-Id: I45b80e28d81749c98dff7273d7756e10db23da70
This commit is contained in:
parent
22c704560c
commit
977b07a29c
|
@ -30,7 +30,6 @@ if int(sys.version_info[1]) <= 3:
|
|||
|
||||
# Constants
|
||||
PID_NAME = "monasca-agent"
|
||||
WATCHDOG_MULTIPLIER = 10
|
||||
RESTART_INTERVAL = 4 * 24 * 60 * 60 # Defaults to 4 days
|
||||
START_COMMANDS = ['start', 'restart', 'foreground']
|
||||
|
||||
|
@ -92,9 +91,7 @@ class CollectorDaemon(monasca_agent.common.daemon.Daemon):
|
|||
|
||||
self.collector = checks.collector.Collector(config, monasca_agent.common.emitter.http_emitter, checksd)
|
||||
|
||||
# Configure the watchdog.
|
||||
check_frequency = int(config['check_freq'])
|
||||
watchdog = self._get_watchdog(check_frequency, config)
|
||||
|
||||
# Initialize the auto-restarter
|
||||
self.restart_interval = int(config.get('restart_interval', RESTART_INTERVAL))
|
||||
|
@ -138,8 +135,6 @@ class CollectorDaemon(monasca_agent.common.daemon.Daemon):
|
|||
# Only plan for the next loop if we will continue,
|
||||
# otherwise just exit quickly.
|
||||
if self.run_forever:
|
||||
if watchdog:
|
||||
watchdog.reset()
|
||||
time.sleep(check_frequency)
|
||||
|
||||
# Now clean-up.
|
||||
|
@ -153,16 +148,6 @@ class CollectorDaemon(monasca_agent.common.daemon.Daemon):
|
|||
log.info("Exiting. Bye bye.")
|
||||
sys.exit(0)
|
||||
|
||||
@staticmethod
|
||||
def _get_watchdog(check_freq, agentConfig):
|
||||
watchdog = None
|
||||
if agentConfig.get("watchdog", True):
|
||||
watchdog = util.Watchdog(check_freq * WATCHDOG_MULTIPLIER,
|
||||
max_mem_mb=agentConfig.get('limit_memory_consumption',
|
||||
None))
|
||||
watchdog.reset()
|
||||
return watchdog
|
||||
|
||||
def _should_restart(self):
|
||||
if time.time() - self.agent_start > self.restart_interval:
|
||||
return True
|
||||
|
|
|
@ -46,7 +46,6 @@ class Config(object):
|
|||
'additional_checksd': '/usr/lib/monasca/agent/custom_checks.d',
|
||||
'limit_memory_consumption': None,
|
||||
'skip_ssl_validation': False,
|
||||
'watchdog': True,
|
||||
'autorestart': False,
|
||||
'non_local_traffic': False},
|
||||
'Api': {'is_enabled': False,
|
||||
|
|
|
@ -8,7 +8,6 @@ import optparse
|
|||
import os
|
||||
import platform
|
||||
import re
|
||||
import signal
|
||||
import socket
|
||||
import subprocess
|
||||
import sys
|
||||
|
@ -41,50 +40,6 @@ import monasca_agent.common.config as configuration
|
|||
from monasca_agent.common.exceptions import PathNotFound
|
||||
|
||||
|
||||
class Watchdog(object):
|
||||
|
||||
"""Simple signal-based watchdog that will scuttle the current process
|
||||
if it has not been reset every N seconds, or if the processes exceeds
|
||||
a specified memory threshold.
|
||||
Can only be invoked once per process, so don't use with multiple threads.
|
||||
If you instantiate more than one, you're also asking for trouble.
|
||||
"""
|
||||
|
||||
def __init__(self, duration, max_mem_mb=None):
|
||||
import resource
|
||||
|
||||
# Set the duration
|
||||
self._duration = int(duration)
|
||||
signal.signal(signal.SIGALRM, Watchdog.self_destruct)
|
||||
|
||||
# cap memory usage
|
||||
if max_mem_mb is not None:
|
||||
self._max_mem_kb = 1024 * max_mem_mb
|
||||
max_mem_bytes = 1024 * self._max_mem_kb
|
||||
resource.setrlimit(resource.RLIMIT_AS, (max_mem_bytes, max_mem_bytes))
|
||||
self.memory_limit_enabled = True
|
||||
else:
|
||||
self.memory_limit_enabled = False
|
||||
|
||||
@staticmethod
|
||||
def self_destruct(signum, frame):
|
||||
try:
|
||||
import traceback
|
||||
log.error("Self-destructing...")
|
||||
log.error(traceback.format_exc())
|
||||
finally:
|
||||
os.kill(os.getpid(), signal.SIGKILL)
|
||||
|
||||
def reset(self):
|
||||
# self destruct if using too much memory, as tornado will swallow MemoryErrors
|
||||
mem_usage_kb = int(os.popen('ps -p %d -o %s | tail -1' % (os.getpid(), 'rss')).read())
|
||||
if self.memory_limit_enabled and mem_usage_kb > (0.95 * self._max_mem_kb):
|
||||
Watchdog.self_destruct(signal.SIGKILL, sys._getframe(0))
|
||||
|
||||
log.debug("Resetting watchdog for %d" % self._duration)
|
||||
signal.alarm(self._duration)
|
||||
|
||||
|
||||
class PidFile(object):
|
||||
"""A small helper class for pidfiles. """
|
||||
|
||||
|
|
|
@ -38,8 +38,6 @@ import monasca_agent.forwarder.transaction as transaction
|
|||
|
||||
log = logging.getLogger('forwarder')
|
||||
|
||||
WATCHDOG_INTERVAL_MULTIPLIER = 10 # 10x flush interval
|
||||
|
||||
# Maximum delay before replaying a transaction
|
||||
MAX_WAIT_FOR_REPLAY = datetime.timedelta(seconds=90)
|
||||
|
||||
|
@ -98,7 +96,7 @@ class AgentInputHandler(tornado.web.RequestHandler):
|
|||
|
||||
class Forwarder(tornado.web.Application):
|
||||
|
||||
def __init__(self, port, agent_config, watchdog=True, skip_ssl_validation=False,
|
||||
def __init__(self, port, agent_config, skip_ssl_validation=False,
|
||||
use_simple_http_client=False):
|
||||
self._port = int(port)
|
||||
self._agent_config = agent_config
|
||||
|
@ -112,18 +110,12 @@ class Forwarder(tornado.web.Application):
|
|||
agent_config)
|
||||
transaction.MetricTransaction.set_tr_manager(self._tr_manager)
|
||||
|
||||
self._watchdog = None
|
||||
self.skip_ssl_validation = skip_ssl_validation or agent_config.get(
|
||||
'skip_ssl_validation', False)
|
||||
self.use_simple_http_client = use_simple_http_client
|
||||
if self.skip_ssl_validation:
|
||||
log.info("Skipping SSL hostname validation, useful when using a transparent proxy")
|
||||
|
||||
if watchdog:
|
||||
watchdog_timeout = self.flush_interval * WATCHDOG_INTERVAL_MULTIPLIER
|
||||
self._watchdog = util.Watchdog(
|
||||
watchdog_timeout, max_mem_mb=agent_config.get('limit_memory_consumption', None))
|
||||
|
||||
def _post_metrics(self):
|
||||
|
||||
if len(self._metrics) > 0:
|
||||
|
@ -199,8 +191,6 @@ class Forwarder(tornado.web.Application):
|
|||
logging.getLogger().setLevel(self._agent_config.get('log_level', logging.INFO))
|
||||
|
||||
def flush_trs():
|
||||
if self._watchdog:
|
||||
self._watchdog.reset()
|
||||
self._post_metrics()
|
||||
self._tr_manager.flush()
|
||||
|
||||
|
@ -208,8 +198,6 @@ class Forwarder(tornado.web.Application):
|
|||
flush_trs, self.flush_interval, io_loop=self.mloop)
|
||||
|
||||
# Start everything
|
||||
if self._watchdog:
|
||||
self._watchdog.reset()
|
||||
tr_sched.start()
|
||||
|
||||
self.mloop.start()
|
||||
|
|
|
@ -9,7 +9,6 @@ import monasca_agent.common.util as util
|
|||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
WATCHDOG_TIMEOUT = 120
|
||||
# Since we call flush more often than the metrics aggregation interval, we should
|
||||
# log a bunch of flushes in a row every so often.
|
||||
FLUSH_LOGGING_PERIOD = 70
|
||||
|
@ -23,7 +22,7 @@ class Reporter(threading.Thread):
|
|||
server.
|
||||
"""
|
||||
|
||||
def __init__(self, interval, aggregator, api_host, use_watchdog=False, event_chunk_size=None):
|
||||
def __init__(self, interval, aggregator, api_host, event_chunk_size=None):
|
||||
threading.Thread.__init__(self)
|
||||
self.interval = int(interval)
|
||||
self.finished = threading.Event()
|
||||
|
@ -31,11 +30,6 @@ class Reporter(threading.Thread):
|
|||
self.flush_count = 0
|
||||
self.log_count = 0
|
||||
|
||||
self.watchdog = None
|
||||
if use_watchdog:
|
||||
from monasca_agent.common.util import Watchdog
|
||||
self.watchdog = Watchdog(WATCHDOG_TIMEOUT)
|
||||
|
||||
self.api_host = api_host
|
||||
self.event_chunk_size = event_chunk_size or EVENT_CHUNK_SIZE
|
||||
|
||||
|
@ -50,7 +44,6 @@ class Reporter(threading.Thread):
|
|||
def run(self):
|
||||
|
||||
log.info("Reporting to %s every %ss" % (self.api_host, self.interval))
|
||||
log.debug("Watchdog enabled: %s" % bool(self.watchdog))
|
||||
|
||||
# Persist a start-up message.
|
||||
check_status.MonascaStatsdStatus().persist()
|
||||
|
@ -58,8 +51,6 @@ class Reporter(threading.Thread):
|
|||
while not self.finished.isSet(): # Use camel case isSet for 2.4 support.
|
||||
self.finished.wait(self.interval)
|
||||
self.flush()
|
||||
if self.watchdog:
|
||||
self.watchdog.reset()
|
||||
|
||||
# Clean up the status messages.
|
||||
log.debug("Stopped reporter")
|
||||
|
|
|
@ -167,7 +167,7 @@ class MonascaForwarder(multiprocessing.Process):
|
|||
else:
|
||||
port = int(port)
|
||||
app_config = get_config(parse_args=False)
|
||||
self.forwarder = Application(port, app_config, watchdog=False)
|
||||
self.forwarder = Application(port, app_config)
|
||||
self.forwarder.run()
|
||||
|
||||
def stop(self):
|
||||
|
|
|
@ -1,168 +0,0 @@
|
|||
import unittest
|
||||
import subprocess
|
||||
import os
|
||||
import sys
|
||||
from random import random, randrange
|
||||
import urllib as url
|
||||
import time
|
||||
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), '../monasca_agent'))
|
||||
from monasca_agent.collector.daemon import CollectorDaemon
|
||||
from monasca_agent.common.util import Watchdog
|
||||
|
||||
|
||||
class TestWatchdog(unittest.TestCase):
|
||||
|
||||
"""Test watchdog in various conditions
|
||||
"""
|
||||
|
||||
JITTER_FACTOR = 2
|
||||
|
||||
def test_watchdog(self):
|
||||
"""Verify that watchdog kills ourselves even when spinning
|
||||
Verify that watchdog kills ourselves when hanging
|
||||
"""
|
||||
start = time.time()
|
||||
try:
|
||||
subprocess.check_call(
|
||||
["python", "tests/test_watchdog.py", "busy"], stderr=subprocess.STDOUT)
|
||||
raise Exception("Should have died with an error")
|
||||
except subprocess.CalledProcessError:
|
||||
duration = int(time.time() - start)
|
||||
self.assertTrue(duration < self.JITTER_FACTOR * 5)
|
||||
|
||||
# Start pseudo web server
|
||||
subprocess.Popen(["nc", "-l", "31834"])
|
||||
start = time.time()
|
||||
try:
|
||||
subprocess.check_call(["python", "tests/test_watchdog.py", "net"])
|
||||
raise Exception("Should have died with an error")
|
||||
except subprocess.CalledProcessError:
|
||||
duration = int(time.time() - start)
|
||||
self.assertTrue(duration < self.JITTER_FACTOR * 5)
|
||||
|
||||
# Normal loop, should run 5 times
|
||||
start = time.time()
|
||||
try:
|
||||
subprocess.check_call(["python", "tests/test_watchdog.py", "normal"])
|
||||
duration = int(time.time() - start)
|
||||
self.assertTrue(duration < self.JITTER_FACTOR * 5)
|
||||
except subprocess.CalledProcessError:
|
||||
self.fail("Watchdog killed normal process after %s seconds" % int(time.time() - start))
|
||||
|
||||
# Fast tornado, not killed
|
||||
start = time.time()
|
||||
p = subprocess.Popen(["python", "tests/test_watchdog.py", "fast"])
|
||||
p.wait()
|
||||
duration = int(time.time() - start)
|
||||
# should die as soon as flush_trs has been called
|
||||
self.assertTrue(duration, self.JITTER_FACTOR * 10)
|
||||
|
||||
# Slow tornado, killed by the Watchdog
|
||||
start = time.time()
|
||||
p = subprocess.Popen(["python", "tests/test_watchdog.py", "slow"])
|
||||
p.wait()
|
||||
duration = int(time.time() - start)
|
||||
self.assertTrue(duration < self.JITTER_FACTOR * 4)
|
||||
|
||||
# Too much memory used, killed by Watchdog
|
||||
start = time.time()
|
||||
p = subprocess.Popen(["python", "tests/test_watchdog.py", "memory"])
|
||||
p.wait()
|
||||
duration = int(time.time() - start)
|
||||
# process should be killed well before the restart interval of 30.
|
||||
assert duration < 20
|
||||
|
||||
|
||||
class MockTxManager(object):
|
||||
|
||||
@staticmethod
|
||||
def flush():
|
||||
"Pretend to flush for a long time"
|
||||
time.sleep(5)
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
class MemoryHogTxManager(object):
|
||||
|
||||
@staticmethod
|
||||
def flush():
|
||||
rand_data = []
|
||||
while True:
|
||||
rand_data.append('%030x' % randrange(256 ** 15))
|
||||
|
||||
|
||||
class PseudoAgent(object):
|
||||
|
||||
"""Same logic as the agent, simplified"""
|
||||
@staticmethod
|
||||
def busy_run():
|
||||
w = Watchdog(5)
|
||||
w.reset()
|
||||
x = 0
|
||||
while True:
|
||||
x = random()
|
||||
return x
|
||||
|
||||
@staticmethod
|
||||
def hanging_net():
|
||||
w = Watchdog(5)
|
||||
w.reset()
|
||||
x = url.urlopen("http://localhost:31834")
|
||||
print("ERROR Net call returned", x)
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def normal_run():
|
||||
w = Watchdog(2)
|
||||
w.reset()
|
||||
for i in range(5):
|
||||
time.sleep(1)
|
||||
w.reset()
|
||||
|
||||
@staticmethod
|
||||
def slow_tornado():
|
||||
a = CollectorDaemon(12345, {})
|
||||
a._watchdog = Watchdog(4)
|
||||
a._tr_manager = MockTxManager()
|
||||
a.run()
|
||||
|
||||
@staticmethod
|
||||
def fast_tornado():
|
||||
a = CollectorDaemon(12345, {})
|
||||
a._watchdog = Watchdog(6)
|
||||
a._tr_manager = MockTxManager()
|
||||
a.run()
|
||||
|
||||
@staticmethod
|
||||
def use_lots_of_memory():
|
||||
# Skip this step on travis
|
||||
if os.environ.get('TRAVIS', False):
|
||||
return
|
||||
a = CollectorDaemon(12345, {})
|
||||
a._watchdog = Watchdog(30, 50)
|
||||
a._tr_manager = MemoryHogTxManager()
|
||||
a.run()
|
||||
|
||||
if __name__ == "__main__":
|
||||
if sys.argv[1] == "busy":
|
||||
a = PseudoAgent()
|
||||
a.busy_run()
|
||||
elif sys.argv[1] == "net":
|
||||
a = PseudoAgent()
|
||||
a.hanging_net()
|
||||
elif sys.argv[1] == "normal":
|
||||
a = PseudoAgent()
|
||||
a.normal_run()
|
||||
elif sys.argv[1] == "slow":
|
||||
a = PseudoAgent()
|
||||
a.slow_tornado()
|
||||
elif sys.argv[1] == "fast":
|
||||
a = PseudoAgent()
|
||||
a.fast_tornado()
|
||||
elif sys.argv[1] == "test":
|
||||
t = TestWatchdog()
|
||||
t.runTest()
|
||||
elif sys.argv[1] == "memory":
|
||||
a = PseudoAgent()
|
||||
a.use_lots_of_memory()
|
Loading…
Reference in New Issue