Merge "Register the diff of functions"
This commit is contained in:
commit
c5637cef49
|
@ -167,6 +167,8 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
|
|||
eventList = new ConcurrentLinkedQueue<GearmanSessionEvent>();
|
||||
// this will cause a grab-job event
|
||||
functionRegistry.setUpdated(true);
|
||||
// Make sure we reset the function list
|
||||
functionMap.clear();
|
||||
} catch (IOException e) {
|
||||
try {
|
||||
Thread.sleep(2000);
|
||||
|
@ -236,22 +238,55 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
|
|||
return;
|
||||
}
|
||||
|
||||
functionMap.clear();
|
||||
sendToAll(new GearmanPacketImpl(GearmanPacketMagic.REQ,
|
||||
GearmanPacketType.RESET_ABILITIES, new byte[0]));
|
||||
session.driveSessionIO();
|
||||
if (!isRunning()) return;
|
||||
|
||||
for (GearmanFunctionFactory factory: functions) {
|
||||
FunctionDefinition def = new FunctionDefinition(0, factory);
|
||||
functionMap.put(factory.getFunctionName(), def);
|
||||
sendToAll(generateCanDoPacket(def));
|
||||
HashMap<String, FunctionDefinition> newFunctionMap = new HashMap<String, FunctionDefinition>();
|
||||
// If we have no previous data then reset abilities to be sure the
|
||||
// gearman server has no stale data that we don't know about.
|
||||
// Or if we have no functions anymore just reset everything, we don't
|
||||
// need a CANT_DO per lost function.
|
||||
if (functions.isEmpty() || functionMap.isEmpty()) {
|
||||
sendToAll(new GearmanPacketImpl(GearmanPacketMagic.REQ,
|
||||
GearmanPacketType.RESET_ABILITIES, new byte[0]));
|
||||
session.driveSessionIO();
|
||||
if (!isRunning()) return;
|
||||
|
||||
LOG.debug("---- Worker " + this + " registered function " +
|
||||
factory.getFunctionName());
|
||||
LOG.debug("---- Worker " + this + " reset functions");
|
||||
if (!isRunning()) {
|
||||
// Ensure we start from scratch on reconnection.
|
||||
functionMap.clear();
|
||||
return;
|
||||
}
|
||||
}
|
||||
// Now only update if we have data to update.
|
||||
if (!functions.isEmpty()) {
|
||||
for (GearmanFunctionFactory factory: functions) {
|
||||
FunctionDefinition def = new FunctionDefinition(0, factory);
|
||||
newFunctionMap.put(factory.getFunctionName(), def);
|
||||
if (!functionMap.containsKey(factory.getFunctionName())) {
|
||||
sendToAll(generateCanDoPacket(def));
|
||||
session.driveSessionIO();
|
||||
if (!isRunning()) {
|
||||
// Ensure we start from scratch on reconnection.
|
||||
functionMap.clear();
|
||||
return;
|
||||
}
|
||||
|
||||
LOG.debug("---- Worker " + this + " registered function " +
|
||||
factory.getFunctionName());
|
||||
}
|
||||
functionMap.remove(factory.getFunctionName());
|
||||
}
|
||||
for (FunctionDefinition def: functionMap.values()) {
|
||||
sendToAll(generateCantDoPacket(def));
|
||||
session.driveSessionIO();
|
||||
if (!isRunning()) {
|
||||
// Ensure we start from scratch on reconnection.
|
||||
functionMap.clear();
|
||||
return;
|
||||
}
|
||||
|
||||
LOG.debug("---- Worker " + this + " unregistered function " +
|
||||
def.getFactory().getFunctionName());
|
||||
}
|
||||
}
|
||||
functionMap = newFunctionMap;
|
||||
|
||||
GearmanSessionEvent nextEvent = eventList.peek();
|
||||
if (nextEvent == null ||
|
||||
|
@ -515,6 +550,12 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
|
|||
return new GearmanPacketImpl(GearmanPacketMagic.REQ, pt, data);
|
||||
}
|
||||
|
||||
private GearmanPacket generateCantDoPacket(FunctionDefinition def) {
|
||||
GearmanPacketType pt = GearmanPacketType.CANT_DO;
|
||||
byte[] data = ByteUtils.toUTF8Bytes(def.getFactory().getFunctionName());
|
||||
return new GearmanPacketImpl(GearmanPacketMagic.REQ, pt, data);
|
||||
}
|
||||
|
||||
private void sendToAll(GearmanPacket p) {
|
||||
sendToAll(null, p);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue