Skip to content

Commit

Permalink
Address duplicate code comments
Browse files Browse the repository at this point in the history
  • Loading branch information
xunyin8 committed Jan 16, 2025
1 parent 886a378 commit a5260e7
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,15 @@ protected void processMessageAndMaybeProduceToKafka(
// call in this context much less obtrusive, however, it implies that all views can only work for AA stores

// Write to views
Runnable produceToVersionTopic = () -> producePutOrDeleteToKafka(
mergeConflictResultWrapper,
partitionConsumptionState,
keyBytes,
consumerRecord,
partition,
kafkaUrl,
kafkaClusterId,
beforeProcessingRecordTimestampNs);
if (hasViewWriters()) {
/**
* The ordering guarantees we want is the following:
Expand All @@ -659,28 +668,11 @@ protected void processMessageAndMaybeProduceToKafka(
mergeConflictResult.getValueSchemaId(),
oldValueSchemaId,
mergeConflictResult.getRmdRecord()),
(pcs) -> producePutOrDeleteToKafka(
mergeConflictResultWrapper,
pcs,
keyBytes,
consumerRecord,
partition,
kafkaUrl,
kafkaClusterId,
beforeProcessingRecordTimestampNs));
produceToVersionTopic);
} else {
// This function may modify the original record in KME and it is unsafe to use the payload from KME directly
// after
// this call.
producePutOrDeleteToKafka(
mergeConflictResultWrapper,
partitionConsumptionState,
keyBytes,
consumerRecord,
partition,
kafkaUrl,
kafkaClusterId,
beforeProcessingRecordTimestampNs);
// after this call.
produceToVersionTopic.run();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongPredicate;
import java.util.function.Predicate;
Expand Down Expand Up @@ -3369,29 +3368,23 @@ protected void processMessageAndMaybeProduceToKafka(
if (msgType.equals(UPDATE) && writeComputeResultWrapper.isSkipProduce()) {
return;
}
Runnable produceToVersionTopic = () -> produceToLocalKafkaHelper(
consumerRecord,
partitionConsumptionState,
writeComputeResultWrapper,
partition,
kafkaUrl,
kafkaClusterId,
beforeProcessingRecordTimestampNs);
// Write to views
if (hasViewWriters()) {
Put newPut = writeComputeResultWrapper.getNewPut();
queueUpVersionTopicWritesWithViewWriters(
partitionConsumptionState,
(viewWriter) -> viewWriter.processRecord(newPut.putValue, keyBytes, newPut.schemaId),
(pcs) -> produceToLocalKafkaHelper(
consumerRecord,
pcs,
writeComputeResultWrapper,
partition,
kafkaUrl,
kafkaClusterId,
beforeProcessingRecordTimestampNs));
produceToVersionTopic);
} else {
produceToLocalKafkaHelper(
consumerRecord,
partitionConsumptionState,
writeComputeResultWrapper,
partition,
kafkaUrl,
kafkaClusterId,
beforeProcessingRecordTimestampNs);
produceToVersionTopic.run();
}
}

Expand Down Expand Up @@ -3952,7 +3945,7 @@ protected void resubscribeAsLeader(PartitionConsumptionState partitionConsumptio
protected void queueUpVersionTopicWritesWithViewWriters(
PartitionConsumptionState partitionConsumptionState,
Function<VeniceViewWriter, CompletableFuture<PubSubProduceResult>> viewWriterRecordProcessor,
Consumer<PartitionConsumptionState> versionTopicWrite) {
Runnable versionTopicWrite) {
long preprocessingTime = System.currentTimeMillis();
CompletableFuture<Void> currentVersionTopicWrite = new CompletableFuture<>();
CompletableFuture[] viewWriterFutures = new CompletableFuture[this.viewWriters.size() + 1];
Expand All @@ -3966,7 +3959,7 @@ protected void queueUpVersionTopicWritesWithViewWriters(
CompletableFuture.allOf(viewWriterFutures).whenCompleteAsync((value, exception) -> {
hostLevelIngestionStats.recordViewProducerAckLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime));
if (exception == null) {
versionTopicWrite.accept(partitionConsumptionState);
versionTopicWrite.run();
currentVersionTopicWrite.complete(null);
} else {
VeniceException veniceException = new VeniceException(exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ public void testQueueUpVersionTopicWritesWithViewWriters() throws InterruptedExc
leaderFollowerStoreIngestionTask.queueUpVersionTopicWritesWithViewWriters(
mockPartitionConsumptionState,
(viewWriter) -> viewWriter.processRecord(mock(ByteBuffer.class), new byte[1], 1),
(pcs) -> writeToVersionTopic.set(true));
() -> writeToVersionTopic.set(true));
verify(mockPartitionConsumptionState, times(1)).getLastVTProduceCallFuture();
ArgumentCaptor<CompletableFuture> vtWriteFutureCaptor = ArgumentCaptor.forClass(CompletableFuture.class);
verify(mockPartitionConsumptionState, times(1)).setLastVTProduceCallFuture(vtWriteFutureCaptor.capture());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.function.BiFunction;


/**
Expand Down Expand Up @@ -51,8 +51,9 @@ public CompletableFuture<PubSubProduceResult> put(
int valueSchemaId,
PubSubProducerCallback callback) {
return compositeOperation(
(writer) -> writer.put(key, value, valueSchemaId, childCallback),
(writer) -> writer.put(key, value, valueSchemaId, callback));
(writer, writeCallback) -> writer.put(key, value, valueSchemaId, writeCallback),
childCallback,
callback);
}

@Override
Expand All @@ -63,22 +64,16 @@ public Future<PubSubProduceResult> put(
PubSubProducerCallback callback,
PutMetadata putMetadata) {
return compositeOperation(
(writer) -> writer.put(
(writer, writeCallback) -> writer.put(
key,
value,
valueSchemaId,
childCallback,
writeCallback,
DEFAULT_LEADER_METADATA_WRAPPER,
APP_DEFAULT_LOGICAL_TS,
putMetadata),
(writer) -> writer.put(
key,
value,
valueSchemaId,
callback,
DEFAULT_LEADER_METADATA_WRAPPER,
APP_DEFAULT_LOGICAL_TS,
putMetadata));
childCallback,
callback);
}

@Override
Expand All @@ -87,8 +82,9 @@ public CompletableFuture<PubSubProduceResult> delete(
PubSubProducerCallback callback,
DeleteMetadata deleteMetadata) {
return compositeOperation(
(writer) -> writer.delete(key, callback, deleteMetadata),
(writer) -> writer.delete(key, callback, deleteMetadata));
(writer, writeCallback) -> writer.delete(key, writeCallback, deleteMetadata),
childCallback,
callback);
}

/**
Expand Down Expand Up @@ -131,18 +127,19 @@ public void close() throws IOException {
* completes the mainWriterOp.
*/
private CompletableFuture<PubSubProduceResult> compositeOperation(
Function<VeniceWriter<K, V, U>, CompletableFuture<PubSubProduceResult>> childWriterOp,
Function<VeniceWriter<K, V, U>, CompletableFuture<PubSubProduceResult>> mainWriterOp) {
BiFunction<VeniceWriter<K, V, U>, PubSubProducerCallback, CompletableFuture<PubSubProduceResult>> writerOperation,
PubSubProducerCallback childWriterCallback,
PubSubProducerCallback mainWriterCallback) {
CompletableFuture<PubSubProduceResult> finalFuture = new CompletableFuture<>();
CompletableFuture[] childFutures = new CompletableFuture[childWriters.length + 1];
int index = 0;
childFutures[index++] = lastWriteFuture;
for (VeniceWriter<K, V, U> writer: childWriters) {
childFutures[index++] = childWriterOp.apply(writer);
childFutures[index++] = writerOperation.apply(writer, childWriterCallback);
}
CompletableFuture.allOf(childFutures).whenCompleteAsync((ignored, childException) -> {
if (childException == null) {
mainWriterOp.apply(mainWriter).whenCompleteAsync((result, mainWriterException) -> {
writerOperation.apply(mainWriter, mainWriterCallback).whenCompleteAsync((result, mainWriterException) -> {
if (mainWriterException == null) {
finalFuture.complete(result);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,27 +36,31 @@ public void testWritesAreInOrder() throws InterruptedException {
VeniceWriter<byte[], byte[], byte[]> mockMainWriter = mock(VeniceWriter.class);
CompletableFuture<PubSubProduceResult> mainWriterFuture = CompletableFuture.completedFuture(null);
doReturn(mainWriterFuture).when(mockMainWriter).put(any(), any(), anyInt(), eq(null));
PubSubProducerCallback deletePubSubProducerCallback = mock(PubSubProducerCallback.class);
PubSubProducerCallback childPubSubProducerCallback = mock(PubSubProducerCallback.class);
DeleteMetadata deleteMetadata = mock(DeleteMetadata.class);
doReturn(mainWriterFuture).when(mockMainWriter).delete(any(), eq(deletePubSubProducerCallback), eq(deleteMetadata));
doReturn(mainWriterFuture).when(mockMainWriter).delete(any(), eq(null), eq(deleteMetadata));
VeniceWriter<byte[], byte[], byte[]> mockChildWriter = mock(VeniceWriter.class);
CompletableFuture<PubSubProduceResult> childWriterFuture = new CompletableFuture<>();
doReturn(childWriterFuture).when(mockChildWriter).put(any(), any(), anyInt(), eq(null));
doReturn(childWriterFuture).when(mockChildWriter).put(any(), any(), anyInt(), eq(childPubSubProducerCallback));
doReturn(childWriterFuture).when(mockChildWriter)
.delete(any(), eq(deletePubSubProducerCallback), eq(deleteMetadata));
.delete(any(), eq(childPubSubProducerCallback), eq(deleteMetadata));
VeniceWriter[] childWriters = new VeniceWriter[1];
childWriters[0] = mockChildWriter;
AbstractVeniceWriter<byte[], byte[], byte[]> compositeVeniceWriter =
new CompositeVeniceWriter<byte[], byte[], byte[]>("test_v1", mockMainWriter, childWriters, null);
new CompositeVeniceWriter<byte[], byte[], byte[]>(
"test_v1",
mockMainWriter,
childWriters,
childPubSubProducerCallback);
compositeVeniceWriter.put(new byte[1], new byte[1], 1, null);
compositeVeniceWriter.delete(new byte[1], deletePubSubProducerCallback, deleteMetadata);
compositeVeniceWriter.delete(new byte[1], null, deleteMetadata);
verify(mockMainWriter, never()).put(any(), any(), anyInt(), eq(null));
verify(mockMainWriter, never()).delete(any(), eq(deletePubSubProducerCallback), eq(deleteMetadata));
verify(mockMainWriter, never()).delete(any(), eq(childPubSubProducerCallback), eq(deleteMetadata));
Thread.sleep(1000);
verify(mockMainWriter, never()).put(any(), any(), anyInt(), eq(null));
verify(mockMainWriter, never()).delete(any(), eq(deletePubSubProducerCallback), eq(deleteMetadata));
verify(mockMainWriter, never()).delete(any(), eq(childPubSubProducerCallback), eq(deleteMetadata));
childWriterFuture.complete(null);
verify(mockMainWriter, timeout(1000)).put(any(), any(), anyInt(), eq(null));
verify(mockMainWriter, timeout(1000)).delete(any(), eq(deletePubSubProducerCallback), eq(deleteMetadata));
verify(mockMainWriter, timeout(1000)).delete(any(), eq(null), eq(deleteMetadata));
}
}

0 comments on commit a5260e7

Please sign in to comment.