Report exceptions while running the job to the client.

Don't catch any exceptions while running the job; instead, report
them back to the client (via a catch-all exception handler in
StartJobWorker).

If the worker raises an exception, unlock the node monitor, in case
the worker didn't get to the point where it would be unlocked.

This change has the side effect that if the gearman server disconnects
while the job is running, the worker should return from watching the
job run (as soon as it notices, currently up to 5 seconds).  This is
helpful in that it will be available to register with gearman again,
including sending CAN_DO packets.  But the node monitor will still
prevent it from scheduling a new job while the one it started earlier
is still running.

Change-Id: Ie01ef0f9e706d81452b189099e36242ab9967950
This commit is contained in:
James E. Blair 2013-06-14 12:04:37 -07:00
parent 6041401766
commit 4556818799
2 changed files with 58 additions and 79 deletions

View File

@ -563,8 +563,16 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
} else {
executorService.submit(fun);
}
} catch (IOException io) {
LOG.warn("Receieved IOException while" +
" running function",io);
session.closeSession();
// The reconnect will unlock the monitor if needed.
} catch (Exception e) {
LOG.warn("Exception while executing function " + fun.getName(), e);
// Unlock the monitor for this worker in case we didn't
// make it as far as the schedule job unlock.
availability.unlock(this);
}
}

View File

@ -105,22 +105,30 @@ public class StartJobWorker extends AbstractGearmanFunction {
return gson.toJson(data);
}
/*
* The Gearman Function
* @see org.gearman.worker.AbstractGearmanFunction#executeFunction()
*/
@Override
public GearmanJobResult executeFunction() {
try {
return safeExecuteFunction();
} catch (Exception inner) {
RuntimeException outer = new RuntimeException(inner);
throw outer;
}
}
private GearmanJobResult safeExecuteFunction()
throws Exception
{
logger.info("---- Running executeFunction in " + name + " ----");
// decode the uniqueId from the client
String decodedUniqueId = null;
if (this.uniqueId != null) {
try {
decodedUniqueId = new String(this.uniqueId, "UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
decodedUniqueId = new String(this.uniqueId, "UTF-8");
}
// create new parameter objects to pass to jenkins build
@ -128,11 +136,7 @@ public class StartJobWorker extends AbstractGearmanFunction {
String decodedData = null;
if (this.data != null) {
// decode the data from the client
try {
decodedData = new String((byte[]) this.data, "UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
decodedData = new String((byte[]) this.data, "UTF-8");
// convert parameters passed in from client to hash map
Gson gson = new Gson();
Map<String, String> inParams = gson.fromJson(decodedData,
@ -184,89 +188,56 @@ public class StartJobWorker extends AbstractGearmanFunction {
}
}
try {
// wait for start of build
Queue.Executable exec = future.getStartCondition().get();
AbstractBuild<?, ?> currBuild = (AbstractBuild<?, ?>) exec;
// wait for start of build
Queue.Executable exec = future.getStartCondition().get();
AbstractBuild<?, ?> currBuild = (AbstractBuild<?, ?>) exec;
availability.unlock(worker);
availability.unlock(worker);
long now = new Date().getTime();
int duration = (int) (now - currBuild.getStartTimeInMillis());
int estimatedDuration = (int) currBuild.getEstimatedDuration();
jobData = buildStatusData(currBuild);
long now = new Date().getTime();
int duration = (int) (now - currBuild.getStartTimeInMillis());
int estimatedDuration = (int) currBuild.getEstimatedDuration();
jobData = buildStatusData(currBuild);
// If we found a session object in the hacky bit above,
// use it to send a WORK_STATUS packet indicating the
// start of the build.
if (sess != null) {
try {
sendData(jobData.getBytes());
sess.driveSessionIO();
sendData(jobData.getBytes());
sess.driveSessionIO();
sendStatus(estimatedDuration, duration);
sess.driveSessionIO();
while (!future.isDone()) {
// wait for jenkins build to complete
try {
future.get(10, TimeUnit.SECONDS);
} catch (TimeoutException e) {
now = new Date().getTime();
duration = (int) (now - currBuild.getStartTimeInMillis());
estimatedDuration = (int) currBuild.getEstimatedDuration();
if (sess != null) {
sendStatus(estimatedDuration, duration);
sess.driveSessionIO();
} catch (IOException e) {
sess = null;
logger.warn("IO Exception when driving session IO");
e.printStackTrace();
}
}
}
while (!future.isDone()) {
// wait for jenkins build to complete
try {
future.get(10, TimeUnit.SECONDS);
} catch (TimeoutException e) {
now = new Date().getTime();
duration = (int) (now - currBuild.getStartTimeInMillis());
estimatedDuration = (int) currBuild.getEstimatedDuration();
if (sess != null) {
try {
sendStatus(estimatedDuration, duration);
sess.driveSessionIO();
} catch (IOException e2) {
sess = null;
logger.warn("IO Exception when driving session IO");
e.printStackTrace();
}
}
}
}
exec = future.get();
exec = future.get();
jobData = buildStatusData(currBuild);
if (sess != null) {
sendData(jobData.getBytes());
sess.driveSessionIO();
}
jobData = buildStatusData(currBuild);
if (sess != null) {
try {
sendData(jobData.getBytes());
sess.driveSessionIO();
} catch (IOException e) {
sess = null;
logger.warn("IO Exception when driving session IO");
e.printStackTrace();
}
}
// check Jenkins build results
Result result = currBuild.getResult();
if (result == Result.SUCCESS) {
jobResult = true;
} else {
jobResult = false;
}
} catch (InterruptedException e) {
availability.unlock(worker);
jobFailureMsg = e.getMessage();
jobResult = false;
} catch (ExecutionException e) {
availability.unlock(worker);
jobFailureMsg = e.getMessage();
// check Jenkins build results
Result result = currBuild.getResult();
if (result == Result.SUCCESS) {
jobResult = true;
} else {
jobResult = false;
}
// return result to client
GearmanJobResult gjr = new GearmanJobResultImpl(this.jobHandle, jobResult,
GearmanJobResult gjr = new GearmanJobResultImpl(
this.jobHandle, jobResult,
"".getBytes(), "".getBytes(),
jobFailureMsg.getBytes(), 0, 0);
return gjr;