summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/main/java/hudson/plugins/gearman/MyGearmanWorkerImpl.java67
1 files changed, 54 insertions, 13 deletions
diff --git a/src/main/java/hudson/plugins/gearman/MyGearmanWorkerImpl.java b/src/main/java/hudson/plugins/gearman/MyGearmanWorkerImpl.java
index ca1c30c..6200369 100644
--- a/src/main/java/hudson/plugins/gearman/MyGearmanWorkerImpl.java
+++ b/src/main/java/hudson/plugins/gearman/MyGearmanWorkerImpl.java
@@ -167,6 +167,8 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
167 eventList = new ConcurrentLinkedQueue<GearmanSessionEvent>(); 167 eventList = new ConcurrentLinkedQueue<GearmanSessionEvent>();
168 // this will cause a grab-job event 168 // this will cause a grab-job event
169 functionRegistry.setUpdated(true); 169 functionRegistry.setUpdated(true);
170 // Make sure we reset the function list
171 functionMap.clear();
170 } catch (IOException e) { 172 } catch (IOException e) {
171 try { 173 try {
172 Thread.sleep(2000); 174 Thread.sleep(2000);
@@ -236,22 +238,55 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
236 return; 238 return;
237 } 239 }
238 240
239 functionMap.clear(); 241 HashMap<String, FunctionDefinition> newFunctionMap = new HashMap<String, FunctionDefinition>();
240 sendToAll(new GearmanPacketImpl(GearmanPacketMagic.REQ, 242 // If we have no previous data then reset abilities to be sure the
241 GearmanPacketType.RESET_ABILITIES, new byte[0])); 243 // gearman server has no stale data that we don't know about.
242 session.driveSessionIO(); 244 // Or if we have no functions anymore just reset everything, we don't
243 if (!isRunning()) return; 245 // need a CANT_DO per lost function.
244 246 if (functions.isEmpty() || functionMap.isEmpty()) {
245 for (GearmanFunctionFactory factory: functions) { 247 sendToAll(new GearmanPacketImpl(GearmanPacketMagic.REQ,
246 FunctionDefinition def = new FunctionDefinition(0, factory); 248 GearmanPacketType.RESET_ABILITIES, new byte[0]));
247 functionMap.put(factory.getFunctionName(), def);
248 sendToAll(generateCanDoPacket(def));
249 session.driveSessionIO(); 249 session.driveSessionIO();
250 if (!isRunning()) return; 250 LOG.debug("---- Worker " + this + " reset functions");
251 if (!isRunning()) {
252 // Ensure we start from scratch on reconnection.
253 functionMap.clear();
254 return;
255 }
256 }
257 // Now only update if we have data to update.
258 if (!functions.isEmpty()) {
259 for (GearmanFunctionFactory factory: functions) {
260 FunctionDefinition def = new FunctionDefinition(0, factory);
261 newFunctionMap.put(factory.getFunctionName(), def);
262 if (!functionMap.containsKey(factory.getFunctionName())) {
263 sendToAll(generateCanDoPacket(def));
264 session.driveSessionIO();
265 if (!isRunning()) {
266 // Ensure we start from scratch on reconnection.
267 functionMap.clear();
268 return;
269 }
251 270
252 LOG.debug("---- Worker " + this + " registered function " + 271 LOG.debug("---- Worker " + this + " registered function " +
253 factory.getFunctionName()); 272 factory.getFunctionName());
273 }
274 functionMap.remove(factory.getFunctionName());
275 }
276 for (FunctionDefinition def: functionMap.values()) {
277 sendToAll(generateCantDoPacket(def));
278 session.driveSessionIO();
279 if (!isRunning()) {
280 // Ensure we start from scratch on reconnection.
281 functionMap.clear();
282 return;
283 }
284
285 LOG.debug("---- Worker " + this + " unregistered function " +
286 def.getFactory().getFunctionName());
287 }
254 } 288 }
289 functionMap = newFunctionMap;
255 290
256 GearmanSessionEvent nextEvent = eventList.peek(); 291 GearmanSessionEvent nextEvent = eventList.peek();
257 if (nextEvent == null || 292 if (nextEvent == null ||
@@ -515,6 +550,12 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
515 return new GearmanPacketImpl(GearmanPacketMagic.REQ, pt, data); 550 return new GearmanPacketImpl(GearmanPacketMagic.REQ, pt, data);
516 } 551 }
517 552
553 private GearmanPacket generateCantDoPacket(FunctionDefinition def) {
554 GearmanPacketType pt = GearmanPacketType.CANT_DO;
555 byte[] data = ByteUtils.toUTF8Bytes(def.getFactory().getFunctionName());
556 return new GearmanPacketImpl(GearmanPacketMagic.REQ, pt, data);
557 }
558
518 private void sendToAll(GearmanPacket p) { 559 private void sendToAll(GearmanPacket p) {
519 sendToAll(null, p); 560 sendToAll(null, p);
520 } 561 }