diff --git a/src/main/java/hudson/plugins/gearman/MyGearmanWorkerImpl.java b/src/main/java/hudson/plugins/gearman/MyGearmanWorkerImpl.java index d261927..72118ea 100644 --- a/src/main/java/hudson/plugins/gearman/MyGearmanWorkerImpl.java +++ b/src/main/java/hudson/plugins/gearman/MyGearmanWorkerImpl.java @@ -36,7 +36,7 @@ import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import org.gearman.common.Constants; @@ -70,7 +70,7 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler { IDLE, RUNNING, SHUTTINGDOWN } private static final String DESCRIPION_PREFIX = "GearmanWorker"; - private Queue functionList = null; + private ConcurrentLinkedQueue eventList = null; private Selector ioAvailable = null; private static final org.slf4j.Logger LOG = LoggerFactory.getLogger( Constants.GEARMAN_WORKER_LOGGER_NAME); @@ -78,7 +78,6 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler { private Map functionMap; private State state; private ExecutorService executorService; - private Map taskMap = null; private GearmanJobServerSession session = null; private final GearmanJobServerIpConnectionFactory connFactory = new GearmanNIOJobServerConnectionFactory(); private volatile boolean jobUniqueIdRequired = false; @@ -163,6 +162,9 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler { GearmanPacketType.SET_CLIENT_ID, ByteUtils.toUTF8Bytes(id))); } + // Reset events so that we don't process events from the old + // connection. + eventList = new ConcurrentLinkedQueue(); // this will cause a grab-job event functionRegistry.setUpdated(true); } catch (IOException e) { @@ -183,12 +185,11 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler { public MyGearmanWorkerImpl(ExecutorService executorService, AvailabilityMonitor availability) { this.availability = availability; - functionList = new LinkedList(); + eventList = new ConcurrentLinkedQueue(); id = DESCRIPION_PREFIX + ":" + Thread.currentThread().getId(); functionMap = new HashMap(); state = State.IDLE; this.executorService = executorService; - taskMap = new HashMap(); functionRegistry = new FunctionRegistry(); try { @@ -252,17 +253,34 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler { factory.getFunctionName()); } - // Simulate a NOOP packet which will kick off a GRAB_JOB cycle - // if we're sleeping. If we get a real NOOP in the mean time, - // it should be fine because GearmanJobServerSession ignores a - // NOOP if PRE_SLEEP is not on the stack. + GearmanSessionEvent nextEvent = eventList.peek(); + if (nextEvent == null || + nextEvent.getPacket().getPacketType() != GearmanPacketType.NOOP) { + // Simulate a NOOP packet which will kick off a GRAB_JOB cycle + // if we're sleeping. If we get a real NOOP in the mean time, + // it should be fine because GearmanJobServerSession ignores a + // NOOP if PRE_SLEEP is not on the stack. + GearmanPacket p = new GearmanPacketImpl(GearmanPacketMagic.RES, + GearmanPacketType.NOOP, new byte[0]); + GearmanSessionEvent event = new GearmanSessionEvent(p, session); + session.handleSessionEvent(event); + } + } + + public void enqueueNoopEvent() { + // Simulate a NOOP packet which will kick off a GRAB_JOB cycle. + // This unconditionally enqueues the NOOP which will send a GRAB_JOB + // and should only be used when you know you need to send a GRAB_JOB. + // Cases like worker start, post function run, post failure. GearmanPacket p = new GearmanPacketImpl(GearmanPacketMagic.RES, GearmanPacketType.NOOP, new byte[0]); GearmanSessionEvent event = new GearmanSessionEvent(p, session); - session.handleSessionEvent(event); + enqueueEvent(event); } public void work() { + GearmanSessionEvent event = null; + GearmanFunction function = null; LOG.info("---- Worker " + this + " starting work"); if (!state.equals(State.IDLE)) { @@ -271,7 +289,9 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler { } state = State.RUNNING; - boolean grabJobSent = false; + // When we first start working we will already be initialized so must + // enqueue a Noop event to trigger GRAB_JOB here. + enqueueNoopEvent(); while (isRunning()) { LOG.debug("---- Worker " + this + " top of run loop"); @@ -279,12 +299,8 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler { if (!session.isInitialized()) { LOG.debug("---- Worker " + this + " run loop reconnect"); reconnect(); - grabJobSent = false; - } - - // if still disconnected, skip - if (!session.isInitialized()) { - LOG.debug("---- Worker " + this + " run loop not initialized"); + enqueueNoopEvent(); + // Restart loop to check we connected. continue; } @@ -298,55 +314,44 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler { continue; } - if (!isRunning()) continue; + if (!isRunning() || !session.isInitialized()) continue; - if (functionList.isEmpty()) { - LOG.debug("---- Worker " + this + " run loop function list is empty while" + - " checking for initial grab job"); - if (!grabJobSent) { - // send the initial GRAB_JOB on reconnection. - LOG.info("---- Worker " + this + " sending initial grab job"); - try { - sendGrabJob(session); - } catch (InterruptedException e) { - LOG.warn("---- Worker " + this + - " interrupted while waiting for okay to send grab job", e); - continue; - } - grabJobSent = true; - try { - session.driveSessionIO(); - } catch (IOException io) { - LOG.warn("---- Worker " + this + " receieved IOException while" + - " sending initial grab job", io); - session.closeSession(); - continue; - } - } - } - LOG.debug("---- Worker " + this + " run loop finished initial grab job"); + event = eventList.poll(); + function = processSessionEvent(event); - if (!isRunning()) continue; + if (!isRunning() || !session.isInitialized()) continue; - if (functionList.isEmpty()) { - LOG.debug("---- Worker " + this + " function list empty; selecting"); - int interestOps = SelectionKey.OP_READ; - if (session.sessionHasDataToWrite()) { - interestOps |= SelectionKey.OP_WRITE; - } - session.getSelectionKey().interestOps(interestOps); - - try { - ioAvailable.select(); - } catch (IOException io) { - LOG.warn("---- Worker " + this + " receieved IOException while" + - " selecting for IO", io); - session.closeSession(); - continue; - } + // For the time being we will execute the jobs synchronously + // in the future, I expect to change this. + if (function != null) { + LOG.info("---- Worker " + this + " executing function"); + submitFunction(function); + // Send another grab_job on the next loop + enqueueNoopEvent(); + // Skip IO as submitFunction drives the IO for function + // running. + continue; + } + + if (!isRunning() || !session.isInitialized()) continue; + + // Run IO, select waiting for ability to read and/or write + // then read and/or write. + int interestOps = SelectionKey.OP_READ; + if (session.sessionHasDataToWrite()) { + interestOps |= SelectionKey.OP_WRITE; + } + session.getSelectionKey().interestOps(interestOps); + + try { + ioAvailable.select(); + } catch (IOException io) { + LOG.warn("---- Worker " + this + " receieved IOException while" + + " selecting for IO", io); + session.closeSession(); + continue; } - LOG.debug("---- Worker " + this + " run loop finished selecting"); if (ioAvailable.selectedKeys().contains(session.getSelectionKey())) { LOG.debug("---- Worker " + this + " received input in run loop"); if (!session.isInitialized()) { @@ -363,19 +368,6 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler { } } LOG.debug("---- Worker " + this + " run loop finished driving session io"); - - if (!isRunning()) continue; - - //For the time being we will execute the jobs synchronously - //in the future, I expect to change this. - if (!functionList.isEmpty()) { - LOG.info("---- Worker " + this + " executing function"); - GearmanFunction fun = functionList.remove(); - submitFunction(fun); - // Send another grab_job on the next loop - grabJobSent = false; - } - LOG.debug("---- Worker " + this + " bottom of run loop"); } shutDownWorker(true); @@ -391,63 +383,71 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler { new GrabJobEventHandler(s), new GearmanPacketImpl(GearmanPacketMagic.REQ, getGrabJobPacketType(), new byte[0])); - taskMap.put(s, grabJobTask); s.submitTask(grabJobTask); } public void handleSessionEvent(GearmanSessionEvent event) throws IllegalArgumentException, IllegalStateException { - GearmanPacket p = event.getPacket(); - GearmanJobServerSession s = event.getSession(); - GearmanPacketType t = p.getPacketType(); - LOG.debug("---- Worker " + this + " handling session event" + - " ( Session = " + s + " Event = " + t + " )"); - switch (t) { - case JOB_ASSIGN: - //TODO Figure out what the right behavior is if JobUUIDRequired was false when we submitted but is now true - LOG.info("---- Worker " + this + " received job assignment"); - taskMap.remove(s); - addNewJob(event); - break; - case JOB_ASSIGN_UNIQ: - //TODO Figure out what the right behavior is if JobUUIDRequired was true when we submitted but is now false - LOG.info("---- Worker " + this + " received unique job assignment"); - taskMap.remove(s); - addNewJob(event); - break; - case NOOP: - taskMap.remove(s); - LOG.debug("---- Worker " + this + " sending grab job after wakeup"); - try { - sendGrabJob(s); - } catch (InterruptedException e) { - LOG.warn("---- Worker " + this + " interrupted while waiting for okay to send " + - "grab job", e); - } - break; - case NO_JOB: - // We didn't get a job, so allow other workers or - // Jenkins to schedule on this node. - availability.unlock(this); - LOG.debug("---- Worker " + this + " sending pre sleep after no_job"); - GearmanTask preSleepTask = new GearmanTask(new GrabJobEventHandler(s), - new GearmanPacketImpl(GearmanPacketMagic.REQ, - GearmanPacketType.PRE_SLEEP, new byte[0])); - taskMap.put(s, preSleepTask); - s.submitTask(preSleepTask); - break; - case ECHO_RES: - break; - case OPTION_RES: - break; - case ERROR: - s.closeSession(); - break; - default: - LOG.warn("---- Worker " + this + " received unknown packet type " + t + - " from session " + s + "; closing connection"); - s.closeSession(); + enqueueEvent(event); + } + + public void enqueueEvent(GearmanSessionEvent event) { + // Enqueue in a thread safe manner. Events will + // be pulled off and processed serially in this workers + // main thread. + eventList.add(event); + } + + private GearmanFunction processSessionEvent(GearmanSessionEvent event) + throws IllegalArgumentException, IllegalStateException { + if (event != null) { + GearmanPacket p = event.getPacket(); + GearmanJobServerSession s = event.getSession(); + GearmanPacketType t = p.getPacketType(); + LOG.debug("---- Worker " + this + " handling session event" + + " ( Session = " + s + " Event = " + t + " )"); + switch (t) { + case JOB_ASSIGN: + //TODO Figure out what the right behavior is if JobUUIDRequired was false when we submitted but is now true + LOG.info("---- Worker " + this + " received job assignment"); + return addNewJob(event); + case JOB_ASSIGN_UNIQ: + //TODO Figure out what the right behavior is if JobUUIDRequired was true when we submitted but is now false + LOG.info("---- Worker " + this + " received unique job assignment"); + return addNewJob(event); + case NOOP: + LOG.debug("---- Worker " + this + " sending grab job after wakeup"); + try { + sendGrabJob(s); + } catch (InterruptedException e) { + LOG.warn("---- Worker " + this + " interrupted while waiting for okay to send " + + "grab job", e); + } + break; + case NO_JOB: + // We didn't get a job, so allow other workers or + // Jenkins to schedule on this node. + availability.unlock(this); + LOG.debug("---- Worker " + this + " sending pre sleep after no_job"); + GearmanTask preSleepTask = new GearmanTask(new GrabJobEventHandler(s), + new GearmanPacketImpl(GearmanPacketMagic.REQ, + GearmanPacketType.PRE_SLEEP, new byte[0])); + s.submitTask(preSleepTask); + break; + case ECHO_RES: + break; + case OPTION_RES: + break; + case ERROR: + s.closeSession(); + break; + default: + LOG.warn("---- Worker " + this + " received unknown packet type " + t + + " from session " + s + "; closing connection"); + s.closeSession(); + } } + return null; } public boolean addServer(String host, int port) { @@ -554,7 +554,7 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler { return exceptions; } - private void addNewJob(GearmanSessionEvent event) { + private GearmanFunction addNewJob(GearmanSessionEvent event) { byte[] handle, data, functionNameBytes, unique; GearmanPacket p = event.getPacket(); String functionName; @@ -572,6 +572,7 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler { new GearmanPacketImpl(GearmanPacketMagic.REQ, GearmanPacketType.WORK_FAIL, handle)); session.submitTask(gsr); + enqueueNoopEvent(); } else { GearmanFunction function = def.getFactory().getFunction(); function.setData(data); @@ -580,8 +581,9 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler { if (unique != null && unique.length > 0) { function.setUniqueId(unique); } - functionList.add(function); + return function; } + return null; } private void submitFunction(GearmanFunction fun) {