From 75a4942cc59a55849b81514f9bb8b81d6b2a7190 Mon Sep 17 00:00:00 2001 From: Dan Prince Date: Wed, 16 Apr 2014 09:34:18 -0400 Subject: [PATCH] testenv-worker: use signals to enable/disable jobs Updates the testenv-worker script to handle signals so that we can stop/start the processing of new jobs without killing the existing testenv-worker process. This is important in that it will give us better control of testenv-worker node upgrades without disrupting the job queue. The following signal handling is now supported: USR1: Enable job processing. USR2: Disable job processing. The current job will finish processing and no new jobs will be accepted. The testenv-worker process will however stay running. INT: Exit after any currently running job finishes. The testenv-worker process must be restarted after this signal is received in order to process new jobs. Change-Id: I465f34393627f6b6fd6584824531d01287b523d7 --- elements/testenv-worker/testenv-worker | 191 ++++++++++++++++--------- 1 file changed, 122 insertions(+), 69 deletions(-) diff --git a/elements/testenv-worker/testenv-worker b/elements/testenv-worker/testenv-worker index 1f363a7ea..ea57f8ab7 100755 --- a/elements/testenv-worker/testenv-worker +++ b/elements/testenv-worker/testenv-worker @@ -21,6 +21,7 @@ import argparse import json import logging +import signal import sys import threading import time @@ -62,76 +63,113 @@ class CallbackClient(gear.Client): return self.event.is_set() -def add_servers(client, servers): - for server in servers.split(','): - server = server.rsplit(':', 1) - if len(server) == 1: - server.append('4730') - client.addServer(server[0], int(server[1])) +class TEWorkerThread(threading.Thread): + def __init__(self, geard, num, timeout, te_data): + super(TEWorkerThread, self).__init__() + self.geard = geard + self.timeout = timeout + self.te_data = te_data + self.running = True + self.process_new_jobs = True + self.num = num + self.worker = None + def stop(self): + self.running = False + self.stopJobProcessing() -def run_callback(geard, timeout, callback_name, te_data): - client = CallbackClient() - add_servers(client, geard) - client.waitForServer() + def startJobProcessing(self): + self.process_new_jobs = True - cb_job = gear.Job(callback_name, te_data) - client.submitJob(cb_job) + def stopJobProcessing(self): + self.process_new_jobs = False + if self.worker: + self.worker.stopWaitingForJobs() - # Wait for 30 seconds, then test the status of the job - if not client.wait(30): - # Request the job status from the broker - cb_job.connection.sendPacket(gear.Packet(gear.constants.REQ, - gear.constants.GET_STATUS, - cb_job.handle)) - # Let a little time pass for the STATUS_RES to return, If we're in - # here we've already waited 30 seconds so another 10 wont make much - # difference - time.sleep(10) - if not cb_job.running: - logger.error("No sign of the Callback job starting," - "assuming its no longer present") - client.shutdown() - return + def run(self): + while self.running: + try: + if self.process_new_jobs: + logger.info('running TE worker') + self.runJob() + except gear.InterruptedError: + logger.info('getJob interrupted...') + except: + logger.exception('Error while run_te_worker worker') + time.sleep(2) - # We timeout after the configured timeout - the 40 second sleep that we - # perform during initial handshaking. Note that after this timeout we offer - # the environment for other test clients, but the prior client's - # credentials are still valid, so very confusing errors can occur if we - # were ever to timeout without the client timing out first. - client.wait(timeout - 40) - if cb_job.failure: - logger.error("The Job appears to have failed") - elif not cb_job.complete: - logger.error("No sign of Job completing, Freeing up the environment") - else: - logger.info('Returned from Job : %s', cb_job.data) - client.shutdown() + def runJob(self): + self.worker = gear.Worker('testenv-worker-%s' % self.num) + try: + self._add_servers(self.worker, self.geard) + self.worker.waitForServer() + self.worker.registerFunction('lockenv') -def run_teworker(geard, num, timeout, te_data): - worker = gear.Worker('testenv-worker-%s' % num) - add_servers(worker, geard) - worker.waitForServer() + logger.info('Getting new job...') + job = self.worker.getJob() + logger.info('Received job : %s', job.arguments) + if job.arguments.startswith("{"): + arguments = json.loads(job.arguments) + call_back = arguments["callback_name"] + job_timeout = int(arguments.get("timeout", self.timeout)) + else: + # legacy support, for clients that just send the callback name + call_back = job.arguments + job_timeout = self.timeout - worker.registerFunction('lockenv') - while True: - job = worker.getJob() - logger.info('Received job : %s', job.arguments) - if job.arguments.startswith("{"): - arguments = json.loads(job.arguments) - call_back = arguments["callback_name"] - job_timeout = int(arguments.get("timeout", timeout)) + # Once this Job is called we call back to the client to run its + # commands while this environment is locked + self._run_callback(job_timeout, call_back) + + job.sendWorkComplete("") + finally: + self.worker.shutdown() + + def _add_servers(self, client, servers): + for server in servers.split(','): + server = server.rsplit(':', 1) + if len(server) == 1: + server.append('4730') + client.addServer(server[0], int(server[1])) + + def _run_callback(self, timeout, callback_name): + client = CallbackClient() + self._add_servers(client, self.geard) + client.waitForServer() + + cb_job = gear.Job(callback_name, self.te_data) + client.submitJob(cb_job) + + # Wait for 30 seconds, then test the status of the job + if not client.wait(30): + # Request the job status from the broker + cb_job.connection.sendPacket(gear.Packet(gear.constants.REQ, + gear.constants.GET_STATUS, + cb_job.handle)) + # Let a little time pass for the STATUS_RES to return, If we're in + # here we've already waited 30 seconds so another 10 wont make much + # difference + time.sleep(10) + if not cb_job.running: + logger.error("No sign of the Callback job starting," + "assuming its no longer present") + client.shutdown() + return + + # We timeout after the configured timeout - the 40 second sleep that we + # perform during initial handshaking. Note that after this timeout we + # offer the environment for other test clients, but the prior client's + # credentials are still valid, so very confusing errors can occur if we + # were ever to timeout without the client timing out first. + client.wait(timeout - 40) + if cb_job.failure: + logger.error("The Job appears to have failed.") + elif not cb_job.complete: + logger.error("No sign of Job completing, Freeing environment.") else: - # legacy support, for clients that just send the callback name - call_back = job.arguments - job_timeout = timeout - - # Once this Job is called we call back to the client to run its - # commands while this environment is locked - run_callback(geard, job_timeout, call_back, te_data) - - job.sendWorkComplete("") + logger.info('Returned from Job : %s', cb_job.data) + client.shutdown() def main(args=sys.argv[1:]): @@ -162,14 +200,29 @@ def main(args=sys.argv[1:]): te_data = fp.read() logger.info('Starting test-env worker with data %r', te_data.strip()) - # If this job dies we loose the test environment, so catch everything and - # retry after a short sleep. - while True: - try: - run_teworker(opts.geard, opts.tenum, opts.timeout, te_data) - except: - logger.exception('Error while running worker') - time.sleep(5) + # run worker in thread so signal handling is responsive + te_worker = TEWorkerThread(opts.geard, opts.tenum, opts.timeout, te_data) + + def usr2_signal_handler(signal, frame): + te_worker.stopJobProcessing() + logger.info('SIGUSR2 recieved: Processing of new jobs is disabled.') + signal.signal(signal.SIGUSR2, usr2_signal_handler) + + def usr1_signal_handler(signal, frame): + te_worker.startJobProcessing() + logger.info('SIGUSR1 recieved: Processing of new jobs is enabled.') + signal.signal(signal.SIGUSR1, usr1_signal_handler) + + def int_signal_handler(signal, frame): + te_worker.stop() + logger.info('SIGINT recieved: Exiting...') + sys.exit(0) + signal.signal(signal.SIGINT, int_signal_handler) + + te_worker.start() + while te_worker.running: + time.sleep(1) + if __name__ == '__main__': main()