Skip to content

Commit

Permalink
Merge pull request #805 from camunda-community-hub/renovate/version.h…
Browse files Browse the repository at this point in the history
…ttpcore5

deps: Update dependency org.apache.httpcomponents.core5:httpcore5 to v5.3.1
  • Loading branch information
renovate[bot] authored Dec 30, 2024
2 parents e77763f + f3cf4f6 commit 4da65ab
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package io.zeebe.containers.exporter;

import io.camunda.zeebe.protocol.record.Record;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
Expand All @@ -24,12 +26,21 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import net.jcip.annotations.ThreadSafe;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.URIScheme;
import org.apache.hc.core5.http.config.CharCodingConfig;
import org.apache.hc.core5.http.impl.HttpProcessors;
import org.apache.hc.core5.http.impl.bootstrap.AsyncServerBootstrap;
import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
import org.apache.hc.core5.http.impl.routing.RequestRouter;
import org.apache.hc.core5.http.nio.AsyncDataConsumer;
import org.apache.hc.core5.http.nio.AsyncFilterChain;
import org.apache.hc.core5.http.nio.AsyncFilterHandler;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.net.URIAuthority;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.reactor.ListenerEndpoint;
import org.apiguardian.api.API;
Expand Down Expand Up @@ -257,11 +268,13 @@ private HttpAsyncServer createServer() {
return AsyncServerBootstrap.bootstrap()
.setIOReactorConfig(config)
.setCanonicalHostName("localhost")
.setExceptionCallback(e -> LOGGER.warn("Error occurred in DebugReceiver server", e))
.setCharCodingConfig(CharCodingConfig.custom().setCharset(StandardCharsets.UTF_8).build())
.setHttpProcessor(HttpProcessors.server("ztc-debug/1.1"))
// need to register the handler on both the primary and possibly Testcontainers' proxy for
// our local server, as otherwise the requests with hosts that do not match will be skipped
.registerVirtual(GenericContainer.INTERNAL_HOST_HOSTNAME, "/records", recordHandler)
.register(GenericContainer.INTERNAL_HOST_HOSTNAME, "/records", recordHandler)
.register("127.0.0.1", "/records", recordHandler)
.register("/records", recordHandler)
.create();
}
Expand Down
84 changes: 49 additions & 35 deletions core/src/main/java/io/zeebe/containers/exporter/RecordHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,15 @@
*/
package io.zeebe.containers.exporter;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.camunda.zeebe.protocol.jackson.ZeebeProtocolModule;
import io.camunda.zeebe.protocol.record.Record;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.apache.hc.core5.http.EntityDetails;
Expand Down Expand Up @@ -58,7 +57,7 @@
* <li>200 - the records were passed through, and the response body will be a singleton map with
* one key, <em>position</em>, the value of which is the highest acknowledged position for the
* partition ID from which the records are coming from
* <li>204 - there were either no records passed, or there is no known acknowledged position yet
* <li>200 - there were either no records passed, or there is no known acknowledged position yet
* for the partition form which the records are coming from
* <li>400 - if there is no request body, or the request body cannot be parsed as a list of
* records
Expand All @@ -69,6 +68,7 @@ final class RecordHandler implements AsyncServerRequestHandler<Message<HttpReque
private static final Logger LOGGER = LoggerFactory.getLogger(RecordHandler.class);
private static final ObjectMapper MAPPER =
new ObjectMapper().registerModule(new ZeebeProtocolModule());
private static final byte[] EMPTY_BODY = "{}".getBytes(StandardCharsets.UTF_8);

private final Consumer<Record<?>> recordConsumer;
private final boolean autoAcknowledge;
Expand Down Expand Up @@ -96,32 +96,27 @@ public void handle(
final HttpContext context)
throws HttpException, IOException {
final byte[] requestBody = requestObject.getBody();
final AsyncResponseProducer responseProducer = handleRequest(requestBody);
responseTrigger.submitResponse(responseProducer, context);
}

private AsyncResponseProducer handleRequest(final byte[] requestBody)
throws JsonProcessingException {
if (requestBody == null || requestBody.length == 0) {
final BasicHttpResponse response =
new BasicHttpResponse(HttpStatus.SC_BAD_REQUEST, "must send a list of records as body");
responseTrigger.submitResponse(new BasicResponseProducer(response), context);
return;
return createErrorResponse(HttpStatus.SC_BAD_REQUEST, "Must send a list of records as body");
}

final List<Record<?>> records;
try {
records = MAPPER.readValue(requestBody, new TypeReference<List<Record<?>>>() {});
} catch (final IOException e) {
final BasicHttpResponse response =
new BasicHttpResponse(
HttpStatus.SC_BAD_REQUEST,
"failed to deserialize records, see receiver logs for more");
responseTrigger.submitResponse(new BasicResponseProducer(response), context);
LOGGER.warn("Failed to deserialize exported records", e);

return;
return createErrorResponse(
HttpStatus.SC_BAD_REQUEST, "Failed to deserialize records, see receiver logs for more");
}

if (records.isEmpty()) {
final BasicHttpResponse response =
new BasicHttpResponse(HttpStatus.SC_NO_CONTENT, "no records given");
responseTrigger.submitResponse(new BasicResponseProducer(response), context);
return;
LOGGER.debug("No records given, will return a successful response regardless");
}

for (final Record<?> record : records) {
Expand All @@ -132,26 +127,45 @@ public void handle(
}
}

final int partitionId = records.get(0).getPartitionId();
final AsyncResponseProducer responseProducer = createSuccessfulResponse(partitionId);
responseTrigger.submitResponse(responseProducer, context);
return createSuccessfulResponse(records);
}

private AsyncResponseProducer createSuccessfulResponse(final int partitionId)
private AsyncResponseProducer createSuccessfulResponse(final List<Record<?>> records)
throws JsonProcessingException {
final Long position = positions.get(partitionId);

if (position == null) {
final HttpResponse response =
new BasicHttpResponse(
HttpStatus.SC_NO_CONTENT, "no acknowledged position for partition " + partitionId);
return new BasicResponseProducer(response);
}

final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_OK);
response.setHeader("Content-Type", "application/json");
final byte[] responseBody =
MAPPER.writeValueAsBytes(Collections.singletonMap("position", position));
records.isEmpty()
? EMPTY_BODY
: MAPPER.writeValueAsBytes(
Collections.singletonMap(
"position", positions.get(records.get(0).getPartitionId())));

response.setHeader("Content-Type", "application/json; charset=UTF-8");
return new BasicResponseProducer(response, new BasicAsyncEntityProducer(responseBody));
}

private AsyncResponseProducer createErrorResponse(final int status, final String message)
throws JsonProcessingException {
final ProblemDetail problem = new ProblemDetail(status, message);
final HttpResponse response = new BasicHttpResponse(status);
response.setHeader("Content-Type", "application/json; charset=UTF-8");

final byte[] responseBody = MAPPER.writeValueAsBytes(problem);
return new BasicResponseProducer(response, new BasicAsyncEntityProducer(responseBody));
}

@SuppressWarnings({"FieldCanBeLocal", "unused"})
@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY)
private static final class ProblemDetail {
private final String type = "about:blank";
private final String instance = "/records";

private final int status;
private final String detail;

private ProblemDetail(int status, String detail) {
this.status = status;
this.detail = detail;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.test.broker.protocol.ProtocolFactory;
import java.net.ConnectException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -107,7 +108,13 @@ void shouldRetryOnNonSuccessfulHttpCode() {
void shouldHandleNoResponseBody() {
// given
final Record<?> record = recordFactory.generateRecord();
WireMock.stubFor(WireMock.post("/records").willReturn(WireMock.aResponse().withStatus(204)));
WireMock.stubFor(
WireMock.post("/records")
.willReturn(
WireMock.aResponse()
.withStatus(200)
.withResponseBody(
Body.fromJsonBytes("{}".getBytes(StandardCharsets.UTF_8)))));
controller.updateLastExportedRecordPosition(10L);

// when
Expand Down Expand Up @@ -141,7 +148,7 @@ void shouldExportRecordAsList() {
// given
final Record<?> record = recordFactory.generateRecord();
final String expectedRequestBody = Json.write(Collections.singletonList(record));
WireMock.stubFor(WireMock.post("/records").willReturn(WireMock.aResponse().withStatus(204)));
WireMock.stubFor(WireMock.post("/records").willReturn(WireMock.aResponse().withStatus(200)));

// when
exporter.export(record);
Expand All @@ -158,7 +165,7 @@ void shouldExportRecordsOneAtATime() {
// given
final Record<?> firstRecord = recordFactory.generateRecord();
final Record<?> secondRecord = recordFactory.generateRecord();
WireMock.stubFor(WireMock.post("/records").willReturn(WireMock.aResponse().withStatus(204)));
WireMock.stubFor(WireMock.post("/records").willReturn(WireMock.aResponse().withStatus(200)));

// when
exporter.export(firstRecord);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ void shouldBindToPortOnStart() throws IOException {
.body(Collections.emptyList())
.post(receiver.recordsEndpoint())
.then()
.statusCode(204);
.statusCode(200);
}
}

Expand All @@ -145,7 +145,7 @@ void shouldBindToRandomPort() {
.body(Collections.emptyList())
.post(receiver.recordsEndpoint())
.then()
.statusCode(204);
.statusCode(200);
}
}

Expand Down Expand Up @@ -209,6 +209,7 @@ void shouldAcknowledgePosition() {
RestAssured.given()
.body(Collections.singletonList(record))
.contentType("application/json")
.accept("application/json")
.post(receiver.recordsEndpoint())
.as(Map.class);
assertThat(response).containsEntry("position", 20);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.net.http.HttpRequest.BodyPublishers;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.Objects;
Expand Down Expand Up @@ -99,6 +100,13 @@ public void export(final Record<?> record) {
Thread.currentThread().interrupt();
LangUtil.rethrowUnchecked(e);
} catch (final Exception e) {
context
.getLogger()
.warn(
"Failed to export record {} of partition {}",
record.getPosition(),
record.getPartitionId(),
e);
LangUtil.rethrowUnchecked(e);
}

Expand All @@ -112,14 +120,15 @@ private void pushRecord(final Record<?> record) throws IOException, InterruptedE
final HttpResponse<byte[]> response = client.send(request, BodyHandlers.ofByteArray());
final int statusCode = response.statusCode();
if (statusCode >= 400) {
final String error =
hasResponseBody(response) ? new String(response.body(), StandardCharsets.UTF_8) : "";
throw new BadRequestException(
String.format(
"Failed to push out record with position %d on partition %d: response code %d",
record.getPosition(), record.getPartitionId(), statusCode));
"Failed to push out record with position %d on partition %d: response code=%d, error='%s'",
record.getPosition(), record.getPartitionId(), statusCode, error));
}

// is there a body to read?
if (statusCode != 204) {
if (hasResponseBody(response)) {
handleAcknowledgment(response.body(), record.getPartitionId());
}

Expand All @@ -129,6 +138,10 @@ private void pushRecord(final Record<?> record) throws IOException, InterruptedE
"Exported record {} to {} (status code: {})", record, config.endpointURI(), statusCode);
}

private boolean hasResponseBody(final HttpResponse<byte[]> response) {
return response.body() != null && response.body().length > 0;
}

private HttpRequest buildRequestForRecord(final Record<?> record) throws JsonProcessingException {
return HttpRequest.newBuilder()
.uri(config.endpointURI())
Expand All @@ -144,9 +157,9 @@ private HttpRequest buildRequestForRecord(final Record<?> record) throws JsonPro
private void handleAcknowledgment(final byte[] responseBody, final int partitionId)
throws IOException {
final RecordsResponse body = MAPPER.readValue(responseBody, RecordsResponse.class);
final long position = body.position;
final Long position = body.position;

if (position > -1) {
if (position != null && position > -1) {
controller.updateLastExportedRecordPosition(position);
context
.getLogger()
Expand Down
7 changes: 6 additions & 1 deletion justfile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@ default:
./mvnw clean -T2C {{ mvnArgs }}

test +mvnArgs='':
./mvnw verify -DskipChecks -T1C {{ mvnArgs }}
./mvnw verify -DskipChecks -T1C {{ mvnArgs }}

# use only if you have a pretty beefy machine :) if not, you can always just set a lower forkCount by calling
# `just fast-test -DforkCount=2` (or something like that)
fast-test +mvnArgs='':
./mvnw verify -DskipChecks -T1C -Pparallel-tests -DforkCount=3

@ut +mvnArgs='': (test "-DskipITs" mvnArgs)
@it +mvnArgs='': (test "-DskipUTs" mvnArgs)
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
<version.docker>3.4.1</version.docker>
<version.duct-tape>1.0.8</version.duct-tape>
<version.feign>13.5</version.feign>
<version.httpcore5>5.2.5</version.httpcore5>
<version.httpcore5>5.3.1</version.httpcore5>
<version.jackson>2.18.2</version.jackson>
<version.jcip>1.0</version.jcip>
<version.junit-jupiter>5.11.4</version.junit-jupiter>
Expand Down

0 comments on commit 4da65ab

Please sign in to comment.