From 2166b00490b0779396fd3b82ed0a4649dde36a24 Mon Sep 17 00:00:00 2001
From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com>
Date: Mon, 30 Dec 2024 08:22:55 +0000
Subject: [PATCH 1/3] deps: Update dependency
org.apache.httpcomponents.core5:httpcore5 to v5.3.1
---
pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/pom.xml b/pom.xml
index bf22ff53..098d8e7e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -66,7 +66,7 @@
3.4.1
1.0.8
13.5
- 5.2.5
+ 5.3.1
2.18.2
1.0
5.11.4
From 3d2f950e3a9d0bcd87e521cfba30aae587610482 Mon Sep 17 00:00:00 2001
From: Nicolas Pepin-Perreault
Date: Mon, 30 Dec 2024 10:52:32 +0100
Subject: [PATCH 2/3] build: add fast-test job for local speed up
---
justfile | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
diff --git a/justfile b/justfile
index 7c3aec2b..642fa300 100644
--- a/justfile
+++ b/justfile
@@ -22,7 +22,12 @@ default:
./mvnw clean -T2C {{ mvnArgs }}
test +mvnArgs='':
- ./mvnw verify -DskipChecks -T1C {{ mvnArgs }}
+ ./mvnw verify -DskipChecks -T1C {{ mvnArgs }}
+
+# use only if you have a pretty beefy machine :) if not, you can always just set a lower forkCount by calling
+# `just fast-test -DforkCount=2` (or something like that)
+fast-test +mvnArgs='':
+ ./mvnw verify -DskipChecks -T1C -Pparallel-tests -DforkCount=3
@ut +mvnArgs='': (test "-DskipITs" mvnArgs)
@it +mvnArgs='': (test "-DskipUTs" mvnArgs)
From f3cf4f695c1efc69fa87286f2a47204d4df2c0c5 Mon Sep 17 00:00:00 2001
From: Nicolas Pepin-Perreault
Date: Mon, 30 Dec 2024 10:52:56 +0100
Subject: [PATCH 3/3] refactor: route 127.0.0.1 for Apache HTTP client 5
---
.../containers/exporter/DebugReceiver.java | 15 +++-
.../containers/exporter/RecordHandler.java | 84 +++++++++++--------
.../exporter/DebugExporterTest.java | 13 ++-
.../exporter/DebugReceiverTest.java | 5 +-
.../containers/exporter/DebugExporter.java | 25 ++++--
5 files changed, 95 insertions(+), 47 deletions(-)
diff --git a/core/src/main/java/io/zeebe/containers/exporter/DebugReceiver.java b/core/src/main/java/io/zeebe/containers/exporter/DebugReceiver.java
index e8a1fcad..7019e0c7 100644
--- a/core/src/main/java/io/zeebe/containers/exporter/DebugReceiver.java
+++ b/core/src/main/java/io/zeebe/containers/exporter/DebugReceiver.java
@@ -16,6 +16,8 @@
package io.zeebe.containers.exporter;
import io.camunda.zeebe.protocol.record.Record;
+
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
@@ -24,12 +26,21 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import net.jcip.annotations.ThreadSafe;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.URIScheme;
import org.apache.hc.core5.http.config.CharCodingConfig;
import org.apache.hc.core5.http.impl.HttpProcessors;
import org.apache.hc.core5.http.impl.bootstrap.AsyncServerBootstrap;
import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
+import org.apache.hc.core5.http.impl.routing.RequestRouter;
+import org.apache.hc.core5.http.nio.AsyncDataConsumer;
+import org.apache.hc.core5.http.nio.AsyncFilterChain;
+import org.apache.hc.core5.http.nio.AsyncFilterHandler;
+import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.io.CloseMode;
+import org.apache.hc.core5.net.URIAuthority;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.reactor.ListenerEndpoint;
import org.apiguardian.api.API;
@@ -257,11 +268,13 @@ private HttpAsyncServer createServer() {
return AsyncServerBootstrap.bootstrap()
.setIOReactorConfig(config)
.setCanonicalHostName("localhost")
+ .setExceptionCallback(e -> LOGGER.warn("Error occurred in DebugReceiver server", e))
.setCharCodingConfig(CharCodingConfig.custom().setCharset(StandardCharsets.UTF_8).build())
.setHttpProcessor(HttpProcessors.server("ztc-debug/1.1"))
// need to register the handler on both the primary and possibly Testcontainers' proxy for
// our local server, as otherwise the requests with hosts that do not match will be skipped
- .registerVirtual(GenericContainer.INTERNAL_HOST_HOSTNAME, "/records", recordHandler)
+ .register(GenericContainer.INTERNAL_HOST_HOSTNAME, "/records", recordHandler)
+ .register("127.0.0.1", "/records", recordHandler)
.register("/records", recordHandler)
.create();
}
diff --git a/core/src/main/java/io/zeebe/containers/exporter/RecordHandler.java b/core/src/main/java/io/zeebe/containers/exporter/RecordHandler.java
index 181d8fd8..5351079c 100644
--- a/core/src/main/java/io/zeebe/containers/exporter/RecordHandler.java
+++ b/core/src/main/java/io/zeebe/containers/exporter/RecordHandler.java
@@ -15,16 +15,15 @@
*/
package io.zeebe.containers.exporter;
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.camunda.zeebe.protocol.jackson.ZeebeProtocolModule;
import io.camunda.zeebe.protocol.record.Record;
import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.apache.hc.core5.http.EntityDetails;
@@ -58,7 +57,7 @@
* 200 - the records were passed through, and the response body will be a singleton map with
* one key, position, the value of which is the highest acknowledged position for the
* partition ID from which the records are coming from
- * 204 - there were either no records passed, or there is no known acknowledged position yet
+ * 200 - there were either no records passed, or there is no known acknowledged position yet
* for the partition form which the records are coming from
* 400 - if there is no request body, or the request body cannot be parsed as a list of
* records
@@ -69,6 +68,7 @@ final class RecordHandler implements AsyncServerRequestHandler> recordConsumer;
private final boolean autoAcknowledge;
@@ -96,32 +96,27 @@ public void handle(
final HttpContext context)
throws HttpException, IOException {
final byte[] requestBody = requestObject.getBody();
+ final AsyncResponseProducer responseProducer = handleRequest(requestBody);
+ responseTrigger.submitResponse(responseProducer, context);
+ }
+
+ private AsyncResponseProducer handleRequest(final byte[] requestBody)
+ throws JsonProcessingException {
if (requestBody == null || requestBody.length == 0) {
- final BasicHttpResponse response =
- new BasicHttpResponse(HttpStatus.SC_BAD_REQUEST, "must send a list of records as body");
- responseTrigger.submitResponse(new BasicResponseProducer(response), context);
- return;
+ return createErrorResponse(HttpStatus.SC_BAD_REQUEST, "Must send a list of records as body");
}
final List> records;
try {
records = MAPPER.readValue(requestBody, new TypeReference>>() {});
} catch (final IOException e) {
- final BasicHttpResponse response =
- new BasicHttpResponse(
- HttpStatus.SC_BAD_REQUEST,
- "failed to deserialize records, see receiver logs for more");
- responseTrigger.submitResponse(new BasicResponseProducer(response), context);
LOGGER.warn("Failed to deserialize exported records", e);
-
- return;
+ return createErrorResponse(
+ HttpStatus.SC_BAD_REQUEST, "Failed to deserialize records, see receiver logs for more");
}
if (records.isEmpty()) {
- final BasicHttpResponse response =
- new BasicHttpResponse(HttpStatus.SC_NO_CONTENT, "no records given");
- responseTrigger.submitResponse(new BasicResponseProducer(response), context);
- return;
+ LOGGER.debug("No records given, will return a successful response regardless");
}
for (final Record> record : records) {
@@ -132,26 +127,45 @@ public void handle(
}
}
- final int partitionId = records.get(0).getPartitionId();
- final AsyncResponseProducer responseProducer = createSuccessfulResponse(partitionId);
- responseTrigger.submitResponse(responseProducer, context);
+ return createSuccessfulResponse(records);
}
- private AsyncResponseProducer createSuccessfulResponse(final int partitionId)
+ private AsyncResponseProducer createSuccessfulResponse(final List> records)
throws JsonProcessingException {
- final Long position = positions.get(partitionId);
-
- if (position == null) {
- final HttpResponse response =
- new BasicHttpResponse(
- HttpStatus.SC_NO_CONTENT, "no acknowledged position for partition " + partitionId);
- return new BasicResponseProducer(response);
- }
-
final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_OK);
- response.setHeader("Content-Type", "application/json");
final byte[] responseBody =
- MAPPER.writeValueAsBytes(Collections.singletonMap("position", position));
+ records.isEmpty()
+ ? EMPTY_BODY
+ : MAPPER.writeValueAsBytes(
+ Collections.singletonMap(
+ "position", positions.get(records.get(0).getPartitionId())));
+
+ response.setHeader("Content-Type", "application/json; charset=UTF-8");
return new BasicResponseProducer(response, new BasicAsyncEntityProducer(responseBody));
}
+
+ private AsyncResponseProducer createErrorResponse(final int status, final String message)
+ throws JsonProcessingException {
+ final ProblemDetail problem = new ProblemDetail(status, message);
+ final HttpResponse response = new BasicHttpResponse(status);
+ response.setHeader("Content-Type", "application/json; charset=UTF-8");
+
+ final byte[] responseBody = MAPPER.writeValueAsBytes(problem);
+ return new BasicResponseProducer(response, new BasicAsyncEntityProducer(responseBody));
+ }
+
+ @SuppressWarnings({"FieldCanBeLocal", "unused"})
+ @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY)
+ private static final class ProblemDetail {
+ private final String type = "about:blank";
+ private final String instance = "/records";
+
+ private final int status;
+ private final String detail;
+
+ private ProblemDetail(int status, String detail) {
+ this.status = status;
+ this.detail = detail;
+ }
+ }
}
diff --git a/exporter-test/src/test/java/io/zeebe/containers/exporter/DebugExporterTest.java b/exporter-test/src/test/java/io/zeebe/containers/exporter/DebugExporterTest.java
index ea2e7a89..a8dc7de7 100644
--- a/exporter-test/src/test/java/io/zeebe/containers/exporter/DebugExporterTest.java
+++ b/exporter-test/src/test/java/io/zeebe/containers/exporter/DebugExporterTest.java
@@ -30,6 +30,7 @@
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.test.broker.protocol.ProtocolFactory;
import java.net.ConnectException;
+import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
import org.junit.jupiter.api.BeforeEach;
@@ -107,7 +108,13 @@ void shouldRetryOnNonSuccessfulHttpCode() {
void shouldHandleNoResponseBody() {
// given
final Record> record = recordFactory.generateRecord();
- WireMock.stubFor(WireMock.post("/records").willReturn(WireMock.aResponse().withStatus(204)));
+ WireMock.stubFor(
+ WireMock.post("/records")
+ .willReturn(
+ WireMock.aResponse()
+ .withStatus(200)
+ .withResponseBody(
+ Body.fromJsonBytes("{}".getBytes(StandardCharsets.UTF_8)))));
controller.updateLastExportedRecordPosition(10L);
// when
@@ -141,7 +148,7 @@ void shouldExportRecordAsList() {
// given
final Record> record = recordFactory.generateRecord();
final String expectedRequestBody = Json.write(Collections.singletonList(record));
- WireMock.stubFor(WireMock.post("/records").willReturn(WireMock.aResponse().withStatus(204)));
+ WireMock.stubFor(WireMock.post("/records").willReturn(WireMock.aResponse().withStatus(200)));
// when
exporter.export(record);
@@ -158,7 +165,7 @@ void shouldExportRecordsOneAtATime() {
// given
final Record> firstRecord = recordFactory.generateRecord();
final Record> secondRecord = recordFactory.generateRecord();
- WireMock.stubFor(WireMock.post("/records").willReturn(WireMock.aResponse().withStatus(204)));
+ WireMock.stubFor(WireMock.post("/records").willReturn(WireMock.aResponse().withStatus(200)));
// when
exporter.export(firstRecord);
diff --git a/exporter-test/src/test/java/io/zeebe/containers/exporter/DebugReceiverTest.java b/exporter-test/src/test/java/io/zeebe/containers/exporter/DebugReceiverTest.java
index 6a478ea0..71c15c70 100644
--- a/exporter-test/src/test/java/io/zeebe/containers/exporter/DebugReceiverTest.java
+++ b/exporter-test/src/test/java/io/zeebe/containers/exporter/DebugReceiverTest.java
@@ -129,7 +129,7 @@ void shouldBindToPortOnStart() throws IOException {
.body(Collections.emptyList())
.post(receiver.recordsEndpoint())
.then()
- .statusCode(204);
+ .statusCode(200);
}
}
@@ -145,7 +145,7 @@ void shouldBindToRandomPort() {
.body(Collections.emptyList())
.post(receiver.recordsEndpoint())
.then()
- .statusCode(204);
+ .statusCode(200);
}
}
@@ -209,6 +209,7 @@ void shouldAcknowledgePosition() {
RestAssured.given()
.body(Collections.singletonList(record))
.contentType("application/json")
+ .accept("application/json")
.post(receiver.recordsEndpoint())
.as(Map.class);
assertThat(response).containsEntry("position", 20);
diff --git a/exporter/src/main/java/io/zeebe/containers/exporter/DebugExporter.java b/exporter/src/main/java/io/zeebe/containers/exporter/DebugExporter.java
index 18fdce8d..0b5a61de 100644
--- a/exporter/src/main/java/io/zeebe/containers/exporter/DebugExporter.java
+++ b/exporter/src/main/java/io/zeebe/containers/exporter/DebugExporter.java
@@ -30,6 +30,7 @@
import java.net.http.HttpRequest.BodyPublishers;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
+import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.Objects;
@@ -99,6 +100,13 @@ public void export(final Record> record) {
Thread.currentThread().interrupt();
LangUtil.rethrowUnchecked(e);
} catch (final Exception e) {
+ context
+ .getLogger()
+ .warn(
+ "Failed to export record {} of partition {}",
+ record.getPosition(),
+ record.getPartitionId(),
+ e);
LangUtil.rethrowUnchecked(e);
}
@@ -112,14 +120,15 @@ private void pushRecord(final Record> record) throws IOException, InterruptedE
final HttpResponse response = client.send(request, BodyHandlers.ofByteArray());
final int statusCode = response.statusCode();
if (statusCode >= 400) {
+ final String error =
+ hasResponseBody(response) ? new String(response.body(), StandardCharsets.UTF_8) : "";
throw new BadRequestException(
String.format(
- "Failed to push out record with position %d on partition %d: response code %d",
- record.getPosition(), record.getPartitionId(), statusCode));
+ "Failed to push out record with position %d on partition %d: response code=%d, error='%s'",
+ record.getPosition(), record.getPartitionId(), statusCode, error));
}
- // is there a body to read?
- if (statusCode != 204) {
+ if (hasResponseBody(response)) {
handleAcknowledgment(response.body(), record.getPartitionId());
}
@@ -129,6 +138,10 @@ private void pushRecord(final Record> record) throws IOException, InterruptedE
"Exported record {} to {} (status code: {})", record, config.endpointURI(), statusCode);
}
+ private boolean hasResponseBody(final HttpResponse response) {
+ return response.body() != null && response.body().length > 0;
+ }
+
private HttpRequest buildRequestForRecord(final Record> record) throws JsonProcessingException {
return HttpRequest.newBuilder()
.uri(config.endpointURI())
@@ -144,9 +157,9 @@ private HttpRequest buildRequestForRecord(final Record> record) throws JsonPro
private void handleAcknowledgment(final byte[] responseBody, final int partitionId)
throws IOException {
final RecordsResponse body = MAPPER.readValue(responseBody, RecordsResponse.class);
- final long position = body.position;
+ final Long position = body.position;
- if (position > -1) {
+ if (position != null && position > -1) {
controller.updateLastExportedRecordPosition(position);
context
.getLogger()