From a7112638fb6be78119d4df836c0be539d02d6861 Mon Sep 17 00:00:00 2001 From: Xun Yin Date: Tue, 12 Nov 2024 18:31:42 -0800 Subject: [PATCH] Add unit tests and make VeniceViewWriter version specific (it already is implicitly) --- .../ActiveActiveStoreIngestionTask.java | 1 - .../LeaderFollowerStoreIngestionTask.java | 10 +- .../store/view/ChangeCaptureViewWriter.java | 19 +- .../store/view/MaterializedViewWriter.java | 46 ++--- .../davinci/store/view/VeniceViewWriter.java | 10 +- .../store/view/VeniceViewWriterFactory.java | 2 +- .../davinci/store/view/ViewWriterUtils.java | 5 +- .../view/ChangeCaptureViewWriterTest.java | 30 ++- .../view/MaterializedViewWriterTest.java | 177 ++++++++++++++++++ .../store/view/ViewWriterUtilsTest.java | 1 + .../linkedin/venice/view/TestViewWriter.java | 13 +- 11 files changed, 236 insertions(+), 78 deletions(-) create mode 100644 clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java index 6d0560d41e7..22b37d211de 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java @@ -1513,7 +1513,6 @@ protected CompletableFuture[] processViewWriters( mergeConflictResult.getNewValue(), oldValueBB, keyBytes, - versionNumber, mergeConflictResult.getValueSchemaId(), oldValueSchemaId, mergeConflictResult.getRmdRecord()); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java index 8cf61b9c16f..aa1e561b9f4 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java @@ -3690,13 +3690,8 @@ protected void processControlMessageForViews( // Iterate through list of views for the store and process the control message. for (VeniceViewWriter viewWriter: viewWriters.values()) { - viewWriter.processControlMessage( - kafkaKey, - kafkaMessageEnvelope, - controlMessage, - partition, - partitionConsumptionState, - this.versionNumber); + viewWriter + .processControlMessage(kafkaKey, kafkaMessageEnvelope, controlMessage, partition, partitionConsumptionState); } } @@ -3968,7 +3963,6 @@ protected CompletableFuture[] processViewWriters( viewWriterFutures[index++] = writer.processRecord( writeComputeResultWrapper.getNewPut().putValue, keyBytes, - versionNumber, writeComputeResultWrapper.getNewPut().schemaId); } return viewWriterFutures; diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriter.java index 8cfe6a211fe..c41725cb4df 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriter.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriter.java @@ -49,9 +49,10 @@ public class ChangeCaptureViewWriter extends VeniceViewWriter { public ChangeCaptureViewWriter( VeniceConfigLoader props, Store store, + int version, Schema keySchema, Map extraViewParameters) { - super(props, store, keySchema, extraViewParameters); + super(props, store, version, keySchema, extraViewParameters); internalView = new ChangeCaptureView(props.getCombinedProperties().toProperties(), store, extraViewParameters); kafkaClusterUrlToIdMap = props.getVeniceServerConfig().getKafkaClusterUrlToIdMap(); pubSubProducerAdapterFactory = props.getVeniceServerConfig().getPubSubClientsFactory().getProducerAdapterFactory(); @@ -63,7 +64,6 @@ public CompletableFuture processRecord( ByteBuffer newValue, ByteBuffer oldValue, byte[] key, - int version, int newValueSchemaId, int oldValueSchemaId, GenericRecord replicationMetadataRecord) { @@ -77,18 +77,14 @@ public CompletableFuture processRecord( recordChangeEvent.replicationCheckpointVector = RmdUtils.extractOffsetVectorFromRmd(replicationMetadataRecord); if (veniceWriter == null) { - initializeVeniceWriter(version); + initializeVeniceWriter(); } // TODO: RecordChangeEvent isn't versioned today. return veniceWriter.put(key, recordChangeEvent, 1); } @Override - public CompletableFuture processRecord( - ByteBuffer newValue, - byte[] key, - int version, - int newValueSchemaId) { + public CompletableFuture processRecord(ByteBuffer newValue, byte[] key, int newValueSchemaId) { // No op return CompletableFuture.completedFuture(null); } @@ -99,8 +95,7 @@ public void processControlMessage( KafkaMessageEnvelope kafkaMessageEnvelope, ControlMessage controlMessage, int partition, - PartitionConsumptionState partitionConsumptionState, - int version) { + PartitionConsumptionState partitionConsumptionState) { // We only care (for now) about version swap control Messages if (!(controlMessage.getControlMessageUnion() instanceof VersionSwap)) { @@ -136,7 +131,7 @@ public void processControlMessage( // Write the message on veniceWriter to the change capture topic if (veniceWriter == null) { - initializeVeniceWriter(version); + initializeVeniceWriter(); } veniceWriter.sendControlMessage( @@ -193,7 +188,7 @@ VeniceWriterOptions buildWriterOptions(int version) { return setProducerOptimizations(configBuilder).build(); } - synchronized private void initializeVeniceWriter(int version) { + synchronized private void initializeVeniceWriter() { if (veniceWriter != null) { return; } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java index 96e5a7ba249..bb46e5c79a1 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java @@ -58,10 +58,11 @@ public class MaterializedViewWriter extends VeniceViewWriter { public MaterializedViewWriter( VeniceConfigLoader props, Store store, + int version, Schema keySchema, Map extraViewParameters, Clock clock) { - super(props, store, keySchema, extraViewParameters); + super(props, store, version, keySchema, extraViewParameters); pubSubProducerAdapterFactory = props.getVeniceServerConfig().getPubSubClientsFactory().getProducerAdapterFactory(); internalView = new MaterializedView(props.getCombinedProperties().toProperties(), store, extraViewParameters); this.clock = clock; @@ -70,9 +71,17 @@ public MaterializedViewWriter( public MaterializedViewWriter( VeniceConfigLoader props, Store store, + int version, Schema keySchema, Map extraViewParameters) { - this(props, store, keySchema, extraViewParameters, Clock.systemUTC()); + this(props, store, version, keySchema, extraViewParameters, Clock.systemUTC()); + } + + /** + * package private for testing purpose + */ + void setVeniceWriter(VeniceWriter veniceWriter) { + this.veniceWriter = veniceWriter; } @Override @@ -80,21 +89,16 @@ public CompletableFuture processRecord( ByteBuffer newValue, ByteBuffer oldValue, byte[] key, - int version, int newValueSchemaId, int oldValueSchemaId, GenericRecord replicationMetadataRecord) { - return processRecord(newValue, key, version, newValueSchemaId); + return processRecord(newValue, key, newValueSchemaId); } @Override - public CompletableFuture processRecord( - ByteBuffer newValue, - byte[] key, - int version, - int newValueSchemaId) { + public CompletableFuture processRecord(ByteBuffer newValue, byte[] key, int newValueSchemaId) { if (veniceWriter == null) { - initializeVeniceWriter(version); + initializeVeniceWriter(); } return veniceWriter.put(key, newValue.array(), newValueSchemaId); } @@ -105,16 +109,14 @@ public void processControlMessage( KafkaMessageEnvelope kafkaMessageEnvelope, ControlMessage controlMessage, int partition, - PartitionConsumptionState partitionConsumptionState, - int version) { + PartitionConsumptionState partitionConsumptionState) { final ControlMessageType type = ControlMessageType.valueOf(controlMessage); // Ignore other control messages for materialized view. if (type == ControlMessageType.START_OF_SEGMENT && Arrays.equals(kafkaKey.getKey(), KafkaKey.HEART_BEAT.getKey())) { maybePropagateHeartbeatLowWatermarkToViewTopic( partition, partitionConsumptionState, - kafkaMessageEnvelope.producerMetadata.messageTimestamp, - version); + kafkaMessageEnvelope.getProducerMetadata().getMessageTimestamp()); } } @@ -137,7 +139,7 @@ VeniceWriterOptions buildWriterOptions(int version) { return setProducerOptimizations(configBuilder).build(); } - synchronized private void initializeVeniceWriter(int version) { + synchronized private void initializeVeniceWriter() { if (veniceWriter == null) { veniceWriter = new VeniceWriterFactory(props, pubSubProducerAdapterFactory, null) .createVeniceWriter(buildWriterOptions(version)); @@ -162,10 +164,9 @@ synchronized private void initializeVeniceWriter(int version) { private void maybePropagateHeartbeatLowWatermarkToViewTopic( int partition, PartitionConsumptionState partitionConsumptionState, - long heartbeatTimestamp, - int version) { + long heartbeatTimestamp) { boolean propagate = false; - long oldestHeartbeatTimestamp; + long oldestHeartbeatTimestamp = 0; broadcastHBLock.lock(); try { partitionToHeartbeatTimestampMap.put(partition, heartbeatTimestamp); @@ -178,14 +179,17 @@ private void maybePropagateHeartbeatLowWatermarkToViewTopic( propagate = true; lastHBBroadcastTimestamp = now; } + // We have determined that the oldestHeartbeatTimestamp offers no value in monitoring the lag for this view + // topic since it's within the DEFAULT_HEARTBEAT_BROADCAST_DELAY_THRESHOLD. We are also clearing the map, so we + // don't need to worry about removing timestamps belonging to partitions that we are no longer leader of. partitionToHeartbeatTimestampMap.clear(); } } finally { broadcastHBLock.unlock(); } - if (propagate) { + if (propagate && oldestHeartbeatTimestamp > 0) { if (veniceWriter == null) { - initializeVeniceWriter(version); + initializeVeniceWriter(); } LeaderCompleteState leaderCompleteState = LeaderCompleteState.getLeaderCompleteState(partitionConsumptionState.isCompletionReported()); @@ -205,7 +209,7 @@ private void maybePropagateHeartbeatLowWatermarkToViewTopic( VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER, true, leaderCompleteState, - heartbeatTimestamp); + oldestHeartbeatTimestamp); heartBeatFuture.whenComplete((ignore, throwable) -> { if (throwable != null) { completionException.set(new CompletionException(throwable)); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriter.java index 3a02a435a7d..d28755c7f3d 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriter.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriter.java @@ -28,12 +28,16 @@ * view implementations. */ public abstract class VeniceViewWriter extends VeniceView { + protected final int version; + public VeniceViewWriter( VeniceConfigLoader props, Store store, + int version, Schema keySchema, Map extraViewParameters) { super(props.getCombinedProperties().toProperties(), store, extraViewParameters); + this.version = version; } /** @@ -52,7 +56,6 @@ public abstract CompletableFuture processRecord( ByteBuffer newValue, ByteBuffer oldValue, byte[] key, - int version, int newValueSchemaId, int oldValueSchemaId, GenericRecord replicationMetadataRecord); @@ -69,7 +72,6 @@ public abstract CompletableFuture processRecord( public abstract CompletableFuture processRecord( ByteBuffer newValue, byte[] key, - int version, int newValueSchemaId); /** @@ -85,15 +87,13 @@ public abstract CompletableFuture processRecord( * @param controlMessage the control message we're processing * @param partition the partition this control message was delivered to * @param partitionConsumptionState the pcs of the consuming node - * @param version the store version that received this message */ public void processControlMessage( KafkaKey kafkaKey, KafkaMessageEnvelope kafkaMessageEnvelope, ControlMessage controlMessage, int partition, - PartitionConsumptionState partitionConsumptionState, - int version) { + PartitionConsumptionState partitionConsumptionState) { // Optionally act on Control Message } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriterFactory.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriterFactory.java index 0e0bc514d74..20d0b18af1d 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriterFactory.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriterFactory.java @@ -24,7 +24,7 @@ public Map buildStoreViewWriters(Store store, int vers String className = viewConfig.getValue().getViewClassName(); Map extraParams = viewConfig.getValue().getViewParameters(); VeniceViewWriter viewWriter = - ViewWriterUtils.getVeniceViewWriter(className, properties, store, keySchema, extraParams); + ViewWriterUtils.getVeniceViewWriter(className, properties, store, version, keySchema, extraParams); storeViewWriters.put(viewConfig.getKey(), viewWriter); } return storeViewWriters; diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ViewWriterUtils.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ViewWriterUtils.java index b0151e89108..4a63abb84df 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ViewWriterUtils.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ViewWriterUtils.java @@ -15,6 +15,7 @@ public static VeniceViewWriter getVeniceViewWriter( String viewClass, VeniceConfigLoader configLoader, Store store, + int version, Schema keySchema, Map extraViewParameters) { Properties params = configLoader.getCombinedProperties().toProperties(); @@ -25,8 +26,8 @@ public static VeniceViewWriter getVeniceViewWriter( VeniceViewWriter viewWriter = ReflectUtils.callConstructor( ReflectUtils.loadClass(view.getWriterClassName()), - new Class[] { VeniceConfigLoader.class, Store.class, Schema.class, Map.class }, - new Object[] { configLoader, store, keySchema, extraViewParameters }); + new Class[] { VeniceConfigLoader.class, Store.class, Integer.TYPE, Schema.class, Map.class }, + new Object[] { configLoader, store, version, keySchema, extraViewParameters }); return viewWriter; } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriterTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriterTest.java index 644159a7c61..194e80dfe52 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriterTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriterTest.java @@ -111,7 +111,7 @@ public void testConstructVersionSwapMessage() { // Build the change capture writer and set the mock writer ChangeCaptureViewWriter changeCaptureViewWriter = - new ChangeCaptureViewWriter(mockVeniceConfigLoader, mockStore, SCHEMA, Collections.emptyMap()); + new ChangeCaptureViewWriter(mockVeniceConfigLoader, mockStore, 1, SCHEMA, Collections.emptyMap()); changeCaptureViewWriter.setVeniceWriter(mockVeniceWriter); // Verify that we never produce the version swap from a follower replica @@ -120,8 +120,7 @@ public void testConstructVersionSwapMessage() { kafkaMessageEnvelope, controlMessage, 1, - mockFollowerPartitionConsumptionState, - 1); + mockFollowerPartitionConsumptionState); Mockito.verify(mockVeniceWriter, Mockito.never()) .sendControlMessage(Mockito.any(), Mockito.anyInt(), Mockito.anyMap(), Mockito.any(), Mockito.any()); @@ -133,8 +132,7 @@ public void testConstructVersionSwapMessage() { kafkaMessageEnvelope, ignoredControlMessage, 1, - mockLeaderPartitionConsumptionState, - 1); + mockLeaderPartitionConsumptionState); Mockito.verify(mockVeniceWriter, Mockito.never()) .sendControlMessage(Mockito.any(), Mockito.anyInt(), Mockito.anyMap(), Mockito.any(), Mockito.any()); @@ -148,18 +146,12 @@ public void testConstructVersionSwapMessage() { kafkaMessageEnvelope, ignoredControlMessage, 1, - mockLeaderPartitionConsumptionState, - 1); + mockLeaderPartitionConsumptionState); Mockito.verify(mockVeniceWriter, Mockito.never()) .sendControlMessage(Mockito.any(), Mockito.anyInt(), Mockito.anyMap(), Mockito.any(), Mockito.any()); - changeCaptureViewWriter.processControlMessage( - kafkaKey, - kafkaMessageEnvelope, - controlMessage, - 1, - mockLeaderPartitionConsumptionState, - 1); + changeCaptureViewWriter + .processControlMessage(kafkaKey, kafkaMessageEnvelope, controlMessage, 1, mockLeaderPartitionConsumptionState); ArgumentCaptor messageArgumentCaptor = ArgumentCaptor.forClass(ControlMessage.class); // Verify and capture input @@ -210,7 +202,7 @@ public void testBuildWriterOptions() { Mockito.when(mockVeniceConfigLoader.getVeniceServerConfig()).thenReturn(mockVeniceServerConfig); ChangeCaptureViewWriter changeCaptureViewWriter = - new ChangeCaptureViewWriter(mockVeniceConfigLoader, mockStore, SCHEMA, Collections.emptyMap()); + new ChangeCaptureViewWriter(mockVeniceConfigLoader, mockStore, 1, SCHEMA, Collections.emptyMap()); VeniceWriterOptions writerOptions = changeCaptureViewWriter.buildWriterOptions(1); @@ -245,7 +237,7 @@ public void testProcessRecord() throws ExecutionException, InterruptedException Mockito.when(mockVeniceConfigLoader.getVeniceServerConfig()).thenReturn(mockVeniceServerConfig); ChangeCaptureViewWriter changeCaptureViewWriter = - new ChangeCaptureViewWriter(mockVeniceConfigLoader, mockStore, SCHEMA, Collections.emptyMap()); + new ChangeCaptureViewWriter(mockVeniceConfigLoader, mockStore, 1, SCHEMA, Collections.emptyMap()); Schema rmdSchema = RmdSchemaGenerator.generateMetadataSchema(SCHEMA, 1); List vectors = Arrays.asList(1L, 2L, 3L); @@ -255,13 +247,13 @@ public void testProcessRecord() throws ExecutionException, InterruptedException changeCaptureViewWriter.setVeniceWriter(mockVeniceWriter); // Update Case - changeCaptureViewWriter.processRecord(NEW_VALUE, OLD_VALUE, KEY, 1, 1, 1, rmdRecordWithValueLevelTimeStamp).get(); + changeCaptureViewWriter.processRecord(NEW_VALUE, OLD_VALUE, KEY, 1, 1, rmdRecordWithValueLevelTimeStamp).get(); // Insert Case - changeCaptureViewWriter.processRecord(NEW_VALUE, null, KEY, 1, 1, 1, rmdRecordWithValueLevelTimeStamp).get(); + changeCaptureViewWriter.processRecord(NEW_VALUE, null, KEY, 1, 1, rmdRecordWithValueLevelTimeStamp).get(); // Deletion Case - changeCaptureViewWriter.processRecord(null, OLD_VALUE, KEY, 1, 1, 1, rmdRecordWithValueLevelTimeStamp).get(); + changeCaptureViewWriter.processRecord(null, OLD_VALUE, KEY, 1, 1, rmdRecordWithValueLevelTimeStamp).get(); // Set up argument captors ArgumentCaptor keyCaptor = ArgumentCaptor.forClass(byte[].class); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java new file mode 100644 index 00000000000..be993ac33c2 --- /dev/null +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java @@ -0,0 +1,177 @@ +package com.linkedin.davinci.store.view; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; +import com.linkedin.davinci.config.VeniceConfigLoader; +import com.linkedin.davinci.config.VeniceServerConfig; +import com.linkedin.davinci.kafka.consumer.PartitionConsumptionState; +import com.linkedin.venice.kafka.protocol.ControlMessage; +import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; +import com.linkedin.venice.kafka.protocol.ProducerMetadata; +import com.linkedin.venice.kafka.protocol.enums.ControlMessageType; +import com.linkedin.venice.message.KafkaKey; +import com.linkedin.venice.meta.Store; +import com.linkedin.venice.meta.Version; +import com.linkedin.venice.meta.ViewParameters; +import com.linkedin.venice.partitioner.DefaultVenicePartitioner; +import com.linkedin.venice.pubsub.PubSubClientsFactory; +import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory; +import com.linkedin.venice.utils.VeniceProperties; +import com.linkedin.venice.views.MaterializedView; +import com.linkedin.venice.views.VeniceView; +import com.linkedin.venice.writer.VeniceWriter; +import com.linkedin.venice.writer.VeniceWriterOptions; +import java.time.Clock; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import org.apache.avro.Schema; +import org.mockito.ArgumentCaptor; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class MaterializedViewWriterTest { + private static final Schema SCHEMA = AvroCompatibilityHelper.parse("\"string\""); + + @Test + public void testBuildWriterOptions() { + String storeName = "testStore"; + String viewName = "testMaterializedView"; + Version version = mock(Version.class); + doReturn(true).when(version).isChunkingEnabled(); + doReturn(true).when(version).isRmdChunkingEnabled(); + Store store = getMockStore(storeName, 1, version); + doReturn(true).when(store).isNearlineProducerCompressionEnabled(); + doReturn(3).when(store).getNearlineProducerCountPerWriter(); + ViewParameters.Builder viewParamsBuilder = new ViewParameters.Builder(viewName); + viewParamsBuilder.setPartitionCount(6); + viewParamsBuilder.setPartitioner(DefaultVenicePartitioner.class.getCanonicalName()); + Map viewParamsMap = viewParamsBuilder.build(); + VeniceConfigLoader props = getMockProps(); + MaterializedViewWriter materializedViewWriter = new MaterializedViewWriter(props, store, 1, SCHEMA, viewParamsMap); + VeniceWriterOptions writerOptions = materializedViewWriter.buildWriterOptions(1); + Assert.assertEquals( + writerOptions.getTopicName(), + Version.composeKafkaTopic(storeName, 1) + VeniceView.VIEW_TOPIC_SEPARATOR + viewName + + MaterializedView.MATERIALIZED_VIEW_TOPIC_SUFFIX); + Assert.assertEquals(writerOptions.getPartitionCount(), Integer.valueOf(6)); + Assert.assertEquals(writerOptions.getPartitioner().getClass(), DefaultVenicePartitioner.class); + Assert.assertEquals(writerOptions.getProducerCount(), 3); + Assert.assertTrue(writerOptions.isProducerCompressionEnabled()); + } + + @Test + public void testProcessIngestionHeartbeat() { + String storeName = "testStore"; + String viewName = "testMaterializedView"; + Version version = mock(Version.class); + doReturn(true).when(version).isChunkingEnabled(); + doReturn(true).when(version).isRmdChunkingEnabled(); + Store store = getMockStore(storeName, 1, version); + ViewParameters.Builder viewParamsBuilder = new ViewParameters.Builder(viewName); + viewParamsBuilder.setPartitionCount(6); + viewParamsBuilder.setPartitioner(DefaultVenicePartitioner.class.getCanonicalName()); + Map viewParamsMap = viewParamsBuilder.build(); + VeniceConfigLoader props = getMockProps(); + Clock clock = mock(Clock.class); + long startTime = System.currentTimeMillis(); + doReturn(startTime).when(clock).millis(); + MaterializedViewWriter materializedViewWriter = + new MaterializedViewWriter(props, store, 1, SCHEMA, viewParamsMap, clock); + materializedViewWriter.buildWriterOptions(1); + ControlMessage controlMessage = new ControlMessage(); + controlMessage.controlMessageType = ControlMessageType.START_OF_SEGMENT.getValue(); + KafkaKey kafkaKey = mock(KafkaKey.class); + doReturn(KafkaKey.HEART_BEAT.getKey()).when(kafkaKey).getKey(); + VeniceWriter veniceWriter = mock(VeniceWriter.class); + when(veniceWriter.sendHeartbeat(anyString(), anyInt(), any(), any(), anyBoolean(), any(), anyLong())) + .thenReturn(CompletableFuture.completedFuture(null)); + doReturn(CompletableFuture.completedFuture(null)).when(veniceWriter) + .sendHeartbeat(anyString(), anyInt(), any(), any(), anyBoolean(), any(), anyLong()); + materializedViewWriter.setVeniceWriter(veniceWriter); + KafkaMessageEnvelope kafkaMessageEnvelope = mock(KafkaMessageEnvelope.class); + ProducerMetadata producerMetadata = mock(ProducerMetadata.class); + doReturn(producerMetadata).when(kafkaMessageEnvelope).getProducerMetadata(); + doReturn(startTime).when(producerMetadata).getMessageTimestamp(); + PartitionConsumptionState partitionConsumptionState = mock(PartitionConsumptionState.class); + doReturn(true).when(partitionConsumptionState).isCompletionReported(); + + materializedViewWriter + .processControlMessage(kafkaKey, kafkaMessageEnvelope, controlMessage, 1, partitionConsumptionState); + verify(veniceWriter, never()).sendHeartbeat(anyString(), anyInt(), any(), any(), anyBoolean(), any(), anyLong()); + long newTime = startTime + TimeUnit.MINUTES.toMillis(4); + doReturn(newTime).when(clock).millis(); + doReturn(startTime + TimeUnit.MINUTES.toMillis(1)).when(producerMetadata).getMessageTimestamp(); + materializedViewWriter + .processControlMessage(kafkaKey, kafkaMessageEnvelope, controlMessage, 1, partitionConsumptionState); + // We still don't expect any broadcast from partition 1 leader because staleness is within 5 minutes + verify(veniceWriter, never()).sendHeartbeat(anyString(), anyInt(), any(), any(), anyBoolean(), any(), anyLong()); + doReturn(newTime).when(producerMetadata).getMessageTimestamp(); + materializedViewWriter + .processControlMessage(kafkaKey, kafkaMessageEnvelope, controlMessage, 0, partitionConsumptionState); + // Partition 0's leader should broadcast based on last broadcast timestamp (0) regardless of staleness threshold + ArgumentCaptor heartbeatTimestampCaptor = ArgumentCaptor.forClass(Long.class); + verify(veniceWriter, times(6)) + .sendHeartbeat(anyString(), anyInt(), any(), any(), anyBoolean(), any(), heartbeatTimestampCaptor.capture()); + // The low watermark for this materialized view writer should be the latest heartbeat stamp received by partition 0 + // since the low watermark from partition 1 was ignored due to DEFAULT_HEARTBEAT_BROADCAST_DELAY_THRESHOLD + for (Long timestamp: heartbeatTimestampCaptor.getAllValues()) { + Assert.assertEquals(timestamp, Long.valueOf(newTime)); + } + newTime = newTime + TimeUnit.SECONDS.toMillis(30); + doReturn(newTime).when(clock).millis(); + doReturn(startTime).when(producerMetadata).getMessageTimestamp(); + materializedViewWriter + .processControlMessage(kafkaKey, kafkaMessageEnvelope, controlMessage, 2, partitionConsumptionState); + doReturn(newTime).when(producerMetadata).getMessageTimestamp(); + materializedViewWriter + .processControlMessage(kafkaKey, kafkaMessageEnvelope, controlMessage, 1, partitionConsumptionState); + materializedViewWriter + .processControlMessage(kafkaKey, kafkaMessageEnvelope, controlMessage, 0, partitionConsumptionState); + // No new broadcast since it's still within DEFAULT_HEARTBEAT_BROADCAST_INTERVAL_MS (1 minute) since last broadcast. + verify(veniceWriter, times(6)) + .sendHeartbeat(anyString(), anyInt(), any(), any(), anyBoolean(), any(), heartbeatTimestampCaptor.capture()); + newTime = newTime + TimeUnit.MINUTES.toMillis(3); + doReturn(newTime).when(clock).millis(); + doReturn(newTime).when(producerMetadata).getMessageTimestamp(); + materializedViewWriter + .processControlMessage(kafkaKey, kafkaMessageEnvelope, controlMessage, 1, partitionConsumptionState); + // We should broadcast the stale heartbeat timestamp from partition 2 since it's > than the reporting threshold + verify(veniceWriter, times(12)) + .sendHeartbeat(anyString(), anyInt(), any(), any(), anyBoolean(), any(), heartbeatTimestampCaptor.capture()); + Assert.assertEquals(heartbeatTimestampCaptor.getValue(), Long.valueOf(startTime)); + } + + private VeniceConfigLoader getMockProps() { + VeniceConfigLoader props = mock(VeniceConfigLoader.class); + VeniceServerConfig serverConfig = mock(VeniceServerConfig.class); + PubSubClientsFactory pubSubClientsFactory = mock(PubSubClientsFactory.class); + PubSubProducerAdapterFactory pubSubProducerAdapterFactory = mock(PubSubProducerAdapterFactory.class); + doReturn(pubSubProducerAdapterFactory).when(pubSubClientsFactory).getProducerAdapterFactory(); + doReturn(pubSubClientsFactory).when(serverConfig).getPubSubClientsFactory(); + doReturn(serverConfig).when(props).getVeniceServerConfig(); + VeniceProperties veniceProperties = new VeniceProperties(new Properties()); + doReturn(veniceProperties).when(props).getCombinedProperties(); + return props; + } + + private Store getMockStore(String storeName, int versionNumber, Version version) { + Store store = mock(Store.class); + doReturn(storeName).when(store).getName(); + doReturn(version).when(store).getVersionOrThrow(versionNumber); + return store; + } +} diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/ViewWriterUtilsTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/ViewWriterUtilsTest.java index 9afe5a17270..5a180f7ed41 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/ViewWriterUtilsTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/ViewWriterUtilsTest.java @@ -55,6 +55,7 @@ public void testGetVeniceViewWriter() { ChangeCaptureView.class.getCanonicalName(), mockVeniceConfigLoader, mockStore, + 1, SCHEMA, Collections.EMPTY_MAP); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/view/TestViewWriter.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/view/TestViewWriter.java index f429fe9e59e..34776b2184f 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/view/TestViewWriter.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/view/TestViewWriter.java @@ -25,9 +25,10 @@ public class TestViewWriter extends VeniceViewWriter { public TestViewWriter( VeniceConfigLoader props, Store store, + int version, Schema keySchema, Map extraViewParameters) { - super(props, store, keySchema, extraViewParameters); + super(props, store, version, keySchema, extraViewParameters); internalView = new TestView(props.getCombinedProperties().toProperties(), store, extraViewParameters); } @@ -36,7 +37,6 @@ public CompletableFuture processRecord( ByteBuffer newValue, ByteBuffer oldValue, byte[] key, - int version, int newValueSchemaId, int oldValueSchemaId, GenericRecord replicationMetadataRecord) { @@ -46,11 +46,7 @@ public CompletableFuture processRecord( } @Override - public CompletableFuture processRecord( - ByteBuffer newValue, - byte[] key, - int version, - int newValueSchemaId) { + public CompletableFuture processRecord(ByteBuffer newValue, byte[] key, int newValueSchemaId) { internalView.incrementRecordCount(store.getName()); return CompletableFuture.completedFuture(null); } @@ -61,8 +57,7 @@ public void processControlMessage( KafkaMessageEnvelope kafkaMessageEnvelope, ControlMessage controlMessage, int partition, - PartitionConsumptionState partitionConsumptionState, - int version) { + PartitionConsumptionState partitionConsumptionState) { // TODO: The below logic only operates on VersionSwap. We might want to augment this // logic to handle other control messages.