From 0b017fe95d670d6d64b5b29750faf4d2938f946a Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Thu, 23 May 2013 13:42:01 -0700 Subject: [PATCH] Rework when GRAB_JOB is sent. * Send GRAB_JOB after NO_OP. * Send GRAB_JOB after initial setup to start the event-state system. * Don't send GRAB_JOB after changing the functions. The gearman server will maintain the existing state of the connection if functions change, so if we are sleeping, we will still get appropriate NOOP messages even after changing functions. * Don't send GRAB_JOB when the task queue is empty. The event-state system should send it when necessary. * Send GRAB_JOB after job completion (to restart the event-state system). Fixes 1183454. Also fixes cases where we would end up grabbing multiple jobs in one cycle. Change-Id: I36a890711ecdfef62a6554bac820acf0ca8b5f5b --- .../plugins/gearman/MyGearmanWorkerImpl.java | 100 ++++++++++++------ 1 file changed, 65 insertions(+), 35 deletions(-) diff --git a/src/main/java/hudson/plugins/gearman/MyGearmanWorkerImpl.java b/src/main/java/hudson/plugins/gearman/MyGearmanWorkerImpl.java index 9587612..6249f6c 100644 --- a/src/main/java/hudson/plugins/gearman/MyGearmanWorkerImpl.java +++ b/src/main/java/hudson/plugins/gearman/MyGearmanWorkerImpl.java @@ -237,13 +237,14 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler { factory.getFunctionName()); } - GearmanTask sessTask = new GearmanTask( - new GrabJobEventHandler(session), - new GearmanPacketImpl(GearmanPacketMagic.REQ, - getGrabJobPacketType(), new byte[0])); - taskMap.put(session, sessTask); - session.submitTask(sessTask); - session.driveSessionIO(); + // Simulate a NOOP packet which will kick off a GRAB_JOB cycle + // if we're sleeping. If we get a real NOOP in the mean time, + // it should be fine because GearmanJobServerSession ignores a + // NOOP if PRE_SLEEP is not on the stack. + GearmanPacket p = new GearmanPacketImpl(GearmanPacketMagic.RES, + GearmanPacketType.NOOP, new byte[0]); + GearmanSessionEvent event = new GearmanSessionEvent(p, session); + session.handleSessionEvent(event); } public void work() { @@ -255,10 +256,12 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler { } state = State.RUNNING; + boolean grabJobSent = false; while (isRunning()) { if (!session.isInitialized()) { reconnect(); + grabJobSent = false; } // if still disconnected, skip @@ -275,17 +278,43 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler { continue; } - int interestOps = SelectionKey.OP_READ; - if (session.sessionHasDataToWrite()) { - interestOps |= SelectionKey.OP_WRITE; + if (functionList.isEmpty()) { + if (!grabJobSent) { + // send the initial GRAB_JOB on reconnection. + LOG.info("Worker " + this + " sending initial grab job"); + try { + GearmanTask sessTask = new GearmanTask( + new GrabJobEventHandler(session), + new GearmanPacketImpl(GearmanPacketMagic.REQ, + getGrabJobPacketType(), new byte[0])); + taskMap.put(session, sessTask); + session.submitTask(sessTask); + session.driveSessionIO(); + grabJobSent = true; + } catch (IOException io) { + LOG.warn("Receieved IOException while" + + " sending initial grab job",io); + session.closeSession(); + continue; + } + } } - session.getSelectionKey().interestOps(interestOps); - try { - ioAvailable.select(); - } catch (IOException io) { - LOG.warn("Receieved IOException while" + - " selecting for IO",io); + if (functionList.isEmpty()) { + int interestOps = SelectionKey.OP_READ; + if (session.sessionHasDataToWrite()) { + interestOps |= SelectionKey.OP_WRITE; + } + session.getSelectionKey().interestOps(interestOps); + + try { + ioAvailable.select(); + } catch (IOException io) { + LOG.warn("Receieved IOException while" + + " selecting for IO",io); + session.closeSession(); + continue; + } } if (ioAvailable.selectedKeys().contains(session.getSelectionKey())) { @@ -294,25 +323,6 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler { } try { session.driveSessionIO(); - - GearmanTask sessTask = taskMap.get(session); - if (sessTask == null) { - sessTask = new GearmanTask( //NOPMD - new GrabJobEventHandler(session), - new GearmanPacketImpl(GearmanPacketMagic.REQ, - getGrabJobPacketType(), new byte[0])); - taskMap.put(session, sessTask); - session.submitTask(sessTask); - LOG.debug("Worker: " + this + " submitted a " + - sessTask.getRequestPacket().getPacketType() + - " to session: " + session); - } - //For the time being we will execute the jobs synchronously - //in the future, I expect to change this. - if (!functionList.isEmpty()) { - GearmanFunction fun = functionList.remove(); - submitFunction(fun); - } } catch (IOException ioe) { LOG.warn("Received IOException while driving" + " IO on session " + session, ioe); @@ -320,6 +330,15 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler { continue; } } + + //For the time being we will execute the jobs synchronously + //in the future, I expect to change this. + if (!functionList.isEmpty()) { + GearmanFunction fun = functionList.remove(); + submitFunction(fun); + // Send another grab_job on the next loop + grabJobSent = false; + } } shutDownWorker(true); @@ -345,8 +364,19 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler { break; case NOOP: taskMap.remove(s); + LOG.debug("Worker " + this + " sending grab job after wakeup"); + GearmanTask grabJobTask = new GearmanTask( + new GrabJobEventHandler(s), + new GearmanPacketImpl(GearmanPacketMagic.REQ, + getGrabJobPacketType(), new byte[0])); + taskMap.put(s, grabJobTask); + s.submitTask(grabJobTask); + LOG.debug("Worker: " + this + " submitted a " + + grabJobTask.getRequestPacket().getPacketType() + + " to session: " + s); break; case NO_JOB: + LOG.debug("Worker " + this + " sending pre sleep after no_job"); GearmanTask preSleepTask = new GearmanTask(new GrabJobEventHandler(s), new GearmanPacketImpl(GearmanPacketMagic.REQ, GearmanPacketType.PRE_SLEEP, new byte[0]));