Change to spawn a thread for each jenkins executor instead of just thread per jenkins nodes.

Also added functionality to wait until a StartJobWorker can service a build request.  This change eliminates
putting builds on the jenkins queue.  Now jobs are either running or it's not.  The only cancel that
makes sense is an abort (currently running jobs).
AbstractWorkerThread.java - add comments, set worker id to name instead of random uuid
ExectorWorkerThread.java - create thread of each jenkins executor
GearmanPlugin.java - refactor to spawn a thread for every executor
NodeAssignmentAction.java - provide access to label name StartJobWorker.java - make thread block execution until there is an available
jenkins executor to run the job. Also set the gearman job return parameters.
StopJobWorker.java - Set gearman job return parameters.

Change-Id: I30cec8ca3900eb7976c38077383505ea73e744dd
This commit is contained in:
Khai Do 2013-01-28 16:59:21 -08:00
parent ef3df2039a
commit e812a72d7d
5 changed files with 64 additions and 33 deletions

View File

@ -19,7 +19,6 @@
package hudson.plugins.gearman;
import java.util.Date;
import java.util.UUID;
import org.gearman.common.GearmanNIOJobServerConnection;
import org.gearman.worker.GearmanWorker;
@ -28,8 +27,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/*
* Thread to run gearman worker
/**
* Base object for gearman worker threads
*
*
* @author Khai Do
*/
public abstract class AbstractWorkerThread implements Runnable {
@ -110,7 +112,7 @@ public abstract class AbstractWorkerThread implements Runnable {
if (!worker.isRunning()) {
logger.info("Starting Worker "+ name +" ("+new Date().toString()+")");
worker.setWorkerID(UUID.randomUUID().toString());
worker.setWorkerID(name);
worker.addServer(conn);
worker.work();
}

View File

@ -44,13 +44,12 @@ public class ExecutorWorkerThread extends AbstractWorkerThread{
private final Node node;
public ExecutorWorkerThread(String host, int port, String nodeName){
super(host, port, nodeName);
this.node = findNode(nodeName);
// constructor
public ExecutorWorkerThread(String host, int port, String name, Node node) {
super(host, port, name);
this.node = node;
}
/**
* This function finds the node with the corresponding node name Returns the
* node if found, otherwise returns null
@ -75,11 +74,6 @@ public class ExecutorWorkerThread extends AbstractWorkerThread{
return myNode;
}
public ExecutorWorkerThread(String host, int port, String name, Node node){
super(host, port, name);
this.node = node;
}
/**
* This function tokenizes the labels in a label string
@ -183,7 +177,7 @@ public class ExecutorWorkerThread extends AbstractWorkerThread{
// "build:projectName" on all nodes
String jobFunctionName = "build:" + projectName;
logger.info("Registering job " + jobFunctionName + " on "
+ this.node.getNodeName());
+ name);
worker.registerFunctionFactory(new CustomGearmanFunctionFactory(
jobFunctionName, StartJobWorker.class.getName(),
project, this.node));

View File

@ -165,8 +165,8 @@ public class GearmanPlugin extends Builder {
/*
* Purpose here is to create a 1:1 mapping of 'gearman
* worker':'jenkins node' then use the gearman worker to execute
* builds on that jenkins node
* worker':'jenkins executor' then use the gearman worker to execute
* builds on that jenkins nodes
*/
List<Node> nodes = jenkins.getNodes();
@ -177,12 +177,16 @@ public class GearmanPlugin extends Builder {
for (Node node : nodes) {
Computer c = node.toComputer();
if (c.isOnline()) {
// create a gearman executor for every node
gwt = new ExecutorWorkerThread(host, port,
node.getNodeName());
gwt.registerJobs();
gwt.start();
gewtHandles.push(gwt);
int numExecutors = c.getExecutors().size();
for (int i=0; i<numExecutors; i++) {
// create a gearman executor for every jenkins executor
gwt = new ExecutorWorkerThread(host, port,
node.getNodeName()+"-exec"+Integer.toString(i), node);
gwt.registerJobs();
gwt.start();
gewtHandles.push(gwt);
}
}
}

View File

@ -23,6 +23,10 @@ import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import hudson.model.Cause;
import hudson.model.Node;
import hudson.model.Action;
@ -106,19 +110,38 @@ public class StartJobWorker extends AbstractGearmanFunction {
}
// schedule the jenkins build
logger.info("Scheduling build on " + node.getNodeName()
+ " with UUID " + uuid + " and build params " + inParams);
logger.info("Scheduling "+project.getName()+" build #" +
project.getNextBuildNumber()+" on " + node.getNodeName()
+ " with UUID " + uuid + " and build params " + buildParams);
// create action to run on a specified node
Action runNode = new NodeAssignmentAction(node.getNodeName());
// create action for parameters
Action params = new NodeParametersAction(buildParams, uuid);
Action [] actions = {runNode, params};
project.scheduleBuild2(0, new Cause.UserIdCause(), actions);
Future<?> future = project.scheduleBuild2(0, new Cause.UserIdCause(), actions);
GearmanJobResult gjr = new GearmanJobResultImpl(this.jobHandle, true,
decoded.toString().getBytes(), new byte[0], new byte[0], 0, 0);
String jobResultMsg = "";
boolean jobResult = false;
try {
future.get();
jobResult = true;
jobResultMsg = "Build completed on " + node.getNodeName()
+ " with UUID " + uuid + " and build params " + buildParams;
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
jobResultMsg = "Build interrupted on " + node.getNodeName();
e1.printStackTrace();
} catch (ExecutionException e1) {
// TODO Auto-generated catch block
jobResultMsg = "Build failed on " + node.getNodeName();
}
GearmanJobResult gjr = new GearmanJobResultImpl(this.jobHandle, jobResult,
jobResultMsg.getBytes(), new byte[0], new byte[0], 0, 0);
return gjr;
}
}

View File

@ -19,14 +19,12 @@
package hudson.plugins.gearman;
import hudson.model.Action;
import hudson.model.Computer;
import hudson.model.Executor;
import hudson.model.Label;
import hudson.model.Node;
import hudson.model.Queue;
import hudson.model.Queue.Executable;
import hudson.model.Queue.Task;
import hudson.model.queue.SubTask;
import java.io.UnsupportedEncodingException;
@ -92,12 +90,22 @@ public class StopJobWorker extends AbstractGearmanFunction {
}
// Cancel jenkins jobs that contain matching uuid from client
boolean canceled = cancelBuild(inUuid);
boolean jobResult = cancelBuild(inUuid);
String jobResultMsg = null;
if (jobResult){
jobResultMsg = "Canceled jenkins build " + inUuid;
} else {
jobResultMsg = "Could not cancel build " + inUuid;
}
GearmanJobResult gjr = new GearmanJobResultImpl(this.jobHandle, true,
decoded.toString().getBytes(), new byte[0], new byte[0], 0, 0);
GearmanJobResult gjr = new GearmanJobResultImpl(this.jobHandle, jobResult,
jobResultMsg.getBytes(), new byte[0], new byte[0], 0, 0);
return gjr;
}