Use more fine-grained synchronization in GearmanProxy

This undoes some of the previous change.  In particular, calling
something that could affect a NodeAvailabilityMonitor's lock
(eg in canTake or the init methods in createExecutorWorkersOnNode)
can deadlock on trying to obtain the Queue lock while holding the
GearmanProxy lock.

Instead, lock on just the handle lists where necessary, and try
to do serious work outside of that lock.

Change-Id: I4c1ed6c7a4f5586c1034651a649bbbd420eacdd7
This commit is contained in:
James E. Blair 2013-08-14 13:05:34 -07:00
parent c97253eff5
commit 2cb540bf51
6 changed files with 120 additions and 66 deletions

View File

@ -54,12 +54,21 @@ public abstract class AbstractWorkerThread implements Runnable {
setPort(port);
setName(name);
setAvailability(availability);
initWorker();
}
protected void initWorker() {
worker = new MyGearmanWorkerImpl(getAvailability());
conn = new GearmanNIOJobServerConnection(host, port);
synchronized(this) {
if (running) {
worker = new MyGearmanWorkerImpl(getAvailability());
conn = new GearmanNIOJobServerConnection(host, port);
}
}
}
// Only for unit tests:
protected void testInitWorker() {
running = true;
initWorker();
}
public String getHost() {
@ -122,12 +131,16 @@ public abstract class AbstractWorkerThread implements Runnable {
* Stop the thread
*/
public void stop() {
running = false;
logger.info("---- " + getName() + " Request to stop AWT: " + this);
logger.info("---- " + getName() + " Thread: " + thread + " name: " + thread.getName());
logger.info("---- " + getName() + " Worker: " + worker);
worker.stop();
synchronized(this) {
running = false;
if (worker != null) {
worker.stop();
}
}
logger.info("---- " + getName() + " Interrupting worker");
// Interrupt the thread so it unblocks any blocking call
@ -150,7 +163,7 @@ public abstract class AbstractWorkerThread implements Runnable {
*/
@Override
public void run() {
initWorker();
while (running) {
try {
logger.info("---- Starting Worker "+ getName() +" ("+new Date().toString()+")");

View File

@ -145,6 +145,11 @@ public class ExecutorWorkerThread extends AbstractWorkerThread{
*/
@Override
public void registerJobs() {
if (worker == null) {
// We haven't been initialized yet; the run method will call this again
return;
}
HashMap<String,GearmanFunctionFactory> newFunctionMap = new HashMap<String,GearmanFunctionFactory>();
if (!computer.isOffline()) {

View File

@ -98,7 +98,7 @@ public class GearmanProxy {
/*
* This method initializes the gearman workers.
*/
public synchronized void initWorkers() {
public void initWorkers() {
/*
* Purpose here is to create a 1:1 mapping of 'gearman worker':'jenkins
* executor' then use the gearman worker to execute builds on that
@ -154,19 +154,22 @@ public class GearmanProxy {
* Spawn management executor workers. This worker does not need any
* executors. It only needs to connect to gearman.
*/
public synchronized void createManagementWorker() {
public void createManagementWorker() {
ManagementWorkerThread gwt;
if (!gmwtHandles.isEmpty()) {
return;
}
synchronized (gmwtHandles) {
if (!gmwtHandles.isEmpty()) {
return;
}
ManagementWorkerThread gwt = new ManagementWorkerThread(
gwt = new ManagementWorkerThread(
GearmanPluginConfig.get().getHost(),
GearmanPluginConfig.get().getPort(),
masterName + "_manager",
masterName, new NoopAvailabilityMonitor());
gwt.start();
gmwtHandles.add(gwt);
gmwtHandles.add(gwt);
gwt.start();
}
logger.info("---- Num of executors running = " + getNumExecutors());
}
@ -174,36 +177,37 @@ public class GearmanProxy {
/*
* Spawn workers for each executor on a node.
*/
public synchronized void createExecutorWorkersOnNode(Computer computer) {
public void createExecutorWorkersOnNode(Computer computer) {
ExecutorWorkerThread workerThread = null;
// find the computer in the executor workers list
for (ExecutorWorkerThread t : gewtHandles) {
if (t.getComputer() == computer) {
logger.info("---- Executor thread already running for " + computer.getName());
return;
}
}
AvailabilityMonitor availability = new NodeAvailabilityMonitor(computer);
int executors = computer.getExecutors().size();
for (int i = 0; i < executors; i++) {
String nodeName = null;
nodeName = GearmanPluginUtil.getRealName(computer);
if (nodeName == "master") {
nodeName = masterName;
synchronized(gewtHandles) {
for (ExecutorWorkerThread t : gewtHandles) {
if (t.getComputer() == computer) {
logger.info("---- Executor thread already running for " + computer.getName());
return;
}
}
ExecutorWorkerThread ewt = new ExecutorWorkerThread(
AvailabilityMonitor availability = new NodeAvailabilityMonitor(computer);
int executors = computer.getExecutors().size();
for (int i = 0; i < executors; i++) {
String nodeName = null;
nodeName = GearmanPluginUtil.getRealName(computer);
if (nodeName == "master") {
nodeName = masterName;
}
ExecutorWorkerThread ewt = new ExecutorWorkerThread(
GearmanPluginConfig.get().getHost(),
GearmanPluginConfig.get().getPort(),
nodeName+"_exec-"+Integer.toString(i),
computer, masterName, availability);
ewt.start();
gewtHandles.add(ewt);
ewt.start();
gewtHandles.add(ewt);
}
}
logger.info("---- Num of executors running = " + getNumExecutors());
@ -213,20 +217,23 @@ public class GearmanProxy {
/*
* This method stops all gearman workers
*/
public synchronized void stopAll() {
public void stopAll() {
// stop gearman executors
List<AbstractWorkerThread> stopHandles;
stopHandles = new ArrayList<AbstractWorkerThread>(gewtHandles);
gewtHandles.clear();
synchronized(gewtHandles) {
stopHandles = new ArrayList<AbstractWorkerThread>(gewtHandles);
gewtHandles.clear();
}
for (AbstractWorkerThread wt : stopHandles) { // stop executors
wt.stop();
}
stopHandles = new ArrayList<AbstractWorkerThread>();
stopHandles = new ArrayList<AbstractWorkerThread>(gmwtHandles);
gmwtHandles.clear();
synchronized(gmwtHandles) {
stopHandles = new ArrayList<AbstractWorkerThread>(gmwtHandles);
gmwtHandles.clear();
}
for (AbstractWorkerThread wt : stopHandles) { // stop executors
wt.stop();
@ -244,15 +251,18 @@ public class GearmanProxy {
* The Computer to stop
*
*/
public synchronized void stop(Computer computer) {
public void stop(Computer computer) {
logger.info("---- Stop computer " + computer);
List<ExecutorWorkerThread> workers = new ArrayList<ExecutorWorkerThread>();
// find the computer in the executor workers list and stop it
for (Iterator<ExecutorWorkerThread> it = gewtHandles.iterator(); it.hasNext(); ) {
ExecutorWorkerThread t = it.next();
if (t.getComputer() == computer) {
workers.add(t);
it.remove();
synchronized(gewtHandles) {
// find the computer in the executor workers list and stop it
for (Iterator<ExecutorWorkerThread> it = gewtHandles.iterator(); it.hasNext(); ) {
ExecutorWorkerThread t = it.next();
if (t.getComputer() == computer) {
workers.add(t);
it.remove();
}
}
}
@ -270,39 +280,49 @@ public class GearmanProxy {
return gmwtHandles.size() + gewtHandles.size();
}
public synchronized void onBuildFinalized(Run r) {
public void onBuildFinalized(Run r) {
Computer computer = r.getExecutor().getOwner();
// A build just finished, so let the AvailabilityMonitor
// associated with its node wake up any workers who may be
// waiting for the lock.
for (ExecutorWorkerThread t : gewtHandles) {
if (t.getComputer() == computer) {
t.getAvailability().wake();
AvailabilityMonitor availability = null;
synchronized(gewtHandles) {
for (ExecutorWorkerThread t : gewtHandles) {
if (t.getComputer() == computer) {
availability = t.getAvailability();
}
}
}
if (availability != null) {
availability.wake();
}
}
public synchronized AvailabilityMonitor getAvailabilityMonitor(Computer computer) {
for (ExecutorWorkerThread t : gewtHandles) {
if (t.getComputer() == computer) {
return t.getAvailability();
public AvailabilityMonitor getAvailabilityMonitor(Computer computer) {
synchronized (gewtHandles) {
for (ExecutorWorkerThread t : gewtHandles) {
if (t.getComputer() == computer) {
return t.getAvailability();
}
}
}
return null;
}
public synchronized CauseOfBlockage canTake(Node node,
Queue.BuildableItem item) {
public CauseOfBlockage canTake(Node node,
Queue.BuildableItem item) {
// Ask the AvailabilityMonitor for this node if it's okay to
// run this build.
ExecutorWorkerThread workerThread = null;
Computer computer = node.toComputer();
for (ExecutorWorkerThread t : gewtHandles) {
if (t.getComputer() == computer) {
workerThread = t;
break;
synchronized(gewtHandles) {
Computer computer = node.toComputer();
for (ExecutorWorkerThread t : gewtHandles) {
if (t.getComputer() == computer) {
workerThread = t;
break;
}
}
}
@ -316,9 +336,11 @@ public class GearmanProxy {
return null;
}
public synchronized void registerJobs() {
for (ExecutorWorkerThread worker : gewtHandles) {
worker.registerJobs();
public void registerJobs() {
synchronized(gewtHandles) {
for (ExecutorWorkerThread worker : gewtHandles) {
worker.registerJobs();
}
}
}
}

View File

@ -55,6 +55,11 @@ public class ManagementWorkerThread extends AbstractWorkerThread{
*/
@Override
public void registerJobs(){
if (worker == null) {
// We haven't been initialized yet; the run method will call this again
return;
}
if (!registered) {
Set<GearmanFunctionFactory> functionSet = new HashSet<GearmanFunctionFactory>();

View File

@ -79,6 +79,7 @@ public class ExecutorWorkerThreadTest extends HudsonTestCase {
apple.setAssignedLabel(new LabelAtom("oneiric-10"));
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave.toComputer(), "master", new NoopAvailabilityMonitor());
oneiric.testInitWorker();
oneiric.registerJobs();
Set<String> functions = oneiric.worker.getRegisteredFunctions();
@ -99,6 +100,7 @@ public class ExecutorWorkerThreadTest extends HudsonTestCase {
lemon.setAssignedLabel(new LabelAtom("linux"));
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave.toComputer(), "master", new NoopAvailabilityMonitor());
oneiric.testInitWorker();
oneiric.registerJobs();
Set<String> functions = oneiric.worker.getRegisteredFunctions();
@ -119,6 +121,7 @@ public class ExecutorWorkerThreadTest extends HudsonTestCase {
lemon.setAssignedLabel(new LabelAtom("bogus"));
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave.toComputer(), "master", new NoopAvailabilityMonitor());
oneiric.testInitWorker();
oneiric.registerJobs();
Set<String> functions = oneiric.worker.getRegisteredFunctions();
@ -136,6 +139,7 @@ public class ExecutorWorkerThreadTest extends HudsonTestCase {
Project<?, ?> lemon = createFreeStyleProject("lemon");
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave.toComputer(), "master", new NoopAvailabilityMonitor());
oneiric.testInitWorker();
oneiric.registerJobs();
Set<String> functions = oneiric.worker.getRegisteredFunctions();
@ -156,6 +160,7 @@ public class ExecutorWorkerThreadTest extends HudsonTestCase {
lemon.disable();
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave.toComputer(), "master", new NoopAvailabilityMonitor());
oneiric.testInitWorker();
oneiric.registerJobs();
Set<String> functions = oneiric.worker.getRegisteredFunctions();
@ -177,6 +182,7 @@ public class ExecutorWorkerThreadTest extends HudsonTestCase {
lemon.setAssignedLabel(new LabelAtom("linux"));
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", offlineSlave.toComputer(), "master", new NoopAvailabilityMonitor());
oneiric.testInitWorker();
oneiric.registerJobs();
Set<String> functions = oneiric.worker.getRegisteredFunctions();
@ -195,6 +201,7 @@ public class ExecutorWorkerThreadTest extends HudsonTestCase {
lemon.setAssignedLabel(new LabelAtom("linux"));
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave.toComputer(), "master", new NoopAvailabilityMonitor());
oneiric.testInitWorker();
oneiric.registerJobs();
Set<String> functions = oneiric.worker.getRegisteredFunctions();
@ -216,6 +223,7 @@ public class ExecutorWorkerThreadTest extends HudsonTestCase {
lemon.setAssignedLabel(new LabelAtom("!linux"));
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave.toComputer(), "master", new NoopAvailabilityMonitor());
oneiric.testInitWorker();
oneiric.registerJobs();
Set<String> functions = oneiric.worker.getRegisteredFunctions();

View File

@ -58,6 +58,7 @@ public class ManagementWorkerThreadTest {
public void testRegisterJobs() {
AbstractWorkerThread manager = new ManagementWorkerThread("GearmanServer", 4730,
"master_manager", "master", new NoopAvailabilityMonitor());
manager.testInitWorker();
manager.registerJobs();
Set<String> functions = manager.worker.getRegisteredFunctions();
assertEquals("set_description:master", functions.toArray()[0]);