Use more fine-grained synchronization in GearmanProxy
This undoes some of the previous change. In particular, calling something that could affect a NodeAvailabilityMonitor's lock (eg in canTake or the init methods in createExecutorWorkersOnNode) can deadlock on trying to obtain the Queue lock while holding the GearmanProxy lock. Instead, lock on just the handle lists where necessary, and try to do serious work outside of that lock. Change-Id: I4c1ed6c7a4f5586c1034651a649bbbd420eacdd7
This commit is contained in:
parent
c97253eff5
commit
2cb540bf51
|
@ -54,12 +54,21 @@ public abstract class AbstractWorkerThread implements Runnable {
|
|||
setPort(port);
|
||||
setName(name);
|
||||
setAvailability(availability);
|
||||
initWorker();
|
||||
}
|
||||
|
||||
protected void initWorker() {
|
||||
worker = new MyGearmanWorkerImpl(getAvailability());
|
||||
conn = new GearmanNIOJobServerConnection(host, port);
|
||||
synchronized(this) {
|
||||
if (running) {
|
||||
worker = new MyGearmanWorkerImpl(getAvailability());
|
||||
conn = new GearmanNIOJobServerConnection(host, port);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Only for unit tests:
|
||||
protected void testInitWorker() {
|
||||
running = true;
|
||||
initWorker();
|
||||
}
|
||||
|
||||
public String getHost() {
|
||||
|
@ -122,12 +131,16 @@ public abstract class AbstractWorkerThread implements Runnable {
|
|||
* Stop the thread
|
||||
*/
|
||||
public void stop() {
|
||||
running = false;
|
||||
|
||||
logger.info("---- " + getName() + " Request to stop AWT: " + this);
|
||||
logger.info("---- " + getName() + " Thread: " + thread + " name: " + thread.getName());
|
||||
logger.info("---- " + getName() + " Worker: " + worker);
|
||||
worker.stop();
|
||||
synchronized(this) {
|
||||
running = false;
|
||||
if (worker != null) {
|
||||
worker.stop();
|
||||
}
|
||||
}
|
||||
|
||||
logger.info("---- " + getName() + " Interrupting worker");
|
||||
// Interrupt the thread so it unblocks any blocking call
|
||||
|
@ -150,7 +163,7 @@ public abstract class AbstractWorkerThread implements Runnable {
|
|||
*/
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
initWorker();
|
||||
while (running) {
|
||||
try {
|
||||
logger.info("---- Starting Worker "+ getName() +" ("+new Date().toString()+")");
|
||||
|
|
|
@ -145,6 +145,11 @@ public class ExecutorWorkerThread extends AbstractWorkerThread{
|
|||
*/
|
||||
@Override
|
||||
public void registerJobs() {
|
||||
if (worker == null) {
|
||||
// We haven't been initialized yet; the run method will call this again
|
||||
return;
|
||||
}
|
||||
|
||||
HashMap<String,GearmanFunctionFactory> newFunctionMap = new HashMap<String,GearmanFunctionFactory>();
|
||||
|
||||
if (!computer.isOffline()) {
|
||||
|
|
|
@ -98,7 +98,7 @@ public class GearmanProxy {
|
|||
/*
|
||||
* This method initializes the gearman workers.
|
||||
*/
|
||||
public synchronized void initWorkers() {
|
||||
public void initWorkers() {
|
||||
/*
|
||||
* Purpose here is to create a 1:1 mapping of 'gearman worker':'jenkins
|
||||
* executor' then use the gearman worker to execute builds on that
|
||||
|
@ -154,19 +154,22 @@ public class GearmanProxy {
|
|||
* Spawn management executor workers. This worker does not need any
|
||||
* executors. It only needs to connect to gearman.
|
||||
*/
|
||||
public synchronized void createManagementWorker() {
|
||||
public void createManagementWorker() {
|
||||
ManagementWorkerThread gwt;
|
||||
|
||||
if (!gmwtHandles.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
synchronized (gmwtHandles) {
|
||||
if (!gmwtHandles.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
ManagementWorkerThread gwt = new ManagementWorkerThread(
|
||||
gwt = new ManagementWorkerThread(
|
||||
GearmanPluginConfig.get().getHost(),
|
||||
GearmanPluginConfig.get().getPort(),
|
||||
masterName + "_manager",
|
||||
masterName, new NoopAvailabilityMonitor());
|
||||
gwt.start();
|
||||
gmwtHandles.add(gwt);
|
||||
gmwtHandles.add(gwt);
|
||||
gwt.start();
|
||||
}
|
||||
|
||||
logger.info("---- Num of executors running = " + getNumExecutors());
|
||||
}
|
||||
|
@ -174,36 +177,37 @@ public class GearmanProxy {
|
|||
/*
|
||||
* Spawn workers for each executor on a node.
|
||||
*/
|
||||
public synchronized void createExecutorWorkersOnNode(Computer computer) {
|
||||
public void createExecutorWorkersOnNode(Computer computer) {
|
||||
|
||||
ExecutorWorkerThread workerThread = null;
|
||||
// find the computer in the executor workers list
|
||||
for (ExecutorWorkerThread t : gewtHandles) {
|
||||
if (t.getComputer() == computer) {
|
||||
logger.info("---- Executor thread already running for " + computer.getName());
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
AvailabilityMonitor availability = new NodeAvailabilityMonitor(computer);
|
||||
|
||||
int executors = computer.getExecutors().size();
|
||||
for (int i = 0; i < executors; i++) {
|
||||
String nodeName = null;
|
||||
|
||||
nodeName = GearmanPluginUtil.getRealName(computer);
|
||||
if (nodeName == "master") {
|
||||
nodeName = masterName;
|
||||
synchronized(gewtHandles) {
|
||||
for (ExecutorWorkerThread t : gewtHandles) {
|
||||
if (t.getComputer() == computer) {
|
||||
logger.info("---- Executor thread already running for " + computer.getName());
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
ExecutorWorkerThread ewt = new ExecutorWorkerThread(
|
||||
AvailabilityMonitor availability = new NodeAvailabilityMonitor(computer);
|
||||
|
||||
int executors = computer.getExecutors().size();
|
||||
for (int i = 0; i < executors; i++) {
|
||||
String nodeName = null;
|
||||
|
||||
nodeName = GearmanPluginUtil.getRealName(computer);
|
||||
if (nodeName == "master") {
|
||||
nodeName = masterName;
|
||||
}
|
||||
|
||||
ExecutorWorkerThread ewt = new ExecutorWorkerThread(
|
||||
GearmanPluginConfig.get().getHost(),
|
||||
GearmanPluginConfig.get().getPort(),
|
||||
nodeName+"_exec-"+Integer.toString(i),
|
||||
computer, masterName, availability);
|
||||
|
||||
ewt.start();
|
||||
gewtHandles.add(ewt);
|
||||
ewt.start();
|
||||
gewtHandles.add(ewt);
|
||||
}
|
||||
}
|
||||
|
||||
logger.info("---- Num of executors running = " + getNumExecutors());
|
||||
|
@ -213,20 +217,23 @@ public class GearmanProxy {
|
|||
/*
|
||||
* This method stops all gearman workers
|
||||
*/
|
||||
public synchronized void stopAll() {
|
||||
public void stopAll() {
|
||||
// stop gearman executors
|
||||
List<AbstractWorkerThread> stopHandles;
|
||||
|
||||
stopHandles = new ArrayList<AbstractWorkerThread>(gewtHandles);
|
||||
gewtHandles.clear();
|
||||
synchronized(gewtHandles) {
|
||||
stopHandles = new ArrayList<AbstractWorkerThread>(gewtHandles);
|
||||
gewtHandles.clear();
|
||||
}
|
||||
|
||||
for (AbstractWorkerThread wt : stopHandles) { // stop executors
|
||||
wt.stop();
|
||||
}
|
||||
|
||||
stopHandles = new ArrayList<AbstractWorkerThread>();
|
||||
stopHandles = new ArrayList<AbstractWorkerThread>(gmwtHandles);
|
||||
gmwtHandles.clear();
|
||||
synchronized(gmwtHandles) {
|
||||
stopHandles = new ArrayList<AbstractWorkerThread>(gmwtHandles);
|
||||
gmwtHandles.clear();
|
||||
}
|
||||
|
||||
for (AbstractWorkerThread wt : stopHandles) { // stop executors
|
||||
wt.stop();
|
||||
|
@ -244,15 +251,18 @@ public class GearmanProxy {
|
|||
* The Computer to stop
|
||||
*
|
||||
*/
|
||||
public synchronized void stop(Computer computer) {
|
||||
public void stop(Computer computer) {
|
||||
logger.info("---- Stop computer " + computer);
|
||||
List<ExecutorWorkerThread> workers = new ArrayList<ExecutorWorkerThread>();
|
||||
// find the computer in the executor workers list and stop it
|
||||
for (Iterator<ExecutorWorkerThread> it = gewtHandles.iterator(); it.hasNext(); ) {
|
||||
ExecutorWorkerThread t = it.next();
|
||||
if (t.getComputer() == computer) {
|
||||
workers.add(t);
|
||||
it.remove();
|
||||
|
||||
synchronized(gewtHandles) {
|
||||
// find the computer in the executor workers list and stop it
|
||||
for (Iterator<ExecutorWorkerThread> it = gewtHandles.iterator(); it.hasNext(); ) {
|
||||
ExecutorWorkerThread t = it.next();
|
||||
if (t.getComputer() == computer) {
|
||||
workers.add(t);
|
||||
it.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -270,39 +280,49 @@ public class GearmanProxy {
|
|||
return gmwtHandles.size() + gewtHandles.size();
|
||||
}
|
||||
|
||||
public synchronized void onBuildFinalized(Run r) {
|
||||
public void onBuildFinalized(Run r) {
|
||||
Computer computer = r.getExecutor().getOwner();
|
||||
// A build just finished, so let the AvailabilityMonitor
|
||||
// associated with its node wake up any workers who may be
|
||||
// waiting for the lock.
|
||||
|
||||
for (ExecutorWorkerThread t : gewtHandles) {
|
||||
if (t.getComputer() == computer) {
|
||||
t.getAvailability().wake();
|
||||
AvailabilityMonitor availability = null;
|
||||
synchronized(gewtHandles) {
|
||||
for (ExecutorWorkerThread t : gewtHandles) {
|
||||
if (t.getComputer() == computer) {
|
||||
availability = t.getAvailability();
|
||||
}
|
||||
}
|
||||
}
|
||||
if (availability != null) {
|
||||
availability.wake();
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized AvailabilityMonitor getAvailabilityMonitor(Computer computer) {
|
||||
for (ExecutorWorkerThread t : gewtHandles) {
|
||||
if (t.getComputer() == computer) {
|
||||
return t.getAvailability();
|
||||
public AvailabilityMonitor getAvailabilityMonitor(Computer computer) {
|
||||
synchronized (gewtHandles) {
|
||||
for (ExecutorWorkerThread t : gewtHandles) {
|
||||
if (t.getComputer() == computer) {
|
||||
return t.getAvailability();
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public synchronized CauseOfBlockage canTake(Node node,
|
||||
Queue.BuildableItem item) {
|
||||
public CauseOfBlockage canTake(Node node,
|
||||
Queue.BuildableItem item) {
|
||||
// Ask the AvailabilityMonitor for this node if it's okay to
|
||||
// run this build.
|
||||
ExecutorWorkerThread workerThread = null;
|
||||
|
||||
Computer computer = node.toComputer();
|
||||
for (ExecutorWorkerThread t : gewtHandles) {
|
||||
if (t.getComputer() == computer) {
|
||||
workerThread = t;
|
||||
break;
|
||||
synchronized(gewtHandles) {
|
||||
Computer computer = node.toComputer();
|
||||
for (ExecutorWorkerThread t : gewtHandles) {
|
||||
if (t.getComputer() == computer) {
|
||||
workerThread = t;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -316,9 +336,11 @@ public class GearmanProxy {
|
|||
return null;
|
||||
}
|
||||
|
||||
public synchronized void registerJobs() {
|
||||
for (ExecutorWorkerThread worker : gewtHandles) {
|
||||
worker.registerJobs();
|
||||
public void registerJobs() {
|
||||
synchronized(gewtHandles) {
|
||||
for (ExecutorWorkerThread worker : gewtHandles) {
|
||||
worker.registerJobs();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -55,6 +55,11 @@ public class ManagementWorkerThread extends AbstractWorkerThread{
|
|||
*/
|
||||
@Override
|
||||
public void registerJobs(){
|
||||
if (worker == null) {
|
||||
// We haven't been initialized yet; the run method will call this again
|
||||
return;
|
||||
}
|
||||
|
||||
if (!registered) {
|
||||
Set<GearmanFunctionFactory> functionSet = new HashSet<GearmanFunctionFactory>();
|
||||
|
||||
|
|
|
@ -79,6 +79,7 @@ public class ExecutorWorkerThreadTest extends HudsonTestCase {
|
|||
apple.setAssignedLabel(new LabelAtom("oneiric-10"));
|
||||
|
||||
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave.toComputer(), "master", new NoopAvailabilityMonitor());
|
||||
oneiric.testInitWorker();
|
||||
oneiric.registerJobs();
|
||||
Set<String> functions = oneiric.worker.getRegisteredFunctions();
|
||||
|
||||
|
@ -99,6 +100,7 @@ public class ExecutorWorkerThreadTest extends HudsonTestCase {
|
|||
lemon.setAssignedLabel(new LabelAtom("linux"));
|
||||
|
||||
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave.toComputer(), "master", new NoopAvailabilityMonitor());
|
||||
oneiric.testInitWorker();
|
||||
oneiric.registerJobs();
|
||||
Set<String> functions = oneiric.worker.getRegisteredFunctions();
|
||||
|
||||
|
@ -119,6 +121,7 @@ public class ExecutorWorkerThreadTest extends HudsonTestCase {
|
|||
lemon.setAssignedLabel(new LabelAtom("bogus"));
|
||||
|
||||
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave.toComputer(), "master", new NoopAvailabilityMonitor());
|
||||
oneiric.testInitWorker();
|
||||
oneiric.registerJobs();
|
||||
Set<String> functions = oneiric.worker.getRegisteredFunctions();
|
||||
|
||||
|
@ -136,6 +139,7 @@ public class ExecutorWorkerThreadTest extends HudsonTestCase {
|
|||
Project<?, ?> lemon = createFreeStyleProject("lemon");
|
||||
|
||||
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave.toComputer(), "master", new NoopAvailabilityMonitor());
|
||||
oneiric.testInitWorker();
|
||||
oneiric.registerJobs();
|
||||
Set<String> functions = oneiric.worker.getRegisteredFunctions();
|
||||
|
||||
|
@ -156,6 +160,7 @@ public class ExecutorWorkerThreadTest extends HudsonTestCase {
|
|||
lemon.disable();
|
||||
|
||||
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave.toComputer(), "master", new NoopAvailabilityMonitor());
|
||||
oneiric.testInitWorker();
|
||||
oneiric.registerJobs();
|
||||
Set<String> functions = oneiric.worker.getRegisteredFunctions();
|
||||
|
||||
|
@ -177,6 +182,7 @@ public class ExecutorWorkerThreadTest extends HudsonTestCase {
|
|||
lemon.setAssignedLabel(new LabelAtom("linux"));
|
||||
|
||||
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", offlineSlave.toComputer(), "master", new NoopAvailabilityMonitor());
|
||||
oneiric.testInitWorker();
|
||||
oneiric.registerJobs();
|
||||
Set<String> functions = oneiric.worker.getRegisteredFunctions();
|
||||
|
||||
|
@ -195,6 +201,7 @@ public class ExecutorWorkerThreadTest extends HudsonTestCase {
|
|||
lemon.setAssignedLabel(new LabelAtom("linux"));
|
||||
|
||||
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave.toComputer(), "master", new NoopAvailabilityMonitor());
|
||||
oneiric.testInitWorker();
|
||||
oneiric.registerJobs();
|
||||
Set<String> functions = oneiric.worker.getRegisteredFunctions();
|
||||
|
||||
|
@ -216,6 +223,7 @@ public class ExecutorWorkerThreadTest extends HudsonTestCase {
|
|||
lemon.setAssignedLabel(new LabelAtom("!linux"));
|
||||
|
||||
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave.toComputer(), "master", new NoopAvailabilityMonitor());
|
||||
oneiric.testInitWorker();
|
||||
oneiric.registerJobs();
|
||||
Set<String> functions = oneiric.worker.getRegisteredFunctions();
|
||||
|
||||
|
|
|
@ -58,6 +58,7 @@ public class ManagementWorkerThreadTest {
|
|||
public void testRegisterJobs() {
|
||||
AbstractWorkerThread manager = new ManagementWorkerThread("GearmanServer", 4730,
|
||||
"master_manager", "master", new NoopAvailabilityMonitor());
|
||||
manager.testInitWorker();
|
||||
manager.registerJobs();
|
||||
Set<String> functions = manager.worker.getRegisteredFunctions();
|
||||
assertEquals("set_description:master", functions.toArray()[0]);
|
||||
|
|
Loading…
Reference in New Issue