Merge "testenv-worker: use signals to enable/disable jobs"

This commit is contained in:
Jenkins 2014-06-03 11:18:17 +00:00 committed by Gerrit Code Review
commit 47ec2a992c
1 changed files with 122 additions and 69 deletions

View File

@ -21,6 +21,7 @@
import argparse
import json
import logging
import signal
import sys
import threading
import time
@ -62,76 +63,113 @@ class CallbackClient(gear.Client):
return self.event.is_set()
def add_servers(client, servers):
for server in servers.split(','):
server = server.rsplit(':', 1)
if len(server) == 1:
server.append('4730')
client.addServer(server[0], int(server[1]))
class TEWorkerThread(threading.Thread):
def __init__(self, geard, num, timeout, te_data):
super(TEWorkerThread, self).__init__()
self.geard = geard
self.timeout = timeout
self.te_data = te_data
self.running = True
self.process_new_jobs = True
self.num = num
self.worker = None
def stop(self):
self.running = False
self.stopJobProcessing()
def run_callback(geard, timeout, callback_name, te_data):
client = CallbackClient()
add_servers(client, geard)
client.waitForServer()
def startJobProcessing(self):
self.process_new_jobs = True
cb_job = gear.Job(callback_name, te_data)
client.submitJob(cb_job)
def stopJobProcessing(self):
self.process_new_jobs = False
if self.worker:
self.worker.stopWaitingForJobs()
# Wait for 30 seconds, then test the status of the job
if not client.wait(30):
# Request the job status from the broker
cb_job.connection.sendPacket(gear.Packet(gear.constants.REQ,
gear.constants.GET_STATUS,
cb_job.handle))
# Let a little time pass for the STATUS_RES to return, If we're in
# here we've already waited 30 seconds so another 10 wont make much
# difference
time.sleep(10)
if not cb_job.running:
logger.error("No sign of the Callback job starting,"
"assuming its no longer present")
client.shutdown()
return
def run(self):
while self.running:
try:
if self.process_new_jobs:
logger.info('running TE worker')
self.runJob()
except gear.InterruptedError:
logger.info('getJob interrupted...')
except:
logger.exception('Error while run_te_worker worker')
time.sleep(2)
# We timeout after the configured timeout - the 40 second sleep that we
# perform during initial handshaking. Note that after this timeout we offer
# the environment for other test clients, but the prior client's
# credentials are still valid, so very confusing errors can occur if we
# were ever to timeout without the client timing out first.
client.wait(timeout - 40)
if cb_job.failure:
logger.error("The Job appears to have failed")
elif not cb_job.complete:
logger.error("No sign of Job completing, Freeing up the environment")
else:
logger.info('Returned from Job : %s', cb_job.data)
client.shutdown()
def runJob(self):
self.worker = gear.Worker('testenv-worker-%s' % self.num)
try:
self._add_servers(self.worker, self.geard)
self.worker.waitForServer()
self.worker.registerFunction('lockenv')
def run_teworker(geard, num, timeout, te_data):
worker = gear.Worker('testenv-worker-%s' % num)
add_servers(worker, geard)
worker.waitForServer()
logger.info('Getting new job...')
job = self.worker.getJob()
logger.info('Received job : %s', job.arguments)
if job.arguments.startswith("{"):
arguments = json.loads(job.arguments)
call_back = arguments["callback_name"]
job_timeout = int(arguments.get("timeout", self.timeout))
else:
# legacy support, for clients that just send the callback name
call_back = job.arguments
job_timeout = self.timeout
worker.registerFunction('lockenv')
while True:
job = worker.getJob()
logger.info('Received job : %s', job.arguments)
if job.arguments.startswith("{"):
arguments = json.loads(job.arguments)
call_back = arguments["callback_name"]
job_timeout = int(arguments.get("timeout", timeout))
# Once this Job is called we call back to the client to run its
# commands while this environment is locked
self._run_callback(job_timeout, call_back)
job.sendWorkComplete("")
finally:
self.worker.shutdown()
def _add_servers(self, client, servers):
for server in servers.split(','):
server = server.rsplit(':', 1)
if len(server) == 1:
server.append('4730')
client.addServer(server[0], int(server[1]))
def _run_callback(self, timeout, callback_name):
client = CallbackClient()
self._add_servers(client, self.geard)
client.waitForServer()
cb_job = gear.Job(callback_name, self.te_data)
client.submitJob(cb_job)
# Wait for 30 seconds, then test the status of the job
if not client.wait(30):
# Request the job status from the broker
cb_job.connection.sendPacket(gear.Packet(gear.constants.REQ,
gear.constants.GET_STATUS,
cb_job.handle))
# Let a little time pass for the STATUS_RES to return, If we're in
# here we've already waited 30 seconds so another 10 wont make much
# difference
time.sleep(10)
if not cb_job.running:
logger.error("No sign of the Callback job starting,"
"assuming its no longer present")
client.shutdown()
return
# We timeout after the configured timeout - the 40 second sleep that we
# perform during initial handshaking. Note that after this timeout we
# offer the environment for other test clients, but the prior client's
# credentials are still valid, so very confusing errors can occur if we
# were ever to timeout without the client timing out first.
client.wait(timeout - 40)
if cb_job.failure:
logger.error("The Job appears to have failed.")
elif not cb_job.complete:
logger.error("No sign of Job completing, Freeing environment.")
else:
# legacy support, for clients that just send the callback name
call_back = job.arguments
job_timeout = timeout
# Once this Job is called we call back to the client to run its
# commands while this environment is locked
run_callback(geard, job_timeout, call_back, te_data)
job.sendWorkComplete("")
logger.info('Returned from Job : %s', cb_job.data)
client.shutdown()
def main(args=sys.argv[1:]):
@ -162,14 +200,29 @@ def main(args=sys.argv[1:]):
te_data = fp.read()
logger.info('Starting test-env worker with data %r', te_data.strip())
# If this job dies we loose the test environment, so catch everything and
# retry after a short sleep.
while True:
try:
run_teworker(opts.geard, opts.tenum, opts.timeout, te_data)
except:
logger.exception('Error while running worker')
time.sleep(5)
# run worker in thread so signal handling is responsive
te_worker = TEWorkerThread(opts.geard, opts.tenum, opts.timeout, te_data)
def usr2_signal_handler(signal, frame):
te_worker.stopJobProcessing()
logger.info('SIGUSR2 recieved: Processing of new jobs is disabled.')
signal.signal(signal.SIGUSR2, usr2_signal_handler)
def usr1_signal_handler(signal, frame):
te_worker.startJobProcessing()
logger.info('SIGUSR1 recieved: Processing of new jobs is enabled.')
signal.signal(signal.SIGUSR1, usr1_signal_handler)
def int_signal_handler(signal, frame):
te_worker.stop()
logger.info('SIGINT recieved: Exiting...')
sys.exit(0)
signal.signal(signal.SIGINT, int_signal_handler)
te_worker.start()
while te_worker.running:
time.sleep(1)
if __name__ == '__main__':
main()