From 28d5017b4aa5e05250c89ac1c6cabac3066c840c Mon Sep 17 00:00:00 2001 From: Clark Boylan Date: Fri, 25 Jan 2013 10:56:13 -0800 Subject: [PATCH] Check for and handle ZMQ errors. Log ZMQ errors and deal with them by creating a new ZMQ PUB socket in most situations. --- .../ZMQEventPublisher/RunListenerImpl.java | 77 +++++++++++++------ 1 file changed, 54 insertions(+), 23 deletions(-) diff --git a/src/main/java/org/jenkinsci/plugins/ZMQEventPublisher/RunListenerImpl.java b/src/main/java/org/jenkinsci/plugins/ZMQEventPublisher/RunListenerImpl.java index ca717d7..9e29dd0 100644 --- a/src/main/java/org/jenkinsci/plugins/ZMQEventPublisher/RunListenerImpl.java +++ b/src/main/java/org/jenkinsci/plugins/ZMQEventPublisher/RunListenerImpl.java @@ -24,13 +24,19 @@ import hudson.model.Run; import hudson.model.TaskListener; import hudson.model.listeners.RunListener; +import java.util.logging.Level; +import java.util.logging.Logger; + import org.zeromq.ZMQ; +import org.zeromq.ZMQException; /* * Listener to publish Jenkins build events through ZMQ */ @Extension public class RunListenerImpl extends RunListener { + public static final Logger LOGGER = Logger.getLogger(RunListenerImpl.class.getName()); + private int port; private String bind_addr; private ZMQ.Context context; @@ -50,36 +56,53 @@ public class RunListenerImpl extends RunListener { return 8888; } - private void bindSocket(Run build) { + private ZMQ.Socket bindSocket(Run build) { int tmpPort = getPort(build); if (publisher == null) { port = tmpPort; - publisher = context.socket(ZMQ.PUB); - bind_addr = String.format("tcp://*:%d", port); - publisher.bind(bind_addr); + LOGGER.log(Level.INFO, + String.format("Binding ZMQ PUB to port %d", port)); + publisher = bindSocket(port); } else if (tmpPort != port) { - publisher.unbind(bind_addr); - publisher.close(); - publisher = context.socket(ZMQ.PUB); + LOGGER.log(Level.INFO, + String.format("Changing ZMQ PUB port from %d to %d", port, tmpPort)); + try { + publisher.unbind(bind_addr); + publisher.close(); + } catch (ZMQException e) { + /* Let the garbage collector sort out cleanup */ + LOGGER.log(Level.INFO, + "Unable to close ZMQ PUB socket. " + e.toString(), e); + } port = tmpPort; - bind_addr = String.format("tcp://*:%d", port); - publisher.bind(bind_addr); + publisher = bindSocket(port); } + return publisher; + } + + private ZMQ.Socket bindSocket(int port) { + ZMQ.Socket socket; + try { + socket = context.socket(ZMQ.PUB); + bind_addr = String.format("tcp://*:%d", port); + socket.bind(bind_addr); + } catch (ZMQException e) { + LOGGER.log(Level.SEVERE, + "Unable to bind ZMQ PUB socket. " + e.toString(), e); + socket = null; + } + return socket; } @Override public void onCompleted(Run build, TaskListener listener) { String event = "onCompleted"; String json = Phase.COMPLETED.handlePhase(build, getStatus(build), listener); - if (json != null) { - bindSocket(build); - event = event + " " + json; - publisher.send(event.getBytes(), 0); - } + sendEvent(build, event, json); } - /* + /* Currently not emitting onDeleted events. This should be fixed. @Override public void onDeleted(Run build) { String update = String.format("onDeleted"); @@ -92,21 +115,29 @@ public class RunListenerImpl extends RunListener { public void onFinalized(Run build) { String event = "onFinalized"; String json = Phase.FINISHED.handlePhase(build, getStatus(build), TaskListener.NULL); - if (json != null) { - bindSocket(build); - event = event + " " + json; - publisher.send(event.getBytes(), 0); - } + sendEvent(build, event, json); } @Override public void onStarted(Run build, TaskListener listener) { String event = "onStarted"; String json = Phase.STARTED.handlePhase(build, getStatus(build), listener); + sendEvent(build, event, json); + } + + private void sendEvent(Run build, String event, String json) { + ZMQ.Socket socket; if (json != null) { - bindSocket(build); - event = event + " " + json; - publisher.send(event.getBytes(), 0); + socket = bindSocket(build); + if (socket != null) { + event = event + " " + json; + try { + socket.send(event.getBytes(), 0); + } catch (ZMQException e) { + LOGGER.log(Level.INFO, + "Unable to send event. " + e.toString(), e); + } + } } }