summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorClark Boylan <clark.boylan@gmail.com>2013-01-31 10:59:53 -0800
committerClark Boylan <clark.boylan@gmail.com>2013-01-31 10:59:53 -0800
commit135d2739635415e2f633a4b70d63f803e193ab88 (patch)
tree4d0584863edeb82ef176cb1e5ed0f22b48e3aa3b
parent7d54898d4e0023d752c4f0b8bc11112643f46c74 (diff)
Use single thread to perform all zmq sends.
jeromq does not appear to be thread safe. Use a single thread to call send on the ZMQ socket to avoid contention for those resources. Have the RunListener pass events to the ZMQ sender thread with a BlockingQueue. Do not block when offering events to that queue to avoid starvation/deadlock in the Jenkins job runners. Events may potentially be lost if ZMQ cannot keep up.
-rw-r--r--README7
-rw-r--r--src/main/java/org/jenkinsci/plugins/ZMQEventPublisher/RunListenerImpl.java101
-rw-r--r--src/main/java/org/jenkinsci/plugins/ZMQEventPublisher/ZMQRunnable.java113
3 files changed, 144 insertions, 77 deletions
diff --git a/README b/README
index 22252e8..752b6c7 100644
--- a/README
+++ b/README
@@ -12,8 +12,11 @@ history and you will find the old versions of this plugin that
12depended on jzmq. 12depended on jzmq.
13 13
14TODO: 14TODO:
15Avoid reading in the global config for each event if possible. 15- Avoid reading in the global config for each event if possible.
16Cleanup config.jelly for the non global Job config. 16- Need to allow ZMQRunnable thread to die if something truly
17 unexpected happens. The RunListener should then start a new
18 DaemonThread to handle further events.
19- Cleanup config.jelly for the non global Job config.
17 20
18This plugin borrows heavily from the Jenkins Notification Plugin 21This plugin borrows heavily from the Jenkins Notification Plugin
19https://github.com/jenkinsci/notification-plugin. That plugin 22https://github.com/jenkinsci/notification-plugin. That plugin
diff --git a/src/main/java/org/jenkinsci/plugins/ZMQEventPublisher/RunListenerImpl.java b/src/main/java/org/jenkinsci/plugins/ZMQEventPublisher/RunListenerImpl.java
index c71d48e..5ac462f 100644
--- a/src/main/java/org/jenkinsci/plugins/ZMQEventPublisher/RunListenerImpl.java
+++ b/src/main/java/org/jenkinsci/plugins/ZMQEventPublisher/RunListenerImpl.java
@@ -18,90 +18,44 @@
18package org.jenkinsci.plugins.ZMQEventPublisher; 18package org.jenkinsci.plugins.ZMQEventPublisher;
19 19
20import hudson.Extension; 20import hudson.Extension;
21import hudson.EnvVars;
22import hudson.model.Hudson;
23import hudson.model.Result; 21import hudson.model.Result;
24import hudson.model.Run; 22import hudson.model.Run;
25import hudson.model.TaskListener; 23import hudson.model.TaskListener;
26import hudson.model.listeners.RunListener; 24import hudson.model.listeners.RunListener;
25import hudson.util.DaemonThreadFactory;
27 26
27import java.util.concurrent.LinkedBlockingQueue;
28import java.util.logging.Level; 28import java.util.logging.Level;
29import java.util.logging.Logger; 29import java.util.logging.Logger;
30 30
31import org.jeromq.ZMQ;
32import org.jeromq.ZMQException;
33
34/* 31/*
35 * Listener to publish Jenkins build events through ZMQ 32 * Listener to publish Jenkins build events through ZMQ
36 */ 33 */
37@Extension 34@Extension
38public class RunListenerImpl extends RunListener<Run> { 35public class RunListenerImpl extends RunListener<Run> {
39 public static final Logger LOGGER = Logger.getLogger(RunListenerImpl.class.getName()); 36 public static final Logger LOGGER =
40 37 Logger.getLogger(RunListenerImpl.class.getName());
41 private int port; 38 private final LinkedBlockingQueue<String> queue =
42 private String bind_addr; 39 new LinkedBlockingQueue<String>(queueLength);
43 private ZMQ.Context context; 40 // ZMQ has a high water mark of 1000 events.
44 private ZMQ.Socket publisher; 41 private static final int queueLength = 1024;
42 private static final DaemonThreadFactory threadFactory =
43 new DaemonThreadFactory();
44 private ZMQRunnable ZMQRunner;
45 private Thread thread;
45 46
46 public RunListenerImpl() { 47 public RunListenerImpl() {
47 super(Run.class); 48 super(Run.class);
48 context = ZMQ.context(1); 49 ZMQRunner = new ZMQRunnable(queue);
49 } 50 thread = threadFactory.newThread(ZMQRunner);
50 51 thread.start();
51 private int getPort(Run build) {
52 Hudson hudson = Hudson.getInstance();
53 HudsonNotificationProperty.HudsonNotificationPropertyDescriptor globalProperty =
54 (HudsonNotificationProperty.HudsonNotificationPropertyDescriptor)
55 hudson.getDescriptor(HudsonNotificationProperty.class);
56 if (globalProperty != null) {
57 return globalProperty.getPort();
58 }
59 return 8888;
60 }
61
62 private ZMQ.Socket bindSocket(Run build) {
63 int tmpPort = getPort(build);
64 if (publisher == null) {
65 port = tmpPort;
66 LOGGER.log(Level.INFO,
67 String.format("Binding ZMQ PUB to port %d", port));
68 publisher = bindSocket(port);
69 }
70 else if (tmpPort != port) {
71 LOGGER.log(Level.INFO,
72 String.format("Changing ZMQ PUB port from %d to %d", port, tmpPort));
73 try {
74 publisher.close();
75 } catch (ZMQException e) {
76 /* Let the garbage collector sort out cleanup */
77 LOGGER.log(Level.INFO,
78 "Unable to close ZMQ PUB socket. " + e.toString(), e);
79 }
80 port = tmpPort;
81 publisher = bindSocket(port);
82 }
83 return publisher;
84 }
85
86 private ZMQ.Socket bindSocket(int port) {
87 ZMQ.Socket socket;
88 try {
89 socket = context.socket(ZMQ.PUB);
90 bind_addr = String.format("tcp://*:%d", port);
91 socket.bind(bind_addr);
92 } catch (ZMQException e) {
93 LOGGER.log(Level.SEVERE,
94 "Unable to bind ZMQ PUB socket. " + e.toString(), e);
95 socket = null;
96 }
97 return socket;
98 } 52 }
99 53
100 @Override 54 @Override
101 public void onCompleted(Run build, TaskListener listener) { 55 public void onCompleted(Run build, TaskListener listener) {
102 String event = "onCompleted"; 56 String event = "onCompleted";
103 String json = Phase.COMPLETED.handlePhase(build, getStatus(build), listener); 57 String json = Phase.COMPLETED.handlePhase(build, getStatus(build), listener);
104 sendEvent(build, event, json); 58 sendEvent(event, json);
105 } 59 }
106 60
107 /* Currently not emitting onDeleted events. This should be fixed. 61 /* Currently not emitting onDeleted events. This should be fixed.
@@ -117,28 +71,25 @@ public class RunListenerImpl extends RunListener<Run> {
117 public void onFinalized(Run build) { 71 public void onFinalized(Run build) {
118 String event = "onFinalized"; 72 String event = "onFinalized";
119 String json = Phase.FINISHED.handlePhase(build, getStatus(build), TaskListener.NULL); 73 String json = Phase.FINISHED.handlePhase(build, getStatus(build), TaskListener.NULL);
120 sendEvent(build, event, json); 74 sendEvent(event, json);
121 } 75 }
122 76
123 @Override 77 @Override
124 public void onStarted(Run build, TaskListener listener) { 78 public void onStarted(Run build, TaskListener listener) {
125 String event = "onStarted"; 79 String event = "onStarted";
126 String json = Phase.STARTED.handlePhase(build, getStatus(build), listener); 80 String json = Phase.STARTED.handlePhase(build, getStatus(build), listener);
127 sendEvent(build, event, json); 81 sendEvent(event, json);
128 } 82 }
129 83
130 private void sendEvent(Run build, String event, String json) { 84 private void sendEvent(String event, String json) {
131 ZMQ.Socket socket;
132 if (json != null) { 85 if (json != null) {
133 socket = bindSocket(build); 86 event = event + " " + json;
134 if (socket != null) { 87 // Offer the event. If the queue is full this will not block.
135 event = event + " " + json; 88 // We may drop events but this should prevent starvation in
136 try { 89 // the calling Jenkins threads.
137 socket.send(event.getBytes(), 0); 90 if (!queue.offer(event)) {
138 } catch (ZMQException e) { 91 LOGGER.log(Level.INFO,
139 LOGGER.log(Level.INFO, 92 "Unable to add event to ZMQ queue.");
140 "Unable to send event. " + e.toString(), e);
141 }
142 } 93 }
143 } 94 }
144 } 95 }
diff --git a/src/main/java/org/jenkinsci/plugins/ZMQEventPublisher/ZMQRunnable.java b/src/main/java/org/jenkinsci/plugins/ZMQEventPublisher/ZMQRunnable.java
new file mode 100644
index 0000000..9cfe64d
--- /dev/null
+++ b/src/main/java/org/jenkinsci/plugins/ZMQEventPublisher/ZMQRunnable.java
@@ -0,0 +1,113 @@
1/*
2 * Copyright 2013 Hewlett-Packard Development Company, L.P.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License"); you may
5 * not use this file except in compliance with the License. You may obtain
6 * a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16
17package org.jenkinsci.plugins.ZMQEventPublisher;
18
19import hudson.model.Hudson;
20
21import java.util.concurrent.LinkedBlockingQueue;
22import java.util.logging.Level;
23import java.util.logging.Logger;
24
25import org.jeromq.ZMQ;
26import org.jeromq.ZMQException;
27
28public class ZMQRunnable implements Runnable {
29 public static final Logger LOGGER = Logger.getLogger(ZMQRunnable.class.getName());
30
31 private static final String bind_addr = "tcp://*:%d";
32 private int port;
33
34 private final LinkedBlockingQueue<String> queue;
35 private final ZMQ.Context context;
36 private ZMQ.Socket publisher;
37
38 public ZMQRunnable(LinkedBlockingQueue<String> queue) {
39 this.queue = queue;
40 context = ZMQ.context(1);
41 bindSocket();
42 }
43
44 private int getPort() {
45 Hudson hudson = Hudson.getInstance();
46 HudsonNotificationProperty.HudsonNotificationPropertyDescriptor globalProperty =
47 (HudsonNotificationProperty.HudsonNotificationPropertyDescriptor)
48 hudson.getDescriptor(HudsonNotificationProperty.class);
49 if (globalProperty != null) {
50 return globalProperty.getPort();
51 }
52 return 8888;
53 }
54
55 private void bindSocket() {
56 int tmpPort = getPort();
57 if (publisher == null) {
58 port = tmpPort;
59 LOGGER.log(Level.INFO,
60 String.format("Binding ZMQ PUB to port %d", port));
61 publisher = bindSocket(port);
62 }
63 else if (tmpPort != port) {
64 LOGGER.log(Level.INFO,
65 String.format("Changing ZMQ PUB port from %d to %d", port, tmpPort));
66 try {
67 publisher.close();
68 } catch (ZMQException e) {
69 /* Let the garbage collector sort out cleanup */
70 LOGGER.log(Level.INFO,
71 "Unable to close ZMQ PUB socket. " + e.toString(), e);
72 }
73 port = tmpPort;
74 publisher = bindSocket(port);
75 }
76 }
77
78 private ZMQ.Socket bindSocket(int port) {
79 ZMQ.Socket socket;
80 try {
81 socket = context.socket(ZMQ.PUB);
82 socket.bind(String.format(bind_addr, port));
83 } catch (ZMQException e) {
84 LOGGER.log(Level.SEVERE,
85 "Unable to bind ZMQ PUB socket. " + e.toString(), e);
86 socket = null;
87 }
88 return socket;
89 }
90
91 public void run() {
92 String event;
93 while(true) {
94 try {
95 event = queue.take();;
96 bindSocket();
97 if (publisher != null) {
98 try {
99 publisher.send(event.getBytes(), 0);
100 } catch (ZMQException e) {
101 LOGGER.log(Level.INFO,
102 "Unable to send event. " + e.toString(), e);
103 }
104 }
105 }
106 // Catch all exceptions so that this thread does not die.
107 catch (Exception e) {
108 LOGGER.log(Level.SEVERE,
109 "Unhandled exception publishing ZMQ events " + e.toString(), e);
110 }
111 }
112 }
113}