Rework starting/stopping executors

There was at least one error, likely a race condition, with the
previous code which could cause more than one ExecutorWorkerThread for
a node to be spawned.  In particular, I think the bogus comparison in
ComputerListenerImpl may have a large part in that (it checked to see
if a _Computer_ object was in a list of _Thread_ objects).

To improve reliability around adding and removing nodes, all related
functionality is moved to the GearmanProxy class.  Any methods (most
of them) that have to do with starting or stopping worker threads are
synchronized on the GearmanProxy monitor (the important parts of most
threads were already synchronized on the worker list before, so this
should not be much of a performance change).

The methods that start management and executor threads now do their
own checking to verify that such threads do not already exist, making
it so that calling them is more idempotent.  Existing checks external
to the class have been removed (these were likely somewhat racy).

To avoid keeping redundant data structures, the node availability list
is removed, and instead if we need to find an availability object, we
walk the list of worker threads and compare to their nodes.  Because
we do this so much, the list of worker and management threads are
changed to use those explicit classes instead of
AbstractWorkerThreads.

The accessor methods for the internal lists of worker threads is
removed to ensure that they are only managed through GearmanProxy.
This changed some unit tests and required the removal of one complete
test (which was not doing much more than verifying the addition
operator).

Also, when stopping ExecutorWorkerThreads, stop all of the ones
associated with a node.

When a computer goes offline, Computer.getNode() returns null, so we
can't know which workers should be deleted.  Instead of using Nodes as
keys for our workers, use Computers instead and change everything to
use Computer (most functions needed Computer rather than Node anyway),
and in the few remaining places where a Node is needed, convert the
other direction.

Change-Id: Ia5084579317f972400069cc3e84db4e0b6560a80
This commit is contained in:
James E. Blair 2013-08-12 11:06:49 -07:00 committed by Clark Boylan
parent e45ffe249d
commit c97253eff5
12 changed files with 198 additions and 279 deletions

View File

@ -124,16 +124,16 @@ public abstract class AbstractWorkerThread implements Runnable {
public void stop() {
running = false;
logger.info("=== " + getName() + " Request to stop AWT: " + this);
logger.info("=== " + getName() + " Thread: " + thread + " name: " + thread.getName());
logger.info("=== " + getName() + " Worker: " + worker);
logger.info("---- " + getName() + " Request to stop AWT: " + this);
logger.info("---- " + getName() + " Thread: " + thread + " name: " + thread.getName());
logger.info("---- " + getName() + " Worker: " + worker);
worker.stop();
logger.info("=== " + getName() + " Interrupting worker");
logger.info("---- " + getName() + " Interrupting worker");
// Interrupt the thread so it unblocks any blocking call
thread.interrupt();
logger.info("=== " + getName() + " Joining thread");
logger.info("---- " + getName() + " Joining thread");
// Wait until the thread exits
try {
thread.join();
@ -141,7 +141,7 @@ public abstract class AbstractWorkerThread implements Runnable {
// Unexpected interruption
logger.error("Exception while waiting for thread to join", e);
}
logger.info("=== " + getName() + " Stop request done");
logger.info("---- " + getName() + " Stop request done");
}
/*

View File

@ -59,7 +59,7 @@ public class ComputerListenerImpl extends ComputerListener {
// gets called when existing slave dis-connects
// gets called when slave is deleted.
logger.info("---- " + ComputerListenerImpl.class.getName() + ":"
+ " onOffline");
+ " onOffline computer" + c);
// update functions only when gearman-plugin is enabled
if (!GearmanPluginConfig.get().enablePlugin()) {
@ -77,7 +77,7 @@ public class ComputerListenerImpl extends ComputerListener {
// gets called when existing slave re-connects
// gets called when new slave goes into online state
logger.info("---- " + ComputerListenerImpl.class.getName() + ":"
+ " onOnline");
+ " onOnline computer " + c);
// update functions only when gearman-plugin is enabled
if (!GearmanPluginConfig.get().enablePlugin()) {
@ -89,25 +89,14 @@ public class ComputerListenerImpl extends ComputerListener {
* the master is not the same as a slave
*/
GearmanProxy gp = GearmanProxy.getInstance();
if (gp.getGmwtHandles().isEmpty()) {
/*
* Spawn management executor worker if one doesn't exist yet.
* This worker does not need any executors. It only needs
* to work with gearman.
*/
gp.createManagementWorker();
// Spawn executors for the jenkins master
if (Computer.currentComputer() == c) { //check to see if this is master
gp.createExecutorWorkersOnNode(c);
}
}
/*
* Spawn management executor worker if one doesn't exist yet.
* This worker does not need any executors. It only needs
* to work with gearman.
*/
gp.createManagementWorker();
// on creation of new slave
if (Computer.currentComputer() != c
&& !gp.getGewtHandles().contains(c)) {
gp.createExecutorWorkersOnNode(c);
}
gp.createExecutorWorkersOnNode(c);
}
}

View File

@ -27,7 +27,7 @@
package hudson.plugins.gearman;
import hudson.model.AbstractProject;
import hudson.model.Node;
import hudson.model.Computer;
import hudson.model.Project;
import java.lang.reflect.Constructor;
@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory;
public class CustomGearmanFunctionFactory extends DefaultGearmanFunctionFactory {
private final AbstractProject<?,?> project;
private final Node node;
private final Computer computer;
private final String theClass;
private final String masterName;
private final MyGearmanWorkerImpl worker;
@ -49,13 +49,13 @@ public class CustomGearmanFunctionFactory extends DefaultGearmanFunctionFactory
Constants.GEARMAN_WORKER_LOGGER_NAME);
public CustomGearmanFunctionFactory(String functionName, String className,
AbstractProject<?,?> project, Node node,
AbstractProject<?,?> project, Computer computer,
String masterName,
MyGearmanWorkerImpl worker) {
super(functionName, className);
this.theClass = className;
this.project = project;
this.node = node;
this.computer = computer;
this.masterName = masterName;
this.worker = worker;
}
@ -63,18 +63,18 @@ public class CustomGearmanFunctionFactory extends DefaultGearmanFunctionFactory
@Override
public GearmanFunction getFunction() {
return createFunctionInstance(theClass, project, node, masterName,
return createFunctionInstance(theClass, project, computer, masterName,
worker);
}
private static GearmanFunction createFunctionInstance(String className, AbstractProject<?,?> project, Node node, String masterName, MyGearmanWorkerImpl worker) {
private static GearmanFunction createFunctionInstance(String className, AbstractProject<?,?> project, Computer computer, String masterName, MyGearmanWorkerImpl worker) {
GearmanFunction f = null;
try {
Class<?> c = Class.forName(className);
Constructor<?> con = c.getConstructor(new Class[]{AbstractProject.class, Node.class, String.class, MyGearmanWorkerImpl.class});
Object o = con.newInstance(new Object[] {project, node, masterName, worker});
Constructor<?> con = c.getConstructor(new Class[]{AbstractProject.class, Computer.class, String.class, MyGearmanWorkerImpl.class});
Object o = con.newInstance(new Object[] {project, computer, masterName, worker});
if (o instanceof GearmanFunction) {
f = (GearmanFunction) o;

View File

@ -48,17 +48,17 @@ public class ExecutorWorkerThread extends AbstractWorkerThread{
private static final Logger logger = LoggerFactory
.getLogger(Constants.PLUGIN_LOGGER_NAME);
private final Node node;
private final Computer computer;
private final String masterName;
private HashMap<String,GearmanFunctionFactory> functionMap;
// constructor
public ExecutorWorkerThread(String host, int port, String name,
Node node, String masterName,
Computer computer, String masterName,
AvailabilityMonitor availability) {
super(host, port, name, availability);
this.node = node;
this.computer = computer;
this.masterName = masterName;
}
@ -110,18 +110,18 @@ public class ExecutorWorkerThread extends AbstractWorkerThread{
}
/**
* Register gearman functions on this node. This will unregister all
* Register gearman functions on this computer. This will unregister all
* functions before registering new functions. Works for free-style
* and maven projects but does not work for multi-config projects
*
* How functions are registered:
* - If the project has no label then we register the project with all
* nodes
* computers
*
* build:pep8 on precise-123
* build:pep8 on oneiric-456
*
* - If the project contains one label then we register with the node
* - If the project contains one label then we register with the computer
* that contains the corresponding label. Labels with '&&' is
* considered just one label
*
@ -131,7 +131,7 @@ public class ExecutorWorkerThread extends AbstractWorkerThread{
* build:pep8 on precise-129
*
* - If the project contains multiple labels separated by '||' then
* we register with the nodes that contain the corresponding labels
* we register with the computers that contain the corresponding labels
*
* build:pep8:precise on precise-123
* build:pep8 on precise-123
@ -147,65 +147,60 @@ public class ExecutorWorkerThread extends AbstractWorkerThread{
public void registerJobs() {
HashMap<String,GearmanFunctionFactory> newFunctionMap = new HashMap<String,GearmanFunctionFactory>();
try {
Node n = getNode();
Computer c = n.toComputer();
if (!computer.isOffline()) {
Node node = computer.getNode();
if (!c.isOffline()) {
List<AbstractProject> allProjects = Jenkins.getInstance().getAllItems(AbstractProject.class);
for (AbstractProject<?, ?> project : allProjects) {
List<AbstractProject> allProjects = Jenkins.getInstance().getAllItems(AbstractProject.class);
for (AbstractProject<?, ?> project : allProjects) {
if (project.isDisabled()) { // ignore all disabled projects
continue;
}
if (project.isDisabled()) { // ignore all disabled projects
continue;
}
String projectName = project.getName();
Label label = project.getAssignedLabel();
String projectName = project.getName();
Label label = project.getAssignedLabel();
if (label == null) { // project has no label -> so register
// "build:projectName" on all nodes
String jobFunctionName = "build:" + projectName;
newFunctionMap.put(jobFunctionName, new CustomGearmanFunctionFactory(
jobFunctionName, StartJobWorker.class.getName(),
project, n, this.masterName, worker));
} else { // register "build:$projectName:$projectLabel" if this
// node matches a node from the project label
if (label == null) { // project has no label -> so register
// "build:projectName" on all nodes
String jobFunctionName = "build:" + projectName;
newFunctionMap.put(jobFunctionName, new CustomGearmanFunctionFactory(
jobFunctionName, StartJobWorker.class.getName(),
project, computer, this.masterName, worker));
} else { // register "build:$projectName:$projectLabel" if this
// node matches a node from the project label
Set<Node> projectLabelNodes = label.getNodes();
String projectLabelString = label.getExpression();
Set<String> projectLabels = tokenizeLabelString(
projectLabelString, "\\|\\|");
Set<Node> projectLabelNodes = label.getNodes();
String projectLabelString = label.getExpression();
Set<String> projectLabels = tokenizeLabelString(
projectLabelString, "\\|\\|");
// iterate thru all project labels and find matching nodes
for (String projectLabel : projectLabels) {
if (projectLabelNodes.contains(n)) {
String jobFunctionName = "build:" + projectName
+ ":" + projectLabel;
// register with label (i.e. "build:$projectName:$projectLabel")
newFunctionMap.put(jobFunctionName, new CustomGearmanFunctionFactory(
jobFunctionName, StartJobWorker.class.getName(),
project, n, this.masterName, worker));
jobFunctionName = "build:" + projectName;
// also register without label (i.e. "build:$projectName")
newFunctionMap.put(jobFunctionName, new CustomGearmanFunctionFactory(
jobFunctionName, StartJobWorker.class.getName(),
project, n, this.masterName, worker));
}
// iterate thru all project labels and find matching nodes
for (String projectLabel : projectLabels) {
if (projectLabelNodes.contains(node)) {
String jobFunctionName = "build:" + projectName
+ ":" + projectLabel;
// register with label (i.e. "build:$projectName:$projectLabel")
newFunctionMap.put(jobFunctionName, new CustomGearmanFunctionFactory(
jobFunctionName, StartJobWorker.class.getName(),
project, computer, this.masterName, worker));
jobFunctionName = "build:" + projectName;
// also register without label (i.e. "build:$projectName")
newFunctionMap.put(jobFunctionName, new CustomGearmanFunctionFactory(
jobFunctionName, StartJobWorker.class.getName(),
project, computer, this.masterName, worker));
}
}
}
}
if (!newFunctionMap.keySet().equals(functionMap.keySet())) {
functionMap = newFunctionMap;
Set<GearmanFunctionFactory> functionSet = new HashSet<GearmanFunctionFactory>(functionMap.values());
updateJobs(functionSet);
}
} catch (NullPointerException npe) {
logger.warn("Failed to register jobs on worker thread: "+getName()+" with worker: "+worker.getWorkerID(), npe);
}
if (!newFunctionMap.keySet().equals(functionMap.keySet())) {
functionMap = newFunctionMap;
Set<GearmanFunctionFactory> functionSet = new HashSet<GearmanFunctionFactory>(functionMap.values());
updateJobs(functionSet);
}
}
public synchronized Node getNode() {
return node;
public synchronized Computer getComputer() {
return computer;
}
}

View File

@ -20,7 +20,6 @@ package hudson.plugins.gearman;
import hudson.model.AbstractProject;
import hudson.model.Computer;
import hudson.model.Node;
import hudson.model.Run;
import jenkins.model.Jenkins;
@ -39,23 +38,23 @@ public class GearmanPluginUtil {
.getLogger(Constants.PLUGIN_LOGGER_NAME);
/*
* This method returns the real node name. Master node
* This method returns the real computer name. Master computer
* by default has an empty string for the name. But you
* need to use "master" to tell jenkins to do stuff,
* namely like schedule a build.
*
* @param Node
* The node to lookup
* @param Computer
* The computer to lookup
*
* @return
* "master" for the master node or assigned name of the slave node
* "master" for the master computer or assigned name of the slave computer
*/
public static String getRealName(Node node) {
public static String getRealName(Computer computer) {
if (Computer.currentComputer() == node.toComputer()) {
if (Computer.currentComputer() == computer) {
return "master";
} else {
return node.getNodeName();
return computer.getName();
}
}

View File

@ -49,9 +49,8 @@ public class GearmanProxy {
.getLogger(Constants.PLUGIN_LOGGER_NAME);
// handles to gearman workers
private final List<AbstractWorkerThread> gewtHandles;
private final List<AbstractWorkerThread> gmwtHandles;
private final List<AvailabilityMonitor> availabilityMonitors;
private final List<ExecutorWorkerThread> gewtHandles;
private final List<ManagementWorkerThread> gmwtHandles;
private final String masterName;
// Singleton instance
@ -64,9 +63,8 @@ public class GearmanProxy {
// constructor
private GearmanProxy() {
gewtHandles = Collections.synchronizedList(new ArrayList<AbstractWorkerThread>());
gmwtHandles = Collections.synchronizedList(new ArrayList<AbstractWorkerThread>());
availabilityMonitors = Collections.synchronizedList(new ArrayList<AvailabilityMonitor>());
gewtHandles = Collections.synchronizedList(new ArrayList<ExecutorWorkerThread>());
gmwtHandles = Collections.synchronizedList(new ArrayList<ManagementWorkerThread>());
Computer master = null;
String hostname = Constants.GEARMAN_DEFAULT_EXECUTOR_NAME;
@ -89,10 +87,18 @@ public class GearmanProxy {
masterName = hostname;
}
/*
* This method is for unit tests only.
*/
protected void testResetHandles() {
gmwtHandles.clear();
gewtHandles.clear();
}
/*
* This method initializes the gearman workers.
*/
public void initWorkers() {
public synchronized void initWorkers() {
/*
* Purpose here is to create a 1:1 mapping of 'gearman worker':'jenkins
* executor' then use the gearman worker to execute builds on that
@ -148,9 +154,13 @@ public class GearmanProxy {
* Spawn management executor workers. This worker does not need any
* executors. It only needs to connect to gearman.
*/
public void createManagementWorker() {
public synchronized void createManagementWorker() {
AbstractWorkerThread gwt = new ManagementWorkerThread(
if (!gmwtHandles.isEmpty()) {
return;
}
ManagementWorkerThread gwt = new ManagementWorkerThread(
GearmanPluginConfig.get().getHost(),
GearmanPluginConfig.get().getPort(),
masterName + "_manager",
@ -159,31 +169,38 @@ public class GearmanProxy {
gmwtHandles.add(gwt);
logger.info("---- Num of executors running = " + getNumExecutors());
}
/*
* Spawn workers for each executor on a node.
*/
public void createExecutorWorkersOnNode(Computer computer) {
public synchronized void createExecutorWorkersOnNode(Computer computer) {
Node node = computer.getNode();
AvailabilityMonitor availability = getAvailabilityMonitor(node);
ExecutorWorkerThread workerThread = null;
// find the computer in the executor workers list
for (ExecutorWorkerThread t : gewtHandles) {
if (t.getComputer() == computer) {
logger.info("---- Executor thread already running for " + computer.getName());
return;
}
}
AvailabilityMonitor availability = new NodeAvailabilityMonitor(computer);
int executors = computer.getExecutors().size();
for (int i = 0; i < executors; i++) {
String nodeName = null;
nodeName = GearmanPluginUtil.getRealName(node);
nodeName = GearmanPluginUtil.getRealName(computer);
if (nodeName == "master") {
nodeName = masterName;
}
AbstractWorkerThread ewt = new ExecutorWorkerThread(
ExecutorWorkerThread ewt = new ExecutorWorkerThread(
GearmanPluginConfig.get().getHost(),
GearmanPluginConfig.get().getPort(),
nodeName+"_exec-"+Integer.toString(i),
node, masterName, availability);
computer, masterName, availability);
ewt.start();
gewtHandles.add(ewt);
@ -196,30 +213,20 @@ public class GearmanProxy {
/*
* This method stops all gearman workers
*/
public void stopAll() {
public synchronized void stopAll() {
// stop gearman executors
List<AbstractWorkerThread> stopHandles;
synchronized (gewtHandles) {
stopHandles = new ArrayList<AbstractWorkerThread>(gewtHandles);
gewtHandles.clear();
}
stopHandles = new ArrayList<AbstractWorkerThread>(gewtHandles);
gewtHandles.clear();
for (AbstractWorkerThread wt : stopHandles) { // stop executors
wt.stop();
}
synchronized (availabilityMonitors) {
// They will be recreated if/when the
// ExecutorWorkerThreads are recreated.
availabilityMonitors.clear();
}
stopHandles = new ArrayList<AbstractWorkerThread>();
synchronized (gmwtHandles) {
stopHandles = new ArrayList<AbstractWorkerThread>(gmwtHandles);
gmwtHandles.clear();
}
stopHandles = new ArrayList<AbstractWorkerThread>(gmwtHandles);
gmwtHandles.clear();
for (AbstractWorkerThread wt : stopHandles) { // stop executors
wt.stop();
@ -237,25 +244,21 @@ public class GearmanProxy {
* The Computer to stop
*
*/
public void stop(Computer computer) {
Node node = computer.getNode();
AbstractWorkerThread workerThread = null;
public synchronized void stop(Computer computer) {
logger.info("---- Stop computer " + computer);
List<ExecutorWorkerThread> workers = new ArrayList<ExecutorWorkerThread>();
// find the computer in the executor workers list and stop it
synchronized(gewtHandles) {
for (Iterator<AbstractWorkerThread> it = gewtHandles.iterator(); it.hasNext(); ) {
AbstractWorkerThread t = it.next();
if (t.name.contains(computer.getName())) {
workerThread = t;
it.remove();
break;
}
}
for (Iterator<ExecutorWorkerThread> it = gewtHandles.iterator(); it.hasNext(); ) {
ExecutorWorkerThread t = it.next();
if (t.getComputer() == computer) {
workers.add(t);
it.remove();
}
}
if (workerThread != null) {
workerThread.stop();
for (ExecutorWorkerThread t : workers) {
t.stop();
}
removeAvailabilityMonitor(node);
logger.info("---- Num of executors running = " + getNumExecutors());
}
@ -267,83 +270,42 @@ public class GearmanProxy {
return gmwtHandles.size() + gewtHandles.size();
}
/*
* This method returns the list of gearman executor workers
*/
public synchronized List<AbstractWorkerThread> getGewtHandles() {
return gewtHandles;
}
/*
* This method returns the list of gearman management workers
*/
public synchronized List<AbstractWorkerThread> getGmwtHandles() {
return gmwtHandles;
}
public void onBuildFinalized(Run r) {
public synchronized void onBuildFinalized(Run r) {
Computer computer = r.getExecutor().getOwner();
// A build just finished, so let the AvailabilityMonitor
// associated with its node wake up any workers who may be
// waiting for the lock.
synchronized(gewtHandles) {
for (Iterator<AbstractWorkerThread> it = gewtHandles.iterator(); it.hasNext(); ) {
AbstractWorkerThread t = it.next();
if (t.name.contains(computer.getName())) {
t.getAvailability().wake();
}
}
}
}
public AvailabilityMonitor getAvailabilityMonitor(Node node) {
AvailabilityMonitor availability;
synchronized (availabilityMonitors) {
for (Iterator<AvailabilityMonitor> it =
availabilityMonitors.iterator(); it.hasNext(); ) {
availability = it.next();
if (((NodeAvailabilityMonitor)availability).getNode() == node) {
return availability;
}
}
availability = new NodeAvailabilityMonitor(node);
availabilityMonitors.add(availability);
return availability;
}
}
public void removeAvailabilityMonitor(Node node) {
AvailabilityMonitor availability;
synchronized (availabilityMonitors) {
for (Iterator<AvailabilityMonitor> it =
availabilityMonitors.iterator(); it.hasNext(); ) {
availability = it.next();
if (((NodeAvailabilityMonitor)availability).getNode() == node) {
it.remove();
}
for (ExecutorWorkerThread t : gewtHandles) {
if (t.getComputer() == computer) {
t.getAvailability().wake();
}
}
}
public CauseOfBlockage canTake(Node node,
Queue.BuildableItem item) {
public synchronized AvailabilityMonitor getAvailabilityMonitor(Computer computer) {
for (ExecutorWorkerThread t : gewtHandles) {
if (t.getComputer() == computer) {
return t.getAvailability();
}
}
return null;
}
public synchronized CauseOfBlockage canTake(Node node,
Queue.BuildableItem item) {
// Ask the AvailabilityMonitor for this node if it's okay to
// run this build.
ExecutorWorkerThread workerThread = null;
synchronized(gewtHandles) {
for (Iterator<AbstractWorkerThread> it = gewtHandles.iterator(); it.hasNext(); ) {
ExecutorWorkerThread t = ((ExecutorWorkerThread)it.next());
if (t.getNode() == node) {
workerThread = t;
break;
}
}
Computer computer = node.toComputer();
for (ExecutorWorkerThread t : gewtHandles) {
if (t.getComputer() == computer) {
workerThread = t;
break;
}
}
if (workerThread != null) {
if (workerThread.getAvailability().canTake(item)) {
return null;
@ -353,4 +315,10 @@ public class GearmanProxy {
}
return null;
}
public synchronized void registerJobs() {
for (ExecutorWorkerThread worker : gewtHandles) {
worker.registerJobs();
}
}
}

View File

@ -19,7 +19,6 @@ package hudson.plugins.gearman;
import jenkins.model.Jenkins;
import hudson.model.Queue;
import hudson.model.Node;
import hudson.model.Computer;
import org.slf4j.Logger;
@ -28,29 +27,27 @@ import org.slf4j.LoggerFactory;
public class NodeAvailabilityMonitor implements AvailabilityMonitor {
private final Queue queue;
private final Jenkins jenkins;
private final Node node;
private final Computer computer;
private MyGearmanWorkerImpl workerHoldingLock = null;
private String expectedUUID = null;
private static final Logger logger = LoggerFactory
.getLogger(Constants.PLUGIN_LOGGER_NAME);
NodeAvailabilityMonitor(Node node)
NodeAvailabilityMonitor(Computer computer)
{
this.node = node;
this.computer = computer;
queue = Queue.getInstance();
jenkins = Jenkins.getInstance();
}
public Node getNode() {
return node;
public Computer getComputer() {
return computer;
}
public void lock(MyGearmanWorkerImpl worker)
throws InterruptedException
{
Computer computer = node.toComputer();
logger.debug("AvailabilityMonitor lock request: " + worker);
while (true) {
boolean busy = false;

View File

@ -56,16 +56,8 @@ public class SaveableListenerImpl extends SaveableListener {
}
// update for when any changes are applied to a project or node
boolean doUpdate = o instanceof Node || o instanceof AbstractProject;
if (doUpdate) {
List<AbstractWorkerThread> workers = GearmanProxy.getInstance().getGewtHandles();
synchronized(workers) {
if (!workers.isEmpty()) {
for (AbstractWorkerThread worker : workers) {
worker.registerJobs();
}
}
}
if (o instanceof Node || o instanceof AbstractProject) {
GearmanProxy.getInstance().registerJobs();
}
}
}

View File

@ -60,8 +60,8 @@ import com.google.gson.reflect.TypeToken;
* This is a gearman function that will start jenkins builds
*
* Assumptions: When this function is created it has an associated
* node and project. The build will start a jenkins build
* on its assigned assigned project and node and pass along
* computer and project. The build will start a jenkins build
* on its assigned assigned project and computer and pass along
* all of the parameters from the client.
*
* @author Khai Do
@ -71,15 +71,15 @@ public class StartJobWorker extends AbstractGearmanFunction {
private static final Logger logger = LoggerFactory
.getLogger(Constants.PLUGIN_LOGGER_NAME);
Node node;
Computer computer;
AbstractProject<?, ?> project;
String masterName;
MyGearmanWorkerImpl worker;
public StartJobWorker(AbstractProject<?, ?> project, Node node, String masterName,
public StartJobWorker(AbstractProject<?, ?> project, Computer computer, String masterName,
MyGearmanWorkerImpl worker) {
this.project = project;
this.node = node;
this.computer = computer;
this.masterName = masterName;
this.worker = worker;
}
@ -160,16 +160,16 @@ public class StartJobWorker extends AbstractGearmanFunction {
/*
* make this node build this project with unique id and build params from the client
*/
String runNodeName = GearmanPluginUtil.getRealName(node);
String runNodeName = GearmanPluginUtil.getRealName(computer);
// create action to run on a specified node
// create action to run on a specified computer
Action runNode = new NodeAssignmentAction(runNodeName);
// create action for parameters
Action params = new NodeParametersAction(buildParams, decodedUniqueId);
Action [] actions = {runNode, params};
AvailabilityMonitor availability =
GearmanProxy.getInstance().getAvailabilityMonitor(node);
GearmanProxy.getInstance().getAvailabilityMonitor(computer);
availability.expectUUID(decodedUniqueId);
@ -236,7 +236,6 @@ public class StartJobWorker extends AbstractGearmanFunction {
}
if (offlineWhenComplete) {
Computer computer = node.toComputer();
if (computer == null) {
logger.error("---- Worker " + this.worker + " has no " +
"computer while trying to take node offline.");

View File

@ -78,7 +78,7 @@ public class ExecutorWorkerThreadTest extends HudsonTestCase {
Project<?, ?> apple = createFreeStyleProject("apple");
apple.setAssignedLabel(new LabelAtom("oneiric-10"));
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave, "master", new NoopAvailabilityMonitor());
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave.toComputer(), "master", new NoopAvailabilityMonitor());
oneiric.registerJobs();
Set<String> functions = oneiric.worker.getRegisteredFunctions();
@ -98,7 +98,7 @@ public class ExecutorWorkerThreadTest extends HudsonTestCase {
Project<?, ?> lemon = createFreeStyleProject("lemon");
lemon.setAssignedLabel(new LabelAtom("linux"));
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave, "master", new NoopAvailabilityMonitor());
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave.toComputer(), "master", new NoopAvailabilityMonitor());
oneiric.registerJobs();
Set<String> functions = oneiric.worker.getRegisteredFunctions();
@ -118,7 +118,7 @@ public class ExecutorWorkerThreadTest extends HudsonTestCase {
Project<?, ?> lemon = createFreeStyleProject("lemon");
lemon.setAssignedLabel(new LabelAtom("bogus"));
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave, "master", new NoopAvailabilityMonitor());
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave.toComputer(), "master", new NoopAvailabilityMonitor());
oneiric.registerJobs();
Set<String> functions = oneiric.worker.getRegisteredFunctions();
@ -135,7 +135,7 @@ public class ExecutorWorkerThreadTest extends HudsonTestCase {
Project<?, ?> lemon = createFreeStyleProject("lemon");
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave, "master", new NoopAvailabilityMonitor());
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave.toComputer(), "master", new NoopAvailabilityMonitor());
oneiric.registerJobs();
Set<String> functions = oneiric.worker.getRegisteredFunctions();
@ -155,7 +155,7 @@ public class ExecutorWorkerThreadTest extends HudsonTestCase {
lemon.setAssignedLabel(new LabelAtom("linux"));
lemon.disable();
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave, "master", new NoopAvailabilityMonitor());
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave.toComputer(), "master", new NoopAvailabilityMonitor());
oneiric.registerJobs();
Set<String> functions = oneiric.worker.getRegisteredFunctions();
@ -176,7 +176,7 @@ public class ExecutorWorkerThreadTest extends HudsonTestCase {
Project<?, ?> lemon = createFreeStyleProject("lemon");
lemon.setAssignedLabel(new LabelAtom("linux"));
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", offlineSlave, "master", new NoopAvailabilityMonitor());
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", offlineSlave.toComputer(), "master", new NoopAvailabilityMonitor());
oneiric.registerJobs();
Set<String> functions = oneiric.worker.getRegisteredFunctions();
@ -194,7 +194,7 @@ public class ExecutorWorkerThreadTest extends HudsonTestCase {
MavenModuleSet lemon = createMavenProject("lemon");
lemon.setAssignedLabel(new LabelAtom("linux"));
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave, "master", new NoopAvailabilityMonitor());
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave.toComputer(), "master", new NoopAvailabilityMonitor());
oneiric.registerJobs();
Set<String> functions = oneiric.worker.getRegisteredFunctions();
@ -215,7 +215,7 @@ public class ExecutorWorkerThreadTest extends HudsonTestCase {
Project<?, ?> lemon = createFreeStyleProject("lemon");
lemon.setAssignedLabel(new LabelAtom("!linux"));
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave, "master", new NoopAvailabilityMonitor());
AbstractWorkerThread oneiric = new ExecutorWorkerThread("GearmanServer", 4730, "MyWorker", slave.toComputer(), "master", new NoopAvailabilityMonitor());
oneiric.registerJobs();
Set<String> functions = oneiric.worker.getRegisteredFunctions();

View File

@ -31,22 +31,21 @@ import org.jvnet.hudson.test.HudsonTestCase;
*/
public class GearmanPluginUtilTest extends HudsonTestCase {
@Test
public void testGetRealNameSlave() throws Exception {
DumbSlave slave = createOnlineSlave();
DumbSlave slave = createSlave();
slave.setNodeName("oneiric-10");
// createOnlineSlave sets the slave name to slave0. Do not change
// this with setNodeName as the name is supposed to be immutable
// except when cloning a preexisting slave.
assertEquals("slave0", GearmanPluginUtil.getRealName(slave.toComputer()));
hudson.removeNode(slave);
assertEquals("oneiric-10", GearmanPluginUtil.getRealName(slave));
}
@Test
public void testGetRealNameMaster() throws Exception {
assertEquals("master", GearmanPluginUtil.getRealName(Computer.currentComputer().getNode()));
assertEquals("master", GearmanPluginUtil.getRealName(Computer.currentComputer()));
}
}

View File

@ -44,43 +44,20 @@ public class GearmanProxyTest extends HudsonTestCase {
@Override
@After
public void tearDown() throws Exception {
gp.getGmwtHandles().clear();
gp.getGewtHandles().clear();
gp.testResetHandles();
super.tearDown();
}
@Test
public void testGetNumExecutors() throws Exception {
DumbSlave slave = createSlave();
assertEquals(0, gp.getNumExecutors());
gp.getGewtHandles().add(new ExecutorWorkerThread("localhost", 4730, "test_exec-0", slave, "master",
new NoopAvailabilityMonitor()));
gp.getGewtHandles().add(new ExecutorWorkerThread("localhost", 4730, "test_exec-1", slave, "master",
new NoopAvailabilityMonitor()));
gp.getGewtHandles().add(new ExecutorWorkerThread("localhost", 4730, "test_exec-2", slave, "master",
new NoopAvailabilityMonitor()));
assertEquals(3, gp.getNumExecutors());
gp.getGewtHandles().add(new ManagementWorkerThread("localhost", 4730,
"master_manage", "master",
new NoopAvailabilityMonitor()));
assertEquals(4, gp.getNumExecutors());
}
@Test
public void testCreateManagementWorker() {
assertEquals(0, gp.getGmwtHandles().size());
assertEquals(0, gp.getNumExecutors());
gp.createManagementWorker();
assertEquals(1, gp.getGmwtHandles().size());
assertTrue(gp.getGmwtHandles().get(0).isAlive());
// mgmt: 1 master
assertEquals(1, gp.getNumExecutors());
// assertTrue(gp.getGmwtHandles().get(0).isAlive());
}
@Test
@ -88,10 +65,12 @@ public class GearmanProxyTest extends HudsonTestCase {
DumbSlave slave = createSlave();
assertEquals(0, gp.getGewtHandles().size());
assertEquals(0, gp.getNumExecutors());
gp.createExecutorWorkersOnNode(slave.toComputer());
assertEquals(1, gp.getGewtHandles().size());
// exec: 1 master
assertEquals(1, gp.getNumExecutors());
}
@Test
@ -99,7 +78,8 @@ public class GearmanProxyTest extends HudsonTestCase {
gp.initWorkers();
assertEquals(2, gp.getGewtHandles().size());
// exec: 1 slave, 1 master + mgmnt: 1
assertEquals(3, gp.getNumExecutors());
}
@Test
@ -108,6 +88,7 @@ public class GearmanProxyTest extends HudsonTestCase {
DumbSlave slave = createSlave();
gp.initWorkers();
assertEquals(3, gp.getGewtHandles().size());
// exec: 2 slaves, 1 master + mgmnt: 1
assertEquals(4, gp.getNumExecutors());
}
}