From fd9fa020748521c00ea39a5a17a9796d6d9fd21a Mon Sep 17 00:00:00 2001 From: Kristy Lee <18691669+kristyelee@users.noreply.github.com> Date: Tue, 10 Dec 2024 16:59:25 -0800 Subject: [PATCH] [dvc] Config flag enabled for subscribing on disk partitions automatically. Integration Test [In Writing] --- .../java/com/linkedin/davinci/DaVinciBackend.java | 4 ++-- .../davinci/client/AvroGenericDaVinciClient.java | 4 ++-- .../main/java/com/linkedin/venice/ConfigKeys.java | 5 +++-- .../venice/endToEnd/DaVinciClientTest.java | 15 +++++---------- 4 files changed, 12 insertions(+), 16 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index dd42cb79bf..24f41b3bec 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -1,6 +1,6 @@ package com.linkedin.davinci; -import static com.linkedin.venice.ConfigKeys.DA_VINCI_BOOTSTRAP_SUBSCRIPTION_DISABLED; +import static com.linkedin.venice.ConfigKeys.DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY; import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_INSTANCE_NAME_SUFFIX; import static com.linkedin.venice.ConfigKeys.VALIDATE_VENICE_INTERNAL_SCHEMA_VERSION; import static com.linkedin.venice.pushmonitor.ExecutionStatus.DVC_INGESTION_ERROR_DISK_FULL; @@ -468,7 +468,7 @@ private synchronized void bootstrap() { configLoader.getVeniceServerConfig()); ingestionBackend.addIngestionNotifier(ingestionListener); - if (!configLoader.getCombinedProperties().getBoolean(DA_VINCI_BOOTSTRAP_SUBSCRIPTION_DISABLED, false)) { + if (configLoader.getCombinedProperties().getBoolean(DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY, true)) { // Subscribe all bootstrap version partitions. storeNameToBootstrapVersionMap.forEach((storeName, version) -> { List partitions = storeNameToPartitionListMap.get(storeName); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java index 5b5389aba0..2a900a2cc4 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java @@ -10,7 +10,7 @@ import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_LEVEL0_STOPS_WRITES_TRIGGER_WRITE_ONLY_VERSION; import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED; import static com.linkedin.venice.ConfigKeys.CLUSTER_NAME; -import static com.linkedin.venice.ConfigKeys.DA_VINCI_BOOTSTRAP_SUBSCRIPTION_DISABLED; +import static com.linkedin.venice.ConfigKeys.DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY; import static com.linkedin.venice.ConfigKeys.INGESTION_MEMORY_LIMIT; import static com.linkedin.venice.ConfigKeys.INGESTION_USE_DA_VINCI_CLIENT; import static com.linkedin.venice.ConfigKeys.KAFKA_ADMIN_CLASS; @@ -711,7 +711,7 @@ private VeniceConfigLoader buildVeniceConfig() { .put(CLUSTER_NAME, clusterName) .put(ZOOKEEPER_ADDRESS, zkAddress) .put(KAFKA_BOOTSTRAP_SERVERS, kafkaBootstrapServers) - .put(DA_VINCI_BOOTSTRAP_SUBSCRIPTION_DISABLED, false) + .put(DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY, true) .put(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, daVinciConfig.getStorageClass() == StorageClass.MEMORY_BACKED_BY_DISK) .put(INGESTION_USE_DA_VINCI_CLIENT, true) .put( diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java index f83835b9a6..ffa67980c2 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java @@ -97,8 +97,9 @@ private ConfigKeys() { public static final String DA_VINCI_CURRENT_VERSION_BOOTSTRAPPING_QUOTA_BYTES_PER_SECOND = "da.vinci.current.version.bootstrapping.quota.bytes.per.second"; - // Prevent automatic subscription of partitions while bootstrapping. - public static final String DA_VINCI_BOOTSTRAP_SUBSCRIPTION_DISABLED = "da.vinci.bootstrap.subscription.disabled"; + // On Da Vinci Client, control over automatic partition subscription. + public static final String DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY = + "da.vinci.subscribe.on.disk.partitions.automatically"; // Unordered throttlers aren't compatible with Shared Kafka Consumer and have no effect when Shared Consumer is used. public static final String KAFKA_FETCH_QUOTA_UNORDERED_BYTES_PER_SECOND = diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java index 22436309a6..1541f1a24b 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java @@ -46,9 +46,7 @@ import com.linkedin.d2.balancer.D2Client; import com.linkedin.d2.balancer.D2ClientBuilder; -import com.linkedin.davinci.DaVinciBackend; import com.linkedin.davinci.DaVinciUserApp; -import com.linkedin.davinci.StoreBackend; import com.linkedin.davinci.client.AvroGenericDaVinciClient; import com.linkedin.davinci.client.DaVinciClient; import com.linkedin.davinci.client.DaVinciConfig; @@ -85,7 +83,6 @@ import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer; import com.linkedin.venice.store.rocksdb.RocksDBUtils; -import com.linkedin.venice.utils.ComplementSet; import com.linkedin.venice.utils.DataProviderUtils; import com.linkedin.venice.utils.ForkedJavaProcess; import com.linkedin.venice.utils.IntegrationTestPushUtils; @@ -1302,7 +1299,6 @@ public void testBootstrapSubscription(DaVinciConfig clientConfig) throws Excepti MetricsRepository metricsRepository = new MetricsRepository(); // Test multiple clients sharing the same ClientConfig/MetricsRepository & base data path - StoreBackend storeBackend; try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory( d2Client, VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME, @@ -1358,16 +1354,15 @@ public void testBootstrapSubscription(DaVinciConfig clientConfig) throws Excepti Optional.of(Collections.singleton(storeName1)))) { assertNotEquals(FileUtils.sizeOfDirectory(new File(baseDataPath)), 0); - DaVinciBackend daVinciBackend = AvroGenericDaVinciClient.getBackend(); - storeBackend = daVinciBackend.getStoreOrThrow(storeName1); - List partitions = new ArrayList<>(); + DaVinciClient client1 = factory.getAndStartGenericAvroClient(storeName1, clientConfig); + + Set partitions = new HashSet<>(); for (int i = 0; i < 5; ++i) { partitions.add(i); } - ComplementSet subscription = ComplementSet.newSet(partitions); - storeBackend.subscribe(subscription); - DaVinciClient client1 = factory.getAndStartGenericAvroClient(storeName1, clientConfig); + client1.subscribe(partitions); + client1.subscribeAll().get(); client1.unsubscribeAll(); TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, true, () -> {