Don't grab a job when executors are busy.
Jobs can be triggered by non-gearman sources, sadly. This makes the gearman plugin aware of when executors are busy and it will refrain from grabbing a job from gearman in those circumstances. It's far from perfect, but should at least handle the most likely cases where a job is already running an an executor. Change-Id: If993c6d6bc63ed89b385d2e5bb41762ef84a429f
This commit is contained in:
parent
18a3fa68cc
commit
858fb155fe
|
@ -21,6 +21,7 @@ package hudson.plugins.gearman;
|
|||
import hudson.model.AbstractProject;
|
||||
import hudson.model.Label;
|
||||
import hudson.model.Node;
|
||||
import hudson.model.Computer;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
|
@ -198,4 +199,29 @@ public class ExecutorWorkerThread extends AbstractWorkerThread{
|
|||
public synchronized Node getNode() {
|
||||
return node;
|
||||
}
|
||||
|
||||
public void onBuildStarted() {
|
||||
// a build has started on this computer
|
||||
Computer computer = node.toComputer();
|
||||
if (computer.countIdle() == 0) {
|
||||
worker.setOkayToGrabJob(false);
|
||||
}
|
||||
|
||||
// TODO: There is a race condition here -- a worker may have
|
||||
// just scheduled a build right as Jenkins started one from a
|
||||
// non-gearman trigger. In that case, we should dequeue that
|
||||
// build and disconnect the worker (which will cause Gearman
|
||||
// to re-queue the job).
|
||||
}
|
||||
|
||||
public void onBuildFinalized() {
|
||||
// a build has completed on this executor
|
||||
Computer computer = node.toComputer();
|
||||
|
||||
worker.setOkayToGrabJob(true);
|
||||
|
||||
// TODO: There could still be jobs in the queue that may or
|
||||
// may not be assigned to this computer. If there are, we
|
||||
// should avoid grabbing jobs until that condition has passed.
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ package hudson.plugins.gearman;
|
|||
|
||||
import hudson.model.Computer;
|
||||
import hudson.model.Node;
|
||||
import hudson.model.Run;
|
||||
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -256,4 +257,31 @@ public class GearmanProxy {
|
|||
return gmwtHandles;
|
||||
}
|
||||
|
||||
public void onBuildStarted(Run r) {
|
||||
Computer computer = r.getExecutor().getOwner();
|
||||
// find the computer in the executor workers list and stop it
|
||||
|
||||
synchronized(gewtHandles) {
|
||||
for (Iterator<AbstractWorkerThread> it = gewtHandles.iterator(); it.hasNext(); ) {
|
||||
AbstractWorkerThread t = it.next();
|
||||
if (t.name.contains(computer.getName())) {
|
||||
((ExecutorWorkerThread)t).onBuildStarted();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void onBuildFinalized(Run r) {
|
||||
Computer computer = r.getExecutor().getOwner();
|
||||
// find the computer in the executor workers list and stop it
|
||||
|
||||
synchronized(gewtHandles) {
|
||||
for (Iterator<AbstractWorkerThread> it = gewtHandles.iterator(); it.hasNext(); ) {
|
||||
AbstractWorkerThread t = it.next();
|
||||
if (t.name.contains(computer.getName())) {
|
||||
((ExecutorWorkerThread)t).onBuildFinalized();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -83,6 +83,31 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
|
|||
private final GearmanJobServerIpConnectionFactory connFactory = new GearmanNIOJobServerConnectionFactory();
|
||||
private volatile boolean jobUniqueIdRequired = false;
|
||||
private FunctionRegistry functionRegistry;
|
||||
private WaitBool okayToGrabJob = new WaitBool(true);
|
||||
|
||||
class WaitBool {
|
||||
private boolean value;
|
||||
|
||||
WaitBool(boolean value) {
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
public synchronized void set(boolean value) {
|
||||
this.value = value;
|
||||
this.notifyAll();
|
||||
}
|
||||
|
||||
public synchronized void waitUntil(boolean value)
|
||||
throws InterruptedException
|
||||
{
|
||||
if (this.value == value)
|
||||
return;
|
||||
|
||||
while (this.value != value) {
|
||||
this.wait();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class GrabJobEventHandler implements GearmanServerResponseHandler {
|
||||
|
||||
|
@ -288,14 +313,15 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
|
|||
// send the initial GRAB_JOB on reconnection.
|
||||
LOG.info("Worker " + this + " sending initial grab job");
|
||||
try {
|
||||
GearmanTask sessTask = new GearmanTask(
|
||||
new GrabJobEventHandler(session),
|
||||
new GearmanPacketImpl(GearmanPacketMagic.REQ,
|
||||
getGrabJobPacketType(), new byte[0]));
|
||||
taskMap.put(session, sessTask);
|
||||
session.submitTask(sessTask);
|
||||
sendGrabJob(session);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Interrupted while waiting for okay to send " +
|
||||
"grab job", e);
|
||||
continue;
|
||||
}
|
||||
grabJobSent = true;
|
||||
try {
|
||||
session.driveSessionIO();
|
||||
grabJobSent = true;
|
||||
} catch (IOException io) {
|
||||
LOG.warn("Receieved IOException while" +
|
||||
" sending initial grab job",io);
|
||||
|
@ -349,6 +375,21 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
|
|||
shutDownWorker(true);
|
||||
}
|
||||
|
||||
public void setOkayToGrabJob(boolean value) {
|
||||
okayToGrabJob.set(value);
|
||||
}
|
||||
|
||||
private void sendGrabJob(GearmanJobServerSession s) throws InterruptedException {
|
||||
okayToGrabJob.waitUntil(true);
|
||||
|
||||
GearmanTask grabJobTask = new GearmanTask(
|
||||
new GrabJobEventHandler(s),
|
||||
new GearmanPacketImpl(GearmanPacketMagic.REQ,
|
||||
getGrabJobPacketType(), new byte[0]));
|
||||
taskMap.put(s, grabJobTask);
|
||||
s.submitTask(grabJobTask);
|
||||
}
|
||||
|
||||
public void handleSessionEvent(GearmanSessionEvent event)
|
||||
throws IllegalArgumentException, IllegalStateException {
|
||||
GearmanPacket p = event.getPacket();
|
||||
|
@ -370,15 +411,12 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
|
|||
case NOOP:
|
||||
taskMap.remove(s);
|
||||
LOG.debug("Worker " + this + " sending grab job after wakeup");
|
||||
GearmanTask grabJobTask = new GearmanTask(
|
||||
new GrabJobEventHandler(s),
|
||||
new GearmanPacketImpl(GearmanPacketMagic.REQ,
|
||||
getGrabJobPacketType(), new byte[0]));
|
||||
taskMap.put(s, grabJobTask);
|
||||
s.submitTask(grabJobTask);
|
||||
LOG.debug("Worker: " + this + " submitted a " +
|
||||
grabJobTask.getRequestPacket().getPacketType() +
|
||||
" to session: " + s);
|
||||
try {
|
||||
sendGrabJob(s);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Interrupted while waiting for okay to send " +
|
||||
"grab job", e);
|
||||
}
|
||||
break;
|
||||
case NO_JOB:
|
||||
LOG.debug("Worker " + this + " sending pre sleep after no_job");
|
||||
|
|
|
@ -0,0 +1,60 @@
|
|||
/*
|
||||
*
|
||||
* Copyright 2013 Hewlett-Packard Development Company, L.P.
|
||||
* Copyright 2013 OpenStack Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
package hudson.plugins.gearman;
|
||||
|
||||
import hudson.Extension;
|
||||
import hudson.model.TaskListener;
|
||||
import hudson.model.Run;
|
||||
import hudson.model.listeners.RunListener;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Update gearman workers when node changes
|
||||
*/
|
||||
@Extension
|
||||
public class RunListenerImpl extends RunListener<Run> {
|
||||
|
||||
private static final Logger logger = LoggerFactory
|
||||
.getLogger(Constants.PLUGIN_LOGGER_NAME);
|
||||
|
||||
|
||||
@Override
|
||||
public void onStarted(Run r, TaskListener listener) {
|
||||
// update only when gearman-plugin is enabled
|
||||
if (!GearmanPluginConfig.get().enablePlugin()) {
|
||||
return;
|
||||
}
|
||||
|
||||
GearmanProxy.getInstance().onBuildStarted(r);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFinalized(Run r) {
|
||||
// update only when gearman-plugin is enabled
|
||||
if (!GearmanPluginConfig.get().enablePlugin()) {
|
||||
return;
|
||||
}
|
||||
|
||||
GearmanProxy.getInstance().onBuildFinalized(r);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue