summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorClark Boylan <clark.boylan@gmail.com>2015-05-05 18:05:59 -0700
committerJames E. Blair <jeblair@linux.vnet.ibm.com>2016-03-08 10:32:26 -0800
commit73359c04563f90fb2c85583183a86b2b6ba92acc (patch)
treeef5403998eeb82081b7b7066bce651256e4a7226
parent0314ab1ea2f959eca0a89214483cb4f16fa92829 (diff)
Register the diff of functions
Instead of registering all functions for every node each time the functions change register the delta of the functions each time. This should cut down on the amount of CAN_DO updates we were doing in the past. Note that we handle the loss of all functions with RESET_ABILITIES rather than sending a CANT_DO for each function that is no longer available. Also, starting a new connection will always begin with RESET_ABILITIES to clear any potentially stale state from the gearman server. Change-Id: I2b16117fce30ddb3e11b338043204cf726c7f1d4
Notes
Notes (review): Code-Review+2: Clark Boylan <cboylan@sapwetik.org> Code-Review+2: James E. Blair <corvus@inaugust.com> Workflow+1: James E. Blair <corvus@inaugust.com> Verified+2: Jenkins Submitted-by: Jenkins Submitted-at: Tue, 08 Mar 2016 21:49:21 +0000 Reviewed-on: https://review.openstack.org/180371 Project: openstack-infra/gearman-plugin Branch: refs/heads/master
-rw-r--r--src/main/java/hudson/plugins/gearman/MyGearmanWorkerImpl.java67
1 files changed, 54 insertions, 13 deletions
diff --git a/src/main/java/hudson/plugins/gearman/MyGearmanWorkerImpl.java b/src/main/java/hudson/plugins/gearman/MyGearmanWorkerImpl.java
index 72118ea..798861e 100644
--- a/src/main/java/hudson/plugins/gearman/MyGearmanWorkerImpl.java
+++ b/src/main/java/hudson/plugins/gearman/MyGearmanWorkerImpl.java
@@ -167,6 +167,8 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
167 eventList = new ConcurrentLinkedQueue<GearmanSessionEvent>(); 167 eventList = new ConcurrentLinkedQueue<GearmanSessionEvent>();
168 // this will cause a grab-job event 168 // this will cause a grab-job event
169 functionRegistry.setUpdated(true); 169 functionRegistry.setUpdated(true);
170 // Make sure we reset the function list
171 functionMap.clear();
170 } catch (IOException e) { 172 } catch (IOException e) {
171 try { 173 try {
172 Thread.sleep(2000); 174 Thread.sleep(2000);
@@ -236,22 +238,55 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
236 return; 238 return;
237 } 239 }
238 240
239 functionMap.clear(); 241 HashMap<String, FunctionDefinition> newFunctionMap = new HashMap<String, FunctionDefinition>();
240 sendToAll(new GearmanPacketImpl(GearmanPacketMagic.REQ, 242 // If we have no previous data then reset abilities to be sure the
241 GearmanPacketType.RESET_ABILITIES, new byte[0])); 243 // gearman server has no stale data that we don't know about.
242 session.driveSessionIO(); 244 // Or if we have no functions anymore just reset everything, we don't
243 if (!isRunning()) return; 245 // need a CANT_DO per lost function.
244 246 if (functions.isEmpty() || functionMap.isEmpty()) {
245 for (GearmanFunctionFactory factory: functions) { 247 sendToAll(new GearmanPacketImpl(GearmanPacketMagic.REQ,
246 FunctionDefinition def = new FunctionDefinition(0, factory); 248 GearmanPacketType.RESET_ABILITIES, new byte[0]));
247 functionMap.put(factory.getFunctionName(), def);
248 sendToAll(generateCanDoPacket(def));
249 session.driveSessionIO(); 249 session.driveSessionIO();
250 if (!isRunning()) return; 250 LOG.debug("---- Worker " + this + " reset functions");
251 if (!isRunning()) {
252 // Ensure we start from scratch on reconnection.
253 functionMap.clear();
254 return;
255 }
256 }
257 // Now only update if we have data to update.
258 if (!functions.isEmpty()) {
259 for (GearmanFunctionFactory factory: functions) {
260 FunctionDefinition def = new FunctionDefinition(0, factory);
261 newFunctionMap.put(factory.getFunctionName(), def);
262 if (!functionMap.containsKey(factory.getFunctionName())) {
263 sendToAll(generateCanDoPacket(def));
264 session.driveSessionIO();
265 if (!isRunning()) {
266 // Ensure we start from scratch on reconnection.
267 functionMap.clear();
268 return;
269 }
251 270
252 LOG.debug("---- Worker " + this + " registered function " + 271 LOG.debug("---- Worker " + this + " registered function " +
253 factory.getFunctionName()); 272 factory.getFunctionName());
273 }
274 functionMap.remove(factory.getFunctionName());
275 }
276 for (FunctionDefinition def: functionMap.values()) {
277 sendToAll(generateCantDoPacket(def));
278 session.driveSessionIO();
279 if (!isRunning()) {
280 // Ensure we start from scratch on reconnection.
281 functionMap.clear();
282 return;
283 }
284
285 LOG.debug("---- Worker " + this + " unregistered function " +
286 def.getFactory().getFunctionName());
287 }
254 } 288 }
289 functionMap = newFunctionMap;
255 290
256 GearmanSessionEvent nextEvent = eventList.peek(); 291 GearmanSessionEvent nextEvent = eventList.peek();
257 if (nextEvent == null || 292 if (nextEvent == null ||
@@ -515,6 +550,12 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
515 return new GearmanPacketImpl(GearmanPacketMagic.REQ, pt, data); 550 return new GearmanPacketImpl(GearmanPacketMagic.REQ, pt, data);
516 } 551 }
517 552
553 private GearmanPacket generateCantDoPacket(FunctionDefinition def) {
554 GearmanPacketType pt = GearmanPacketType.CANT_DO;
555 byte[] data = ByteUtils.toUTF8Bytes(def.getFactory().getFunctionName());
556 return new GearmanPacketImpl(GearmanPacketMagic.REQ, pt, data);
557 }
558
518 private void sendToAll(GearmanPacket p) { 559 private void sendToAll(GearmanPacket p) {
519 sendToAll(null, p); 560 sendToAll(null, p);
520 } 561 }