Skip to content

Commit

Permalink
implement builder for KafkaSpanExporter
Browse files Browse the repository at this point in the history
  • Loading branch information
naser-ayat committed Jul 26, 2023
1 parent 4a5d04c commit 457daf9
Show file tree
Hide file tree
Showing 4 changed files with 366 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,35 +12,33 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
@SuppressWarnings("FutureReturnValueIgnored")
public class KafkaSpanExporter implements SpanExporter {
private static final Logger logger = LoggerFactory.getLogger(KafkaSpanExporter.class);
private static final long DEFAULT_TIMEOUT_IN_SECONDS = 5L;
private final String topicName;
private final Producer<String, Collection<SpanData>> producer;
private final ExecutorService executorService;
private final long timeoutInSeconds;
private final AtomicBoolean isShutdown = new AtomicBoolean();

public KafkaSpanExporter(
public static KafkaSpanExporterBuilder newBuilder() {
return new KafkaSpanExporterBuilder();
}

KafkaSpanExporter(
String topicName,
Producer<String, Collection<SpanData>> producer,
ExecutorService executorService,
Expand All @@ -51,53 +49,6 @@ public KafkaSpanExporter(
this.timeoutInSeconds = timeoutInSeconds;
}

public KafkaSpanExporter(
String topicName,
Producer<String, Collection<SpanData>> producer,
ExecutorService executorService) {
this(topicName, producer, executorService, DEFAULT_TIMEOUT_IN_SECONDS);
}

public KafkaSpanExporter(String topicName, Producer<String, Collection<SpanData>> producer) {
this(topicName, producer, Executors.newCachedThreadPool(), DEFAULT_TIMEOUT_IN_SECONDS);
}

public KafkaSpanExporter(
String topicName, Map<String, Object> configs, ExecutorService executorService) {
this(topicName, new KafkaProducer<>(configs), executorService, DEFAULT_TIMEOUT_IN_SECONDS);
}

public KafkaSpanExporter(
String topicName,
Map<String, Object> configs,
ExecutorService executorService,
Serializer<String> keySerializer,
Serializer<Collection<SpanData>> valueSerializer) {
this(
topicName,
new KafkaProducer<>(configs, keySerializer, valueSerializer),
executorService,
DEFAULT_TIMEOUT_IN_SECONDS);
}

public KafkaSpanExporter(
String topicName, Properties properties, ExecutorService executorService) {
this(topicName, new KafkaProducer<>(properties), executorService, DEFAULT_TIMEOUT_IN_SECONDS);
}

public KafkaSpanExporter(
String topicName,
Properties properties,
ExecutorService executorService,
Serializer<String> keySerializer,
Serializer<Collection<SpanData>> valueSerializer) {
this(
topicName,
new KafkaProducer<>(properties, keySerializer, valueSerializer),
executorService,
DEFAULT_TIMEOUT_IN_SECONDS);
}

@Override
public CompletableResultCode export(@Nonnull Collection<SpanData> spans) {
if (isShutdown.get()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.kafka;

import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;

import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.Serializer;

public class KafkaSpanExporterBuilder {
private static final long DEFAULT_TIMEOUT_IN_SECONDS = 5L;
private String topicName;
private Producer<String, Collection<SpanData>> producer;
private ExecutorService executorService;
private long timeoutInSeconds = DEFAULT_TIMEOUT_IN_SECONDS;

@SuppressWarnings(value = {"NullAway"})
public KafkaSpanExporterBuilder() {}

@CanIgnoreReturnValue
public KafkaSpanExporterBuilder setTopicName(String topicName) {
this.topicName = topicName;
return this;
}

@CanIgnoreReturnValue
public KafkaSpanExporterBuilder setProducer(Producer<String, Collection<SpanData>> producer) {
this.producer = producer;
return this;
}

@CanIgnoreReturnValue
public KafkaSpanExporterBuilder setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
return this;
}

@CanIgnoreReturnValue
public KafkaSpanExporterBuilder setTimeoutInSeconds(long timeoutInSeconds) {
this.timeoutInSeconds = timeoutInSeconds;
return this;
}

public KafkaSpanExporter build() {
if (isNull(topicName)) {
throw new IllegalArgumentException("topicName cannot be null");
}
if (isNull(producer)) {
throw new IllegalArgumentException("producer cannot be null");
}
if (isNull(executorService)) {
executorService = Executors.newCachedThreadPool();
}
return new KafkaSpanExporter(topicName, producer, executorService, timeoutInSeconds);
}

public static class ProducerBuilder {
private Map<String, Object> config;
private Serializer<String> keySerializer;
private Serializer<Collection<SpanData>> valueSerializer;

public static ProducerBuilder newInstance() {
return new ProducerBuilder();
}

@SuppressWarnings(value = {"NullAway"})
public ProducerBuilder() {}

@CanIgnoreReturnValue
public ProducerBuilder setConfig(Map<String, Object> config) {
this.config = config;
return this;
}

@CanIgnoreReturnValue
public ProducerBuilder setKeySerializer(Serializer<String> keySerializer) {
this.keySerializer = keySerializer;
return this;
}

@CanIgnoreReturnValue
public ProducerBuilder setValueSerializer(Serializer<Collection<SpanData>> valueSerializer) {
this.valueSerializer = valueSerializer;
return this;
}

public Producer<String, Collection<SpanData>> build() {
if (isNull(config)) {
throw new IllegalArgumentException("producer configuration cannot be null");
}
boolean correctConfig =
((config.containsKey(KEY_SERIALIZER_CLASS_CONFIG)
&& config.containsKey(VALUE_SERIALIZER_CLASS_CONFIG))
^ (nonNull(keySerializer) && nonNull(valueSerializer)))
&& (config.containsKey(KEY_SERIALIZER_CLASS_CONFIG) ^ nonNull(valueSerializer))
&& (config.containsKey(VALUE_SERIALIZER_CLASS_CONFIG) ^ nonNull(keySerializer));
if (!correctConfig) {
throw new IllegalArgumentException(
"Both the key and value serializers should be provided either in the configuration or by using the corresponding setters");
}
if (config.containsKey(KEY_SERIALIZER_CLASS_CONFIG)) {
return new KafkaProducer<>(config);
}
return new KafkaProducer<>(config, keySerializer, valueSerializer);
}
}
}
Loading

0 comments on commit 457daf9

Please sign in to comment.