Skip to content

Commit

Permalink
Update dependency io.quarkiverse.reactivemessaging.nats-jetstream:qua…
Browse files Browse the repository at this point in the history
…rkus-messaging-nats-jetstream to v3.17.10 (#10114)

* Update dependency io.quarkiverse.reactivemessaging.nats-jetstream:quarkus-messaging-nats-jetstream to v3.17.10

* fix nats example (again)

---------

Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
Co-authored-by: Alexandre Dutra <[email protected]>
  • Loading branch information
renovate[bot] and adutra authored Dec 27, 2024
1 parent 80c3316 commit 65b2ec8
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 14 deletions.
2 changes: 1 addition & 1 deletion events/ri/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ dependencies {

// Quarkus - NATS
implementation(
"io.quarkiverse.reactivemessaging.nats-jetstream:quarkus-messaging-nats-jetstream:3.17.8"
"io.quarkiverse.reactivemessaging.nats-jetstream:quarkus-messaging-nats-jetstream:3.17.10"
)

// Avro serialization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package org.projectnessie.events.ri.messaging.nats;

import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.SubscribeMessageMetadata;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.PublishMessageMetadata;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -48,10 +48,11 @@ public AbstractNatsEventSubscriber(Emitter<T> emitter, EventSubscriberConfig con
protected Message<T> createMessage(Event upstreamEvent, T messagePayload) {
Map<String, List<String>> headers = new HashMap<>();
createHeaders(upstreamEvent, (name, value) -> headers.put(name, List.of(value)));
SubscribeMessageMetadata metadata =
SubscribeMessageMetadata.builder()
PublishMessageMetadata metadata =
PublishMessageMetadata.builder()
.messageId(upstreamEvent.getIdAsText())
.headers(headers)
.stream("nessie-events")
.subject(subject(upstreamEvent))
.build();
return Message.of(messagePayload, Metadata.of(metadata));
Expand All @@ -61,21 +62,21 @@ protected Message<T> createMessage(Event upstreamEvent, T messagePayload) {
protected CompletionStage<Void> onWriteAck(Metadata metadata) {
// Do NOT enable this log statement in production!
if (LOGGER.isDebugEnabled()) {
SubscribeMessageMetadata jetStreamMetadata =
metadata.get(SubscribeMessageMetadata.class).orElseThrow();
PublishMessageMetadata jetStreamMetadata =
metadata.get(PublishMessageMetadata.class).orElseThrow();
String id = jetStreamMetadata.messageId();
String subject = jetStreamMetadata.subjectOptional().orElse("<?>");
String subject = jetStreamMetadata.subject();
LOGGER.debug("Event written: messageId={}, subject={}", id, subject);
}
return CompletableFuture.completedFuture(null); // immediate ack
}

@Override
protected CompletionStage<Void> onWriteNack(Throwable error, Metadata metadata) {
SubscribeMessageMetadata jetStreamMetadata =
metadata.get(SubscribeMessageMetadata.class).orElseThrow();
PublishMessageMetadata jetStreamMetadata =
metadata.get(PublishMessageMetadata.class).orElseThrow();
String id = jetStreamMetadata.messageId();
String subject = jetStreamMetadata.subjectOptional().orElse("<?>");
String subject = jetStreamMetadata.subject();
LOGGER.error("Failed to write event: messageId={}, subject={}", id, subject, error);
return CompletableFuture.completedFuture(null); // immediate ack
}
Expand Down
1 change: 0 additions & 1 deletion events/ri/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ mp.messaging.outgoing.nessie-kafka-json.value.serializer=org.projectnessie.event
# NATS Json producer configuration
mp.messaging.outgoing.nessie-nats-json.connector=quarkus-jetstream
mp.messaging.outgoing.nessie-nats-json.stream=nessie-events
mp.messaging.outgoing.nessie-nats-json.subject=nessie.events
quarkus.messaging.nats.jet-stream.streams[0].name=nessie-events
quarkus.messaging.nats.jet-stream.streams[0].subjects[0]=nessie.events.reference.created
quarkus.messaging.nats.jet-stream.streams[0].subjects[1]=nessie.events.reference.deleted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import static org.projectnessie.events.ri.messaging.MessageHeaders.REPOSITORY_ID;
import static org.projectnessie.events.ri.messaging.MessageHeaders.SPEC_VERSION;

import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.PublishMessageMetadata;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.SubscribeMessageMetadata;
import io.quarkus.test.junit.QuarkusTest;
import jakarta.inject.Inject;
import java.util.List;
Expand Down Expand Up @@ -55,8 +55,8 @@ protected Message<Event> receive() {

@Override
protected void checkMessage(Message<Event> message, Event expectedPayload) {
PublishMessageMetadata metadata =
message.getMetadata().get(PublishMessageMetadata.class).orElseThrow();
SubscribeMessageMetadata metadata =
message.getMetadata().get(SubscribeMessageMetadata.class).orElseThrow();
assertThat(metadata.messageId()).isEqualTo(expectedPayload.getIdAsText());
String subject = AbstractNatsEventSubscriber.subject(expectedPayload);
assertThat(metadata.subject()).isEqualTo(subject);
Expand Down

0 comments on commit 65b2ec8

Please sign in to comment.