Don't grab a job when executors are busy.

Jobs can be triggered by non-gearman sources, sadly.  This makes the gearman
plugin aware of when executors are busy and it will refrain from grabbing a
job from gearman in those circumstances.

It's far from perfect, but should at least handle the most likely cases where
a job is already running an an executor.

Change-Id: If993c6d6bc63ed89b385d2e5bb41762ef84a429f
This commit is contained in:
James E. Blair 2013-06-11 13:32:36 -07:00
parent 18a3fa68cc
commit 858fb155fe
4 changed files with 168 additions and 16 deletions

View File

@ -21,6 +21,7 @@ package hudson.plugins.gearman;
import hudson.model.AbstractProject;
import hudson.model.Label;
import hudson.model.Node;
import hudson.model.Computer;
import java.util.HashMap;
import java.util.HashSet;
@ -198,4 +199,29 @@ public class ExecutorWorkerThread extends AbstractWorkerThread{
public synchronized Node getNode() {
return node;
}
public void onBuildStarted() {
// a build has started on this computer
Computer computer = node.toComputer();
if (computer.countIdle() == 0) {
worker.setOkayToGrabJob(false);
}
// TODO: There is a race condition here -- a worker may have
// just scheduled a build right as Jenkins started one from a
// non-gearman trigger. In that case, we should dequeue that
// build and disconnect the worker (which will cause Gearman
// to re-queue the job).
}
public void onBuildFinalized() {
// a build has completed on this executor
Computer computer = node.toComputer();
worker.setOkayToGrabJob(true);
// TODO: There could still be jobs in the queue that may or
// may not be assigned to this computer. If there are, we
// should avoid grabbing jobs until that condition has passed.
}
}

View File

@ -20,6 +20,7 @@ package hudson.plugins.gearman;
import hudson.model.Computer;
import hudson.model.Node;
import hudson.model.Run;
import java.net.UnknownHostException;
import java.util.ArrayList;
@ -256,4 +257,31 @@ public class GearmanProxy {
return gmwtHandles;
}
public void onBuildStarted(Run r) {
Computer computer = r.getExecutor().getOwner();
// find the computer in the executor workers list and stop it
synchronized(gewtHandles) {
for (Iterator<AbstractWorkerThread> it = gewtHandles.iterator(); it.hasNext(); ) {
AbstractWorkerThread t = it.next();
if (t.name.contains(computer.getName())) {
((ExecutorWorkerThread)t).onBuildStarted();
}
}
}
}
public void onBuildFinalized(Run r) {
Computer computer = r.getExecutor().getOwner();
// find the computer in the executor workers list and stop it
synchronized(gewtHandles) {
for (Iterator<AbstractWorkerThread> it = gewtHandles.iterator(); it.hasNext(); ) {
AbstractWorkerThread t = it.next();
if (t.name.contains(computer.getName())) {
((ExecutorWorkerThread)t).onBuildFinalized();
}
}
}
}
}

View File

@ -83,6 +83,31 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
private final GearmanJobServerIpConnectionFactory connFactory = new GearmanNIOJobServerConnectionFactory();
private volatile boolean jobUniqueIdRequired = false;
private FunctionRegistry functionRegistry;
private WaitBool okayToGrabJob = new WaitBool(true);
class WaitBool {
private boolean value;
WaitBool(boolean value) {
this.value = value;
}
public synchronized void set(boolean value) {
this.value = value;
this.notifyAll();
}
public synchronized void waitUntil(boolean value)
throws InterruptedException
{
if (this.value == value)
return;
while (this.value != value) {
this.wait();
}
}
}
class GrabJobEventHandler implements GearmanServerResponseHandler {
@ -288,14 +313,15 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
// send the initial GRAB_JOB on reconnection.
LOG.info("Worker " + this + " sending initial grab job");
try {
GearmanTask sessTask = new GearmanTask(
new GrabJobEventHandler(session),
new GearmanPacketImpl(GearmanPacketMagic.REQ,
getGrabJobPacketType(), new byte[0]));
taskMap.put(session, sessTask);
session.submitTask(sessTask);
sendGrabJob(session);
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for okay to send " +
"grab job", e);
continue;
}
grabJobSent = true;
try {
session.driveSessionIO();
grabJobSent = true;
} catch (IOException io) {
LOG.warn("Receieved IOException while" +
" sending initial grab job",io);
@ -349,6 +375,21 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
shutDownWorker(true);
}
public void setOkayToGrabJob(boolean value) {
okayToGrabJob.set(value);
}
private void sendGrabJob(GearmanJobServerSession s) throws InterruptedException {
okayToGrabJob.waitUntil(true);
GearmanTask grabJobTask = new GearmanTask(
new GrabJobEventHandler(s),
new GearmanPacketImpl(GearmanPacketMagic.REQ,
getGrabJobPacketType(), new byte[0]));
taskMap.put(s, grabJobTask);
s.submitTask(grabJobTask);
}
public void handleSessionEvent(GearmanSessionEvent event)
throws IllegalArgumentException, IllegalStateException {
GearmanPacket p = event.getPacket();
@ -370,15 +411,12 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
case NOOP:
taskMap.remove(s);
LOG.debug("Worker " + this + " sending grab job after wakeup");
GearmanTask grabJobTask = new GearmanTask(
new GrabJobEventHandler(s),
new GearmanPacketImpl(GearmanPacketMagic.REQ,
getGrabJobPacketType(), new byte[0]));
taskMap.put(s, grabJobTask);
s.submitTask(grabJobTask);
LOG.debug("Worker: " + this + " submitted a " +
grabJobTask.getRequestPacket().getPacketType() +
" to session: " + s);
try {
sendGrabJob(s);
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for okay to send " +
"grab job", e);
}
break;
case NO_JOB:
LOG.debug("Worker " + this + " sending pre sleep after no_job");

View File

@ -0,0 +1,60 @@
/*
*
* Copyright 2013 Hewlett-Packard Development Company, L.P.
* Copyright 2013 OpenStack Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package hudson.plugins.gearman;
import hudson.Extension;
import hudson.model.TaskListener;
import hudson.model.Run;
import hudson.model.listeners.RunListener;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Update gearman workers when node changes
*/
@Extension
public class RunListenerImpl extends RunListener<Run> {
private static final Logger logger = LoggerFactory
.getLogger(Constants.PLUGIN_LOGGER_NAME);
@Override
public void onStarted(Run r, TaskListener listener) {
// update only when gearman-plugin is enabled
if (!GearmanPluginConfig.get().enablePlugin()) {
return;
}
GearmanProxy.getInstance().onBuildStarted(r);
}
@Override
public void onFinalized(Run r) {
// update only when gearman-plugin is enabled
if (!GearmanPluginConfig.get().enablePlugin()) {
return;
}
GearmanProxy.getInstance().onBuildFinalized(r);
}
}