Skip to content

Commit

Permalink
Address duplicate code comments
Browse files Browse the repository at this point in the history
  • Loading branch information
xunyin8 committed Jan 16, 2025
1 parent 886a378 commit 979089c
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,15 @@ protected void processMessageAndMaybeProduceToKafka(
// call in this context much less obtrusive, however, it implies that all views can only work for AA stores

// Write to views
Runnable produceToVersionTopic = () -> producePutOrDeleteToKafka(
mergeConflictResultWrapper,
partitionConsumptionState,
keyBytes,
consumerRecord,
partition,
kafkaUrl,
kafkaClusterId,
beforeProcessingRecordTimestampNs);
if (hasViewWriters()) {
/**
* The ordering guarantees we want is the following:
Expand All @@ -659,28 +668,11 @@ protected void processMessageAndMaybeProduceToKafka(
mergeConflictResult.getValueSchemaId(),
oldValueSchemaId,
mergeConflictResult.getRmdRecord()),
(pcs) -> producePutOrDeleteToKafka(
mergeConflictResultWrapper,
pcs,
keyBytes,
consumerRecord,
partition,
kafkaUrl,
kafkaClusterId,
beforeProcessingRecordTimestampNs));
produceToVersionTopic);
} else {
// This function may modify the original record in KME and it is unsafe to use the payload from KME directly
// after
// this call.
producePutOrDeleteToKafka(
mergeConflictResultWrapper,
partitionConsumptionState,
keyBytes,
consumerRecord,
partition,
kafkaUrl,
kafkaClusterId,
beforeProcessingRecordTimestampNs);
// after this call.
produceToVersionTopic.run();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongPredicate;
import java.util.function.Predicate;
Expand Down Expand Up @@ -3369,29 +3368,23 @@ protected void processMessageAndMaybeProduceToKafka(
if (msgType.equals(UPDATE) && writeComputeResultWrapper.isSkipProduce()) {
return;
}
Runnable produceToVersionTopic = () -> produceToLocalKafkaHelper(
consumerRecord,
partitionConsumptionState,
writeComputeResultWrapper,
partition,
kafkaUrl,
kafkaClusterId,
beforeProcessingRecordTimestampNs);
// Write to views
if (hasViewWriters()) {
Put newPut = writeComputeResultWrapper.getNewPut();
queueUpVersionTopicWritesWithViewWriters(
partitionConsumptionState,
(viewWriter) -> viewWriter.processRecord(newPut.putValue, keyBytes, newPut.schemaId),
(pcs) -> produceToLocalKafkaHelper(
consumerRecord,
pcs,
writeComputeResultWrapper,
partition,
kafkaUrl,
kafkaClusterId,
beforeProcessingRecordTimestampNs));
produceToVersionTopic);
} else {
produceToLocalKafkaHelper(
consumerRecord,
partitionConsumptionState,
writeComputeResultWrapper,
partition,
kafkaUrl,
kafkaClusterId,
beforeProcessingRecordTimestampNs);
produceToVersionTopic.run();
}
}

Expand Down Expand Up @@ -3952,7 +3945,7 @@ protected void resubscribeAsLeader(PartitionConsumptionState partitionConsumptio
protected void queueUpVersionTopicWritesWithViewWriters(
PartitionConsumptionState partitionConsumptionState,
Function<VeniceViewWriter, CompletableFuture<PubSubProduceResult>> viewWriterRecordProcessor,
Consumer<PartitionConsumptionState> versionTopicWrite) {
Runnable versionTopicWrite) {
long preprocessingTime = System.currentTimeMillis();
CompletableFuture<Void> currentVersionTopicWrite = new CompletableFuture<>();
CompletableFuture[] viewWriterFutures = new CompletableFuture[this.viewWriters.size() + 1];
Expand All @@ -3966,7 +3959,7 @@ protected void queueUpVersionTopicWritesWithViewWriters(
CompletableFuture.allOf(viewWriterFutures).whenCompleteAsync((value, exception) -> {
hostLevelIngestionStats.recordViewProducerAckLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime));
if (exception == null) {
versionTopicWrite.accept(partitionConsumptionState);
versionTopicWrite.run();
currentVersionTopicWrite.complete(null);
} else {
VeniceException veniceException = new VeniceException(exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ public void testQueueUpVersionTopicWritesWithViewWriters() throws InterruptedExc
leaderFollowerStoreIngestionTask.queueUpVersionTopicWritesWithViewWriters(
mockPartitionConsumptionState,
(viewWriter) -> viewWriter.processRecord(mock(ByteBuffer.class), new byte[1], 1),
(pcs) -> writeToVersionTopic.set(true));
() -> writeToVersionTopic.set(true));
verify(mockPartitionConsumptionState, times(1)).getLastVTProduceCallFuture();
ArgumentCaptor<CompletableFuture> vtWriteFutureCaptor = ArgumentCaptor.forClass(CompletableFuture.class);
verify(mockPartitionConsumptionState, times(1)).setLastVTProduceCallFuture(vtWriteFutureCaptor.capture());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.function.BiFunction;


/**
Expand Down Expand Up @@ -51,8 +51,9 @@ public CompletableFuture<PubSubProduceResult> put(
int valueSchemaId,
PubSubProducerCallback callback) {
return compositeOperation(
(writer) -> writer.put(key, value, valueSchemaId, childCallback),
(writer) -> writer.put(key, value, valueSchemaId, callback));
(writer, writeCallback) -> writer.put(key, value, valueSchemaId, writeCallback),
childCallback,
callback);
}

@Override
Expand All @@ -63,22 +64,16 @@ public Future<PubSubProduceResult> put(
PubSubProducerCallback callback,
PutMetadata putMetadata) {
return compositeOperation(
(writer) -> writer.put(
(writer, writeCallback) -> writer.put(
key,
value,
valueSchemaId,
childCallback,
writeCallback,
DEFAULT_LEADER_METADATA_WRAPPER,
APP_DEFAULT_LOGICAL_TS,
putMetadata),
(writer) -> writer.put(
key,
value,
valueSchemaId,
callback,
DEFAULT_LEADER_METADATA_WRAPPER,
APP_DEFAULT_LOGICAL_TS,
putMetadata));
childCallback,
callback);
}

@Override
Expand All @@ -87,8 +82,9 @@ public CompletableFuture<PubSubProduceResult> delete(
PubSubProducerCallback callback,
DeleteMetadata deleteMetadata) {
return compositeOperation(
(writer) -> writer.delete(key, callback, deleteMetadata),
(writer) -> writer.delete(key, callback, deleteMetadata));
(writer, writeCallback) -> writer.delete(key, writeCallback, deleteMetadata),
childCallback,
callback);
}

/**
Expand Down Expand Up @@ -131,18 +127,19 @@ public void close() throws IOException {
* completes the mainWriterOp.
*/
private CompletableFuture<PubSubProduceResult> compositeOperation(
Function<VeniceWriter<K, V, U>, CompletableFuture<PubSubProduceResult>> childWriterOp,
Function<VeniceWriter<K, V, U>, CompletableFuture<PubSubProduceResult>> mainWriterOp) {
BiFunction<VeniceWriter<K, V, U>, PubSubProducerCallback, CompletableFuture<PubSubProduceResult>> writerOperation,
PubSubProducerCallback childWriterCallback,
PubSubProducerCallback mainWriterCallback) {
CompletableFuture<PubSubProduceResult> finalFuture = new CompletableFuture<>();
CompletableFuture[] childFutures = new CompletableFuture[childWriters.length + 1];
int index = 0;
childFutures[index++] = lastWriteFuture;
for (VeniceWriter<K, V, U> writer: childWriters) {
childFutures[index++] = childWriterOp.apply(writer);
childFutures[index++] = writerOperation.apply(writer, childWriterCallback);
}
CompletableFuture.allOf(childFutures).whenCompleteAsync((ignored, childException) -> {
if (childException == null) {
mainWriterOp.apply(mainWriter).whenCompleteAsync((result, mainWriterException) -> {
writerOperation.apply(mainWriter, mainWriterCallback).whenCompleteAsync((result, mainWriterException) -> {
if (mainWriterException == null) {
finalFuture.complete(result);
} else {
Expand Down

0 comments on commit 979089c

Please sign in to comment.