diff --git a/src/main/java/com/chavaillaz/appender/log4j/AbstractBatchLogDeliveryAppender.java b/src/main/java/com/chavaillaz/appender/log4j/AbstractBatchLogDeliveryAppender.java new file mode 100644 index 0000000..93dd1cd --- /dev/null +++ b/src/main/java/com/chavaillaz/appender/log4j/AbstractBatchLogDeliveryAppender.java @@ -0,0 +1,89 @@ +package com.chavaillaz.appender.log4j; + +import static java.util.Collections.singletonList; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import com.chavaillaz.appender.LogConfiguration; +import com.chavaillaz.appender.LogDelivery; +import lombok.Getter; +import lombok.extern.log4j.Log4j2; + +/** + * Abstract implementation of logs transmission by batches. + * + * @param The configuration type + */ +@Log4j2 +public abstract class AbstractBatchLogDeliveryAppender implements LogDelivery { + + /** + * The list of logs documents to be sent with the next transmission + */ + private final List> batch = new ArrayList<>(); + + /** + * The configuration of the logs transmission + */ + @Getter + private final C configuration; + + /** + * Creates a new logs delivery handler sending them by batches. + * + * @param configuration The configuration to use + */ + protected AbstractBatchLogDeliveryAppender(C configuration) { + this.configuration = configuration; + } + + @Override + public void send(Map document) { + send(singletonList(document)); + } + + @Override + public void send(List> documents) { + if (documents != null && !documents.isEmpty()) { + stackAndSend(documents); + } + } + + @Override + public synchronized void flush() { + if (!batch.isEmpty()) { + log.debug("Sending partial batch of {}/{}", batch.size(), configuration.getFlushThreshold()); + sendBatch(); + } + } + + private synchronized void stackAndSend(List> data) { + batch.addAll(data); + log.trace("Stacking batch {}/{}", batch.size(), configuration.getFlushThreshold()); + if (batch.size() >= configuration.getFlushThreshold()) { + sendBatch(); + } + } + + private synchronized void sendBatch() { + if (!batch.isEmpty() && sendBulk(batch)) { + batch.clear(); + } + } + + /** + * Sends the given list of documents representing logs. + * + * @param documents The list of record data + * @return {@code true} if the transmission was successful, {@code false} otherwise + */ + protected abstract boolean sendBulk(List> documents); + + @Override + public void close() { + sendBatch(); + } + +} diff --git a/src/main/java/com/chavaillaz/appender/log4j/elastic/ElasticsearchLogDelivery.java b/src/main/java/com/chavaillaz/appender/log4j/elastic/ElasticsearchLogDelivery.java index 6576111..7d6a7c9 100644 --- a/src/main/java/com/chavaillaz/appender/log4j/elastic/ElasticsearchLogDelivery.java +++ b/src/main/java/com/chavaillaz/appender/log4j/elastic/ElasticsearchLogDelivery.java @@ -3,32 +3,23 @@ import static com.chavaillaz.appender.CommonUtils.createSSLContext; import static com.chavaillaz.appender.log4j.elastic.ElasticsearchUtils.createClient; import static java.time.OffsetDateTime.now; -import static java.util.Collections.singletonList; import static org.apache.commons.lang3.StringUtils.isNotBlank; -import java.util.ArrayList; import java.util.List; import java.util.Map; import co.elastic.clients.elasticsearch.ElasticsearchClient; import co.elastic.clients.elasticsearch.core.BulkRequest; import co.elastic.clients.elasticsearch.core.BulkResponse; -import com.chavaillaz.appender.LogDelivery; -import lombok.Getter; +import com.chavaillaz.appender.log4j.AbstractBatchLogDeliveryAppender; import lombok.extern.log4j.Log4j2; /** * Implementation of logs transmission for Elasticsearch. */ @Log4j2 -public class ElasticsearchLogDelivery implements LogDelivery { +public class ElasticsearchLogDelivery extends AbstractBatchLogDeliveryAppender { - private final List> batch = new ArrayList<>(); - - @Getter - private final ElasticsearchConfiguration configuration; - - @Getter private final ElasticsearchClient client; /** @@ -37,7 +28,7 @@ public class ElasticsearchLogDelivery implements LogDelivery { * @param configuration The configuration to use */ public ElasticsearchLogDelivery(ElasticsearchConfiguration configuration) { - this.configuration = configuration; + super(configuration); if (isNotBlank(configuration.getApiKey())) { client = createClient(configuration.getUrl(), createSSLContext(), configuration.getApiKey()); @@ -47,46 +38,13 @@ public ElasticsearchLogDelivery(ElasticsearchConfiguration configuration) { } @Override - public void send(Map document) { - send(singletonList(document)); - } - - @Override - public void send(List> documents) { - if (documents != null && !documents.isEmpty()) { - stackAndSend(documents); - } - } - - @Override - public synchronized void flush() { - if (!batch.isEmpty()) { - log.debug("Sending partial batch of {}/{}", batch.size(), configuration.getFlushThreshold()); - sendBatch(); - } - } - - private synchronized void stackAndSend(List> data) { - batch.addAll(data); - log.trace("Batch size {}/{}", batch.size(), configuration.getFlushThreshold()); - if (batch.size() >= configuration.getFlushThreshold()) { - sendBatch(); - } - } - - private synchronized void sendBatch() { - if (!batch.isEmpty() && sendBulk(batch)) { - batch.clear(); - } - } - - private boolean sendBulk(List> documents) { + protected boolean sendBulk(List> documents) { try { BulkRequest.Builder builder = new BulkRequest.Builder(); for (Map document : documents) { builder.operations(operation -> operation .index(index -> index - .index(configuration.generateIndexName(now())) + .index(getConfiguration().generateIndexName(now())) .document(document))); } @@ -104,7 +62,7 @@ private boolean sendBulk(List> documents) { @Override public void close() { if (client != null) { - sendBatch(); + super.close(); } }