Skip to content

Commit

Permalink
Unit test coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
xunyin8 committed Dec 3, 2024
1 parent 7b9fe93 commit 37c15cc
Show file tree
Hide file tree
Showing 11 changed files with 193 additions and 67 deletions.
Original file line number Diff line number Diff line change
@@ -1,32 +1,47 @@
package com.linkedin.davinci.kafka.consumer;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.davinci.config.VeniceStoreVersionConfig;
import com.linkedin.davinci.helix.LeaderFollowerPartitionStateModel;
import com.linkedin.davinci.storage.StorageService;
import com.linkedin.davinci.store.view.MaterializedViewWriter;
import com.linkedin.davinci.store.view.VeniceViewWriter;
import com.linkedin.davinci.store.view.VeniceViewWriterFactory;
import com.linkedin.venice.kafka.protocol.Put;
import com.linkedin.venice.meta.MaterializedViewParameters;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.ViewConfig;
import com.linkedin.venice.meta.ViewConfigImpl;
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.partitioner.DefaultVenicePartitioner;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.schema.SchemaEntry;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.lazy.Lazy;
import com.linkedin.venice.views.MaterializedView;
import com.linkedin.venice.writer.VeniceWriter;
import it.unimi.dsi.fastutil.objects.Object2IntMaps;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.function.BooleanSupplier;
import org.testng.annotations.Test;

Expand All @@ -42,6 +57,8 @@ public class LeaderFollowerStoreIngestionTaskTest {
private BooleanSupplier mockBooleanSupplier;
private VeniceStoreVersionConfig mockVeniceStoreVersionConfig;

private VeniceViewWriterFactory mockVeniceViewWriterFactory;

@Test
public void testCheckWhetherToCloseUnusedVeniceWriter() {
VeniceWriter<byte[], byte[], byte[]> writer1 = mock(VeniceWriter.class);
Expand Down Expand Up @@ -150,9 +167,18 @@ public void setUp() throws InterruptedException {
PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();
StoreIngestionTaskFactory.Builder builder = TestUtils.getStoreIngestionTaskBuilder(storeName)
.setServerConfig(mockVeniceServerConfig)
.setPubSubTopicRepository(pubSubTopicRepository);
.setPubSubTopicRepository(pubSubTopicRepository)
.setVeniceViewWriterFactory(mockVeniceViewWriterFactory);
when(builder.getSchemaRepo().getKeySchema(storeName)).thenReturn(new SchemaEntry(1, "\"string\""));
mockStore = builder.getMetadataRepo().getStoreOrThrow(storeName);
Version version = mockStore.getVersion(versionNumber);
Map<String, ViewConfig> viewConfigMap = new HashMap<>();
String viewName = "testView";
MaterializedViewParameters.Builder viewParamBuilder = new MaterializedViewParameters.Builder(viewName);
viewParamBuilder.setPartitioner(DefaultVenicePartitioner.class.getCanonicalName()).setPartitionCount(3);
ViewConfig viewConfig = new ViewConfigImpl(MaterializedView.class.getCanonicalName(), viewParamBuilder.build());
viewConfigMap.put(viewName, viewConfig);
when(mockStore.getViewConfigs()).thenReturn(viewConfigMap);

mockPartitionConsumptionState = mock(PartitionConsumptionState.class);
mockConsumerAction = mock(ConsumerAction.class);
Expand Down Expand Up @@ -219,4 +245,25 @@ public void testVeniceWriterInProcessConsumerAction() throws InterruptedExceptio
leaderFollowerStoreIngestionTask.processConsumerAction(mockConsumerAction, mockStore);
verify(mockWriter, times(1)).closePartition(0);
}

@Test
public void testProcessViewWriters() throws InterruptedException {
mockVeniceViewWriterFactory = mock(VeniceViewWriterFactory.class);
Map<String, VeniceViewWriter> viewWriterMap = new HashMap<>();
MaterializedViewWriter materializedViewWriter = mock(MaterializedViewWriter.class);
viewWriterMap.put("testView", materializedViewWriter);
when(mockVeniceViewWriterFactory.buildStoreViewWriters(any(), anyInt(), any())).thenReturn(viewWriterMap);
CompletableFuture<PubSubProduceResult> viewWriterFuture = new CompletableFuture<>();
when(materializedViewWriter.processRecord(any(), any(), anyInt())).thenReturn(viewWriterFuture);
setUp();
WriteComputeResultWrapper mockResult = mock(WriteComputeResultWrapper.class);
Put put = new Put();
put.schemaId = 1;
when(mockResult.getNewPut()).thenReturn(put);
CompletableFuture[] futures = leaderFollowerStoreIngestionTask
.processViewWriters(mockPartitionConsumptionState, new byte[1], null, mockResult);
assertEquals(futures.length, 2);
verify(mockPartitionConsumptionState, times(1)).getLastVTProduceCallFuture();
verify(materializedViewWriter, times(1)).processRecord(any(), any(), anyInt());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import com.linkedin.venice.kafka.protocol.ProducerMetadata;
import com.linkedin.venice.kafka.protocol.enums.ControlMessageType;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.meta.MaterializedViewParameters;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.ViewParameters;
import com.linkedin.venice.partitioner.DefaultVenicePartitioner;
import com.linkedin.venice.pubsub.PubSubClientsFactory;
import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory;
Expand Down Expand Up @@ -54,20 +54,20 @@ public class MaterializedViewWriterTest {
public void testViewParametersBuilder() throws JsonProcessingException {
String viewName = "testMaterializedView";
int partitionCount = 3;
ViewParameters.Builder viewParamsBuilder = new ViewParameters.Builder(viewName);
MaterializedViewParameters.Builder viewParamsBuilder = new MaterializedViewParameters.Builder(viewName);
Map<String, String> viewParams = viewParamsBuilder.build();
Assert.assertEquals(viewParams.size(), 1);
Assert.assertEquals(viewParams.get(ViewParameters.MATERIALIZED_VIEW_NAME.name()), viewName);
Assert.assertEquals(viewParams.get(MaterializedViewParameters.MATERIALIZED_VIEW_NAME.name()), viewName);
viewParamsBuilder.setPartitionCount(partitionCount);
List<String> projectionFields = Arrays.asList("field1", "field2");
viewParamsBuilder.setProjectionFields(projectionFields);
viewParams = viewParamsBuilder.build();
Assert.assertEquals(viewParams.size(), 3);
Assert.assertEquals(
viewParams.get(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name()),
viewParams.get(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name()),
String.valueOf(partitionCount));
Assert.assertEquals(
viewParams.get(ViewParameters.MATERIALIZED_VIEW_PROJECTION_FIELDS.name()),
viewParams.get(MaterializedViewParameters.MATERIALIZED_VIEW_PROJECTION_FIELDS.name()),
ObjectMapperFactory.getInstance().writeValueAsString(projectionFields));
}

Expand All @@ -81,7 +81,7 @@ public void testBuildWriterOptions() {
Store store = getMockStore(storeName, 1, version);
doReturn(true).when(store).isNearlineProducerCompressionEnabled();
doReturn(3).when(store).getNearlineProducerCountPerWriter();
ViewParameters.Builder viewParamsBuilder = new ViewParameters.Builder(viewName);
MaterializedViewParameters.Builder viewParamsBuilder = new MaterializedViewParameters.Builder(viewName);
viewParamsBuilder.setPartitionCount(6);
viewParamsBuilder.setPartitioner(DefaultVenicePartitioner.class.getCanonicalName());
Map<String, String> viewParamsMap = viewParamsBuilder.build();
Expand All @@ -107,7 +107,7 @@ public void testProcessIngestionHeartbeat() {
doReturn(true).when(version).isChunkingEnabled();
doReturn(true).when(version).isRmdChunkingEnabled();
getMockStore(storeName, 1, version);
ViewParameters.Builder viewParamsBuilder = new ViewParameters.Builder(viewName);
MaterializedViewParameters.Builder viewParamsBuilder = new MaterializedViewParameters.Builder(viewName);
viewParamsBuilder.setPartitionCount(6);
viewParamsBuilder.setPartitioner(DefaultVenicePartitioner.class.getCanonicalName());
Map<String, String> viewParamsMap = viewParamsBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@
import com.linkedin.venice.hadoop.task.datawriter.DataWriterTaskTracker;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.meta.HybridStoreConfigImpl;
import com.linkedin.venice.meta.MaterializedViewParameters;
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionImpl;
import com.linkedin.venice.meta.ViewConfig;
import com.linkedin.venice.meta.ViewConfigImpl;
import com.linkedin.venice.meta.ViewParameters;
import com.linkedin.venice.partitioner.DefaultVenicePartitioner;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.schema.AvroSchemaParseUtils;
Expand Down Expand Up @@ -944,8 +944,9 @@ public void testConfigureWithMaterializedViewConfigs() throws Exception {
Assert.assertNull(pushJobSetting.materializedViewConfigFlatMap);
}
Map<String, ViewConfig> viewConfigs = new HashMap<>();
ViewParameters.Builder builder = new ViewParameters.Builder("testView").setPartitionCount(12)
.setPartitioner(DefaultVenicePartitioner.class.getCanonicalName());
MaterializedViewParameters.Builder builder =
new MaterializedViewParameters.Builder("testView").setPartitionCount(12)
.setPartitioner(DefaultVenicePartitioner.class.getCanonicalName());
viewConfigs.put("testView", new ViewConfigImpl(MaterializedView.class.getCanonicalName(), builder.build()));
viewConfigs
.put("dummyView", new ViewConfigImpl(ChangeCaptureView.class.getCanonicalName(), Collections.emptyMap()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
import com.linkedin.venice.hadoop.mapreduce.engine.MapReduceEngineTaskConfigProvider;
import com.linkedin.venice.hadoop.task.datawriter.AbstractPartitionWriter;
import com.linkedin.venice.hadoop.task.datawriter.DataWriterTaskTracker;
import com.linkedin.venice.meta.MaterializedViewParameters;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.ViewConfig;
import com.linkedin.venice.meta.ViewConfigImpl;
import com.linkedin.venice.meta.ViewParameters;
import com.linkedin.venice.partitioner.DefaultVenicePartitioner;
import com.linkedin.venice.pubsub.adapter.SimplePubSubProduceResultImpl;
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
Expand Down Expand Up @@ -637,13 +637,13 @@ public void testCreateCompositeVeniceWriter() throws JsonProcessingException {
VeniceWriter<byte[], byte[], byte[]> mainWriter = mock(VeniceWriter.class);
Map<String, ViewConfig> viewConfigMap = new HashMap<>();
String view1Name = "view1";
ViewParameters.Builder builder = new ViewParameters.Builder(view1Name);
MaterializedViewParameters.Builder builder = new MaterializedViewParameters.Builder(view1Name);
builder.setPartitionCount(6);
builder.setPartitioner(DefaultVenicePartitioner.class.getCanonicalName());
ViewConfigImpl viewConfig1 = new ViewConfigImpl(MaterializedView.class.getCanonicalName(), builder.build());
viewConfigMap.put(view1Name, viewConfig1);
String view2Name = "view2";
builder = new ViewParameters.Builder(view2Name);
builder = new MaterializedViewParameters.Builder(view2Name);
builder.setPartitionCount(12);
builder.setPartitioner(DefaultVenicePartitioner.class.getCanonicalName());
ViewConfigImpl viewConfig2 = new ViewConfigImpl(MaterializedView.class.getCanonicalName(), builder.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import java.util.Objects;


public enum ViewParameters {
public enum MaterializedViewParameters {
/**
* Parameter key used to specify the re-partition view name.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
import static com.linkedin.venice.views.ViewUtils.PARTITION_COUNT;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.meta.MaterializedViewParameters;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.ViewConfig;
import com.linkedin.venice.meta.ViewParameters;
import com.linkedin.venice.partitioner.VenicePartitioner;
import com.linkedin.venice.utils.PartitionUtils;
import com.linkedin.venice.utils.VeniceProperties;
Expand All @@ -29,12 +29,14 @@ public class MaterializedView extends VeniceView {
public MaterializedView(Properties props, String storeName, Map<String, String> viewParameters) {
super(props, storeName, viewParameters);
// Override topic partition count config
viewPartitionCount = Integer.parseInt(viewParameters.get(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name()));
viewPartitionCount =
Integer.parseInt(viewParameters.get(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name()));
this.props.put(PARTITION_COUNT, viewPartitionCount);
viewPartitioner = Lazy.of(() -> {
String viewPartitionerClass = this.viewParameters.get(ViewParameters.MATERIALIZED_VIEW_PARTITIONER.name());
String viewPartitionerClass =
this.viewParameters.get(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITIONER.name());
String viewPartitionerParamsString =
this.viewParameters.get(ViewParameters.MATERIALIZED_VIEW_PARTITIONER_PARAMS.name());
this.viewParameters.get(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITIONER_PARAMS.name());
return PartitionUtils.getVenicePartitioner(viewPartitionerClass, viewPartitionerParamsString);
});
}
Expand All @@ -52,42 +54,45 @@ public VeniceWriterOptions.Builder getWriterOptionsBuilder(String viewTopicName,
@Override
public Map<String, VeniceProperties> getTopicNamesAndConfigsForVersion(int version) {
VeniceProperties properties = new VeniceProperties(props);
String viewName = viewParameters.get(ViewParameters.MATERIALIZED_VIEW_NAME.name());
String viewName = viewParameters.get(MaterializedViewParameters.MATERIALIZED_VIEW_NAME.name());
return Collections.singletonMap(
Version.composeKafkaTopic(storeName, version) + VIEW_TOPIC_SEPARATOR + viewName
+ MATERIALIZED_VIEW_TOPIC_SUFFIX,
properties);
}

/**
* {@link ViewParameters#MATERIALIZED_VIEW_PARTITION_COUNT} is required to configure a new re-partition view.
* {@link ViewParameters#MATERIALIZED_VIEW_PARTITIONER} is optional. The re-partition view will use the store level
* {@link MaterializedViewParameters#MATERIALIZED_VIEW_PARTITION_COUNT} is required to configure a new re-partition view.
* {@link MaterializedViewParameters#MATERIALIZED_VIEW_PARTITIONER} is optional. The re-partition view will use the store level
* partitioner config if it's not specified in the view parameters.
* {@link ViewParameters#MATERIALIZED_VIEW_PARTITIONER_PARAMS} is optional.
* {@link MaterializedViewParameters#MATERIALIZED_VIEW_PARTITIONER_PARAMS} is optional.
*/
@Override
public void validateConfigs(Store store) {
String viewName = viewParameters.get(ViewParameters.MATERIALIZED_VIEW_NAME.name());
String viewName = viewParameters.get(MaterializedViewParameters.MATERIALIZED_VIEW_NAME.name());
if (viewName == null) {
throw new VeniceException(String.format(MISSING_PARAMETER_MESSAGE, ViewParameters.MATERIALIZED_VIEW_NAME.name()));
throw new VeniceException(
String.format(MISSING_PARAMETER_MESSAGE, MaterializedViewParameters.MATERIALIZED_VIEW_NAME.name()));
}
if (store.getViewConfigs().containsKey(viewName)) {
throw new VeniceException("A view config with the same view name already exist, view name: " + viewName);
}
String viewPartitioner = viewParameters.get(ViewParameters.MATERIALIZED_VIEW_PARTITIONER.name());
String viewPartitioner = viewParameters.get(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITIONER.name());
if (viewPartitioner == null) {
throw new VeniceException(
String.format(MISSING_PARAMETER_MESSAGE, ViewParameters.MATERIALIZED_VIEW_PARTITIONER.name()));
String.format(MISSING_PARAMETER_MESSAGE, MaterializedViewParameters.MATERIALIZED_VIEW_PARTITIONER.name()));
}
try {
Class.forName(viewPartitioner);
} catch (ClassNotFoundException e) {
throw new VeniceException("Cannot find partitioner class: " + viewPartitioner);
}
String partitionCountString = viewParameters.get(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name());
String partitionCountString =
viewParameters.get(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name());
if (partitionCountString == null) {
throw new VeniceException(
String.format(MISSING_PARAMETER_MESSAGE, ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name()));
String
.format(MISSING_PARAMETER_MESSAGE, MaterializedViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name()));
}
int viewPartitionCount = Integer.parseInt(partitionCountString);
// A materialized view with the exact same partitioner and partition count as the store is not allwoed
Expand All @@ -101,9 +106,9 @@ public void validateConfigs(Store store) {
ViewConfig viewConfig = viewConfigEntries.getValue();
if (viewConfig.getViewClassName().equals(MaterializedView.class.getCanonicalName())) {
String configPartitioner =
viewConfig.getViewParameters().get(ViewParameters.MATERIALIZED_VIEW_PARTITIONER.name());
int configPartitionCount = Integer
.parseInt(viewConfig.getViewParameters().get(ViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name()));
viewConfig.getViewParameters().get(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITIONER.name());
int configPartitionCount = Integer.parseInt(
viewConfig.getViewParameters().get(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name()));
if (configPartitionCount == viewPartitionCount && configPartitioner.equals(viewPartitioner)) {
throw new VeniceException(
"A view with identical view configs already exist, view name: " + viewConfigEntries.getKey());
Expand Down
Loading

0 comments on commit 37c15cc

Please sign in to comment.