Handle mutex scheduling from Gearman or Jenkins.

Every node (slave or master) gets an AvailabilityMonitor that
handles mutually exclusive access to scheduling builds on that
node.  If Jenkins wants to run a build on the node, it will only
be able to do so if we are not waiting for a response to a
GRAB_JOB packet from Gearman.  Likewise, immediately before
sending a GRAB_JOB, we lock the monitor and only unlock it if
we either get a NO_JOB response, or after the job we were just
assigned starts building.

(As an exception to the above rule, since Jenkins will apply the
same scheduling veto logic to the build that we request via Gearman,
(while we still hold the lock) we tell the monitor to expect a request
for that build from Jenkins and we permit Jenkins to build it even
if the lock is held.)

Change-Id: Iae03932aef4b503c69699b99d38a6fc2691fb02e
This commit is contained in:
James E. Blair 2013-06-12 12:50:06 -07:00
parent 76cb343b8c
commit 6041401766
17 changed files with 415 additions and 145 deletions

View File

@ -44,12 +44,12 @@ public abstract class AbstractWorkerThread implements Runnable {
protected String name;
protected MyGearmanWorkerImpl worker;
protected GearmanNIOJobServerConnection conn;
protected AvailabilityChecker availability;
protected AvailabilityMonitor availability;
private Thread thread;
private boolean running = false;
public AbstractWorkerThread(String host, int port, String name,
AvailabilityChecker availability) {
AvailabilityMonitor availability) {
setHost(host);
setPort(port);
setName(name);
@ -86,11 +86,11 @@ public abstract class AbstractWorkerThread implements Runnable {
this.name = name;
}
public AvailabilityChecker getAvailability() {
public AvailabilityMonitor getAvailability() {
return availability;
}
public void setAvailability(AvailabilityChecker availability) {
public void setAvailability(AvailabilityMonitor availability) {
this.availability = availability;
}

View File

@ -1,51 +0,0 @@
/*
*
* Copyright 2013 OpenStack Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package hudson.plugins.gearman;
import jenkins.model.Jenkins;
public class AvailabilityChecker {
private boolean okayToGrabJob = true;
private final boolean checkQuietingDown;
AvailabilityChecker(boolean checkQuietingDown)
{
this.checkQuietingDown = checkQuietingDown;
}
public synchronized void setOkayToGrabJob(boolean value) {
this.okayToGrabJob = value;
this.notifyAll();
}
public void waitUntilOkayToGrabJob()
throws InterruptedException
{
synchronized(this) {
while (!okayToGrabJob) {
this.wait();
}
}
if (checkQuietingDown) {
while (Jenkins.getInstance().isQuietingDown()) {
Thread.sleep(5000);
}
}
}
}

View File

@ -0,0 +1,39 @@
/*
*
* Copyright 2013 OpenStack Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package hudson.plugins.gearman;
import hudson.model.Queue;
public interface AvailabilityMonitor {
// Reserve exclusive access for this worker.
public void lock(MyGearmanWorkerImpl worker);
// Release exclusive access for this worker.
public void unlock(MyGearmanWorkerImpl worker);
// Notify waiting workers that they should try again to get the
// lock.
public void wake();
// A worker holding the lock has scheduled a build with this UUID.
public void expectUUID(String UUID);
// Called by Jenkins to decide if a build can run on this node.
public boolean canTake(Queue.BuildableItem item);
}

View File

@ -43,37 +43,38 @@ public class CustomGearmanFunctionFactory extends DefaultGearmanFunctionFactory
private final Node node;
private final String theClass;
private final String masterName;
private final MyGearmanWorkerImpl worker;
private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(
Constants.GEARMAN_WORKER_LOGGER_NAME);
public CustomGearmanFunctionFactory(String functionName, String className,
AbstractProject<?,?> project, Node node,
String masterName) {
String masterName,
MyGearmanWorkerImpl worker) {
super(functionName, className);
this.theClass = className;
this.project = project;
this.node = node;
this.masterName = masterName;
this.worker = worker;
}
@Override
public GearmanFunction getFunction() {
return createFunctionInstance(theClass, project, node, masterName);
return createFunctionInstance(theClass, project, node, masterName,
worker);
}
private static GearmanFunction createFunctionInstance(String className, AbstractProject<?,?> project, Node node, String masterName) {
private static GearmanFunction createFunctionInstance(String className, AbstractProject<?,?> project, Node node, String masterName, MyGearmanWorkerImpl worker) {
GearmanFunction f = null;
try {
Class<?> c = Class.forName(className);
Constructor<?> con = c.getConstructor(new Class[]{Project.class, Node.class, String.class});
Object o = con.newInstance(new Object[] {project, node, masterName});
Constructor<?> con = c.getConstructor(new Class[]{Project.class, Node.class, String.class, MyGearmanWorkerImpl.class});
Object o = con.newInstance(new Object[] {project, node, masterName, worker});
if (o instanceof GearmanFunction) {
f = (GearmanFunction) o;

View File

@ -56,13 +56,15 @@ public class ExecutorWorkerThread extends AbstractWorkerThread{
// constructor
public ExecutorWorkerThread(String host, int port, String name,
Node node, String masterName) {
super(host, port, name, new AvailabilityChecker(true));
Node node, String masterName,
AvailabilityMonitor availability) {
super(host, port, name, availability);
this.node = node;
this.masterName = masterName;
}
protected void initWorker() {
availability.unlock(worker);
super.initWorker();
this.functionMap = new HashMap<String,GearmanFunctionFactory>();
}
@ -161,7 +163,7 @@ public class ExecutorWorkerThread extends AbstractWorkerThread{
String jobFunctionName = "build:" + projectName;
newFunctionMap.put(jobFunctionName, new CustomGearmanFunctionFactory(
jobFunctionName, StartJobWorker.class.getName(),
project, this.node, this.masterName));
project, this.node, this.masterName, worker));
} else { // register "build:$projectName:$projectLabel" if this
// node matches a node from the project label
@ -178,12 +180,12 @@ public class ExecutorWorkerThread extends AbstractWorkerThread{
// register with label (i.e. "build:$projectName:$projectLabel")
newFunctionMap.put(jobFunctionName, new CustomGearmanFunctionFactory(
jobFunctionName, StartJobWorker.class.getName(),
project, this.node, this.masterName));
project, this.node, this.masterName, worker));
jobFunctionName = "build:" + projectName;
// also register without label (i.e. "build:$projectName")
newFunctionMap.put(jobFunctionName, new CustomGearmanFunctionFactory(
jobFunctionName, StartJobWorker.class.getName(),
project, this.node, this.masterName));
project, this.node, this.masterName, worker));
}
}
}
@ -199,29 +201,4 @@ public class ExecutorWorkerThread extends AbstractWorkerThread{
public synchronized Node getNode() {
return node;
}
public void onBuildStarted() {
// a build has started on this computer
Computer computer = node.toComputer();
if (computer.countIdle() == 0) {
getAvailability().setOkayToGrabJob(false);
}
// TODO: There is a race condition here -- a worker may have
// just scheduled a build right as Jenkins started one from a
// non-gearman trigger. In that case, we should dequeue that
// build and disconnect the worker (which will cause Gearman
// to re-queue the job).
}
public void onBuildFinalized() {
// a build has completed on this executor
Computer computer = node.toComputer();
getAvailability().setOkayToGrabJob(true);
// TODO: There could still be jobs in the queue that may or
// may not be assigned to this computer. If there are, we
// should avoid grabbing jobs until that condition has passed.
}
}

View File

@ -21,6 +21,8 @@ package hudson.plugins.gearman;
import hudson.model.Computer;
import hudson.model.Node;
import hudson.model.Run;
import hudson.model.Queue;
import hudson.model.queue.CauseOfBlockage;
import java.net.UnknownHostException;
import java.util.ArrayList;
@ -49,6 +51,7 @@ public class GearmanProxy {
// handles to gearman workers
private final List<AbstractWorkerThread> gewtHandles;
private final List<AbstractWorkerThread> gmwtHandles;
private final List<AvailabilityMonitor> availabilityMonitors;
private final String masterName;
// Singleton instance
@ -63,6 +66,7 @@ public class GearmanProxy {
private GearmanProxy() {
gewtHandles = Collections.synchronizedList(new ArrayList<AbstractWorkerThread>());
gmwtHandles = Collections.synchronizedList(new ArrayList<AbstractWorkerThread>());
availabilityMonitors = Collections.synchronizedList(new ArrayList<AvailabilityMonitor>());
Computer master = null;
String hostname = Constants.GEARMAN_DEFAULT_EXECUTOR_NAME;
@ -151,7 +155,7 @@ public class GearmanProxy {
GearmanPluginConfig.get().getHost(),
GearmanPluginConfig.get().getPort(),
masterName + "_manager",
masterName);
masterName, new NoopAvailabilityMonitor());
gwt.start();
gmwtHandles.add(gwt);
@ -165,6 +169,7 @@ public class GearmanProxy {
public void createExecutorWorkersOnNode(Computer computer) {
Node node = computer.getNode();
AvailabilityMonitor availability = getAvailabilityMonitor(node);
int executors = computer.getExecutors().size();
for (int i = 0; i < executors; i++) {
@ -175,12 +180,12 @@ public class GearmanProxy {
nodeName = masterName;
}
AbstractWorkerThread ewt = new ExecutorWorkerThread(GearmanPluginConfig.get().getHost(),
AbstractWorkerThread ewt = new ExecutorWorkerThread(
GearmanPluginConfig.get().getHost(),
GearmanPluginConfig.get().getPort(),
nodeName+"_exec-"+Integer.toString(i),
node, masterName);
node, masterName, availability);
//ewt.registerJobs();
ewt.start();
gewtHandles.add(ewt);
}
@ -201,6 +206,12 @@ public class GearmanProxy {
gewtHandles.clear();
}
synchronized (availabilityMonitors) {
// They will be recreated if/when the
// ExecutorWorkerThreads are recreated.
availabilityMonitors.clear();
}
synchronized(gmwtHandles) {
for (AbstractWorkerThread gmwtHandle : gmwtHandles) { // stop executors
gmwtHandle.stop();
@ -221,6 +232,7 @@ public class GearmanProxy {
*
*/
public void stop(Computer computer) {
Node node = computer.getNode();
// find the computer in the executor workers list and stop it
synchronized(gewtHandles) {
@ -232,6 +244,7 @@ public class GearmanProxy {
}
}
}
removeAvailabilityMonitor(node);
logger.info("---- Num of executors running = " + getNumExecutors());
}
@ -257,31 +270,70 @@ public class GearmanProxy {
return gmwtHandles;
}
public void onBuildStarted(Run r) {
Computer computer = r.getExecutor().getOwner();
// find the computer in the executor workers list and stop it
synchronized(gewtHandles) {
for (Iterator<AbstractWorkerThread> it = gewtHandles.iterator(); it.hasNext(); ) {
AbstractWorkerThread t = it.next();
if (t.name.contains(computer.getName())) {
((ExecutorWorkerThread)t).onBuildStarted();
}
}
}
}
public void onBuildFinalized(Run r) {
Computer computer = r.getExecutor().getOwner();
// find the computer in the executor workers list and stop it
// A build just finished, so let the AvailabilityMonitor
// associated with its node wake up any workers who may be
// waiting for the lock.
synchronized(gewtHandles) {
for (Iterator<AbstractWorkerThread> it = gewtHandles.iterator(); it.hasNext(); ) {
AbstractWorkerThread t = it.next();
if (t.name.contains(computer.getName())) {
((ExecutorWorkerThread)t).onBuildFinalized();
t.getAvailability().wake();
}
}
}
}
public AvailabilityMonitor getAvailabilityMonitor(Node node) {
AvailabilityMonitor availability;
synchronized (availabilityMonitors) {
for (Iterator<AvailabilityMonitor> it =
availabilityMonitors.iterator(); it.hasNext(); ) {
availability = it.next();
if (((NodeAvailabilityMonitor)availability).getNode() == node) {
return availability;
}
}
availability = new NodeAvailabilityMonitor(node);
availabilityMonitors.add(availability);
return availability;
}
}
public void removeAvailabilityMonitor(Node node) {
AvailabilityMonitor availability;
synchronized (availabilityMonitors) {
for (Iterator<AvailabilityMonitor> it =
availabilityMonitors.iterator(); it.hasNext(); ) {
availability = it.next();
if (((NodeAvailabilityMonitor)availability).getNode() == node) {
it.remove();
}
}
}
}
public CauseOfBlockage canTake(Node node,
Queue.BuildableItem item) {
// Ask the AvailabilityMonitor for this node if it's okay to
// run this build.
synchronized(gewtHandles) {
for (Iterator<AbstractWorkerThread> it = gewtHandles.iterator(); it.hasNext(); ) {
ExecutorWorkerThread t = ((ExecutorWorkerThread)it.next());
if (t.getNode() == node) {
if (t.getAvailability().canTake(item)) {
return null;
} else {
return new CauseOfBlockage.BecauseNodeIsBusy(node);
}
}
}
}
return null;
}
}

View File

@ -41,8 +41,8 @@ public class ManagementWorkerThread extends AbstractWorkerThread{
private boolean registered = false;
private final String masterName;
public ManagementWorkerThread(String host, int port, String name, String masterName){
super(host, port, name, new AvailabilityChecker(false));
public ManagementWorkerThread(String host, int port, String name, String masterName, AvailabilityMonitor availability){
super(host, port, name, availability);
this.masterName = masterName;
}

View File

@ -83,7 +83,7 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
private final GearmanJobServerIpConnectionFactory connFactory = new GearmanNIOJobServerConnectionFactory();
private volatile boolean jobUniqueIdRequired = false;
private FunctionRegistry functionRegistry;
private AvailabilityChecker availability;
private AvailabilityMonitor availability;
class GrabJobEventHandler implements GearmanServerResponseHandler {
@ -154,6 +154,8 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
public void reconnect() {
LOG.info("Starting reconnect for " + session.toString());
// In case we held the availability lock earlier, release it.
availability.unlock(this);
try {
session.initSession(ioAvailable, this);
if (id != null) {
@ -172,12 +174,12 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
LOG.info("Ending reconnect for " + session.toString());
}
public MyGearmanWorkerImpl(AvailabilityChecker availability) {
public MyGearmanWorkerImpl(AvailabilityMonitor availability) {
this (null, availability);
}
public MyGearmanWorkerImpl(ExecutorService executorService,
AvailabilityChecker availability) {
AvailabilityMonitor availability) {
this.availability = availability;
functionList = new LinkedList<GearmanFunction>();
id = DESCRIPION_PREFIX + ":" + Thread.currentThread().getId();
@ -354,7 +356,10 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
}
private void sendGrabJob(GearmanJobServerSession s) throws InterruptedException {
availability.waitUntilOkayToGrabJob();
// If we can get the lock, this will prevent other workers and
// Jenkins itself from scheduling builds on this node. If we
// can not get the lock, this will wait for it.
availability.lock(this);
GearmanTask grabJobTask = new GearmanTask(
new GrabJobEventHandler(s),
@ -393,6 +398,9 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
}
break;
case NO_JOB:
// We didn't get a job, so allow other workers or
// Jenkins to schedule on this node.
availability.unlock(this);
LOG.debug("Worker " + this + " sending pre sleep after no_job");
GearmanTask preSleepTask = new GearmanTask(new GrabJobEventHandler(s),
new GearmanPacketImpl(GearmanPacketMagic.REQ,

View File

@ -0,0 +1,148 @@
/*
*
* Copyright 2013 OpenStack Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package hudson.plugins.gearman;
import jenkins.model.Jenkins;
import hudson.model.Queue;
import hudson.model.Node;
import hudson.model.Computer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NodeAvailabilityMonitor implements AvailabilityMonitor {
private final Queue queue;
private final Jenkins jenkins;
private final Node node;
private MyGearmanWorkerImpl workerHoldingLock = null;
private String expectedUUID = null;
private static final Logger logger = LoggerFactory
.getLogger(Constants.PLUGIN_LOGGER_NAME);
NodeAvailabilityMonitor(Node node)
{
this.node = node;
queue = Queue.getInstance();
jenkins = Jenkins.getInstance();
}
public Node getNode() {
return node;
}
public void lock(MyGearmanWorkerImpl worker) {
Computer computer = node.toComputer();
logger.debug("AvailabilityMonitor lock request: " + worker);
while (true) {
boolean busy = false;
// Synchronize on the Jenkins queue so that Jenkins is
// unable to schedule builds while we try to acquire the
// lock.
synchronized(queue) {
if (workerHoldingLock == null) {
if (computer.countIdle() == 0) {
// If there are no idle executors, we can not
// schedule a build.
busy = true;
} else if (jenkins.isQuietingDown()) {
busy = true;
} else {
logger.debug("AvailabilityMonitor got lock: " + worker);
workerHoldingLock = worker;
return;
}
} else {
busy = true;
}
}
if (busy) {
synchronized(this) {
try {
// We get synchronous notification when a
// build finishes, but there are lots of other
// reasons circumstances could change (adding
// an executor, canceling shutdown, etc), so
// we slowly busy wait to cover all those
// reasons.
this.wait(5000);
} catch (InterruptedException e) {
}
}
}
}
}
public void unlock(MyGearmanWorkerImpl worker) {
logger.debug("AvailabilityMonitor unlock request: " + worker);
synchronized(queue) {
if (workerHoldingLock == worker) {
workerHoldingLock = null;
expectedUUID = null;
logger.debug("AvailabilityMonitor unlocked: " + worker);
} else {
logger.debug("Worker does not own AvailabilityMonitor lock: " +
worker);
}
}
wake();
}
public void wake() {
// Called when we know circumstances may have changed in a way
// that may allow someone to get the lock.
logger.debug("AvailabilityMonitor wake request");
synchronized(this) {
logger.debug("AvailabilityMonitor woken");
notifyAll();
}
}
public void expectUUID(String UUID) {
// The Gearman worker which holds the lock is about to
// schedule this build, so when Jenkins asks to run it, say
// "yes".
if (expectedUUID != null) {
logger.error("AvailabilityMonitor told to expect UUID " +
UUID + "while already expecting " + expectedUUID);
}
expectedUUID = UUID;
}
public boolean canTake(Queue.BuildableItem item)
{
// Jenkins calls this from within the scheduler maintenance
// function (while owning the queue monitor). If we are
// locked, only allow the build we are expecting to run.
logger.debug("AvailabilityMonitor canTake request for " +
workerHoldingLock);
NodeParametersAction param = item.getAction(NodeParametersAction.class);
if (param != null) {
logger.debug("AvailabilityMonitor canTake request for UUID " +
param.getUuid() + " expecting " + expectedUUID);
if (expectedUUID == param.getUuid()) {
return true;
}
}
return (workerHoldingLock == null);
}
}

View File

@ -0,0 +1,40 @@
/*
*
* Copyright 2013 OpenStack Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package hudson.plugins.gearman;
import hudson.model.Queue;
public class NoopAvailabilityMonitor implements AvailabilityMonitor {
public void lock(MyGearmanWorkerImpl worker) {
}
public void unlock(MyGearmanWorkerImpl worker) {
}
public void wake() {
}
public void expectUUID(String UUID) {
}
public boolean canTake(Queue.BuildableItem item)
{
return (true);
}
}

View File

@ -0,0 +1,50 @@
/*
*
* Copyright 2013 Hewlett-Packard Development Company, L.P.
* Copyright 2013 OpenStack Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package hudson.plugins.gearman;
import hudson.Extension;
import hudson.model.Node;
import hudson.model.Queue;
import hudson.model.queue.QueueTaskDispatcher;
import hudson.model.queue.CauseOfBlockage;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Extension
public class QueueTaskDispatcherImpl extends QueueTaskDispatcher {
private static final Logger logger = LoggerFactory
.getLogger(Constants.PLUGIN_LOGGER_NAME);
@Override
public CauseOfBlockage canTake(Node node,
Queue.BuildableItem item) {
// update only when gearman-plugin is enabled
if (!GearmanPluginConfig.get().enablePlugin()) {
return null;
}
return GearmanProxy.getInstance().canTake(node, item);
}
}

View File

@ -38,16 +38,6 @@ public class RunListenerImpl extends RunListener<Run> {
.getLogger(Constants.PLUGIN_LOGGER_NAME);
@Override
public void onStarted(Run r, TaskListener listener) {
// update only when gearman-plugin is enabled
if (!GearmanPluginConfig.get().enablePlugin()) {
return;
}
GearmanProxy.getInstance().onBuildStarted(r);
}
@Override
public void onFinalized(Run r) {
// update only when gearman-plugin is enabled

View File

@ -72,11 +72,14 @@ public class StartJobWorker extends AbstractGearmanFunction {
Node node;
Project<?, ?> project;
String masterName;
MyGearmanWorkerImpl worker;
public StartJobWorker(Project<?, ?> project, Node node, String masterName) {
public StartJobWorker(Project<?, ?> project, Node node, String masterName,
MyGearmanWorkerImpl worker) {
this.project = project;
this.node = node;
this.masterName = masterName;
this.worker = worker;
}
private String buildStatusData(AbstractBuild<?, ?> build) {
@ -152,6 +155,11 @@ public class StartJobWorker extends AbstractGearmanFunction {
Action params = new NodeParametersAction(buildParams, decodedUniqueId);
Action [] actions = {runNode, params};
AvailabilityMonitor availability =
GearmanProxy.getInstance().getAvailabilityMonitor(node);
availability.expectUUID(decodedUniqueId);
// schedule jenkins to build project
logger.info("---- Scheduling "+project.getName()+" build #" +
project.getNextBuildNumber()+" on " + runNodeName
@ -181,6 +189,8 @@ public class StartJobWorker extends AbstractGearmanFunction {
Queue.Executable exec = future.getStartCondition().get();
AbstractBuild<?, ?> currBuild = (AbstractBuild<?, ?>) exec;
availability.unlock(worker);
long now = new Date().getTime();
int duration = (int) (now - currBuild.getStartTimeInMillis());
int estimatedDuration = (int) currBuild.getEstimatedDuration();
@ -246,9 +256,11 @@ public class StartJobWorker extends AbstractGearmanFunction {
}
} catch (InterruptedException e) {
availability.unlock(worker);
jobFailureMsg = e.getMessage();
jobResult = false;
} catch (ExecutionException e) {
availability.unlock(worker);
jobFailureMsg = e.getMessage();
jobResult = false;
}

View File

@ -78,7 +78,7 @@ public class ExecutorWorkerThreadTest extends HudsonTestCase {
Project<?, ?> apple = createFreeStyleProject("apple");
apple.setAssignedLabel(new LabelAtom("oneiric-10"));
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave, "master");
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave, "master", new NoopAvailabilityMonitor());
oneiric.registerJobs();
Set<String> functions = oneiric.worker.getRegisteredFunctions();
@ -98,7 +98,7 @@ public class ExecutorWorkerThreadTest extends HudsonTestCase {
Project<?, ?> lemon = createFreeStyleProject("lemon");
lemon.setAssignedLabel(new LabelAtom("linux"));
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave, "master");
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave, "master", new NoopAvailabilityMonitor());
oneiric.registerJobs();
Set<String> functions = oneiric.worker.getRegisteredFunctions();
@ -118,7 +118,7 @@ public class ExecutorWorkerThreadTest extends HudsonTestCase {
Project<?, ?> lemon = createFreeStyleProject("lemon");
lemon.setAssignedLabel(new LabelAtom("bogus"));
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave, "master");
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave, "master", new NoopAvailabilityMonitor());
oneiric.registerJobs();
Set<String> functions = oneiric.worker.getRegisteredFunctions();
@ -135,7 +135,7 @@ public class ExecutorWorkerThreadTest extends HudsonTestCase {
Project<?, ?> lemon = createFreeStyleProject("lemon");
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave, "master");
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave, "master", new NoopAvailabilityMonitor());
oneiric.registerJobs();
Set<String> functions = oneiric.worker.getRegisteredFunctions();
@ -155,7 +155,7 @@ public class ExecutorWorkerThreadTest extends HudsonTestCase {
lemon.setAssignedLabel(new LabelAtom("linux"));
lemon.disable();
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave, "master");
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave, "master", new NoopAvailabilityMonitor());
oneiric.registerJobs();
Set<String> functions = oneiric.worker.getRegisteredFunctions();
@ -176,7 +176,7 @@ public class ExecutorWorkerThreadTest extends HudsonTestCase {
Project<?, ?> lemon = createFreeStyleProject("lemon");
lemon.setAssignedLabel(new LabelAtom("linux"));
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", offlineSlave, "master");
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", offlineSlave, "master", new NoopAvailabilityMonitor());
oneiric.registerJobs();
Set<String> functions = oneiric.worker.getRegisteredFunctions();
@ -194,7 +194,7 @@ public class ExecutorWorkerThreadTest extends HudsonTestCase {
MavenModuleSet lemon = createMavenProject("lemon");
lemon.setAssignedLabel(new LabelAtom("linux"));
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave, "master");
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave, "master", new NoopAvailabilityMonitor());
oneiric.registerJobs();
Set<String> functions = oneiric.worker.getRegisteredFunctions();
@ -215,7 +215,7 @@ public class ExecutorWorkerThreadTest extends HudsonTestCase {
Project<?, ?> lemon = createFreeStyleProject("lemon");
lemon.setAssignedLabel(new LabelAtom("!linux"));
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave, "master");
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave, "master", new NoopAvailabilityMonitor());
oneiric.registerJobs();
Set<String> functions = oneiric.worker.getRegisteredFunctions();

View File

@ -34,7 +34,7 @@ public class FakeWorkerThread extends AbstractWorkerThread{
.getLogger(Constants.PLUGIN_LOGGER_NAME);
public FakeWorkerThread(String host, int port, String name,
AvailabilityChecker availability) {
AvailabilityMonitor availability) {
super(host, port, name, availability);
}

View File

@ -56,14 +56,18 @@ public class GearmanProxyTest extends HudsonTestCase {
assertEquals(0, gp.getNumExecutors());
gp.getGewtHandles().add(new ExecutorWorkerThread("localhost", 4730, "test_exec-0", slave, "master"));
gp.getGewtHandles().add(new ExecutorWorkerThread("localhost", 4730, "test_exec-1", slave, "master"));
gp.getGewtHandles().add(new ExecutorWorkerThread("localhost", 4730, "test_exec-2", slave, "master"));
gp.getGewtHandles().add(new ExecutorWorkerThread("localhost", 4730, "test_exec-0", slave, "master",
new NoopAvailabilityMonitor()));
gp.getGewtHandles().add(new ExecutorWorkerThread("localhost", 4730, "test_exec-1", slave, "master",
new NoopAvailabilityMonitor()));
gp.getGewtHandles().add(new ExecutorWorkerThread("localhost", 4730, "test_exec-2", slave, "master",
new NoopAvailabilityMonitor()));
assertEquals(3, gp.getNumExecutors());
gp.getGewtHandles().add(new ManagementWorkerThread("localhost", 4730,
"master_manage", "master"));
"master_manage", "master",
new NoopAvailabilityMonitor()));
assertEquals(4, gp.getNumExecutors());
}

View File

@ -57,7 +57,7 @@ public class ManagementWorkerThreadTest {
@Test
public void testRegisterJobs() {
AbstractWorkerThread manager = new ManagementWorkerThread("GearmanServer", 4730,
"master_manager", "master");
"master_manager", "master", new NoopAvailabilityMonitor());
manager.registerJobs();
Set<String> functions = manager.worker.getRegisteredFunctions();
assertEquals("set_description:master", functions.toArray()[0]);
@ -67,7 +67,7 @@ public class ManagementWorkerThreadTest {
@Test
public void testManagerId() {
AbstractWorkerThread manager = new ManagementWorkerThread("GearmanServer", 4730,
"master_manager", "master");
"master_manager", "master", new NoopAvailabilityMonitor());
assertEquals("master_manager", manager.getName());
}