Merge "Add more steps to stop an engine"

This commit is contained in:
Jenkins 2014-06-18 22:30:21 +00:00 committed by Gerrit Code Review
commit e9cb9ab547
1 changed files with 19 additions and 8 deletions

View File

@ -128,7 +128,7 @@ class Engine(object):
self._watchdog_thread.join()
def schedule(self):
while True:
while self._state == self.ENABLED:
(next_time, next_jobs) = self.wait_next(self.engine_timeout)
# NOTE(praneshp): here, call a function that will wait till next
# time and call next_jobs,
@ -146,8 +146,8 @@ class Engine(object):
if not self.run_queue:
if watch and watch.expired():
raise exceptions.TimeoutException(
"Died after waiting for audits to arrive for %s" %
watch.elapsed())
"Died at %s after waiting for audits to arrive "
"for %s" % (utils.wallclock(), watch.elapsed()))
else:
# Grab all the jobs for the next time.
next_jobs.append(self.run_queue.popleft())
@ -169,12 +169,13 @@ class Engine(object):
now = datetime.datetime.now()
cron = croniter.croniter(schedule, now)
next_iteration = cron.get_next(datetime.datetime)
while True:
while self._state == self.ENABLED:
LOG.info('It is %s, next serializer at %s', now, next_iteration)
pause.until(next_iteration)
now = datetime.datetime.now()
next_iteration = cron.get_next(datetime.datetime)
self.run_serializer(next_iteration, now)
if self._state == self.ENABLED:
self.run_serializer(next_iteration, now)
def run_serializer(self, next_iteration, current_time):
LOG.info("Running serializer for %s at %s", self.name, current_time)
@ -201,7 +202,10 @@ class Engine(object):
new_additions.sort(key=operator.itemgetter('time'))
self.run_queue.extend(new_additions)
# NOTE(praneshp): Protect this operation with a state check, so in
# case of race conditions no extra audit scripts are added.
if self._state == self.ENABLED:
self.run_queue.extend(new_additions)
LOG.info("Run queue till %s is %s", next_iteration, self.run_queue)
LOG.info("Repair scripts at %s: %s", next_iteration, self._repairs)
except Exception:
@ -216,8 +220,10 @@ class Engine(object):
def stop_engine(self):
LOG.info("Stopping engine %s", self.name)
# Set state to stop, which will stop serializers
LOG.info("Setting %s to state: %s", self.name, states.DISABLED)
self._state = states.DISABLED
# Clear run queue
LOG.info("Clearing audit run queue for %s", self.name)
self.run_queue.clear()
# Stop all repairs - not yet implemented
# Stop watchdog monitoring
LOG.info("Stopping watchdog for %s", self.name)
self._watchdog_thread.stop()
@ -236,6 +242,11 @@ class Engine(object):
def setup_audit(self, execution_time, audit_list):
try:
pause.until(execution_time)
# Only proceed if engine is running, i.e in enabled state.
if self._state != self.ENABLED:
LOG.info("%s is disabled, so not running audits at %s",
self.name, execution_time)
return
LOG.info("Time: %s, Starting %s", execution_time, audit_list)
audit_futures = []
for audit in audit_list: