From 73359c04563f90fb2c85583183a86b2b6ba92acc Mon Sep 17 00:00:00 2001 From: Clark Boylan Date: Tue, 5 May 2015 18:05:59 -0700 Subject: [PATCH] Register the diff of functions Instead of registering all functions for every node each time the functions change register the delta of the functions each time. This should cut down on the amount of CAN_DO updates we were doing in the past. Note that we handle the loss of all functions with RESET_ABILITIES rather than sending a CANT_DO for each function that is no longer available. Also, starting a new connection will always begin with RESET_ABILITIES to clear any potentially stale state from the gearman server. Change-Id: I2b16117fce30ddb3e11b338043204cf726c7f1d4 --- .../plugins/gearman/MyGearmanWorkerImpl.java | 69 +++++++++++++++---- 1 file changed, 55 insertions(+), 14 deletions(-) diff --git a/src/main/java/hudson/plugins/gearman/MyGearmanWorkerImpl.java b/src/main/java/hudson/plugins/gearman/MyGearmanWorkerImpl.java index 72118ea..798861e 100644 --- a/src/main/java/hudson/plugins/gearman/MyGearmanWorkerImpl.java +++ b/src/main/java/hudson/plugins/gearman/MyGearmanWorkerImpl.java @@ -167,6 +167,8 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler { eventList = new ConcurrentLinkedQueue(); // this will cause a grab-job event functionRegistry.setUpdated(true); + // Make sure we reset the function list + functionMap.clear(); } catch (IOException e) { try { Thread.sleep(2000); @@ -236,22 +238,55 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler { return; } - functionMap.clear(); - sendToAll(new GearmanPacketImpl(GearmanPacketMagic.REQ, - GearmanPacketType.RESET_ABILITIES, new byte[0])); - session.driveSessionIO(); - if (!isRunning()) return; - - for (GearmanFunctionFactory factory: functions) { - FunctionDefinition def = new FunctionDefinition(0, factory); - functionMap.put(factory.getFunctionName(), def); - sendToAll(generateCanDoPacket(def)); + HashMap newFunctionMap = new HashMap(); + // If we have no previous data then reset abilities to be sure the + // gearman server has no stale data that we don't know about. + // Or if we have no functions anymore just reset everything, we don't + // need a CANT_DO per lost function. + if (functions.isEmpty() || functionMap.isEmpty()) { + sendToAll(new GearmanPacketImpl(GearmanPacketMagic.REQ, + GearmanPacketType.RESET_ABILITIES, new byte[0])); session.driveSessionIO(); - if (!isRunning()) return; - - LOG.debug("---- Worker " + this + " registered function " + - factory.getFunctionName()); + LOG.debug("---- Worker " + this + " reset functions"); + if (!isRunning()) { + // Ensure we start from scratch on reconnection. + functionMap.clear(); + return; + } } + // Now only update if we have data to update. + if (!functions.isEmpty()) { + for (GearmanFunctionFactory factory: functions) { + FunctionDefinition def = new FunctionDefinition(0, factory); + newFunctionMap.put(factory.getFunctionName(), def); + if (!functionMap.containsKey(factory.getFunctionName())) { + sendToAll(generateCanDoPacket(def)); + session.driveSessionIO(); + if (!isRunning()) { + // Ensure we start from scratch on reconnection. + functionMap.clear(); + return; + } + + LOG.debug("---- Worker " + this + " registered function " + + factory.getFunctionName()); + } + functionMap.remove(factory.getFunctionName()); + } + for (FunctionDefinition def: functionMap.values()) { + sendToAll(generateCantDoPacket(def)); + session.driveSessionIO(); + if (!isRunning()) { + // Ensure we start from scratch on reconnection. + functionMap.clear(); + return; + } + + LOG.debug("---- Worker " + this + " unregistered function " + + def.getFactory().getFunctionName()); + } + } + functionMap = newFunctionMap; GearmanSessionEvent nextEvent = eventList.peek(); if (nextEvent == null || @@ -515,6 +550,12 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler { return new GearmanPacketImpl(GearmanPacketMagic.REQ, pt, data); } + private GearmanPacket generateCantDoPacket(FunctionDefinition def) { + GearmanPacketType pt = GearmanPacketType.CANT_DO; + byte[] data = ByteUtils.toUTF8Bytes(def.getFactory().getFunctionName()); + return new GearmanPacketImpl(GearmanPacketMagic.REQ, pt, data); + } + private void sendToAll(GearmanPacket p) { sendToAll(null, p); }