Check for and handle ZMQ errors.
Log ZMQ errors and deal with them by creating a new ZMQ PUB socket in most situations.
This commit is contained in:
parent
f92be91eb0
commit
28d5017b4a
|
@ -24,13 +24,19 @@ import hudson.model.Run;
|
||||||
import hudson.model.TaskListener;
|
import hudson.model.TaskListener;
|
||||||
import hudson.model.listeners.RunListener;
|
import hudson.model.listeners.RunListener;
|
||||||
|
|
||||||
|
import java.util.logging.Level;
|
||||||
|
import java.util.logging.Logger;
|
||||||
|
|
||||||
import org.zeromq.ZMQ;
|
import org.zeromq.ZMQ;
|
||||||
|
import org.zeromq.ZMQException;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Listener to publish Jenkins build events through ZMQ
|
* Listener to publish Jenkins build events through ZMQ
|
||||||
*/
|
*/
|
||||||
@Extension
|
@Extension
|
||||||
public class RunListenerImpl extends RunListener<Run> {
|
public class RunListenerImpl extends RunListener<Run> {
|
||||||
|
public static final Logger LOGGER = Logger.getLogger(RunListenerImpl.class.getName());
|
||||||
|
|
||||||
private int port;
|
private int port;
|
||||||
private String bind_addr;
|
private String bind_addr;
|
||||||
private ZMQ.Context context;
|
private ZMQ.Context context;
|
||||||
|
@ -50,36 +56,53 @@ public class RunListenerImpl extends RunListener<Run> {
|
||||||
return 8888;
|
return 8888;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void bindSocket(Run build) {
|
private ZMQ.Socket bindSocket(Run build) {
|
||||||
int tmpPort = getPort(build);
|
int tmpPort = getPort(build);
|
||||||
if (publisher == null) {
|
if (publisher == null) {
|
||||||
port = tmpPort;
|
port = tmpPort;
|
||||||
publisher = context.socket(ZMQ.PUB);
|
LOGGER.log(Level.INFO,
|
||||||
bind_addr = String.format("tcp://*:%d", port);
|
String.format("Binding ZMQ PUB to port %d", port));
|
||||||
publisher.bind(bind_addr);
|
publisher = bindSocket(port);
|
||||||
}
|
}
|
||||||
else if (tmpPort != port) {
|
else if (tmpPort != port) {
|
||||||
publisher.unbind(bind_addr);
|
LOGGER.log(Level.INFO,
|
||||||
publisher.close();
|
String.format("Changing ZMQ PUB port from %d to %d", port, tmpPort));
|
||||||
publisher = context.socket(ZMQ.PUB);
|
try {
|
||||||
|
publisher.unbind(bind_addr);
|
||||||
|
publisher.close();
|
||||||
|
} catch (ZMQException e) {
|
||||||
|
/* Let the garbage collector sort out cleanup */
|
||||||
|
LOGGER.log(Level.INFO,
|
||||||
|
"Unable to close ZMQ PUB socket. " + e.toString(), e);
|
||||||
|
}
|
||||||
port = tmpPort;
|
port = tmpPort;
|
||||||
bind_addr = String.format("tcp://*:%d", port);
|
publisher = bindSocket(port);
|
||||||
publisher.bind(bind_addr);
|
|
||||||
}
|
}
|
||||||
|
return publisher;
|
||||||
|
}
|
||||||
|
|
||||||
|
private ZMQ.Socket bindSocket(int port) {
|
||||||
|
ZMQ.Socket socket;
|
||||||
|
try {
|
||||||
|
socket = context.socket(ZMQ.PUB);
|
||||||
|
bind_addr = String.format("tcp://*:%d", port);
|
||||||
|
socket.bind(bind_addr);
|
||||||
|
} catch (ZMQException e) {
|
||||||
|
LOGGER.log(Level.SEVERE,
|
||||||
|
"Unable to bind ZMQ PUB socket. " + e.toString(), e);
|
||||||
|
socket = null;
|
||||||
|
}
|
||||||
|
return socket;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onCompleted(Run build, TaskListener listener) {
|
public void onCompleted(Run build, TaskListener listener) {
|
||||||
String event = "onCompleted";
|
String event = "onCompleted";
|
||||||
String json = Phase.COMPLETED.handlePhase(build, getStatus(build), listener);
|
String json = Phase.COMPLETED.handlePhase(build, getStatus(build), listener);
|
||||||
if (json != null) {
|
sendEvent(build, event, json);
|
||||||
bindSocket(build);
|
|
||||||
event = event + " " + json;
|
|
||||||
publisher.send(event.getBytes(), 0);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/* Currently not emitting onDeleted events. This should be fixed.
|
||||||
@Override
|
@Override
|
||||||
public void onDeleted(Run build) {
|
public void onDeleted(Run build) {
|
||||||
String update = String.format("onDeleted");
|
String update = String.format("onDeleted");
|
||||||
|
@ -92,21 +115,29 @@ public class RunListenerImpl extends RunListener<Run> {
|
||||||
public void onFinalized(Run build) {
|
public void onFinalized(Run build) {
|
||||||
String event = "onFinalized";
|
String event = "onFinalized";
|
||||||
String json = Phase.FINISHED.handlePhase(build, getStatus(build), TaskListener.NULL);
|
String json = Phase.FINISHED.handlePhase(build, getStatus(build), TaskListener.NULL);
|
||||||
if (json != null) {
|
sendEvent(build, event, json);
|
||||||
bindSocket(build);
|
|
||||||
event = event + " " + json;
|
|
||||||
publisher.send(event.getBytes(), 0);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onStarted(Run build, TaskListener listener) {
|
public void onStarted(Run build, TaskListener listener) {
|
||||||
String event = "onStarted";
|
String event = "onStarted";
|
||||||
String json = Phase.STARTED.handlePhase(build, getStatus(build), listener);
|
String json = Phase.STARTED.handlePhase(build, getStatus(build), listener);
|
||||||
|
sendEvent(build, event, json);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void sendEvent(Run build, String event, String json) {
|
||||||
|
ZMQ.Socket socket;
|
||||||
if (json != null) {
|
if (json != null) {
|
||||||
bindSocket(build);
|
socket = bindSocket(build);
|
||||||
event = event + " " + json;
|
if (socket != null) {
|
||||||
publisher.send(event.getBytes(), 0);
|
event = event + " " + json;
|
||||||
|
try {
|
||||||
|
socket.send(event.getBytes(), 0);
|
||||||
|
} catch (ZMQException e) {
|
||||||
|
LOGGER.log(Level.INFO,
|
||||||
|
"Unable to send event. " + e.toString(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue