Skip to content

Commit

Permalink
Address duplicate code comments
Browse files Browse the repository at this point in the history
  • Loading branch information
xunyin8 committed Jan 17, 2025
1 parent e201bdc commit ddc56a0
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,15 @@ protected void processMessageAndMaybeProduceToKafka(
// call in this context much less obtrusive, however, it implies that all views can only work for AA stores

// Write to views
Runnable produceToVersionTopic = () -> producePutOrDeleteToKafka(
mergeConflictResultWrapper,
partitionConsumptionState,
keyBytes,
consumerRecord,
partition,
kafkaUrl,
kafkaClusterId,
beforeProcessingRecordTimestampNs);
if (hasViewWriters()) {
/**
* The ordering guarantees we want is the following:
Expand All @@ -659,28 +668,11 @@ protected void processMessageAndMaybeProduceToKafka(
mergeConflictResult.getValueSchemaId(),
oldValueSchemaId,
mergeConflictResult.getRmdRecord()),
(pcs) -> producePutOrDeleteToKafka(
mergeConflictResultWrapper,
pcs,
keyBytes,
consumerRecord,
partition,
kafkaUrl,
kafkaClusterId,
beforeProcessingRecordTimestampNs));
produceToVersionTopic);
} else {
// This function may modify the original record in KME and it is unsafe to use the payload from KME directly
// after
// this call.
producePutOrDeleteToKafka(
mergeConflictResultWrapper,
partitionConsumptionState,
keyBytes,
consumerRecord,
partition,
kafkaUrl,
kafkaClusterId,
beforeProcessingRecordTimestampNs);
// after this call.
produceToVersionTopic.run();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongPredicate;
import java.util.function.Predicate;
Expand Down Expand Up @@ -3369,29 +3368,23 @@ protected void processMessageAndMaybeProduceToKafka(
if (msgType.equals(UPDATE) && writeComputeResultWrapper.isSkipProduce()) {
return;
}
Runnable produceToVersionTopic = () -> produceToLocalKafkaHelper(
consumerRecord,
partitionConsumptionState,
writeComputeResultWrapper,
partition,
kafkaUrl,
kafkaClusterId,
beforeProcessingRecordTimestampNs);
// Write to views
if (hasViewWriters()) {
Put newPut = writeComputeResultWrapper.getNewPut();
queueUpVersionTopicWritesWithViewWriters(
partitionConsumptionState,
(viewWriter) -> viewWriter.processRecord(newPut.putValue, keyBytes, newPut.schemaId),
(pcs) -> produceToLocalKafkaHelper(
consumerRecord,
pcs,
writeComputeResultWrapper,
partition,
kafkaUrl,
kafkaClusterId,
beforeProcessingRecordTimestampNs));
produceToVersionTopic);
} else {
produceToLocalKafkaHelper(
consumerRecord,
partitionConsumptionState,
writeComputeResultWrapper,
partition,
kafkaUrl,
kafkaClusterId,
beforeProcessingRecordTimestampNs);
produceToVersionTopic.run();
}
}

Expand Down Expand Up @@ -3952,7 +3945,7 @@ protected void resubscribeAsLeader(PartitionConsumptionState partitionConsumptio
protected void queueUpVersionTopicWritesWithViewWriters(
PartitionConsumptionState partitionConsumptionState,
Function<VeniceViewWriter, CompletableFuture<PubSubProduceResult>> viewWriterRecordProcessor,
Consumer<PartitionConsumptionState> versionTopicWrite) {
Runnable versionTopicWrite) {
long preprocessingTime = System.currentTimeMillis();
CompletableFuture<Void> currentVersionTopicWrite = new CompletableFuture<>();
CompletableFuture[] viewWriterFutures = new CompletableFuture[this.viewWriters.size() + 1];
Expand All @@ -3966,7 +3959,7 @@ protected void queueUpVersionTopicWritesWithViewWriters(
CompletableFuture.allOf(viewWriterFutures).whenCompleteAsync((value, exception) -> {
hostLevelIngestionStats.recordViewProducerAckLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime));
if (exception == null) {
versionTopicWrite.accept(partitionConsumptionState);
versionTopicWrite.run();
currentVersionTopicWrite.complete(null);
} else {
VeniceException veniceException = new VeniceException(exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ public void testQueueUpVersionTopicWritesWithViewWriters() throws InterruptedExc
leaderFollowerStoreIngestionTask.queueUpVersionTopicWritesWithViewWriters(
mockPartitionConsumptionState,
(viewWriter) -> viewWriter.processRecord(mock(ByteBuffer.class), new byte[1], 1),
(pcs) -> writeToVersionTopic.set(true));
() -> writeToVersionTopic.set(true));
verify(mockPartitionConsumptionState, times(1)).getLastVTProduceCallFuture();
ArgumentCaptor<CompletableFuture> vtWriteFutureCaptor = ArgumentCaptor.forClass(CompletableFuture.class);
verify(mockPartitionConsumptionState, times(1)).setLastVTProduceCallFuture(vtWriteFutureCaptor.capture());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import com.linkedin.venice.meta.ViewConfig;
import com.linkedin.venice.utils.ObjectMapperFactory;
import com.linkedin.venice.utils.ReflectUtils;
import com.linkedin.venice.utils.VeniceProperties;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -53,4 +55,18 @@ public static Map<String, ViewConfig> parseViewConfigMapString(String flatViewCo
}
return viewConfigMap;
}

public static Map<String, VeniceProperties> getViewTopicsAndConfigs(
Collection<ViewConfig> viewConfigs,
Properties veniceViewProperties,
String storeName,
int version) {
Map<String, VeniceProperties> viewTopicNamesAndConfigs = new HashMap<>();
for (ViewConfig rawView: viewConfigs) {
VeniceView veniceView =
getVeniceView(rawView.getViewClassName(), veniceViewProperties, storeName, rawView.getViewParameters());
viewTopicNamesAndConfigs.putAll(veniceView.getTopicNamesAndConfigsForVersion(version));
}
return viewTopicNamesAndConfigs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.function.BiFunction;


/**
Expand Down Expand Up @@ -51,8 +51,9 @@ public CompletableFuture<PubSubProduceResult> put(
int valueSchemaId,
PubSubProducerCallback callback) {
return compositeOperation(
(writer) -> writer.put(key, value, valueSchemaId, childCallback),
(writer) -> writer.put(key, value, valueSchemaId, callback));
(writer, writeCallback) -> writer.put(key, value, valueSchemaId, writeCallback),
childCallback,
callback);
}

@Override
Expand All @@ -63,22 +64,16 @@ public Future<PubSubProduceResult> put(
PubSubProducerCallback callback,
PutMetadata putMetadata) {
return compositeOperation(
(writer) -> writer.put(
(writer, writeCallback) -> writer.put(
key,
value,
valueSchemaId,
childCallback,
writeCallback,
DEFAULT_LEADER_METADATA_WRAPPER,
APP_DEFAULT_LOGICAL_TS,
putMetadata),
(writer) -> writer.put(
key,
value,
valueSchemaId,
callback,
DEFAULT_LEADER_METADATA_WRAPPER,
APP_DEFAULT_LOGICAL_TS,
putMetadata));
childCallback,
callback);
}

@Override
Expand All @@ -87,8 +82,9 @@ public CompletableFuture<PubSubProduceResult> delete(
PubSubProducerCallback callback,
DeleteMetadata deleteMetadata) {
return compositeOperation(
(writer) -> writer.delete(key, callback, deleteMetadata),
(writer) -> writer.delete(key, callback, deleteMetadata));
(writer, writeCallback) -> writer.delete(key, writeCallback, deleteMetadata),
childCallback,
callback);
}

/**
Expand Down Expand Up @@ -131,18 +127,19 @@ public void close() throws IOException {
* completes the mainWriterOp.
*/
private CompletableFuture<PubSubProduceResult> compositeOperation(
Function<VeniceWriter<K, V, U>, CompletableFuture<PubSubProduceResult>> childWriterOp,
Function<VeniceWriter<K, V, U>, CompletableFuture<PubSubProduceResult>> mainWriterOp) {
BiFunction<VeniceWriter<K, V, U>, PubSubProducerCallback, CompletableFuture<PubSubProduceResult>> writerOperation,
PubSubProducerCallback childWriterCallback,
PubSubProducerCallback mainWriterCallback) {
CompletableFuture<PubSubProduceResult> finalFuture = new CompletableFuture<>();
CompletableFuture[] childFutures = new CompletableFuture[childWriters.length + 1];
int index = 0;
childFutures[index++] = lastWriteFuture;
for (VeniceWriter<K, V, U> writer: childWriters) {
childFutures[index++] = childWriterOp.apply(writer);
childFutures[index++] = writerOperation.apply(writer, childWriterCallback);
}
CompletableFuture.allOf(childFutures).whenCompleteAsync((ignored, childException) -> {
if (childException == null) {
mainWriterOp.apply(mainWriter).whenCompleteAsync((result, mainWriterException) -> {
writerOperation.apply(mainWriter, mainWriterCallback).whenCompleteAsync((result, mainWriterException) -> {
if (mainWriterException == null) {
finalFuture.complete(result);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,27 +36,31 @@ public void testWritesAreInOrder() throws InterruptedException {
VeniceWriter<byte[], byte[], byte[]> mockMainWriter = mock(VeniceWriter.class);
CompletableFuture<PubSubProduceResult> mainWriterFuture = CompletableFuture.completedFuture(null);
doReturn(mainWriterFuture).when(mockMainWriter).put(any(), any(), anyInt(), eq(null));
PubSubProducerCallback deletePubSubProducerCallback = mock(PubSubProducerCallback.class);
PubSubProducerCallback childPubSubProducerCallback = mock(PubSubProducerCallback.class);
DeleteMetadata deleteMetadata = mock(DeleteMetadata.class);
doReturn(mainWriterFuture).when(mockMainWriter).delete(any(), eq(deletePubSubProducerCallback), eq(deleteMetadata));
doReturn(mainWriterFuture).when(mockMainWriter).delete(any(), eq(null), eq(deleteMetadata));
VeniceWriter<byte[], byte[], byte[]> mockChildWriter = mock(VeniceWriter.class);
CompletableFuture<PubSubProduceResult> childWriterFuture = new CompletableFuture<>();
doReturn(childWriterFuture).when(mockChildWriter).put(any(), any(), anyInt(), eq(null));
doReturn(childWriterFuture).when(mockChildWriter).put(any(), any(), anyInt(), eq(childPubSubProducerCallback));
doReturn(childWriterFuture).when(mockChildWriter)
.delete(any(), eq(deletePubSubProducerCallback), eq(deleteMetadata));
.delete(any(), eq(childPubSubProducerCallback), eq(deleteMetadata));
VeniceWriter[] childWriters = new VeniceWriter[1];
childWriters[0] = mockChildWriter;
AbstractVeniceWriter<byte[], byte[], byte[]> compositeVeniceWriter =
new CompositeVeniceWriter<byte[], byte[], byte[]>("test_v1", mockMainWriter, childWriters, null);
new CompositeVeniceWriter<byte[], byte[], byte[]>(
"test_v1",
mockMainWriter,
childWriters,
childPubSubProducerCallback);
compositeVeniceWriter.put(new byte[1], new byte[1], 1, null);
compositeVeniceWriter.delete(new byte[1], deletePubSubProducerCallback, deleteMetadata);
compositeVeniceWriter.delete(new byte[1], null, deleteMetadata);
verify(mockMainWriter, never()).put(any(), any(), anyInt(), eq(null));
verify(mockMainWriter, never()).delete(any(), eq(deletePubSubProducerCallback), eq(deleteMetadata));
verify(mockMainWriter, never()).delete(any(), eq(childPubSubProducerCallback), eq(deleteMetadata));
Thread.sleep(1000);
verify(mockMainWriter, never()).put(any(), any(), anyInt(), eq(null));
verify(mockMainWriter, never()).delete(any(), eq(deletePubSubProducerCallback), eq(deleteMetadata));
verify(mockMainWriter, never()).delete(any(), eq(childPubSubProducerCallback), eq(deleteMetadata));
childWriterFuture.complete(null);
verify(mockMainWriter, timeout(1000)).put(any(), any(), anyInt(), eq(null));
verify(mockMainWriter, timeout(1000)).delete(any(), eq(deletePubSubProducerCallback), eq(deleteMetadata));
verify(mockMainWriter, timeout(1000)).delete(any(), eq(null), eq(deleteMetadata));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2456,14 +2456,10 @@ private void constructViewResources(Properties params, Store store, Version vers
// Construct Kafka topics
// TODO: Today we only have support for creating Kafka topics as a resource for a given view, but later we would
// like to add support for potentially other resource types (maybe helix RG's as an example?)
Map<String, VeniceProperties> topicNamesAndConfigs = new HashMap<>();
for (ViewConfig rawView: viewConfigs.values()) {
VeniceView adminView =
ViewUtils.getVeniceView(rawView.getViewClassName(), params, store.getName(), rawView.getViewParameters());
topicNamesAndConfigs.putAll(adminView.getTopicNamesAndConfigsForVersion(version.getNumber()));
}
Map<String, VeniceProperties> viewTopicNamesAndConfigs =
ViewUtils.getViewTopicsAndConfigs(viewConfigs.values(), params, store.getName(), version.getNumber());
TopicManager topicManager = getTopicManager();
for (Map.Entry<String, VeniceProperties> topicNameAndConfigs: topicNamesAndConfigs.entrySet()) {
for (Map.Entry<String, VeniceProperties> topicNameAndConfigs: viewTopicNamesAndConfigs.entrySet()) {
String materializedViewTopicName = topicNameAndConfigs.getKey();
PubSubTopic kafkaTopic = pubSubTopicRepository.getTopic(materializedViewTopicName);
VeniceProperties kafkaTopicConfigs = topicNameAndConfigs.getValue();
Expand Down Expand Up @@ -3951,43 +3947,29 @@ public void topicCleanupWhenPushComplete(String clusterName, String storeName, i
ReadWriteStoreRepository storeRepository = resources.getStoreMetadataRepository();
Store store = storeRepository.getStore(storeName);
if (store.isHybrid() && clusterConfig.isKafkaLogCompactionForHybridStoresEnabled()) {
PubSubTopic versionTopic = pubSubTopicRepository.getTopic(Version.composeKafkaTopic(storeName, versionNumber));
PubSubTopic versionTopic =
getPubSubTopicRepository().getTopic(Version.composeKafkaTopic(storeName, versionNumber));
long minCompactionLagSeconds = store.getMinCompactionLagSeconds();
long expectedMinCompactionLagMs =
minCompactionLagSeconds > 0 ? minCompactionLagSeconds * Time.MS_PER_SECOND : minCompactionLagSeconds;
long maxCompactionLagSeconds = store.getMaxCompactionLagSeconds();
long expectedMaxCompactionLagMs =
maxCompactionLagSeconds > 0 ? maxCompactionLagSeconds * Time.MS_PER_SECOND : maxCompactionLagSeconds;
getTopicManager().updateTopicCompactionPolicy(
versionTopic,
Consumer<PubSubTopic> updateTopicCompaction = (topic) -> getTopicManager().updateTopicCompactionPolicy(
topic,
true,
expectedMinCompactionLagMs,
expectedMaxCompactionLagMs > 0 ? Optional.of(expectedMaxCompactionLagMs) : Optional.empty());
updateTopicCompaction.accept(versionTopic);

// Compaction settings should also be applied to corresponding materialized view topics
// Compaction settings should also be applied to corresponding view topics
Map<String, ViewConfig> viewConfigs = store.getVersionOrThrow(versionNumber).getViewConfigs();
if (viewConfigs != null && !viewConfigs.isEmpty()) {
Set<String> viewTopicsToUpdate = new HashSet<>();
for (ViewConfig rawViewConfig: viewConfigs.values()) {
if (MaterializedView.class.getCanonicalName().equals(rawViewConfig.getViewClassName())) {
viewTopicsToUpdate.addAll(
ViewUtils
.getVeniceView(
rawViewConfig.getViewClassName(),
new Properties(),
storeName,
rawViewConfig.getViewParameters())
.getTopicNamesAndConfigsForVersion(versionNumber)
.keySet());
}
}
for (String topic: viewTopicsToUpdate) {
PubSubTopic viewTopic = pubSubTopicRepository.getTopic(topic);
getTopicManager().updateTopicCompactionPolicy(
viewTopic,
true,
expectedMinCompactionLagMs,
expectedMaxCompactionLagMs > 0 ? Optional.of(expectedMaxCompactionLagMs) : Optional.empty());
Map<String, VeniceProperties> viewTopicNamesAndConfigs =
ViewUtils.getViewTopicsAndConfigs(viewConfigs.values(), new Properties(), storeName, versionNumber);
for (String topic: viewTopicNamesAndConfigs.keySet()) {
PubSubTopic viewTopic = getPubSubTopicRepository().getTopic(topic);
updateTopicCompaction.accept(viewTopic);
}
}
}
Expand Down
Loading

0 comments on commit ddc56a0

Please sign in to comment.