Skip to content

Commit

Permalink
Put batch transmission logic in a separated class.
Browse files Browse the repository at this point in the history
  • Loading branch information
Chavjoh committed Jan 4, 2025
1 parent c6b1b59 commit d7598e6
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 48 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package com.chavaillaz.appender.log4j;

import static java.util.Collections.singletonList;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import com.chavaillaz.appender.LogConfiguration;
import com.chavaillaz.appender.LogDelivery;
import lombok.Getter;
import lombok.extern.log4j.Log4j2;

/**
* Abstract implementation of logs transmission by batches.
*
* @param <C> The configuration type
*/
@Log4j2
public abstract class AbstractBatchLogDeliveryAppender<C extends LogConfiguration> implements LogDelivery {

/**
* The list of logs documents to be sent with the next transmission
*/
private final List<Map<String, Object>> batch = new ArrayList<>();

/**
* The configuration of the logs transmission
*/
@Getter
private final C configuration;

/**
* Creates a new logs delivery handler sending them by batches.
*
* @param configuration The configuration to use
*/
protected AbstractBatchLogDeliveryAppender(C configuration) {
this.configuration = configuration;
}

@Override
public void send(Map<String, Object> document) {
send(singletonList(document));
}

@Override
public void send(List<Map<String, Object>> documents) {
if (documents != null && !documents.isEmpty()) {
stackAndSend(documents);
}
}

@Override
public synchronized void flush() {
if (!batch.isEmpty()) {
log.debug("Sending partial batch of {}/{}", batch.size(), configuration.getFlushThreshold());
sendBatch();
}
}

private synchronized void stackAndSend(List<Map<String, Object>> data) {
batch.addAll(data);
log.trace("Stacking batch {}/{}", batch.size(), configuration.getFlushThreshold());
if (batch.size() >= configuration.getFlushThreshold()) {
sendBatch();
}
}

private synchronized void sendBatch() {
if (!batch.isEmpty() && sendBulk(batch)) {
batch.clear();
}
}

/**
* Sends the given list of documents representing logs.
*
* @param documents The list of record data
* @return {@code true} if the transmission was successful, {@code false} otherwise
*/
protected abstract boolean sendBulk(List<Map<String, Object>> documents);

@Override
public void close() {
sendBatch();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,23 @@
import static com.chavaillaz.appender.CommonUtils.createSSLContext;
import static com.chavaillaz.appender.log4j.elastic.ElasticsearchUtils.createClient;
import static java.time.OffsetDateTime.now;
import static java.util.Collections.singletonList;
import static org.apache.commons.lang3.StringUtils.isNotBlank;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.BulkRequest;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import com.chavaillaz.appender.LogDelivery;
import lombok.Getter;
import com.chavaillaz.appender.log4j.AbstractBatchLogDeliveryAppender;
import lombok.extern.log4j.Log4j2;

/**
* Implementation of logs transmission for Elasticsearch.
*/
@Log4j2
public class ElasticsearchLogDelivery implements LogDelivery {
public class ElasticsearchLogDelivery extends AbstractBatchLogDeliveryAppender<ElasticsearchConfiguration> {

private final List<Map<String, Object>> batch = new ArrayList<>();

@Getter
private final ElasticsearchConfiguration configuration;

@Getter
private final ElasticsearchClient client;

/**
Expand All @@ -37,7 +28,7 @@ public class ElasticsearchLogDelivery implements LogDelivery {
* @param configuration The configuration to use
*/
public ElasticsearchLogDelivery(ElasticsearchConfiguration configuration) {
this.configuration = configuration;
super(configuration);

if (isNotBlank(configuration.getApiKey())) {
client = createClient(configuration.getUrl(), createSSLContext(), configuration.getApiKey());
Expand All @@ -47,46 +38,13 @@ public ElasticsearchLogDelivery(ElasticsearchConfiguration configuration) {
}

@Override
public void send(Map<String, Object> document) {
send(singletonList(document));
}

@Override
public void send(List<Map<String, Object>> documents) {
if (documents != null && !documents.isEmpty()) {
stackAndSend(documents);
}
}

@Override
public synchronized void flush() {
if (!batch.isEmpty()) {
log.debug("Sending partial batch of {}/{}", batch.size(), configuration.getFlushThreshold());
sendBatch();
}
}

private synchronized void stackAndSend(List<Map<String, Object>> data) {
batch.addAll(data);
log.trace("Batch size {}/{}", batch.size(), configuration.getFlushThreshold());
if (batch.size() >= configuration.getFlushThreshold()) {
sendBatch();
}
}

private synchronized void sendBatch() {
if (!batch.isEmpty() && sendBulk(batch)) {
batch.clear();
}
}

private boolean sendBulk(List<Map<String, Object>> documents) {
protected boolean sendBulk(List<Map<String, Object>> documents) {
try {
BulkRequest.Builder builder = new BulkRequest.Builder();
for (Map<String, Object> document : documents) {
builder.operations(operation -> operation
.index(index -> index
.index(configuration.generateIndexName(now()))
.index(getConfiguration().generateIndexName(now()))
.document(document)));
}

Expand All @@ -104,7 +62,7 @@ private boolean sendBulk(List<Map<String, Object>> documents) {
@Override
public void close() {
if (client != null) {
sendBatch();
super.close();
}
}

Expand Down

0 comments on commit d7598e6

Please sign in to comment.