Check for and handle ZMQ errors.

Log ZMQ errors and deal with them by creating a new ZMQ PUB socket in
most situations.
This commit is contained in:
Clark Boylan 2013-01-25 10:56:13 -08:00
parent f92be91eb0
commit 28d5017b4a
1 changed files with 54 additions and 23 deletions

View File

@ -24,13 +24,19 @@ import hudson.model.Run;
import hudson.model.TaskListener;
import hudson.model.listeners.RunListener;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.zeromq.ZMQ;
import org.zeromq.ZMQException;
/*
* Listener to publish Jenkins build events through ZMQ
*/
@Extension
public class RunListenerImpl extends RunListener<Run> {
public static final Logger LOGGER = Logger.getLogger(RunListenerImpl.class.getName());
private int port;
private String bind_addr;
private ZMQ.Context context;
@ -50,36 +56,53 @@ public class RunListenerImpl extends RunListener<Run> {
return 8888;
}
private void bindSocket(Run build) {
private ZMQ.Socket bindSocket(Run build) {
int tmpPort = getPort(build);
if (publisher == null) {
port = tmpPort;
publisher = context.socket(ZMQ.PUB);
bind_addr = String.format("tcp://*:%d", port);
publisher.bind(bind_addr);
LOGGER.log(Level.INFO,
String.format("Binding ZMQ PUB to port %d", port));
publisher = bindSocket(port);
}
else if (tmpPort != port) {
publisher.unbind(bind_addr);
publisher.close();
publisher = context.socket(ZMQ.PUB);
LOGGER.log(Level.INFO,
String.format("Changing ZMQ PUB port from %d to %d", port, tmpPort));
try {
publisher.unbind(bind_addr);
publisher.close();
} catch (ZMQException e) {
/* Let the garbage collector sort out cleanup */
LOGGER.log(Level.INFO,
"Unable to close ZMQ PUB socket. " + e.toString(), e);
}
port = tmpPort;
bind_addr = String.format("tcp://*:%d", port);
publisher.bind(bind_addr);
publisher = bindSocket(port);
}
return publisher;
}
private ZMQ.Socket bindSocket(int port) {
ZMQ.Socket socket;
try {
socket = context.socket(ZMQ.PUB);
bind_addr = String.format("tcp://*:%d", port);
socket.bind(bind_addr);
} catch (ZMQException e) {
LOGGER.log(Level.SEVERE,
"Unable to bind ZMQ PUB socket. " + e.toString(), e);
socket = null;
}
return socket;
}
@Override
public void onCompleted(Run build, TaskListener listener) {
String event = "onCompleted";
String json = Phase.COMPLETED.handlePhase(build, getStatus(build), listener);
if (json != null) {
bindSocket(build);
event = event + " " + json;
publisher.send(event.getBytes(), 0);
}
sendEvent(build, event, json);
}
/*
/* Currently not emitting onDeleted events. This should be fixed.
@Override
public void onDeleted(Run build) {
String update = String.format("onDeleted");
@ -92,21 +115,29 @@ public class RunListenerImpl extends RunListener<Run> {
public void onFinalized(Run build) {
String event = "onFinalized";
String json = Phase.FINISHED.handlePhase(build, getStatus(build), TaskListener.NULL);
if (json != null) {
bindSocket(build);
event = event + " " + json;
publisher.send(event.getBytes(), 0);
}
sendEvent(build, event, json);
}
@Override
public void onStarted(Run build, TaskListener listener) {
String event = "onStarted";
String json = Phase.STARTED.handlePhase(build, getStatus(build), listener);
sendEvent(build, event, json);
}
private void sendEvent(Run build, String event, String json) {
ZMQ.Socket socket;
if (json != null) {
bindSocket(build);
event = event + " " + json;
publisher.send(event.getBytes(), 0);
socket = bindSocket(build);
if (socket != null) {
event = event + " " + json;
try {
socket.send(event.getBytes(), 0);
} catch (ZMQException e) {
LOGGER.log(Level.INFO,
"Unable to send event. " + e.toString(), e);
}
}
}
}