Merge "Remove watchdog from the agent"

This commit is contained in:
Jenkins 2015-09-16 04:45:55 +00:00 committed by Gerrit Code Review
commit 9ad14a48aa
7 changed files with 3 additions and 253 deletions

View File

@ -30,7 +30,6 @@ if int(sys.version_info[1]) <= 3:
# Constants
PID_NAME = "monasca-agent"
WATCHDOG_MULTIPLIER = 10
RESTART_INTERVAL = 4 * 24 * 60 * 60 # Defaults to 4 days
START_COMMANDS = ['start', 'restart', 'foreground']
@ -92,9 +91,7 @@ class CollectorDaemon(monasca_agent.common.daemon.Daemon):
self.collector = checks.collector.Collector(config, monasca_agent.common.emitter.http_emitter, checksd)
# Configure the watchdog.
check_frequency = int(config['check_freq'])
watchdog = self._get_watchdog(check_frequency, config)
# Initialize the auto-restarter
self.restart_interval = int(config.get('restart_interval', RESTART_INTERVAL))
@ -138,8 +135,6 @@ class CollectorDaemon(monasca_agent.common.daemon.Daemon):
# Only plan for the next loop if we will continue,
# otherwise just exit quickly.
if self.run_forever:
if watchdog:
watchdog.reset()
time.sleep(check_frequency)
# Now clean-up.
@ -153,16 +148,6 @@ class CollectorDaemon(monasca_agent.common.daemon.Daemon):
log.info("Exiting. Bye bye.")
sys.exit(0)
@staticmethod
def _get_watchdog(check_freq, agentConfig):
watchdog = None
if agentConfig.get("watchdog", True):
watchdog = util.Watchdog(check_freq * WATCHDOG_MULTIPLIER,
max_mem_mb=agentConfig.get('limit_memory_consumption',
None))
watchdog.reset()
return watchdog
def _should_restart(self):
if time.time() - self.agent_start > self.restart_interval:
return True

View File

@ -46,7 +46,6 @@ class Config(object):
'additional_checksd': '/usr/lib/monasca/agent/custom_checks.d',
'limit_memory_consumption': None,
'skip_ssl_validation': False,
'watchdog': True,
'autorestart': False,
'non_local_traffic': False},
'Api': {'is_enabled': False,

View File

@ -8,7 +8,6 @@ import optparse
import os
import platform
import re
import signal
import socket
import subprocess
import sys
@ -41,50 +40,6 @@ import monasca_agent.common.config as configuration
from monasca_agent.common.exceptions import PathNotFound
class Watchdog(object):
"""Simple signal-based watchdog that will scuttle the current process
if it has not been reset every N seconds, or if the processes exceeds
a specified memory threshold.
Can only be invoked once per process, so don't use with multiple threads.
If you instantiate more than one, you're also asking for trouble.
"""
def __init__(self, duration, max_mem_mb=None):
import resource
# Set the duration
self._duration = int(duration)
signal.signal(signal.SIGALRM, Watchdog.self_destruct)
# cap memory usage
if max_mem_mb is not None:
self._max_mem_kb = 1024 * max_mem_mb
max_mem_bytes = 1024 * self._max_mem_kb
resource.setrlimit(resource.RLIMIT_AS, (max_mem_bytes, max_mem_bytes))
self.memory_limit_enabled = True
else:
self.memory_limit_enabled = False
@staticmethod
def self_destruct(signum, frame):
try:
import traceback
log.error("Self-destructing...")
log.error(traceback.format_exc())
finally:
os.kill(os.getpid(), signal.SIGKILL)
def reset(self):
# self destruct if using too much memory, as tornado will swallow MemoryErrors
mem_usage_kb = int(os.popen('ps -p %d -o %s | tail -1' % (os.getpid(), 'rss')).read())
if self.memory_limit_enabled and mem_usage_kb > (0.95 * self._max_mem_kb):
Watchdog.self_destruct(signal.SIGKILL, sys._getframe(0))
log.debug("Resetting watchdog for %d" % self._duration)
signal.alarm(self._duration)
class PidFile(object):
"""A small helper class for pidfiles. """

View File

@ -38,8 +38,6 @@ import monasca_agent.forwarder.transaction as transaction
log = logging.getLogger('forwarder')
WATCHDOG_INTERVAL_MULTIPLIER = 10 # 10x flush interval
# Maximum delay before replaying a transaction
MAX_WAIT_FOR_REPLAY = datetime.timedelta(seconds=90)
@ -98,7 +96,7 @@ class AgentInputHandler(tornado.web.RequestHandler):
class Forwarder(tornado.web.Application):
def __init__(self, port, agent_config, watchdog=True, skip_ssl_validation=False,
def __init__(self, port, agent_config, skip_ssl_validation=False,
use_simple_http_client=False):
self._port = int(port)
self._agent_config = agent_config
@ -112,18 +110,12 @@ class Forwarder(tornado.web.Application):
agent_config)
transaction.MetricTransaction.set_tr_manager(self._tr_manager)
self._watchdog = None
self.skip_ssl_validation = skip_ssl_validation or agent_config.get(
'skip_ssl_validation', False)
self.use_simple_http_client = use_simple_http_client
if self.skip_ssl_validation:
log.info("Skipping SSL hostname validation, useful when using a transparent proxy")
if watchdog:
watchdog_timeout = self.flush_interval * WATCHDOG_INTERVAL_MULTIPLIER
self._watchdog = util.Watchdog(
watchdog_timeout, max_mem_mb=agent_config.get('limit_memory_consumption', None))
def _post_metrics(self):
if len(self._metrics) > 0:
@ -199,8 +191,6 @@ class Forwarder(tornado.web.Application):
logging.getLogger().setLevel(self._agent_config.get('log_level', logging.INFO))
def flush_trs():
if self._watchdog:
self._watchdog.reset()
self._post_metrics()
self._tr_manager.flush()
@ -208,8 +198,6 @@ class Forwarder(tornado.web.Application):
flush_trs, self.flush_interval, io_loop=self.mloop)
# Start everything
if self._watchdog:
self._watchdog.reset()
tr_sched.start()
self.mloop.start()

View File

@ -9,7 +9,6 @@ import monasca_agent.common.util as util
log = logging.getLogger(__name__)
WATCHDOG_TIMEOUT = 120
# Since we call flush more often than the metrics aggregation interval, we should
# log a bunch of flushes in a row every so often.
FLUSH_LOGGING_PERIOD = 70
@ -23,7 +22,7 @@ class Reporter(threading.Thread):
server.
"""
def __init__(self, interval, aggregator, api_host, use_watchdog=False, event_chunk_size=None):
def __init__(self, interval, aggregator, api_host, event_chunk_size=None):
threading.Thread.__init__(self)
self.interval = int(interval)
self.finished = threading.Event()
@ -31,11 +30,6 @@ class Reporter(threading.Thread):
self.flush_count = 0
self.log_count = 0
self.watchdog = None
if use_watchdog:
from monasca_agent.common.util import Watchdog
self.watchdog = Watchdog(WATCHDOG_TIMEOUT)
self.api_host = api_host
self.event_chunk_size = event_chunk_size or EVENT_CHUNK_SIZE
@ -50,7 +44,6 @@ class Reporter(threading.Thread):
def run(self):
log.info("Reporting to %s every %ss" % (self.api_host, self.interval))
log.debug("Watchdog enabled: %s" % bool(self.watchdog))
# Persist a start-up message.
check_status.MonascaStatsdStatus().persist()
@ -58,8 +51,6 @@ class Reporter(threading.Thread):
while not self.finished.isSet(): # Use camel case isSet for 2.4 support.
self.finished.wait(self.interval)
self.flush()
if self.watchdog:
self.watchdog.reset()
# Clean up the status messages.
log.debug("Stopped reporter")

View File

@ -167,7 +167,7 @@ class MonascaForwarder(multiprocessing.Process):
else:
port = int(port)
app_config = get_config(parse_args=False)
self.forwarder = Application(port, app_config, watchdog=False)
self.forwarder = Application(port, app_config)
self.forwarder.run()
def stop(self):

View File

@ -1,168 +0,0 @@
import unittest
import subprocess
import os
import sys
from random import random, randrange
import urllib as url
import time
sys.path.append(os.path.join(os.path.dirname(__file__), '../monasca_agent'))
from monasca_agent.collector.daemon import CollectorDaemon
from monasca_agent.common.util import Watchdog
class TestWatchdog(unittest.TestCase):
"""Test watchdog in various conditions
"""
JITTER_FACTOR = 2
def test_watchdog(self):
"""Verify that watchdog kills ourselves even when spinning
Verify that watchdog kills ourselves when hanging
"""
start = time.time()
try:
subprocess.check_call(
["python", "tests/test_watchdog.py", "busy"], stderr=subprocess.STDOUT)
raise Exception("Should have died with an error")
except subprocess.CalledProcessError:
duration = int(time.time() - start)
self.assertTrue(duration < self.JITTER_FACTOR * 5)
# Start pseudo web server
subprocess.Popen(["nc", "-l", "31834"])
start = time.time()
try:
subprocess.check_call(["python", "tests/test_watchdog.py", "net"])
raise Exception("Should have died with an error")
except subprocess.CalledProcessError:
duration = int(time.time() - start)
self.assertTrue(duration < self.JITTER_FACTOR * 5)
# Normal loop, should run 5 times
start = time.time()
try:
subprocess.check_call(["python", "tests/test_watchdog.py", "normal"])
duration = int(time.time() - start)
self.assertTrue(duration < self.JITTER_FACTOR * 5)
except subprocess.CalledProcessError:
self.fail("Watchdog killed normal process after %s seconds" % int(time.time() - start))
# Fast tornado, not killed
start = time.time()
p = subprocess.Popen(["python", "tests/test_watchdog.py", "fast"])
p.wait()
duration = int(time.time() - start)
# should die as soon as flush_trs has been called
self.assertTrue(duration, self.JITTER_FACTOR * 10)
# Slow tornado, killed by the Watchdog
start = time.time()
p = subprocess.Popen(["python", "tests/test_watchdog.py", "slow"])
p.wait()
duration = int(time.time() - start)
self.assertTrue(duration < self.JITTER_FACTOR * 4)
# Too much memory used, killed by Watchdog
start = time.time()
p = subprocess.Popen(["python", "tests/test_watchdog.py", "memory"])
p.wait()
duration = int(time.time() - start)
# process should be killed well before the restart interval of 30.
assert duration < 20
class MockTxManager(object):
@staticmethod
def flush():
"Pretend to flush for a long time"
time.sleep(5)
sys.exit(0)
class MemoryHogTxManager(object):
@staticmethod
def flush():
rand_data = []
while True:
rand_data.append('%030x' % randrange(256 ** 15))
class PseudoAgent(object):
"""Same logic as the agent, simplified"""
@staticmethod
def busy_run():
w = Watchdog(5)
w.reset()
x = 0
while True:
x = random()
return x
@staticmethod
def hanging_net():
w = Watchdog(5)
w.reset()
x = url.urlopen("http://localhost:31834")
print("ERROR Net call returned", x)
return True
@staticmethod
def normal_run():
w = Watchdog(2)
w.reset()
for i in range(5):
time.sleep(1)
w.reset()
@staticmethod
def slow_tornado():
a = CollectorDaemon(12345, {})
a._watchdog = Watchdog(4)
a._tr_manager = MockTxManager()
a.run()
@staticmethod
def fast_tornado():
a = CollectorDaemon(12345, {})
a._watchdog = Watchdog(6)
a._tr_manager = MockTxManager()
a.run()
@staticmethod
def use_lots_of_memory():
# Skip this step on travis
if os.environ.get('TRAVIS', False):
return
a = CollectorDaemon(12345, {})
a._watchdog = Watchdog(30, 50)
a._tr_manager = MemoryHogTxManager()
a.run()
if __name__ == "__main__":
if sys.argv[1] == "busy":
a = PseudoAgent()
a.busy_run()
elif sys.argv[1] == "net":
a = PseudoAgent()
a.hanging_net()
elif sys.argv[1] == "normal":
a = PseudoAgent()
a.normal_run()
elif sys.argv[1] == "slow":
a = PseudoAgent()
a.slow_tornado()
elif sys.argv[1] == "fast":
a = PseudoAgent()
a.fast_tornado()
elif sys.argv[1] == "test":
t = TestWatchdog()
t.runTest()
elif sys.argv[1] == "memory":
a = PseudoAgent()
a.use_lots_of_memory()