Added mon-messaging sub-project

This commit is contained in:
Jonathan Halterman 2014-02-19 16:02:05 -08:00
parent 3245488838
commit 4e65e3f30a
23 changed files with 1138 additions and 0 deletions

View File

@ -11,6 +11,8 @@
* [mon-collectd](https://git.hpcloud.net/mon/mon-common/tree/master/java/mon-collectd) - Utilities for working with Collectd data.
* [mon-dropwizard](https://git.hpcloud.net/mon/mon-common/tree/master/java/mon-dropwizard) - Utilities for building and testing dropwizard services.
* [mon-http](https://git.hpcloud.net/mon/mon-common/tree/master/java/mon-http) - HTTP related infrastructure and utilities.
* [mon-messaging](https://git.hpcloud.net/mon/mon-common/tree/master/java/mon-messaging) - [EIP](http://www.eaipatterns.com/) inspired messaging patterns and implementations.
* [mon-persistence](https://git.hpcloud.net/mon/mon-common/tree/master/java/mon-persistence) - Persistence related infrastructure and utilities.
* [mon-service](https://git.hpcloud.net/mon/mon-common/tree/master/java/mon-service) - Simple service abstractions and utilities.
* [mon-testing](https://git.hpcloud.net/mon/mon-common/tree/master/java/mon-testing) - A set of testing related dependencies.
* [mon-util](https://git.hpcloud.net/mon/mon-common/tree/master/java/mon-util) - Various utilities such as for serialization, dependency injection, date and time, invocation retries, concurrency, etc.

1
java/mon-messaging/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/target

View File

@ -0,0 +1,69 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.hpcloud</groupId>
<artifactId>mon-common</artifactId>
<version>${computedVersion}</version>
</parent>
<artifactId>mon-messaging</artifactId>
<packaging>jar</packaging>
<properties>
<metrics.version>3.0.1</metrics.version>
</properties>
<dependencies>
<dependency>
<groupId>com.hpcloud</groupId>
<artifactId>mon-service</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.hpcloud</groupId>
<artifactId>mon-util</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>${metrics.version}</version>
</dependency>
<dependency>
<groupId>net.jodah</groupId>
<artifactId>typetools</artifactId>
<version>0.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.0</version>
<exclusions>
<exclusion>
<groupId>com.sun.jdmk</groupId>
<artifactId>jmxtools</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jmx</groupId>
<artifactId>jmxri</artifactId>
</exclusion>
<exclusion>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-annotation</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Test dependencies -->
<dependency>
<groupId>com.hpcloud</groupId>
<artifactId>mon-testing</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,53 @@
package com.hpcloud.messaging;
/**
* Adapts messages sent and received via a channel.
*
* @param <I> inbound message data type
* @param <O> outbound message data type
* @see <a href="http://eaipatterns.com/ChannelAdapter.html">Channel Adapter</a>
* @author Jonathan Halterman
*/
public interface ChannelAdapter<I, O> {
/**
* Returns an adapted version of the inbound {@code messageBody} for the {@code topic}. Returns
* {@code messageBody} if no translator is registered for the {@code topic} or if translation
* fails.
*/
I adaptInboundMessage(I messageBody, String topic);
/**
* Returns an adapted version of the outbound {@code messageBody} for the {@code topic}. Returns
* {@code messageBody} if no translator is registered for the {@code topic} or if translation
* fails.
*/
O adaptOutboundMessage(O messageBody, String topic);
/**
* Returns the channel adapter's inbound data type.
*/
Class<I> inboundType();
/**
* Returns the channel adapter's outbound data type.
*/
Class<O> outboundType();
/**
* Sets the inbound {@code translator} for the {@code topic}.
*
* @throws NullPointerException if {@code topic} is null
* @throws IllegalArgumentException if {@code translator}'s input and output types do not match
* {@code <I>}
*/
void registerInboundTranslator(MessageTranslator<I, I> translator, String topic);
/**
* Sets the outbound {@code translator} for the {@code topic}.
*
* @throws NullPointerException if {@code topic} is null
* @throws IllegalArgumentException if {@code translator}'s input and output types do not match
* {@code <O>}
*/
void registerOutboundTranslator(MessageTranslator<O, O> translator, String topic);
}

View File

@ -0,0 +1,74 @@
package com.hpcloud.messaging;
import com.hpcloud.service.Service;
/**
* Message bus. Acts as a registry for channels.
*
* @see <a href="http://eaipatterns.com/MessageBus.html">Message Bus</a>
* @author Jonathan Halterman
*/
public interface MessageBus extends Service {
/**
* Binds the {@code channel} to the bus according to the {@link MessageChannel#name()}.
*
* @throws NullPointerException if {@code channel} is null
* @throws IllegalStateException if a local MessageChannel is already bound
*/
void bind(MessageChannel channel);
/**
* Returns the bound MessageChannel for the {@code address} else {@code null} if none has been
* bound.
*
* @throws NullPointerException if {@code address} is null
*/
MessageChannel channelFor(String address);
/**
* Publishes the {@code message} to all handlers that are registered for the {@code message}.
*
* @throws NullPointerException if {@code message} is null
* @throws IllegalStateException if the bus has not been started
*/
void publish(Object message);
/**
* Publishes the {@code message} to all registered handlers for the {@code address}.
*
* @throws NullPointerException if {@code message} or {@code address} are null
* @throws IllegalStateException if the bus has not been started
*/
void publish(Object message, String address);
/**
* Registers the {@code handler} to handle messages for handler's class name.
*
* @throws NullPointerException if {@code handler} is null
* @throws IllegalStateException if no message type can be resolved for the {@code handler}
*/
void register(MessageHandler<?> handler);
/**
* Registers the {@code handler} for the {@code address} with the bus.
*
* @throws NullPointerException if {@code handler} or {@code address} are null
* @throws IllegalStateException if no {@link ChannelAdapter} is registered for the
* {@code address}
*/
void register(MessageHandler<?> handler, String address);
/**
* Unregisters the {@code handler} for the handler's class name.
*
* @throws NullPointerException if {@code handler} is null
*/
void unregister(MessageHandler<?> handler);
/**
* Unregisters the {@code handler} for the {@code address} from the bus.
*
* @throws NullPointerException if {@code handler} or {@code address} are null
*/
void unregister(MessageHandler<?> handler, String address);
}

View File

@ -0,0 +1,7 @@
package com.hpcloud.messaging;
/**
* Message bus configuration.
*/
public class MessageBusConfiguration {
}

View File

@ -0,0 +1,45 @@
package com.hpcloud.messaging;
/**
* A messaging channel.
*
* @author Jonathan Halterman
*/
public interface MessageChannel {
/**
* Returns the channel's message adapter.
*/
ChannelAdapter<?, ?> adapter();
/**
* Binds the dispatcher to the channel.
*
* @throws NullPointerException if the {@code dispatcher} is null
*/
void bind(MessageDispatcher dispatcher);
/**
* Close the channel.
*/
void close();
/**
* Returns the name of channel.
*/
String name();
/**
* Opens the channel.
*
* @throws ChannelOpenException if the channel cannot be opened
*/
void open();
/**
* Sends the {@code message} to the {@code address}.
*
* @throws NullPointerException if the {@code message} or {@code address} are null
* @throws IllegalStateException if the channel is not open
*/
void send(Object message, String address);
}

View File

@ -0,0 +1,19 @@
package com.hpcloud.messaging;
import com.hpcloud.service.Service;
/**
* Dispatches messages to handlers.
*
* @author Jonathan Halterman
*/
public interface MessageDispatcher extends Service {
/**
* Dispatches the {@code message} to the {@code handler}.
*
* @param <T> message type
* @param message to dispatch
* @param handler to handle message
*/
<T> void dispatch(T message, MessageHandler<T> handler);
}

View File

@ -0,0 +1,14 @@
package com.hpcloud.messaging;
/**
* Handles messages of type {@code <T>}.
*
* @param <T> message type
* @author Jonathan Halterman
*/
public interface MessageHandler<T> {
/**
* Handles the {@code message}.
*/
void handle(T message);
}

View File

@ -0,0 +1,47 @@
package com.hpcloud.messaging;
import net.jodah.typetools.TypeResolver;
import net.jodah.typetools.TypeResolver.Unknown;
import com.google.common.base.Preconditions;
/**
* Translates a message, breaking the dependency between separate application's data formats.
*
* @param <I> input type
* @param <O> output type
* @see <a href="http://www.eaipatterns.com/MessageTranslator.html">Message Translator</a>
* @author Jonathan Halterman
*/
public abstract class MessageTranslator<I, O> {
protected final Class<I> inputType;
protected final Class<O> outputType;
protected MessageTranslator(Class<I> inputType, Class<O> outputType) {
this.inputType = inputType;
this.outputType = outputType;
}
@SuppressWarnings("unchecked")
public MessageTranslator() {
Class<?>[] typeArguments = TypeResolver.resolveRawArguments(MessageTranslator.class, getClass());
Preconditions.checkArgument(typeArguments[0] != Unknown.class
&& typeArguments[1] != Unknown.class,
"Must declare input type argument <I> and output type argument <O> for Translator");
inputType = (Class<I>) typeArguments[0];
outputType = (Class<O>) typeArguments[1];
}
/** Returns the translator's input type, else {@link Unknown} if one cannot be resolved. */
public Class<I> inputType() {
return inputType;
}
/** Returns the translator's output type, else {@link Unknown} if one cannot be resolved. */
public Class<O> outputType() {
return outputType;
}
/** Translates the {@code messageBody} to an output format. */
public abstract O translate(I messageBody);
}

View File

@ -0,0 +1,24 @@
package com.hpcloud.messaging;
/**
* Publishes messages to and subscribes to messages from topics.
*
* @author Jonathan Halterman
*/
public interface PublishSubscribeChannel extends MessageChannel {
/**
* Subscribes the {@code subscriber} to the {@code topic}.
*
* @throws NullPointerException if the {@code subscriber} or {@code topic} are null
* @throws IllegalStateException if the {@code subscriber} is already subscribed to the
* {@code topic}
*/
void subscribe(MessageHandler<?> subscriber, String topic);
/**
* Unsubscribes the {@code subscriber} from the {@code topic}.
*
* @throws NullPointerException if the {@code subscriber} or {@code topic} are null
*/
void unsubscribe(MessageHandler<?> subscriber, String topic);
}

View File

@ -0,0 +1,175 @@
package com.hpcloud.messaging;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;
import javax.inject.Singleton;
import net.jodah.typetools.TypeResolver;
import net.jodah.typetools.TypeResolver.Unknown;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Preconditions;
import com.hpcloud.messaging.internal.LocalChannel;
import com.hpcloud.messaging.internal.ThreadPoolingMessageDispatcher;
import com.hpcloud.service.ManagedService;
/**
* Standard message bus implementation.
*
* @author Jonathan Halterman
*/
@Singleton
@ThreadSafe
public class StandardMessageBus extends ManagedService implements MessageBus {
private static final Logger LOG = LoggerFactory.getLogger(StandardMessageBus.class);
@SuppressWarnings("unused") private final MessageBusConfiguration config;
private final Map<String, MessageChannel> channels = new ConcurrentHashMap<String, MessageChannel>();
private final MessageDispatcher dispatcher;
private final Meter messagesSent;
/** Used to send local messages that lack a fully qualified address. */
private MessageChannel localChannel = new LocalChannel();
@Inject
public StandardMessageBus(MessageBusConfiguration config, MetricRegistry metricRegistry) {
this(config, new ThreadPoolingMessageDispatcher(), metricRegistry);
}
public StandardMessageBus(MessageBusConfiguration config, MessageDispatcher dispatcher,
MetricRegistry metricRegistry) {
super("Message Bus");
Preconditions.checkNotNull(config, "config");
Preconditions.checkNotNull(dispatcher, "dispatcher");
this.config = config;
this.dispatcher = dispatcher;
localChannel.bind(dispatcher);
messagesSent = metricRegistry.meter(getClass().getName() + ".messages.sent");
}
static String prefixFor(String address) {
int index = address.indexOf("://");
return index == -1 ? null : address.substring(0, index);
}
static String topicFor(String address) {
int index = address.indexOf("://");
String result = index == -1 ? address : address.substring(index + 3);
return result == null ? null : "".equals(result) ? null : result;
}
@Override
public void bind(MessageChannel channel) {
Preconditions.checkNotNull(channel, "channel");
if (channel.name() == null) {
Preconditions.checkState(localChannel instanceof LocalChannel,
"A local channel adapter has already been registered");
localChannel = channel;
} else {
Preconditions.checkState(!channels.containsKey(channel.name()),
"A channel adapter has already been registered for %s", channel.name());
channels.put(channel.name(), channel);
}
channel.bind(dispatcher);
}
@Override
public MessageChannel channelFor(String address) {
Preconditions.checkNotNull(address, "address");
String prefix = prefixFor(address);
return prefix == null ? localChannel : channels.get(prefix);
}
@Override
public void publish(Object message) {
checkIsRunning();
Preconditions.checkNotNull(message, "message");
localChannel.send(message, message.getClass().getName());
messagesSent.mark();
}
@Override
public void publish(Object message, String address) {
checkIsRunning();
Preconditions.checkNotNull(address, "address");
Preconditions.checkNotNull(message, "message");
MessageChannel channel = channelFor(address);
if (channel != null) {
channel.send(message, topicFor(address));
messagesSent.mark();
}
}
@Override
public void register(MessageHandler<?> handler) {
Preconditions.checkNotNull(handler, "handler");
Class<?> messageType = TypeResolver.resolveRawArgument(MessageHandler.class, handler.getClass());
Preconditions.checkArgument(!messageType.equals(Unknown.class),
"Must declare message type argument <T> for MessageHandler");
register(handler, messageType.getName());
}
@Override
public void register(MessageHandler<?> handler, String address) {
Preconditions.checkNotNull(address);
Preconditions.checkNotNull(handler);
MessageChannel channel = channelFor(address);
Preconditions.checkState(channel != null && channel instanceof PublishSubscribeChannel,
"No channel adapter has been registered for %s", address);
((PublishSubscribeChannel) channel).subscribe(handler, address);
}
@Override
public void start() throws Exception {
LOG.info("Starting message bus");
dispatcher.start();
new Thread("channel-open-thread") {
public void run() {
/**
* Decouples starting the bus from the caller's lifecycle, so that stops are not blocked by
* starts that may still be running if the caller synchronizes operations against the bus as
* dropwizard/jetty does against managed services.
*/
localChannel.open();
for (MessageChannel channel : channels.values())
channel.open();
}
}.start();
super.start();
}
@Override
public void stop() throws Exception {
if (!isRunning())
return;
LOG.info("Stopping message bus");
super.stop();
localChannel.close();
for (MessageChannel channel : channels.values())
channel.close();
dispatcher.stop();
}
@Override
public void unregister(MessageHandler<?> handler) {
Preconditions.checkNotNull(handler, "handler");
unregister(handler, handler.getClass().getName());
}
@Override
public void unregister(MessageHandler<?> handler, String address) {
Preconditions.checkNotNull(address);
Preconditions.checkNotNull(handler);
MessageChannel channel = channelFor(address);
Preconditions.checkState(channel != null && channel instanceof PublishSubscribeChannel,
"No channel has been registered for %s", address);
((PublishSubscribeChannel) channel).unsubscribe(handler, address);
}
}

View File

@ -0,0 +1,46 @@
package com.hpcloud.messaging.internal;
import com.google.common.base.Preconditions;
import com.hpcloud.messaging.ChannelAdapter;
import com.hpcloud.messaging.MessageChannel;
import com.hpcloud.messaging.MessageDispatcher;
/**
* Base message channel implementation.
*
* @param <I> inbound message data type
* @param <O> outbound message data type
* @author Jonathan Halterman
*/
public abstract class AbstractMessageChannel<I, O> implements MessageChannel {
protected final String name;
protected final String qualifiedName;
protected final ChannelAdapter<I, O> adapter;
protected MessageDispatcher dispatcher;
protected AbstractMessageChannel() {
this(null, null);
}
protected AbstractMessageChannel(String name, ChannelAdapter<I, O> adapter) {
this.name = name;
this.adapter = adapter;
qualifiedName = name == null ? "" : name + "://";
}
@Override
public ChannelAdapter<I, O> adapter() {
return adapter;
}
@Override
public void bind(MessageDispatcher dispatcher) {
Preconditions.checkNotNull(dispatcher, "dispatcher");
this.dispatcher = dispatcher;
}
@Override
public String name() {
return name;
}
}

View File

@ -0,0 +1,106 @@
package com.hpcloud.messaging.internal;
import java.util.Collection;
import javax.annotation.concurrent.ThreadSafe;
import net.jodah.typetools.TypeResolver;
import net.jodah.typetools.TypeResolver.Unknown;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.hpcloud.messaging.ChannelAdapter;
import com.hpcloud.messaging.MessageHandler;
import com.hpcloud.messaging.PublishSubscribeChannel;
import com.hpcloud.util.Serialization;
/**
* Base publish/subscribe channel adapter implementation.
*
* @author Jonathan Halterman
*/
@ThreadSafe
public abstract class AbstractPublishSubscribeChannel<I, O> extends AbstractMessageChannel<I, O>
implements PublishSubscribeChannel {
protected final Multimap<String, MessageHandler<?>> subscribers = Multimaps.synchronizedMultimap(HashMultimap.<String, MessageHandler<?>>create());
protected final Meter messagesReceived;
public AbstractPublishSubscribeChannel() {
this(null, null, new MetricRegistry());
}
public AbstractPublishSubscribeChannel(String name, ChannelAdapter<I, O> adapter,
MetricRegistry metricRegistry) {
super(name, adapter);
messagesReceived = metricRegistry.meter(getClass().getName() + ".messages.received");
metricRegistry.register(getClass().getName() + ".subscribers", new Gauge<Integer>() {
@Override
public Integer getValue() {
return subscribers.size();
}
});
}
@Override
public void close() {
}
@Override
public void open() {
}
@Override
public void subscribe(MessageHandler<?> subscriber, String topic) {
Preconditions.checkNotNull(subscriber, "subscriber");
Preconditions.checkNotNull(topic, "topic");
Class<?> messageType = TypeResolver.resolveRawArgument(MessageHandler.class,
subscriber.getClass());
if (messageType != Unknown.class && messageType != Object.class)
Serialization.registerTarget(messageType);
subscribers.put(topic, subscriber);
}
@Override
public void unsubscribe(MessageHandler<?> subscriber, String topic) {
Preconditions.checkNotNull(subscriber, "subscriber");
Preconditions.checkNotNull(topic, "topic");
subscribers.remove(topic, subscriber);
}
/**
* Handles an inbound {@code messageBody} that was consumed from a {@code topic}, adapting it
* using the bound ChannelAdapter and dispatching it to all subscribed handlers using the bound
* dispatcher.
*/
@SuppressWarnings("unchecked")
protected void handle(I messageBody, String address) {
Preconditions.checkState(dispatcher != null, "A dispatcher has not yet been bound");
Object message = adaptedMessageFor(messageBody, address);
if (message == null)
return;
Collection<MessageHandler<?>> handlers = subscribers.get(address);
if (!handlers.isEmpty()) {
// TODO separate message copies for each handler
synchronized (subscribers) {
for (MessageHandler<?> handler : handlers) {
Class<?> messageType = TypeResolver.resolveRawArgument(MessageHandler.class,
handler.getClass());
if (messageType.isAssignableFrom(message.getClass()))
dispatcher.dispatch(message, (MessageHandler<Object>) handler);
}
}
}
messagesReceived.mark();
}
private Object adaptedMessageFor(I messageBody, String address) {
return adapter == null ? messageBody : adapter.adaptInboundMessage(messageBody, address);
}
}

View File

@ -0,0 +1,19 @@
package com.hpcloud.messaging.internal;
import javax.inject.Singleton;
/**
* Channel adapter for messaging within the local process.
*
* @author Jonathan Halterman
*/
@Singleton
public class LocalChannel extends AbstractPublishSubscribeChannel<Object, Object> {
/**
* Handles outbound messages using the inbound handler.
*/
@Override
public void send(Object message, String address) {
handle(message, address);
}
}

View File

@ -0,0 +1,100 @@
package com.hpcloud.messaging.internal;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.concurrent.ThreadSafe;
import net.jodah.typetools.TypeResolver;
import net.jodah.typetools.TypeResolver.Unknown;
import com.google.common.base.Preconditions;
import com.hpcloud.messaging.ChannelAdapter;
import com.hpcloud.messaging.MessageTranslator;
/**
* Standard channel adapter implementation.
*
* @param <I> inbound message data type
* @param <O> outbound message data type
* @author Jonathan Halterman
*/
@ThreadSafe
public class StandardChannelAdapter<I, O> implements ChannelAdapter<I, O> {
private final Map<String, MessageTranslator<I, I>> inboundTranslators = new ConcurrentHashMap<String, MessageTranslator<I, I>>();
protected final Map<String, MessageTranslator<O, O>> outboundTranslators = new ConcurrentHashMap<String, MessageTranslator<O, O>>();
private final Class<I> inboundType;
private final Class<O> outboundType;
/**
* @throws NullPointerException if {@code inboundType} or {@code outboundType} are null
*/
public StandardChannelAdapter(Class<I> inboundType, Class<O> outboundType) {
Preconditions.checkNotNull(inboundType, "inboundType");
Preconditions.checkNotNull(outboundType, "outboundType");
this.inboundType = inboundType;
this.outboundType = outboundType;
}
@SuppressWarnings("unchecked")
protected StandardChannelAdapter() {
Class<?>[] typeArguments = TypeResolver.resolveRawArguments(ChannelAdapter.class, getClass());
Preconditions.checkArgument(typeArguments[0] != Unknown.class
&& typeArguments[1] != Unknown.class,
"Must declare inbound type argument <I> and outbound type argument <O> for ChannelAdapter");
inboundType = (Class<I>) typeArguments[0];
outboundType = (Class<O>) typeArguments[1];
}
@Override
public I adaptInboundMessage(I messageBody, String topic) {
MessageTranslator<I, I> translator = inboundTranslators.get(topic);
if (translator != null)
try {
messageBody = translator.translate(messageBody);
} catch (Exception ignore) {
}
return messageBody;
}
@Override
public O adaptOutboundMessage(O messageBody, String topic) {
MessageTranslator<O, O> translator = outboundTranslators.get(topic);
if (translator != null)
try {
messageBody = translator.translate(messageBody);
} catch (Exception ignore) {
}
return messageBody;
}
@Override
public void registerInboundTranslator(MessageTranslator<I, I> translator, String topic) {
Preconditions.checkNotNull(translator, "translator");
Preconditions.checkNotNull(topic, "topic");
Preconditions.checkArgument(translator.inputType().equals(inboundType)
&& translator.outputType().equals(inboundType),
"The translator's input and output types must be %s", inboundType.getClass().getName());
inboundTranslators.put(topic, translator);
}
@Override
public void registerOutboundTranslator(MessageTranslator<O, O> translator, String topic) {
Preconditions.checkNotNull(translator, "translator");
Preconditions.checkNotNull(topic, "topic");
Preconditions.checkArgument(translator.inputType().equals(outboundType)
&& translator.outputType().equals(outboundType),
"The translator's input and output types must be %s", outboundType.getClass().getName());
outboundTranslators.put(topic, translator);
}
@Override
public Class<I> inboundType() {
return inboundType;
}
@Override
public Class<O> outboundType() {
return outboundType;
}
}

View File

@ -0,0 +1,47 @@
package com.hpcloud.messaging.internal;
import java.util.concurrent.ExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.hpcloud.messaging.MessageDispatcher;
import com.hpcloud.messaging.MessageHandler;
import com.hpcloud.util.Types;
import com.hpcloud.util.concurrent.ThreadPools;
/**
* Dispatches messages on a thread pool.
*
* @author Jonathan Halterman
*/
public class ThreadPoolingMessageDispatcher implements MessageDispatcher {
private final Logger LOG = LoggerFactory.getLogger(ThreadPoolingMessageDispatcher.class);
private ExecutorService executor;
@Override
public <T> void dispatch(final T message, final MessageHandler<T> handler) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
if (LOG.isDebugEnabled())
LOG.debug("Delivering message to handler {}", Types.deProxy(handler.getClass()));
handler.handle(message);
} catch (Exception e) {
LOG.error("Error while handling message: {}", message, e);
}
}
});
}
@Override
public void start() throws Exception {
executor = ThreadPools.newInstrumentedCachedThreadPool("MessageBus");
}
@Override
public void stop() throws Exception {
executor.shutdown();
}
}

View File

@ -0,0 +1,89 @@
package com.hpcloud.messaging.kafka;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import com.hpcloud.messaging.MessageHandler;
import com.hpcloud.messaging.internal.AbstractPublishSubscribeChannel;
import com.hpcloud.messaging.internal.StandardChannelAdapter;
/**
* Kafka channel implementation.
*
* @author Jonathan Halterman
*/
@Singleton
public class KafkaChannel extends AbstractPublishSubscribeChannel<JsonNode, JsonNode> {
private static final Logger LOG = LoggerFactory.getLogger(KafkaChannel.class);
private final KafkaConfiguration config;
// private Connection connection;
private volatile boolean open;
@Inject
public KafkaChannel(KafkaConfiguration config, MetricRegistry metricRegistry) {
super("kafka", new StandardChannelAdapter<JsonNode, JsonNode>(JsonNode.class, JsonNode.class),
metricRegistry);
this.config = config;
Preconditions.checkNotNull(config, "config");
}
@Override
public void close() {
LOG.info("Closing Kafka channel");
try {
// TODO close kafka cxn
} finally {
open = false;
}
}
@Override
public void open() {
LOG.info("Opening Kafka channel");
// TODO open kafka cxn
}
/**
* Sends the {@code message} to the {@code topic}.
*/
public void send(Object message, String topic) {
Preconditions.checkNotNull(message, "message");
Preconditions.checkNotNull(topic, "topic");
if (!open)
throw new IllegalStateException("The message cannot be sent since the channel is not open");
// TODO send message
}
/**
* Subscribes the {@code subscriber} to the {@code address} where {@code address} should consist
* of exchangeName/routingKey.
*/
@Override
public void subscribe(MessageHandler<?> subscriber, String address) {
super.subscribe(subscriber, address);
if (!open)
return;
// TODO create topic subscription
}
/**
* Unsubscribes the {@code subscriber} for the {@code address} where {@code address} should
* consist of exchangeName/routingKey.
*/
@Override
public void unsubscribe(MessageHandler<?> subscriber, String address) {
super.unsubscribe(subscriber, address);
// TODO remove topic subscription
}
}

View File

@ -0,0 +1,16 @@
package com.hpcloud.messaging.kafka;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import org.hibernate.validator.constraints.NotEmpty;
/**
* @author Jonathan Halterman
*/
public class KafkaConfiguration {
@NotEmpty public String[] hosts;
@Min(1) @Max(65535) public int port = 5672;
@NotEmpty public String username;
@NotEmpty public String password;
}

View File

@ -0,0 +1,113 @@
package com.hpcloud.messaging;
import static org.testng.Assert.assertEquals;
import org.jodah.concurrentunit.ConcurrentTestCase;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import com.codahale.metrics.MetricRegistry;
import com.google.inject.AbstractModule;
import com.hpcloud.messaging.internal.AbstractPublishSubscribeChannel;
import com.hpcloud.util.Injector;
@Test
public class StandardMessageBusTest extends ConcurrentTestCase {
static final TestMessage testMessage = new TestMessage("test message");
static final TestMessage condMessage = new TestMessage("conditional message");
MessageBusConfiguration busConfig = new MessageBusConfiguration();
MessageDispatcher dispatcher = new SynchronousMessageDispatcher();
public static class TestMessage {
public String message;
public TestMessage() {
}
public TestMessage(String message) {
this.message = message;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
TestMessage other = (TestMessage) obj;
if (message == null) {
if (other.message != null)
return false;
} else if (!message.equals(other.message))
return false;
return true;
}
}
class TestMessageHandler implements MessageHandler<TestMessage> {
public void handle(TestMessage message) {
threadAssertEquals(message, testMessage);
resume();
}
}
@BeforeClass
protected void beforeClass() {
Injector.registerModules(new AbstractModule() {
protected void configure() {
bind(TestMessageHandler.class).toInstance(new TestMessageHandler());
}
});
}
static class SomeMessage {
}
static class SomeMessageHandler implements MessageHandler<SomeMessage> {
@Override
public void handle(SomeMessage message) {
}
}
public void testPrefixFor() {
assertEquals(StandardMessageBus.prefixFor("test://address"), "test");
assertEquals(StandardMessageBus.prefixFor("akka://"), "akka");
assertEquals(StandardMessageBus.prefixFor("akka:/"), null);
assertEquals(StandardMessageBus.prefixFor("akka"), null);
}
public void testTopicFor() {
assertEquals(StandardMessageBus.topicFor("test://address1/address2"), "address1/address2");
assertEquals(StandardMessageBus.topicFor("test://address"), "address");
assertEquals(StandardMessageBus.topicFor("akka://"), null);
assertEquals(StandardMessageBus.topicFor("akka:/"), "akka:/");
assertEquals(StandardMessageBus.topicFor("akka"), "akka");
}
public void shouldPublishToChannel() throws Throwable {
MessageChannel channel = new AbstractPublishSubscribeChannel<Object, Object>() {
@Override
public void send(Object message, String address) {
threadAssertEquals(testMessage, message);
resume();
}
};
StandardMessageBus bus = new StandardMessageBus(busConfig, dispatcher, new MetricRegistry());
bus.bind(channel);
bus.start();
bus.publish(testMessage);
await(2000);
}
/** This behavior was removed. */
@Test(expectedExceptions = IllegalStateException.class, enabled = false)
public void shouldThrowOnDuplicateMessageHandlerRegistration() {
StandardMessageBus bus = new StandardMessageBus(busConfig, dispatcher, new MetricRegistry());
SomeMessageHandler h = new SomeMessageHandler();
bus.register(h, "a");
bus.register(h, "a");
}
}

View File

@ -0,0 +1,22 @@
package com.hpcloud.messaging;
/**
* Dispatches messages synchronously on the current thread. Useful for testing scenarios where
* correlation is not required.
*
* @author Jonathan Halterman
*/
public class SynchronousMessageDispatcher implements MessageDispatcher {
@Override
public <T> void dispatch(T message, MessageHandler<T> handler) {
handler.handle(message);
}
@Override
public void start() throws Exception {
}
@Override
public void stop() throws Exception {
}
}

View File

@ -0,0 +1,48 @@
package com.hpcloud.messaging.functional;
import org.jodah.concurrentunit.ConcurrentTestCase;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import com.codahale.metrics.MetricRegistry;
import com.hpcloud.messaging.MessageBus;
import com.hpcloud.messaging.MessageBusConfiguration;
import com.hpcloud.messaging.MessageHandler;
import com.hpcloud.messaging.StandardMessageBus;
import com.hpcloud.messaging.SynchronousMessageDispatcher;
@Test
public class MessageHandlerInstanceTest extends ConcurrentTestCase {
private MessageBus bus;
static class Foo {
}
@BeforeClass
protected void beforeClass() throws Exception {
MessageBusConfiguration busConfig = new MessageBusConfiguration();
bus = new StandardMessageBus(busConfig, new SynchronousMessageDispatcher(),
new MetricRegistry());
bus.start();
}
@AfterClass
protected void afterClass() throws Exception {
bus.stop();
}
public void shouldDeliverMessageToInstanceHandler() throws Throwable {
final Foo foo = new Foo();
bus.register(new MessageHandler<Foo>() {
@Override
public void handle(Foo message) {
threadAssertEquals(message, foo);
resume();
}
});
bus.publish(foo);
await(2000);
}
}

View File

@ -31,8 +31,10 @@
<module>mon-collectd</module>
<module>mon-dropwizard</module>
<module>mon-http</module>
<module>mon-messaging</module>
<module>mon-model</module>
<module>mon-persistence</module>
<module>mon-service</module>
<module>mon-testing</module>
<module>mon-util</module>
</modules>