Register the diff of functions

Instead of registering all functions for every node each time the
functions change register the delta of the functions each time. This
should cut down on the amount of CAN_DO updates we were doing in the
past.

Note that we handle the loss of all functions with RESET_ABILITIES
rather than sending a CANT_DO for each function that is no longer
available. Also, starting a new connection will always begin with
RESET_ABILITIES to clear any potentially stale state from the gearman
server.

Change-Id: I2b16117fce30ddb3e11b338043204cf726c7f1d4
This commit is contained in:
Clark Boylan 2015-05-05 18:05:59 -07:00 committed by James E. Blair
parent 0314ab1ea2
commit 73359c0456
1 changed files with 55 additions and 14 deletions

View File

@ -167,6 +167,8 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
eventList = new ConcurrentLinkedQueue<GearmanSessionEvent>();
// this will cause a grab-job event
functionRegistry.setUpdated(true);
// Make sure we reset the function list
functionMap.clear();
} catch (IOException e) {
try {
Thread.sleep(2000);
@ -236,22 +238,55 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
return;
}
functionMap.clear();
sendToAll(new GearmanPacketImpl(GearmanPacketMagic.REQ,
GearmanPacketType.RESET_ABILITIES, new byte[0]));
session.driveSessionIO();
if (!isRunning()) return;
for (GearmanFunctionFactory factory: functions) {
FunctionDefinition def = new FunctionDefinition(0, factory);
functionMap.put(factory.getFunctionName(), def);
sendToAll(generateCanDoPacket(def));
HashMap<String, FunctionDefinition> newFunctionMap = new HashMap<String, FunctionDefinition>();
// If we have no previous data then reset abilities to be sure the
// gearman server has no stale data that we don't know about.
// Or if we have no functions anymore just reset everything, we don't
// need a CANT_DO per lost function.
if (functions.isEmpty() || functionMap.isEmpty()) {
sendToAll(new GearmanPacketImpl(GearmanPacketMagic.REQ,
GearmanPacketType.RESET_ABILITIES, new byte[0]));
session.driveSessionIO();
if (!isRunning()) return;
LOG.debug("---- Worker " + this + " registered function " +
factory.getFunctionName());
LOG.debug("---- Worker " + this + " reset functions");
if (!isRunning()) {
// Ensure we start from scratch on reconnection.
functionMap.clear();
return;
}
}
// Now only update if we have data to update.
if (!functions.isEmpty()) {
for (GearmanFunctionFactory factory: functions) {
FunctionDefinition def = new FunctionDefinition(0, factory);
newFunctionMap.put(factory.getFunctionName(), def);
if (!functionMap.containsKey(factory.getFunctionName())) {
sendToAll(generateCanDoPacket(def));
session.driveSessionIO();
if (!isRunning()) {
// Ensure we start from scratch on reconnection.
functionMap.clear();
return;
}
LOG.debug("---- Worker " + this + " registered function " +
factory.getFunctionName());
}
functionMap.remove(factory.getFunctionName());
}
for (FunctionDefinition def: functionMap.values()) {
sendToAll(generateCantDoPacket(def));
session.driveSessionIO();
if (!isRunning()) {
// Ensure we start from scratch on reconnection.
functionMap.clear();
return;
}
LOG.debug("---- Worker " + this + " unregistered function " +
def.getFactory().getFunctionName());
}
}
functionMap = newFunctionMap;
GearmanSessionEvent nextEvent = eventList.peek();
if (nextEvent == null ||
@ -515,6 +550,12 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
return new GearmanPacketImpl(GearmanPacketMagic.REQ, pt, data);
}
private GearmanPacket generateCantDoPacket(FunctionDefinition def) {
GearmanPacketType pt = GearmanPacketType.CANT_DO;
byte[] data = ByteUtils.toUTF8Bytes(def.getFactory().getFunctionName());
return new GearmanPacketImpl(GearmanPacketMagic.REQ, pt, data);
}
private void sendToAll(GearmanPacket p) {
sendToAll(null, p);
}