diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java index ed8311d..33a44b5 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java @@ -5,10 +5,7 @@ import java.net.InetSocketAddress; import java.nio.charset.Charset; import java.util.Locale; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.text.NumberFormat; /** @@ -35,9 +32,10 @@ * on any StatsD clients.

* * @author Tom Denley - * + * @author Mauro Franceschini */ public final class NonBlockingStatsDClient extends ConvenienceMethodProvidingStatsDClient { + private static final int STATS_QUEUE_MAX_SIZE = 64 * 1024; private static final Charset STATS_D_ENCODING = Charset.forName("UTF-8"); @@ -48,6 +46,9 @@ public final class NonBlockingStatsDClient extends ConvenienceMethodProvidingSta private final String prefix; private final DatagramSocket clientSocket; private final StatsDClientErrorHandler handler; + private final BlockingQueue statsQueue; + private Future executorFuture; + private boolean stopping = false; private final ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactory() { final ThreadFactory delegate = Executors.defaultThreadFactory(); @@ -107,6 +108,7 @@ public NonBlockingStatsDClient(String prefix, String hostname, int port) throws public NonBlockingStatsDClient(String prefix, String hostname, int port, StatsDClientErrorHandler errorHandler) throws StatsDClientException { this.prefix = (prefix == null || prefix.trim().isEmpty()) ? "" : (prefix.trim() + "."); this.handler = errorHandler; + this.statsQueue = new ArrayBlockingQueue(STATS_QUEUE_MAX_SIZE); try { this.clientSocket = new DatagramSocket(); @@ -114,6 +116,19 @@ public NonBlockingStatsDClient(String prefix, String hostname, int port, StatsDC } catch (Exception e) { throw new StatsDClientException("Failed to start StatsD client", e); } + + this.executorFuture = executor.submit(new Runnable() { + @Override + public void run() { + while (!stopping) { + try { + blockingSend(statsQueue.take()); + } catch (InterruptedException ex) { + handler.handle(ex); + } + } + } + }); } /** @@ -123,6 +138,8 @@ public NonBlockingStatsDClient(String prefix, String hostname, int port, StatsDC @Override public void stop() { try { + stopping = true; + executorFuture.cancel(true); executor.shutdown(); executor.awaitTermination(30, TimeUnit.SECONDS); } @@ -151,7 +168,7 @@ public void stop() { */ @Override public void count(String aspect, long delta, double sampleRate) { - send(messageFor(aspect, Long.toString(delta), "c", sampleRate)); + send(messageFor(aspect, delta, "c", sampleRate)); } /** @@ -169,7 +186,6 @@ public void recordGaugeValue(String aspect, long value) { recordGaugeCommon(aspect, Long.toString(value), value < 0, false); } - @Override public void recordGaugeValue(String aspect, double value) { recordGaugeCommon(aspect, stringValueOf(value), value < 0, false); } @@ -179,7 +195,6 @@ public void recordGaugeDelta(String aspect, long value) { recordGaugeCommon(aspect, Long.toString(value), value < 0, true); } - @Override public void recordGaugeDelta(String aspect, double value) { recordGaugeCommon(aspect, stringValueOf(value), value < 0, true); } @@ -221,25 +236,21 @@ public void recordSetEvent(String aspect, String eventName) { */ @Override public void recordExecutionTime(String aspect, long timeInMs, double sampleRate) { - send(messageFor(aspect, Long.toString(timeInMs), "ms", sampleRate)); + send(messageFor(aspect, timeInMs, "ms", sampleRate)); } - private String messageFor(String aspect, String value, String type) { + private String messageFor(String aspect, Object value, String type) { return messageFor(aspect, value, type, 1.0); } - private String messageFor(String aspect, String value, String type, double sampleRate) { + private String messageFor(String aspect, Object value, String type, double sampleRate) { final String messageFormat = (sampleRate == 1.0) ? "%s%s:%s|%s" : "%s%s:%s|%s@%f"; - return String.format((Locale)null, messageFormat, prefix, aspect, value, type, sampleRate); + return String.format(Locale.US, messageFormat, prefix, aspect, value, type, sampleRate); } private void send(final String message) { try { - executor.execute(new Runnable() { - @Override public void run() { - blockingSend(message); - } - }); + statsQueue.add(message); } catch (Exception e) { handler.handle(e);