Finish changes to stop an engine

Added a call to stop all react scripts, by passing all known routing
keys to the react_killer function. Then stop the threadpool executor.
Finally, Raise a known exception to stop the watchdog thread. This
will throw an ugly traceback, but will shutdown the engine
gracefully.

Also made minor changes to the example react.json, to change the log
format. Knowing the time a log was printed is useful.

Change-Id: Ibed06f79547312d188feb499f937eb5390d60c3e
This commit is contained in:
Pranesh Pandurangan 2014-06-20 17:39:50 -07:00
parent 857fb52cf1
commit c79d12c645
4 changed files with 31 additions and 18 deletions

View File

@ -18,9 +18,7 @@
import argparse
import logging
import os
import psutil
import tempfile
import time
from engine import Engine
from entropy import utils
@ -152,13 +150,7 @@ def start_engine(args):
def stop_engine(args):
LOG.info("Stopping engine %s", args.name)
# Grab engine config file, set our engine to disabled
pid = utils.disable_engine(args.name, engine_cfg)
try:
p = psutil.Process(pid)
time.sleep(5)
p.terminate()
except psutil.NoSuchProcess:
LOG.exception("No running engine %s", args.name)
utils.disable_engine(args.name, engine_cfg)
def parse():

View File

@ -73,8 +73,11 @@ class Engine(object):
self.futures = []
self.run_queue = collections.deque()
# Private variables
self._watchdog_event_fn = {self.repair_cfg: self.repair_modified,
self.engine_cfg: self.engine_disabled}
self._watchdog_event_fn = {
self.repair_cfg: self.repair_modified,
self.engine_cfg: self.engine_disabled,
self.audit_cfg: self.audit_modified,
}
# Private variables to keep track of repair scripts.
self._repairs = []
self._known_routing_keys = collections.defaultdict(list)
@ -126,6 +129,7 @@ class Engine(object):
self.start_scheduler()
def start_scheduler(self):
# Start serializer
if not self._serializer:
self._serializer = self.executor.submit(self.start_serializer)
self.futures.append(self._serializer)
@ -137,6 +141,7 @@ class Engine(object):
self.futures.extend(self.start_react_scripts(
self._get_react_scripts()))
# Start scheduler
self._scheduler = self.executor.submit(self.schedule)
self.futures.append(self._scheduler)
self._scheduler.add_done_callback(
@ -257,15 +262,32 @@ class Engine(object):
def stop_engine(self):
LOG.info("Stopping engine %s", self.name)
# Set state to stop, which will stop serializers
# Set state to stop, which will stop serializer, and scheduler
self._state = states.DISABLED
# Clear run queue
LOG.info("Clearing audit run queue for %s", self.name)
self.run_queue.clear()
# Stop all repairs - not yet implemented
# Stop all repairs
LOG.info("Stopping all repairs for %s", self.name)
repairs_to_stop = self._known_routing_keys.keys()
self.stop_react_scripts(repairs_to_stop)
# Stop the executor - this is a blocking call.
LOG.info("Shutting down executor for %s", self.name)
self.executor.shutdown()
# Stop watchdog monitoring
LOG.info("Stopping watchdog for %s", self.name)
self._watchdog_thread.stop()
# NOTE(praneshp): Till the watchdog authors respond with the right way
# to stop watchdog, we'll raise something from here. That will stop
# the watchdog thread, go back to the join() in start_scheduler(), and
# quit the program
raise exceptions.EngineStoppedException(
'Fake exception to kill watchdog thread')
def audit_modified(self):
return NotImplemented
def repair_modified(self):
LOG.info('Repair configuration changed')
@ -289,7 +311,7 @@ class Engine(object):
def start_watchdog(self):
LOG.debug('Watchdog mapping is: ', self._watchdog_event_fn)
dirs_to_watch = [utils.get_filename_and_path(x)[0] for x in
self.engine_cfg, self.repair_cfg]
self.engine_cfg, self.repair_cfg, self.audit_cfg]
return utils.watch_dir_for_change(dirs_to_watch,
self._watchdog_event_fn)
@ -323,11 +345,9 @@ class Engine(object):
def stop_react_scripts(self, repairs_to_stop):
# current react scripts
LOG.info("Currently running react scripts: %s", self._repairs)
for repair in repairs_to_stop:
self.stop_react(repair)
# react scripts at the end
LOG.info("Currently running react scripts: %s", self._repairs)
def stop_react(self, repair):
LOG.info("Stopping react script %s", repair)

View File

@ -9,5 +9,5 @@
"mq_password": "guest",
"routing_key": "demo",
"log_file": "/home/praneshp/code/entropy/entropy/examples/logs/demo.log",
"log_format": "%(filename)s %(lineno)s %(message)s"
"log_format": "%(asctime)s %(lineno)s %(message)s"
}

View File

@ -110,6 +110,7 @@ def watch_dir_for_change(dirs_to_watch, event_fn):
observer = Observer()
for directory in dirs_to_watch:
observer.schedule(event_handler, path=directory)
observer.setDaemon(True)
observer.start()
return observer