More deadlock fixes.

It's possible that some InterruptedExceptions were being hidden by
the GrabJobEventHandler, which must catch those and not throw them.
So wherever we call driveSessionIO, check to make sure that we are
still supposed to be running afterwords.

Make sure all other places where InterruptedException is caught
do something reasonable with it.

Remove printStackTrace calls and replace with logging calls.

Change-Id: I0790eece8582c1ee2cd28e8866bfd4a9d5d700cd
This commit is contained in:
James E. Blair 2013-06-14 20:57:58 -07:00
parent 352664ee95
commit 2ba8b61b3a
3 changed files with 34 additions and 33 deletions

View File

@ -123,24 +123,18 @@ public abstract class AbstractWorkerThread implements Runnable {
*/
public void stop() {
running = false;
worker.stop();
// Interrupt the thread so it unblocks any blocking call
if (worker.isRunning()) {
try {
worker.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
thread.interrupt();
// Wait until the thread exits
try {
thread.join();
} catch (InterruptedException ex) {
} catch (InterruptedException e) {
// Unexpected interruption
ex.printStackTrace();
logger.error("Exception while waiting for thread to join", e);
}
}
@ -159,13 +153,17 @@ public abstract class AbstractWorkerThread implements Runnable {
worker.setJobUniqueIdRequired(true);
registerJobs();
worker.work();
} catch (Exception ex) {
logger.error("Exception while running worker", ex);
} catch (Exception e) {
logger.error("Exception while running worker", e);
if (!running) continue;
worker.shutdown();
if (!running) continue;
try {
Thread.sleep(2000);
} catch (InterruptedException e2) {
logger.error("Exception while waiting to restart worker", e2);
}
if (!running) continue;
initWorker();
}
}

View File

@ -75,14 +75,14 @@ public class GearmanProxy {
master = Jenkins.getInstance().getComputer("");
hostname = master.getHostName();
} catch (Exception e) {
e.printStackTrace();
logger.warn("Exception while getting hostname", e);
}
// master node may not be enabled so get masterName from system
if (master == null) {
try {
hostname = java.net.InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
e.printStackTrace();
logger.warn("Exception while getting hostname", e);
}
}
@ -117,8 +117,7 @@ public class GearmanProxy {
} catch (NullPointerException npe) {
logger.info("---- Master is offline");
} catch (Exception e) {
logger.info("---- Can't get Master");
e.printStackTrace();
logger.error("Exception while finding master", e);
}
if (masterNode != null) {

View File

@ -169,6 +169,8 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
LOG.warn("Interrupted while reconnecting", e);
return;
}
}
LOG.info("Ending reconnect for " + session.toString());
@ -225,7 +227,7 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
return ret;
}
private void registerFunctions() throws IOException{
private void registerFunctions() throws IOException {
Set<GearmanFunctionFactory> functions = functionRegistry.getFunctions();
if (functions == null) {
@ -236,12 +238,14 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
sendToAll(new GearmanPacketImpl(GearmanPacketMagic.REQ,
GearmanPacketType.RESET_ABILITIES, new byte[0]));
session.driveSessionIO();
if (!isRunning()) return;
for (GearmanFunctionFactory factory: functions) {
FunctionDefinition def = new FunctionDefinition(0, factory);
functionMap.put(factory.getFunctionName(), def);
sendToAll(generateCanDoPacket(def));
session.driveSessionIO();
if (!isRunning()) return;
LOG.debug("Worker " + this + " has registered function " +
factory.getFunctionName());
@ -283,11 +287,13 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
registerFunctions();
} catch (IOException io) {
LOG.warn("Receieved IOException while" +
" registering function",io);
" registering function", io);
session.closeSession();
continue;
}
if (!isRunning()) continue;
if (functionList.isEmpty()) {
if (!grabJobSent) {
// send the initial GRAB_JOB on reconnection.
@ -304,13 +310,15 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
session.driveSessionIO();
} catch (IOException io) {
LOG.warn("Receieved IOException while" +
" sending initial grab job",io);
" sending initial grab job", io);
session.closeSession();
continue;
}
}
}
if (!isRunning()) continue;
if (functionList.isEmpty()) {
int interestOps = SelectionKey.OP_READ;
if (session.sessionHasDataToWrite()) {
@ -322,7 +330,7 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
ioAvailable.select();
} catch (IOException io) {
LOG.warn("Receieved IOException while" +
" selecting for IO",io);
" selecting for IO", io);
session.closeSession();
continue;
}
@ -334,14 +342,16 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
}
try {
session.driveSessionIO();
} catch (IOException ioe) {
} catch (IOException io) {
LOG.warn("Received IOException while driving" +
" IO on session " + session, ioe);
" IO on session " + session, io);
session.closeSession();
continue;
}
}
if (!isRunning()) continue;
//For the time being we will execute the jobs synchronously
//in the future, I expect to change this.
if (!functionList.isEmpty()) {
@ -563,6 +573,10 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
} else {
executorService.submit(fun);
}
// We should have submitted either a WORK_EXCEPTION, COMPLETE,
// or FAIL; make sure it gets sent.
session.driveSessionIO();
} catch (IOException io) {
LOG.warn("Receieved IOException while" +
" running function",io);
@ -574,16 +588,6 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
// make it as far as the schedule job unlock.
availability.unlock(this);
}
// We should have submitted either a WORK_EXCEPTION, COMPLETE,
// or FAIL; make sure it gets sent.
try {
session.driveSessionIO();
} catch (IOException ioe) {
LOG.warn("Received IOException while driving" +
" IO on session " + session, ioe);
session.closeSession();
}
}
private GearmanPacketType getGrabJobPacketType() {