Merge "Register the diff of functions"

This commit is contained in:
Jenkins 2016-03-08 21:49:21 +00:00 committed by Gerrit Code Review
commit c5637cef49
1 changed files with 55 additions and 14 deletions

View File

@ -167,6 +167,8 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
eventList = new ConcurrentLinkedQueue<GearmanSessionEvent>();
// this will cause a grab-job event
functionRegistry.setUpdated(true);
// Make sure we reset the function list
functionMap.clear();
} catch (IOException e) {
try {
Thread.sleep(2000);
@ -236,22 +238,55 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
return;
}
functionMap.clear();
sendToAll(new GearmanPacketImpl(GearmanPacketMagic.REQ,
GearmanPacketType.RESET_ABILITIES, new byte[0]));
session.driveSessionIO();
if (!isRunning()) return;
for (GearmanFunctionFactory factory: functions) {
FunctionDefinition def = new FunctionDefinition(0, factory);
functionMap.put(factory.getFunctionName(), def);
sendToAll(generateCanDoPacket(def));
HashMap<String, FunctionDefinition> newFunctionMap = new HashMap<String, FunctionDefinition>();
// If we have no previous data then reset abilities to be sure the
// gearman server has no stale data that we don't know about.
// Or if we have no functions anymore just reset everything, we don't
// need a CANT_DO per lost function.
if (functions.isEmpty() || functionMap.isEmpty()) {
sendToAll(new GearmanPacketImpl(GearmanPacketMagic.REQ,
GearmanPacketType.RESET_ABILITIES, new byte[0]));
session.driveSessionIO();
if (!isRunning()) return;
LOG.debug("---- Worker " + this + " registered function " +
factory.getFunctionName());
LOG.debug("---- Worker " + this + " reset functions");
if (!isRunning()) {
// Ensure we start from scratch on reconnection.
functionMap.clear();
return;
}
}
// Now only update if we have data to update.
if (!functions.isEmpty()) {
for (GearmanFunctionFactory factory: functions) {
FunctionDefinition def = new FunctionDefinition(0, factory);
newFunctionMap.put(factory.getFunctionName(), def);
if (!functionMap.containsKey(factory.getFunctionName())) {
sendToAll(generateCanDoPacket(def));
session.driveSessionIO();
if (!isRunning()) {
// Ensure we start from scratch on reconnection.
functionMap.clear();
return;
}
LOG.debug("---- Worker " + this + " registered function " +
factory.getFunctionName());
}
functionMap.remove(factory.getFunctionName());
}
for (FunctionDefinition def: functionMap.values()) {
sendToAll(generateCantDoPacket(def));
session.driveSessionIO();
if (!isRunning()) {
// Ensure we start from scratch on reconnection.
functionMap.clear();
return;
}
LOG.debug("---- Worker " + this + " unregistered function " +
def.getFactory().getFunctionName());
}
}
functionMap = newFunctionMap;
GearmanSessionEvent nextEvent = eventList.peek();
if (nextEvent == null ||
@ -515,6 +550,12 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
return new GearmanPacketImpl(GearmanPacketMagic.REQ, pt, data);
}
private GearmanPacket generateCantDoPacket(FunctionDefinition def) {
GearmanPacketType pt = GearmanPacketType.CANT_DO;
byte[] data = ByteUtils.toUTF8Bytes(def.getFactory().getFunctionName());
return new GearmanPacketImpl(GearmanPacketMagic.REQ, pt, data);
}
private void sendToAll(GearmanPacket p) {
sendToAll(null, p);
}