From 51028b4c72960e167e4d8f15dd197280f6a2c39c Mon Sep 17 00:00:00 2001 From: Xun Yin Date: Thu, 2 Jan 2025 10:52:41 -0800 Subject: [PATCH] Address more review comments --- .../ActiveActiveStoreIngestionTask.java | 6 +- .../LeaderFollowerStoreIngestionTask.java | 5 +- .../stats/HostLevelIngestionStats.java | 11 ++++ .../store/view/MaterializedViewWriter.java | 62 ++++++++++--------- .../view/MaterializedViewWriterTest.java | 14 ++--- .../linkedin/venice/hadoop/VenicePushJob.java | 9 +-- .../hadoop/TestVenicePushJobCheckpoints.java | 4 ++ .../datawriter/reduce/TestVeniceReducer.java | 4 +- .../venice/writer/AbstractVeniceWriter.java | 2 +- .../venice/writer/CompositeVeniceWriter.java | 38 +++++++++++- .../linkedin/venice/writer/VeniceWriter.java | 35 ++++++----- 11 files changed, 124 insertions(+), 66 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java index e861f3247e6..747e0af1ff2 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java @@ -653,8 +653,10 @@ protected void processMessageAndMaybeProduceToKafka( CompletableFuture currentVersionTopicWrite = new CompletableFuture(); CompletableFuture[] viewWriterFutures = processViewWriters(partitionConsumptionState, keyBytes, mergeConflictResultWrapper); + hostLevelIngestionStats.recordViewProducerLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime)); CompletableFuture.allOf(viewWriterFutures).whenCompleteAsync((value, exception) -> { - hostLevelIngestionStats.recordViewProducerLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime)); + hostLevelIngestionStats + .recordViewProducerAckLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime)); if (exception == null) { producePutOrDeleteToKafka( mergeConflictResultWrapper, @@ -1440,7 +1442,7 @@ CompletableFuture[] processViewWriters( oldValueBB == null ? -1 : mergeConflictResultWrapper.getOldValueProvider().get().writerSchemaId(); for (VeniceViewWriter writer: viewWriters.values()) { viewWriterFutures[index++] = writer.processRecord( - mergeConflictResult.getNewValue(), + mergeConflictResultWrapper.getUpdatedValueBytes(), oldValueBB, keyBytes, mergeConflictResult.getValueSchemaId(), diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java index ae66dee4c05..d67e3679326 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java @@ -3363,8 +3363,9 @@ protected void processMessageAndMaybeProduceToKafka( CompletableFuture currentVersionTopicWrite = new CompletableFuture<>(); CompletableFuture[] viewWriterFutures = processViewWriters(partitionConsumptionState, keyBytes, writeComputeResultWrapper); + hostLevelIngestionStats.recordViewProducerLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime)); CompletableFuture.allOf(viewWriterFutures).whenCompleteAsync((value, exception) -> { - hostLevelIngestionStats.recordViewProducerLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime)); + hostLevelIngestionStats.recordViewProducerAckLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime)); if (exception == null) { produceToLocalKafkaHelper( consumerRecord, @@ -3982,7 +3983,7 @@ void reportCompleted(PartitionConsumptionState partitionConsumptionState, boolea Set getKafkaUrlSetFromTopicSwitch(TopicSwitchWrapper topicSwitchWrapper) { if (isSeparatedRealtimeTopicEnabled()) { Set result = new HashSet<>(); - for (String server : topicSwitchWrapper.getSourceServers()) { + for (String server: topicSwitchWrapper.getSourceServers()) { result.add(server); result.add(server + Utils.SEPARATE_TOPIC_SUFFIX); } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/HostLevelIngestionStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/HostLevelIngestionStats.java index e59ba6ee792..5a229779d6a 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/HostLevelIngestionStats.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/HostLevelIngestionStats.java @@ -67,6 +67,7 @@ public class HostLevelIngestionStats extends AbstractVeniceStats { private final Sensor resubscriptionFailureSensor; private final Sensor viewProducerLatencySensor; + private final Sensor viewProducerAckLatencySensor; /** * Sensors for emitting if/when we detect DCR violations (such as a backwards timestamp or receding offset vector) */ @@ -311,6 +312,12 @@ public HostLevelIngestionStats( () -> totalStats.viewProducerLatencySensor, avgAndMax()); + this.viewProducerAckLatencySensor = registerPerStoreAndTotalSensor( + "total_view_writer_ack_latency", + totalStats, + () -> totalStats.viewProducerAckLatencySensor, + avgAndMax()); + registerSensor( "storage_quota_used", new AsyncGauge((ignored, ignored2) -> hybridQuotaUsageGauge, "storage_quota_used")); @@ -513,6 +520,10 @@ public void recordViewProducerLatency(double latency) { viewProducerLatencySensor.record(latency); } + public void recordViewProducerAckLatency(double latency) { + viewProducerAckLatencySensor.record(latency); + } + public void recordUnexpectedMessage() { unexpectedMessageSensor.record(); } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java index e13e429db75..b3646850ae5 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java @@ -9,15 +9,18 @@ import com.linkedin.venice.meta.Version; import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory; import com.linkedin.venice.pubsub.api.PubSubProduceResult; +import com.linkedin.venice.utils.ByteUtils; import com.linkedin.venice.utils.RedundantExceptionFilter; +import com.linkedin.venice.utils.SystemTime; +import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; +import com.linkedin.venice.utils.lazy.Lazy; import com.linkedin.venice.views.MaterializedView; import com.linkedin.venice.writer.LeaderCompleteState; import com.linkedin.venice.writer.VeniceWriter; import com.linkedin.venice.writer.VeniceWriterFactory; import com.linkedin.venice.writer.VeniceWriterOptions; import java.nio.ByteBuffer; -import java.time.Clock; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -34,14 +37,20 @@ import org.apache.logging.log4j.Logger; +/** + * Materialized view writer is responsible for processing input records from the version topic and write them to the + * materialized view topic based on parameters defined in {@link com.linkedin.venice.meta.MaterializedViewParameters}. + * This writer has its own {@link VeniceWriter} and will also propagate heartbeat messages differently. See details in + * the doc for {@link #maybePropagateHeartbeatLowWatermarkToViewTopic} method. + */ public class MaterializedViewWriter extends VeniceViewWriter { private final PubSubProducerAdapterFactory pubSubProducerAdapterFactory; private final MaterializedView internalView; private final ReentrantLock broadcastHBLock = new ReentrantLock(); private final Map partitionToHeartbeatTimestampMap = new HashMap<>(); - private final Clock clock; + private final Time time; private final String materializedViewTopicName; - private VeniceWriter veniceWriter; + private Lazy veniceWriter; private long lastHBBroadcastTimestamp; /** @@ -59,14 +68,17 @@ public MaterializedViewWriter( Version version, Schema keySchema, Map extraViewParameters, - Clock clock) { + Time time) { super(props, version, keySchema, extraViewParameters); pubSubProducerAdapterFactory = props.getVeniceServerConfig().getPubSubClientsFactory().getProducerAdapterFactory(); internalView = new MaterializedView(props.getCombinedProperties().toProperties(), version.getStoreName(), extraViewParameters); materializedViewTopicName = internalView.getTopicNamesAndConfigsForVersion(version.getNumber()).keySet().stream().findAny().get(); - this.clock = clock; + this.time = time; + this.veniceWriter = Lazy.of( + () -> new VeniceWriterFactory(props.getCombinedProperties().toProperties(), pubSubProducerAdapterFactory, null) + .createVeniceWriter(buildWriterOptions())); } public MaterializedViewWriter( @@ -74,14 +86,14 @@ public MaterializedViewWriter( Version version, Schema keySchema, Map extraViewParameters) { - this(props, version, keySchema, extraViewParameters, Clock.systemUTC()); + this(props, version, keySchema, extraViewParameters, SystemTime.INSTANCE); } /** * package private for testing purpose */ void setVeniceWriter(VeniceWriter veniceWriter) { - this.veniceWriter = veniceWriter; + this.veniceWriter = Lazy.of(() -> veniceWriter); } @Override @@ -97,10 +109,11 @@ public CompletableFuture processRecord( @Override public CompletableFuture processRecord(ByteBuffer newValue, byte[] key, int newValueSchemaId) { - if (veniceWriter == null) { - initializeVeniceWriter(); + if (newValue == null) { + // this is a delete operation + return veniceWriter.get().delete(key, null); } - return veniceWriter.put(key, newValue.array(), newValueSchemaId); + return veniceWriter.get().put(key, ByteUtils.extractByteArray(newValue), newValueSchemaId); } @Override @@ -130,13 +143,6 @@ VeniceWriterOptions buildWriterOptions() { return setProducerOptimizations(internalView.getWriterOptionsBuilder(materializedViewTopicName, version)).build(); } - synchronized private void initializeVeniceWriter() { - if (veniceWriter == null) { - veniceWriter = - new VeniceWriterFactory(props, pubSubProducerAdapterFactory, null).createVeniceWriter(buildWriterOptions()); - } - } - /** * View topic's partitioner and partition count could be different from the VT. In order to ensure we are capturing * all potential lag in the VT ingestion from the view topic, we will broadcast the low watermark observed from every @@ -161,7 +167,7 @@ private void maybePropagateHeartbeatLowWatermarkToViewTopic( broadcastHBLock.lock(); try { partitionToHeartbeatTimestampMap.put(partition, heartbeatTimestamp); - long now = clock.millis(); + long now = time.getMilliseconds(); if (now > lastHBBroadcastTimestamp + DEFAULT_HEARTBEAT_BROADCAST_INTERVAL_MS && !partitionToHeartbeatTimestampMap.isEmpty()) { oldestHeartbeatTimestamp = Collections.min(partitionToHeartbeatTimestampMap.values()); @@ -179,9 +185,6 @@ private void maybePropagateHeartbeatLowWatermarkToViewTopic( broadcastHBLock.unlock(); } if (propagate && oldestHeartbeatTimestamp > 0) { - if (veniceWriter == null) { - initializeVeniceWriter(); - } LeaderCompleteState leaderCompleteState = LeaderCompleteState.getLeaderCompleteState(partitionConsumptionState.isCompletionReported()); Set failedPartitions = VeniceConcurrentHashMap.newKeySet(); @@ -193,14 +196,15 @@ private void maybePropagateHeartbeatLowWatermarkToViewTopic( // least // one partition leader has completed. final int viewPartitionNumber = p; - CompletableFuture heartBeatFuture = veniceWriter.sendHeartbeat( - materializedViewTopicName, - viewPartitionNumber, - null, - VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER, - true, - leaderCompleteState, - oldestHeartbeatTimestamp); + CompletableFuture heartBeatFuture = veniceWriter.get() + .sendHeartbeat( + materializedViewTopicName, + viewPartitionNumber, + null, + VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER, + true, + leaderCompleteState, + oldestHeartbeatTimestamp); heartBeatFuture.whenComplete((ignore, throwable) -> { if (throwable != null) { completionException.set(new CompletionException(throwable)); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java index 83994b8fb87..a8fa513abe6 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java @@ -29,12 +29,12 @@ import com.linkedin.venice.pubsub.PubSubClientsFactory; import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory; import com.linkedin.venice.utils.ObjectMapperFactory; +import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.VeniceProperties; import com.linkedin.venice.views.MaterializedView; import com.linkedin.venice.views.VeniceView; import com.linkedin.venice.writer.VeniceWriter; import com.linkedin.venice.writer.VeniceWriterOptions; -import java.time.Clock; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -112,11 +112,11 @@ public void testProcessIngestionHeartbeat() { viewParamsBuilder.setPartitioner(DefaultVenicePartitioner.class.getCanonicalName()); Map viewParamsMap = viewParamsBuilder.build(); VeniceConfigLoader props = getMockProps(); - Clock clock = mock(Clock.class); + Time time = mock(Time.class); long startTime = System.currentTimeMillis(); - doReturn(startTime).when(clock).millis(); + doReturn(startTime).when(time).getMilliseconds(); MaterializedViewWriter materializedViewWriter = - new MaterializedViewWriter(props, version, SCHEMA, viewParamsMap, clock); + new MaterializedViewWriter(props, version, SCHEMA, viewParamsMap, time); ControlMessage controlMessage = new ControlMessage(); controlMessage.controlMessageType = ControlMessageType.START_OF_SEGMENT.getValue(); KafkaKey kafkaKey = mock(KafkaKey.class); @@ -138,7 +138,7 @@ public void testProcessIngestionHeartbeat() { .processControlMessage(kafkaKey, kafkaMessageEnvelope, controlMessage, 1, partitionConsumptionState); verify(veniceWriter, never()).sendHeartbeat(anyString(), anyInt(), any(), any(), anyBoolean(), any(), anyLong()); long newTime = startTime + TimeUnit.MINUTES.toMillis(4); - doReturn(newTime).when(clock).millis(); + doReturn(newTime).when(time).getMilliseconds(); doReturn(startTime + TimeUnit.MINUTES.toMillis(1)).when(producerMetadata).getMessageTimestamp(); materializedViewWriter .processControlMessage(kafkaKey, kafkaMessageEnvelope, controlMessage, 1, partitionConsumptionState); @@ -157,7 +157,7 @@ public void testProcessIngestionHeartbeat() { Assert.assertEquals(timestamp, Long.valueOf(newTime)); } newTime = newTime + TimeUnit.SECONDS.toMillis(30); - doReturn(newTime).when(clock).millis(); + doReturn(newTime).when(time).getMilliseconds(); doReturn(startTime).when(producerMetadata).getMessageTimestamp(); materializedViewWriter .processControlMessage(kafkaKey, kafkaMessageEnvelope, controlMessage, 2, partitionConsumptionState); @@ -170,7 +170,7 @@ public void testProcessIngestionHeartbeat() { verify(veniceWriter, times(6)) .sendHeartbeat(anyString(), anyInt(), any(), any(), anyBoolean(), any(), heartbeatTimestampCaptor.capture()); newTime = newTime + TimeUnit.MINUTES.toMillis(3); - doReturn(newTime).when(clock).millis(); + doReturn(newTime).when(time).getMilliseconds(); doReturn(newTime).when(producerMetadata).getMessageTimestamp(); materializedViewWriter .processControlMessage(kafkaKey, kafkaMessageEnvelope, controlMessage, 1, partitionConsumptionState); diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java index ef3d033fb9d..f16c78fee25 100755 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java @@ -1055,15 +1055,16 @@ private void validateInputDataSchema(String inputDataSchemaString) { private void configureJobPropertiesWithMaterializedViewConfigs() { try { - // For now, we only perform view topic writes for basic batch push. No incremental and re-push - if (pushJobSetting.isIncrementalPush || pushJobSetting.isSourceKafka) { + // For now, we only perform view topic writes for basic batch push and re-push. No incremental pushes. + if (pushJobSetting.isIncrementalPush) { return; } StoreResponse storeResponse = ControllerClient.retryableRequest( controllerClient, pushJobSetting.controllerRetries, c -> c.getStore(pushJobSetting.storeName)); - Map viewConfigMap = storeResponse.getStore().getViewConfigs(); + Map viewConfigMap = + storeResponse.getStore().getVersion(pushJobSetting.version).get().getViewConfigs(); viewConfigMap = viewConfigMap.entrySet() .stream() .filter(vc -> Objects.equals(vc.getValue().getViewClassName(), MaterializedView.class.getCanonicalName())) @@ -1072,7 +1073,7 @@ private void configureJobPropertiesWithMaterializedViewConfigs() { pushJobSetting.materializedViewConfigFlatMap = ViewUtils.flatViewConfigMapString(viewConfigMap); } } catch (Exception e) { - throw new VeniceException("Failed to configure job properties with view configs"); + throw new VeniceException("Failed to configure job properties with view configs", e); } } diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/TestVenicePushJobCheckpoints.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/TestVenicePushJobCheckpoints.java index d15a4962741..883d20503f6 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/TestVenicePushJobCheckpoints.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/TestVenicePushJobCheckpoints.java @@ -48,6 +48,7 @@ import com.linkedin.venice.jobs.DataWriterComputeJob; import com.linkedin.venice.message.KafkaKey; import com.linkedin.venice.meta.StoreInfo; +import com.linkedin.venice.meta.Version; import com.linkedin.venice.partitioner.DefaultVenicePartitioner; import com.linkedin.venice.pushmonitor.ExecutionStatus; import com.linkedin.venice.schema.AvroSchemaParseUtils; @@ -882,6 +883,9 @@ private void configureControllerClientMock( when(storeResponse.getStore()).thenReturn(storeInfo); when(storeInfo.getCompressionStrategy()).thenReturn( CompressionStrategy.valueOf(props.getProperty(COMPRESSION_STRATEGY, CompressionStrategy.NO_OP.toString()))); + Version version = mock(Version.class); + when(version.getViewConfigs()).thenReturn(Collections.emptyMap()); + when(storeInfo.getVersion(anyInt())).thenReturn(Optional.of(version)); SchemaResponse keySchemaResponse = mock(SchemaResponse.class); when(keySchemaResponse.isError()).thenReturn(false); diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/mapreduce/datawriter/reduce/TestVeniceReducer.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/mapreduce/datawriter/reduce/TestVeniceReducer.java index 46f4c98abe0..d7abaee1506 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/mapreduce/datawriter/reduce/TestVeniceReducer.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/mapreduce/datawriter/reduce/TestVeniceReducer.java @@ -510,7 +510,7 @@ public Future put( } @Override - public Future delete( + public CompletableFuture delete( Object key, PubSubProducerCallback callback, DeleteMetadata deleteMetadata) { @@ -573,7 +573,7 @@ public Future put( } @Override - public Future delete( + public CompletableFuture delete( Object key, PubSubProducerCallback callback, DeleteMetadata deleteMetadata) { diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/writer/AbstractVeniceWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/writer/AbstractVeniceWriter.java index 4324dfe1744..3418404064e 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/writer/AbstractVeniceWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/writer/AbstractVeniceWriter.java @@ -45,7 +45,7 @@ public abstract Future put( PubSubProducerCallback callback, PutMetadata putMetadata); - public abstract Future delete( + public abstract CompletableFuture delete( K key, PubSubProducerCallback callback, DeleteMetadata deleteMetadata); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/writer/CompositeVeniceWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/writer/CompositeVeniceWriter.java index 410fb00d4f7..792dbd5ab6c 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/writer/CompositeVeniceWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/writer/CompositeVeniceWriter.java @@ -115,10 +115,42 @@ CompletableFuture put( } @Override - public Future delete(K key, PubSubProducerCallback callback, DeleteMetadata deleteMetadata) { - return mainWriter.delete(key, callback, deleteMetadata); + public CompletableFuture delete( + K key, + PubSubProducerCallback callback, + DeleteMetadata deleteMetadata) { + CompletableFuture finalFuture = new CompletableFuture<>(); + CompletableFuture[] childFutures = new CompletableFuture[childWriters.length + 1]; + int index = 0; + childFutures[index++] = lastWriteFuture; + for (VeniceWriter writer: childWriters) { + childFutures[index++] = writer.delete(key, callback, deleteMetadata); + } + CompletableFuture.allOf(childFutures).whenCompleteAsync((ignored, childException) -> { + if (childException == null) { + CompletableFuture mainFuture = mainWriter.delete(key, callback, deleteMetadata); + mainFuture.whenCompleteAsync((result, mainWriteException) -> { + if (mainWriteException == null) { + finalFuture.complete(result); + } else { + finalFuture.completeExceptionally(new VeniceException(mainWriteException)); + } + }); + } else { + VeniceException veniceException = new VeniceException(childException); + finalFuture.completeExceptionally(veniceException); + } + }); + + lastWriteFuture = finalFuture; + return finalFuture; } + /** + * The main use of the {@link CompositeVeniceWriter} for now is to write batch portion of a store version to VT and + * materialized view topic in the NR fabric. Updates should never go through the {@link CompositeVeniceWriter} because + * it should be written to RT (hybrid writes or incremental push) and handled by view writers in L/F or A/A SIT. + */ @Override public Future update( K key, @@ -126,7 +158,7 @@ public Future update( int valueSchemaId, int derivedSchemaId, PubSubProducerCallback callback) { - return mainWriter.update(key, update, valueSchemaId, derivedSchemaId, callback); + throw new UnsupportedOperationException(this.getClass().getSimpleName() + "does not support update function"); } @Override diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java index 6c938d828f2..3e7ddb1de47 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java @@ -554,11 +554,11 @@ public String getTopicName() { * * @param key - The key to delete in storage. * @param callback - callback will be executed after Kafka producer completes on sending the message. - * @return a java.util.concurrent.Future Future for the RecordMetadata that will be assigned to this + * @return a java.util.concurrent.CompletableFuture Future for the RecordMetadata that will be assigned to this * record. Invoking java.util.concurrent.Future's get() on this future will block until the associated request * completes and then return the metadata for the record or throw any exception that occurred while sending the record. */ - public Future delete(K key, PubSubProducerCallback callback) { + public CompletableFuture delete(K key, PubSubProducerCallback callback) { return delete(key, callback, DEFAULT_LEADER_METADATA_WRAPPER, APP_DEFAULT_LOGICAL_TS, null); } @@ -568,11 +568,11 @@ public Future delete(K key, PubSubProducerCallback callback * @param key - The key to delete in storage. * @param logicalTs - An timestamp field to indicate when this record was produced from apps point of view. * @param callback - callback will be executed after Kafka producer completes on sending the message. - * @return a java.util.concurrent.Future Future for the RecordMetadata that will be assigned to this + * @return a java.util.concurrent.CompletableFuture Future for the RecordMetadata that will be assigned to this * record. Invoking java.util.concurrent.Future's get() on this future will block until the associated request * completes and then return the metadata for the record or throw any exception that occurred while sending the record. */ - public Future delete(K key, long logicalTs, PubSubProducerCallback callback) { + public CompletableFuture delete(K key, long logicalTs, PubSubProducerCallback callback) { return delete(key, callback, DEFAULT_LEADER_METADATA_WRAPPER, logicalTs, null); } @@ -586,11 +586,11 @@ public Future delete(K key, long logicalTs, PubSubProducerC * sending the message in VPJ plugin to the version topic; * >=0: Leader replica consumes a delete message from real-time topic, VeniceWriter in leader * is sending this message to version topic with extra info: offset in the real-time topic. - * @return a java.util.concurrent.Future Future for the RecordMetadata that will be assigned to this + * @return a java.util.concurrent.CompletableFuture Future for the RecordMetadata that will be assigned to this * record. Invoking java.util.concurrent.Future's get() on this future will block until the associated request * completes and then return the metadata for the record or throw any exception that occurred while sending the record. */ - public Future delete( + public CompletableFuture delete( K key, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper) { @@ -608,11 +608,11 @@ public Future delete( * >=0: Leader replica consumes a delete message from real-time topic, VeniceWriter in leader * is sending this message to version topic with extra info: offset in the real-time topic. * @param logicalTs - An timestamp field to indicate when this record was produced from apps point of view. - * @return a java.util.concurrent.Future Future for the RecordMetadata that will be assigned to this + * @return a java.util.concurrent.CompletableFuture Future for the RecordMetadata that will be assigned to this * record. Invoking java.util.concurrent.Future's get() on this future will block until the associated request * completes and then return the metadata for the record or throw any exception that occurred while sending the record. */ - public Future delete( + public CompletableFuture delete( K key, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, @@ -631,11 +631,11 @@ public Future delete( * >=0: Leader replica consumes a delete message from real-time topic, VeniceWriter in leader * is sending this message to version topic with extra info: offset in the real-time topic. * @param deleteMetadata - a DeleteMetadata containing replication metadata related fields. - * @return a java.util.concurrent.Future Future for the RecordMetadata that will be assigned to this + * @return a java.util.concurrent.CompletableFuture Future for the RecordMetadata that will be assigned to this * record. Invoking java.util.concurrent.Future's get() on this future will block until the associated request * completes and then return the metadata for the record or throw any exception that occurred while sending the record. */ - public Future delete( + public CompletableFuture delete( K key, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, @@ -644,7 +644,10 @@ public Future delete( } @Override - public Future delete(K key, PubSubProducerCallback callback, DeleteMetadata deleteMetadata) { + public CompletableFuture delete( + K key, + PubSubProducerCallback callback, + DeleteMetadata deleteMetadata) { return delete(key, callback, DEFAULT_LEADER_METADATA_WRAPPER, APP_DEFAULT_LOGICAL_TS, deleteMetadata); } @@ -678,7 +681,7 @@ public void deleteDeprecatedChunk( APP_DEFAULT_LOGICAL_TS); } - private Future delete( + private CompletableFuture delete( K key, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, @@ -699,11 +702,11 @@ private Future delete( * is sending this message to version topic with extra info: offset in the real-time topic. * @param logicalTs - An timestamp field to indicate when this record was produced from apps point of view. * @param deleteMetadata - a DeleteMetadata containing replication metadata related fields (can be null). - * @return a java.util.concurrent.Future. Future for the RecordMetadata that will be assigned to this - * record. Invoking java.util.concurrent.Future's get() on this future will block until the associated request + * @return a java.util.concurrent.CompletableFuture. Future for the RecordMetadata that will be assigned to this + * record. Invoking java.util.concurrent.CompletableFuture's get() on this future will block until the associated request * completes and then return the metadata for the record or throw any exception that occurred while sending the record. */ - public Future delete( + public CompletableFuture delete( K key, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, @@ -744,7 +747,7 @@ public Future delete( delete.replicationMetadataPayload = deleteMetadata.getRmdPayload(); } - Future produceResultFuture = sendMessage( + CompletableFuture produceResultFuture = sendMessage( producerMetadata -> kafkaKey, MessageType.DELETE, delete,