From d282196046471eadc64d63a766dfe87d0246154a Mon Sep 17 00:00:00 2001 From: Kristy Lee <18691669+kristyelee@users.noreply.github.com> Date: Wed, 20 Nov 2024 15:15:12 -0800 Subject: [PATCH 01/29] Initial commit --- .../src/main/java/com/linkedin/davinci/DaVinciBackend.java | 1 + 1 file changed, 1 insertion(+) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index 141af20a368..064b9121e09 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -207,6 +207,7 @@ public DaVinciBackend( String pid = Utils.getPid(); String instanceSuffix = configLoader.getCombinedProperties().getString(PUSH_STATUS_INSTANCE_NAME_SUFFIX, (pid == null ? "NA" : pid)); + // Current instance name. String instanceName = Utils.getHostName() + "_" + instanceSuffix; // Fetch latest update schema's protocol ID for Push Status Store from Router. From ec82b2c4502c7653be369b5b9c6034229ec075e5 Mon Sep 17 00:00:00 2001 From: Kristy Lee <18691669+kristyelee@users.noreply.github.com> Date: Mon, 25 Nov 2024 10:41:11 -0800 Subject: [PATCH 02/29] Test Commit --- .../src/main/java/com/linkedin/davinci/StoreBackend.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java index d2b9ecbf92e..01c61d7fc82 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java @@ -144,6 +144,10 @@ synchronized CompletableFuture subscribe( }), stats)); } else if (bootstrapVersion.isPresent()) { + if (!subscription.isEmpty()) { + subscription.removeAll(subscription); + subscription.addAll(partitions); + } throw new VeniceException( "Bootstrap version is already selected, storeName=" + storeName + ", currentVersion=" + daVinciCurrentVersion + ", desiredVersion=" + bootstrapVersion.get().kafkaTopicName()); From 5fa6390ff6283316d8e4119803711126d6b69fe8 Mon Sep 17 00:00:00 2001 From: Kristy Lee <18691669+kristyelee@users.noreply.github.com> Date: Mon, 25 Nov 2024 11:05:04 -0800 Subject: [PATCH 03/29] Test Commit --- .../src/main/java/com/linkedin/davinci/StoreBackend.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java index 01c61d7fc82..127a2d396e0 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java @@ -145,7 +145,7 @@ synchronized CompletableFuture subscribe( } else if (bootstrapVersion.isPresent()) { if (!subscription.isEmpty()) { - subscription.removeAll(subscription); + unsubscribe(subscription); subscription.addAll(partitions); } throw new VeniceException( From 6601afd4421eccbff795c385a3f1350942c786f5 Mon Sep 17 00:00:00 2001 From: Kristy Lee <18691669+kristyelee@users.noreply.github.com> Date: Mon, 25 Nov 2024 16:10:20 -0800 Subject: [PATCH 04/29] Test unsubscription of partitions --- .../src/main/java/com/linkedin/davinci/StoreBackend.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java index 127a2d396e0..975d212a1ab 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java @@ -144,10 +144,6 @@ synchronized CompletableFuture subscribe( }), stats)); } else if (bootstrapVersion.isPresent()) { - if (!subscription.isEmpty()) { - unsubscribe(subscription); - subscription.addAll(partitions); - } throw new VeniceException( "Bootstrap version is already selected, storeName=" + storeName + ", currentVersion=" + daVinciCurrentVersion + ", desiredVersion=" + bootstrapVersion.get().kafkaTopicName()); @@ -158,6 +154,11 @@ synchronized CompletableFuture subscribe( // Recreate store config that was potentially deleted by unsubscribe. config.store(); } + + if (!subscription.isEmpty()) { + unsubscribe(subscription); + } + subscription.addAll(partitions); if (daVinciFutureVersion == null) { From 5b18fe97189c0b60634e6b3181427f52b1d1d0cc Mon Sep 17 00:00:00 2001 From: Kristy Lee <18691669+kristyelee@users.noreply.github.com> Date: Mon, 25 Nov 2024 16:42:23 -0800 Subject: [PATCH 05/29] Rewrite unsubscription of partitions --- .../src/main/java/com/linkedin/davinci/StoreBackend.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java index 975d212a1ab..4f09f218265 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java @@ -156,7 +156,7 @@ synchronized CompletableFuture subscribe( } if (!subscription.isEmpty()) { - unsubscribe(subscription); + subscription.removeAll(subscription); } subscription.addAll(partitions); From ef49d50b627ace0d4a277cbf3b5719896ca54d87 Mon Sep 17 00:00:00 2001 From: Kristy Lee <18691669+kristyelee@users.noreply.github.com> Date: Mon, 25 Nov 2024 22:39:40 -0800 Subject: [PATCH 06/29] Modify Partition Removal --- .../src/main/java/com/linkedin/davinci/StoreBackend.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java index 4f09f218265..03f635233de 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java @@ -155,11 +155,12 @@ synchronized CompletableFuture subscribe( config.store(); } - if (!subscription.isEmpty()) { - subscription.removeAll(subscription); - } + ComplementSet unassignedPartitionSet = ComplementSet.newSet(subscription); + // Set of partitions that are not included in the submitted set of partitions. + unassignedPartitionSet.removeAll(partitions); subscription.addAll(partitions); + unsubscribe(unassignedPartitionSet); if (daVinciFutureVersion == null) { trySubscribeDaVinciFutureVersion(); From 0059955124c17205d8b633f138d9b0f8ec50d581 Mon Sep 17 00:00:00 2001 From: Kristy Lee <18691669+kristyelee@users.noreply.github.com> Date: Wed, 27 Nov 2024 15:14:42 -0800 Subject: [PATCH 07/29] Partition adjustment in bootstrapping [DaVinciBackend] --- .../main/java/com/linkedin/davinci/DaVinciBackend.java | 5 +++++ .../src/main/java/com/linkedin/davinci/StoreBackend.java | 9 ++++----- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index 63ec5a0b103..bb25e86d3f4 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -475,6 +475,11 @@ private synchronized void bootstrap() { AbstractStorageEngine storageEngine = storageService.getStorageEngine(versionTopic); aggVersionedStorageEngineStats.setStorageEngine(versionTopic, storageEngine); StoreBackend storeBackend = getStoreOrThrow(storeName); + ComplementSet subscription = storeBackend.getSubscription(); + ComplementSet unassignedPartitionSet = ComplementSet.newSet(subscription); + // Set of partitions that are not included in the submitted set of partitions. + unassignedPartitionSet.removeAll(ComplementSet.newSet(partitions)); + subscription.removeAll(unassignedPartitionSet); storeBackend.subscribe(ComplementSet.newSet(partitions), Optional.of(version)); }); } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java index 03f635233de..f62c65fedc4 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java @@ -108,6 +108,10 @@ public StoreBackendStats getStats() { return stats; } + public ComplementSet getSubscription() { + return subscription; + } + public ReferenceCounted getDaVinciCurrentVersion() { return daVinciCurrentVersionRef.get(); } @@ -155,12 +159,7 @@ synchronized CompletableFuture subscribe( config.store(); } - ComplementSet unassignedPartitionSet = ComplementSet.newSet(subscription); - // Set of partitions that are not included in the submitted set of partitions. - unassignedPartitionSet.removeAll(partitions); - subscription.addAll(partitions); - unsubscribe(unassignedPartitionSet); if (daVinciFutureVersion == null) { trySubscribeDaVinciFutureVersion(); From 1fa3ebaab9d9f4493df33faa398e25d86e6368d1 Mon Sep 17 00:00:00 2001 From: Kristy Lee <18691669+kristyelee@users.noreply.github.com> Date: Mon, 2 Dec 2024 10:08:45 -0800 Subject: [PATCH 08/29] [dvc] Partition Difference Calculation and Update to Partition Subscription --- .../main/java/com/linkedin/davinci/DaVinciBackend.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index bb25e86d3f4..1f0dcea03fb 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -475,11 +475,11 @@ private synchronized void bootstrap() { AbstractStorageEngine storageEngine = storageService.getStorageEngine(versionTopic); aggVersionedStorageEngineStats.setStorageEngine(versionTopic, storageEngine); StoreBackend storeBackend = getStoreOrThrow(storeName); - ComplementSet subscription = storeBackend.getSubscription(); - ComplementSet unassignedPartitionSet = ComplementSet.newSet(subscription); + ComplementSet persistedPartitionIds = ComplementSet.newSet(storageEngine.getPersistedPartitionIds()); + ComplementSet unassignedPartitionSet = ComplementSet.newSet(storeBackend.getSubscription()); // Set of partitions that are not included in the submitted set of partitions. - unassignedPartitionSet.removeAll(ComplementSet.newSet(partitions)); - subscription.removeAll(unassignedPartitionSet); + unassignedPartitionSet.removeAll(persistedPartitionIds); + storeBackend.unsubscribe(unassignedPartitionSet); storeBackend.subscribe(ComplementSet.newSet(partitions), Optional.of(version)); }); } From 321fb40ff92417f3bf93fa352cbe8d3b99508e52 Mon Sep 17 00:00:00 2001 From: Kristy Lee <18691669+kristyelee@users.noreply.github.com> Date: Tue, 3 Dec 2024 17:25:48 -0800 Subject: [PATCH 09/29] Integration Test [In Writing] --- .../venice/endToEnd/DaVinciClientTest.java | 29 +++++++++++++------ 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java index 5714633ec95..6f8171d1696 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java @@ -83,15 +83,7 @@ import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer; import com.linkedin.venice.store.rocksdb.RocksDBUtils; -import com.linkedin.venice.utils.DataProviderUtils; -import com.linkedin.venice.utils.ForkedJavaProcess; -import com.linkedin.venice.utils.IntegrationTestPushUtils; -import com.linkedin.venice.utils.Pair; -import com.linkedin.venice.utils.PropertyBuilder; -import com.linkedin.venice.utils.TestUtils; -import com.linkedin.venice.utils.TestWriteUtils; -import com.linkedin.venice.utils.Utils; -import com.linkedin.venice.utils.VeniceProperties; +import com.linkedin.venice.utils.*; import com.linkedin.venice.utils.metrics.MetricsRepositoryUtils; import com.linkedin.venice.writer.VeniceWriter; import com.linkedin.venice.writer.VeniceWriterFactory; @@ -1283,6 +1275,25 @@ public void testBootstrap(DaVinciConfig daVinciConfig) throws Exception { } } + @Test(timeOut = TEST_TIMEOUT, dataProvider = "dv-client-config-provider", dataProviderClass = DataProviderUtils.class) + public void testBootstrapSubscription(DaVinciConfig daVinciConfig) throws Exception { + String storeName = createStoreWithMetaSystemStoreAndPushStatusSystemStore(KEY_COUNT); + + VeniceProperties backendConfig = new PropertyBuilder().build(); + + Set keySet = new HashSet<>(); + for (int i = 0; i < KEY_COUNT; ++i) { + keySet.add(i); + } + + try (DaVinciClient client = ServiceFactory + .getGenericAvroDaVinciClient(storeName, cluster.getZk().getAddress(), daVinciConfig, backendConfig)) { + // We only subscribe to 1/3 of the partitions so some data will not be present locally. + client.subscribe(Collections.singleton(0)).get(); + assertThrows(() -> client.batchGet(keySet).get()); + } + } + @Test(timeOut = TEST_TIMEOUT, dataProvider = "dv-client-config-provider", dataProviderClass = DataProviderUtils.class) public void testPartialSubscription(DaVinciConfig daVinciConfig) throws Exception { String storeName = createStoreWithMetaSystemStoreAndPushStatusSystemStore(KEY_COUNT); From 52a557c706f7fdc9167c0f156fcf2e8d186fced0 Mon Sep 17 00:00:00 2001 From: Kristy Lee <18691669+kristyelee@users.noreply.github.com> Date: Wed, 4 Dec 2024 17:06:35 -0800 Subject: [PATCH 10/29] Partition Removal --- .../src/main/java/com/linkedin/davinci/DaVinciBackend.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index 1f0dcea03fb..6266b9efc69 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -475,10 +475,9 @@ private synchronized void bootstrap() { AbstractStorageEngine storageEngine = storageService.getStorageEngine(versionTopic); aggVersionedStorageEngineStats.setStorageEngine(versionTopic, storageEngine); StoreBackend storeBackend = getStoreOrThrow(storeName); - ComplementSet persistedPartitionIds = ComplementSet.newSet(storageEngine.getPersistedPartitionIds()); - ComplementSet unassignedPartitionSet = ComplementSet.newSet(storeBackend.getSubscription()); - // Set of partitions that are not included in the submitted set of partitions. - unassignedPartitionSet.removeAll(persistedPartitionIds); + ComplementSet subscription = ComplementSet.newSet(storeBackend.getSubscription()); + ComplementSet unassignedPartitionSet = ComplementSet.newSet(storageEngine.getPersistedPartitionIds()); + unassignedPartitionSet.removeAll(subscription); storeBackend.unsubscribe(unassignedPartitionSet); storeBackend.subscribe(ComplementSet.newSet(partitions), Optional.of(version)); }); From c5c9f1621fa761280cbd0131d869811d06101ec1 Mon Sep 17 00:00:00 2001 From: Kristy Lee <18691669+kristyelee@users.noreply.github.com> Date: Thu, 5 Dec 2024 12:21:55 -0800 Subject: [PATCH 11/29] Integration Test [In Writing] --- .../linkedin/venice/endToEnd/DaVinciClientTest.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java index 6f8171d1696..8b4c1d2fe1f 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java @@ -83,7 +83,15 @@ import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer; import com.linkedin.venice.store.rocksdb.RocksDBUtils; -import com.linkedin.venice.utils.*; +import com.linkedin.venice.utils.DataProviderUtils; +import com.linkedin.venice.utils.ForkedJavaProcess; +import com.linkedin.venice.utils.IntegrationTestPushUtils; +import com.linkedin.venice.utils.Pair; +import com.linkedin.venice.utils.PropertyBuilder; +import com.linkedin.venice.utils.TestUtils; +import com.linkedin.venice.utils.TestWriteUtils; +import com.linkedin.venice.utils.Utils; +import com.linkedin.venice.utils.VeniceProperties; import com.linkedin.venice.utils.metrics.MetricsRepositoryUtils; import com.linkedin.venice.writer.VeniceWriter; import com.linkedin.venice.writer.VeniceWriterFactory; @@ -1288,7 +1296,6 @@ public void testBootstrapSubscription(DaVinciConfig daVinciConfig) throws Except try (DaVinciClient client = ServiceFactory .getGenericAvroDaVinciClient(storeName, cluster.getZk().getAddress(), daVinciConfig, backendConfig)) { - // We only subscribe to 1/3 of the partitions so some data will not be present locally. client.subscribe(Collections.singleton(0)).get(); assertThrows(() -> client.batchGet(keySet).get()); } From b520ab8d2cbb1f2ec302a88da74ff96a5c3d76da Mon Sep 17 00:00:00 2001 From: Kristy Lee <18691669+kristyelee@users.noreply.github.com> Date: Fri, 6 Dec 2024 12:35:46 -0800 Subject: [PATCH 12/29] Integration Test [In Writing] --- .../venice/endToEnd/DaVinciClientTest.java | 111 ++++++++++++++++-- 1 file changed, 101 insertions(+), 10 deletions(-) diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java index 8b4c1d2fe1f..1f797dc8e49 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java @@ -1284,20 +1284,111 @@ public void testBootstrap(DaVinciConfig daVinciConfig) throws Exception { } @Test(timeOut = TEST_TIMEOUT, dataProvider = "dv-client-config-provider", dataProviderClass = DataProviderUtils.class) - public void testBootstrapSubscription(DaVinciConfig daVinciConfig) throws Exception { - String storeName = createStoreWithMetaSystemStoreAndPushStatusSystemStore(KEY_COUNT); + public void testBootstrapSubscription(DaVinciConfig clientConfig) throws Exception { + String storeName1 = createStoreWithMetaSystemStoreAndPushStatusSystemStore(KEY_COUNT); + String storeName2 = createStoreWithMetaSystemStoreAndPushStatusSystemStore(KEY_COUNT); + String storeName3 = createStoreWithMetaSystemStoreAndPushStatusSystemStore(KEY_COUNT); + String baseDataPath = Utils.getTempDataDirectory().getAbsolutePath(); + VeniceProperties backendConfig = new PropertyBuilder().put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true) + .put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1) + .put(DATA_BASE_PATH, baseDataPath) + .put(PERSISTENCE_TYPE, ROCKS_DB) + .put(DA_VINCI_CURRENT_VERSION_BOOTSTRAPPING_SPEEDUP_ENABLED, true) + .put(PUSH_STATUS_STORE_ENABLED, true) + .put(DAVINCI_PUSH_STATUS_CHECK_INTERVAL_IN_MS, 1000) + .build(); - VeniceProperties backendConfig = new PropertyBuilder().build(); + MetricsRepository metricsRepository = new MetricsRepository(); - Set keySet = new HashSet<>(); - for (int i = 0; i < KEY_COUNT; ++i) { - keySet.add(i); + // Test multiple clients sharing the same ClientConfig/MetricsRepository & base data path + try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory( + d2Client, + VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME, + metricsRepository, + backendConfig)) { + DaVinciClient client1 = factory.getAndStartGenericAvroClient(storeName1, clientConfig); + + // Test non-existent key access + client1.subscribeAll().get(); + assertNull(client1.get(KEY_COUNT + 1).get()); + + // Test single-get access + Map keyValueMap = new HashMap<>(); + for (int k = 0; k < KEY_COUNT; ++k) { + assertEquals(client1.get(k).get(), 1); + keyValueMap.put(k, 1); + } + + // Test batch-get access + assertEquals(client1.batchGet(keyValueMap.keySet()).get(), keyValueMap); + + // Test automatic new version ingestion + for (int i = 0; i < 2; ++i) { + // Test per-version partitioning parameters + int partitionCount = i + 1; + String iString = String.valueOf(i); + cluster.useControllerClient(controllerClient -> { + ControllerResponse response = controllerClient.updateStore( + storeName1, + new UpdateStoreQueryParams().setPartitionerClass(ConstantVenicePartitioner.class.getName()) + .setPartitionCount(partitionCount) + .setPartitionerParams( + Collections.singletonMap(ConstantVenicePartitioner.CONSTANT_PARTITION, iString))); + assertFalse(response.isError(), response.getError()); + }); + + Integer expectedValue = cluster.createVersion(storeName1, KEY_COUNT); + TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT, TimeUnit.MILLISECONDS, () -> { + for (int k = 0; k < KEY_COUNT; ++k) { + Object readValue = client1.get(k).get(); + assertEquals(readValue, expectedValue); + } + }); + } + + // Test multiple client ingesting different stores concurrently + DaVinciClient client2 = factory.getAndStartGenericAvroClient(storeName2, clientConfig); + DaVinciClient client3 = factory.getAndStartGenericAvroClient(storeName3, clientConfig); + CompletableFuture.allOf(client2.subscribeAll(), client3.subscribeAll()).get(); + assertEquals(client2.batchGet(keyValueMap.keySet()).get(), keyValueMap); + assertEquals(client3.batchGet(keyValueMap.keySet()).get(), keyValueMap); + + // TODO(jlliu): Re-enable this test-case after fixing store deletion that is flaky due to + // CLIENT_USE_SYSTEM_STORE_REPOSITORY. + // // Test read from a store that is being deleted concurrently + // try (ControllerClient controllerClient = cluster.getControllerClient()) { + // ControllerResponse response = controllerClient.disableAndDeleteStore(storeName2); + // assertFalse(response.isError(), response.getError()); + // TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT, TimeUnit.MILLISECONDS, () -> { + // assertThrows(VeniceClientException.class, () -> client2.get(KEY_COUNT / 3).get()); + // }); + // } + client2.unsubscribeAll(); } - try (DaVinciClient client = ServiceFactory - .getGenericAvroDaVinciClient(storeName, cluster.getZk().getAddress(), daVinciConfig, backendConfig)) { - client.subscribe(Collections.singleton(0)).get(); - assertThrows(() -> client.batchGet(keySet).get()); + // Test bootstrap-time junk removal + cluster.useControllerClient(controllerClient -> { + ControllerResponse response = controllerClient.disableAndDeleteStore(storeName3); + assertFalse(response.isError(), response.getError()); + }); + + // Test managed clients & data cleanup + try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory( + d2Client, + VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME, + new MetricsRepository(), + backendConfig, + Optional.of(Collections.singleton(storeName1)))) { + assertNotEquals(FileUtils.sizeOfDirectory(new File(baseDataPath)), 0); + + DaVinciClient client1 = factory.getAndStartGenericAvroClient(storeName1, clientConfig); + client1.subscribeAll().get(); + client1.unsubscribeAll(); + // client2 was removed explicitly above via disableAndDeleteStore() + // client3 is expected to be removed by the factory during bootstrap + TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, true, () -> { + assertEquals(FileUtils.sizeOfDirectory(new File(baseDataPath)), 0); + }); } } From 262bbc605083a6a9e4fbcad7fae5c2c4d827af4f Mon Sep 17 00:00:00 2001 From: Kristy Lee <18691669+kristyelee@users.noreply.github.com> Date: Mon, 9 Dec 2024 10:32:07 -0800 Subject: [PATCH 13/29] Integration Test [In Writing] --- .../venice/endToEnd/DaVinciClientTest.java | 52 +++++-------------- 1 file changed, 14 insertions(+), 38 deletions(-) diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java index 1f797dc8e49..79e1e05ff10 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java @@ -46,7 +46,9 @@ import com.linkedin.d2.balancer.D2Client; import com.linkedin.d2.balancer.D2ClientBuilder; +import com.linkedin.davinci.DaVinciBackend; import com.linkedin.davinci.DaVinciUserApp; +import com.linkedin.davinci.StoreBackend; import com.linkedin.davinci.client.AvroGenericDaVinciClient; import com.linkedin.davinci.client.DaVinciClient; import com.linkedin.davinci.client.DaVinciConfig; @@ -83,15 +85,7 @@ import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer; import com.linkedin.venice.store.rocksdb.RocksDBUtils; -import com.linkedin.venice.utils.DataProviderUtils; -import com.linkedin.venice.utils.ForkedJavaProcess; -import com.linkedin.venice.utils.IntegrationTestPushUtils; -import com.linkedin.venice.utils.Pair; -import com.linkedin.venice.utils.PropertyBuilder; -import com.linkedin.venice.utils.TestUtils; -import com.linkedin.venice.utils.TestWriteUtils; -import com.linkedin.venice.utils.Utils; -import com.linkedin.venice.utils.VeniceProperties; +import com.linkedin.venice.utils.*; import com.linkedin.venice.utils.metrics.MetricsRepositoryUtils; import com.linkedin.venice.writer.VeniceWriter; import com.linkedin.venice.writer.VeniceWriterFactory; @@ -1286,8 +1280,6 @@ public void testBootstrap(DaVinciConfig daVinciConfig) throws Exception { @Test(timeOut = TEST_TIMEOUT, dataProvider = "dv-client-config-provider", dataProviderClass = DataProviderUtils.class) public void testBootstrapSubscription(DaVinciConfig clientConfig) throws Exception { String storeName1 = createStoreWithMetaSystemStoreAndPushStatusSystemStore(KEY_COUNT); - String storeName2 = createStoreWithMetaSystemStoreAndPushStatusSystemStore(KEY_COUNT); - String storeName3 = createStoreWithMetaSystemStoreAndPushStatusSystemStore(KEY_COUNT); String baseDataPath = Utils.getTempDataDirectory().getAbsolutePath(); VeniceProperties backendConfig = new PropertyBuilder().put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true) .put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1) @@ -1301,6 +1293,7 @@ public void testBootstrapSubscription(DaVinciConfig clientConfig) throws Excepti MetricsRepository metricsRepository = new MetricsRepository(); // Test multiple clients sharing the same ClientConfig/MetricsRepository & base data path + StoreBackend storeBackend; try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory( d2Client, VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME, @@ -1345,33 +1338,8 @@ public void testBootstrapSubscription(DaVinciConfig clientConfig) throws Excepti } }); } - - // Test multiple client ingesting different stores concurrently - DaVinciClient client2 = factory.getAndStartGenericAvroClient(storeName2, clientConfig); - DaVinciClient client3 = factory.getAndStartGenericAvroClient(storeName3, clientConfig); - CompletableFuture.allOf(client2.subscribeAll(), client3.subscribeAll()).get(); - assertEquals(client2.batchGet(keyValueMap.keySet()).get(), keyValueMap); - assertEquals(client3.batchGet(keyValueMap.keySet()).get(), keyValueMap); - - // TODO(jlliu): Re-enable this test-case after fixing store deletion that is flaky due to - // CLIENT_USE_SYSTEM_STORE_REPOSITORY. - // // Test read from a store that is being deleted concurrently - // try (ControllerClient controllerClient = cluster.getControllerClient()) { - // ControllerResponse response = controllerClient.disableAndDeleteStore(storeName2); - // assertFalse(response.isError(), response.getError()); - // TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT, TimeUnit.MILLISECONDS, () -> { - // assertThrows(VeniceClientException.class, () -> client2.get(KEY_COUNT / 3).get()); - // }); - // } - client2.unsubscribeAll(); } - // Test bootstrap-time junk removal - cluster.useControllerClient(controllerClient -> { - ControllerResponse response = controllerClient.disableAndDeleteStore(storeName3); - assertFalse(response.isError(), response.getError()); - }); - // Test managed clients & data cleanup try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory( d2Client, @@ -1381,11 +1349,19 @@ public void testBootstrapSubscription(DaVinciConfig clientConfig) throws Excepti Optional.of(Collections.singleton(storeName1)))) { assertNotEquals(FileUtils.sizeOfDirectory(new File(baseDataPath)), 0); + DaVinciBackend daVinciBackend = AvroGenericDaVinciClient.getBackend(); + storeBackend = daVinciBackend.getStoreOrThrow(storeName1); + List partitions = new ArrayList(); + partitions.add(1); + partitions.add(2); + partitions.add(3); + partitions.add(4); + ComplementSet subscription = ComplementSet.newSet(partitions); + storeBackend.subscribe(subscription); + DaVinciClient client1 = factory.getAndStartGenericAvroClient(storeName1, clientConfig); client1.subscribeAll().get(); client1.unsubscribeAll(); - // client2 was removed explicitly above via disableAndDeleteStore() - // client3 is expected to be removed by the factory during bootstrap TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, true, () -> { assertEquals(FileUtils.sizeOfDirectory(new File(baseDataPath)), 0); }); From 699c200fc48ed5fd5e03e37d12d273dad7f7dc7b Mon Sep 17 00:00:00 2001 From: Kristy Lee <18691669+kristyelee@users.noreply.github.com> Date: Mon, 9 Dec 2024 13:11:28 -0800 Subject: [PATCH 14/29] [dvc] Partition Difference Calculation and Update to Partition Subscription. Enabled config flag over bootstrap subscription --- .../com/linkedin/davinci/DaVinciBackend.java | 33 ++++++++++--------- .../client/AvroGenericDaVinciClient.java | 8 ++--- .../java/com/linkedin/venice/ConfigKeys.java | 3 ++ 3 files changed, 22 insertions(+), 22 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index 6266b9efc69..cb7b9a62295 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -1,7 +1,6 @@ package com.linkedin.davinci; -import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_INSTANCE_NAME_SUFFIX; -import static com.linkedin.venice.ConfigKeys.VALIDATE_VENICE_INTERNAL_SCHEMA_VERSION; +import static com.linkedin.venice.ConfigKeys.*; import static com.linkedin.venice.pushmonitor.ExecutionStatus.DVC_INGESTION_ERROR_DISK_FULL; import static com.linkedin.venice.pushmonitor.ExecutionStatus.DVC_INGESTION_ERROR_MEMORY_LIMIT_REACHED; import static com.linkedin.venice.pushmonitor.ExecutionStatus.DVC_INGESTION_ERROR_OTHER; @@ -467,20 +466,22 @@ private synchronized void bootstrap() { configLoader.getVeniceServerConfig()); ingestionBackend.addIngestionNotifier(ingestionListener); - // Subscribe all bootstrap version partitions. - storeNameToBootstrapVersionMap.forEach((storeName, version) -> { - List partitions = storeNameToPartitionListMap.get(storeName); - String versionTopic = version.kafkaTopicName(); - LOGGER.info("Bootstrapping partitions {} for {}", partitions, versionTopic); - AbstractStorageEngine storageEngine = storageService.getStorageEngine(versionTopic); - aggVersionedStorageEngineStats.setStorageEngine(versionTopic, storageEngine); - StoreBackend storeBackend = getStoreOrThrow(storeName); - ComplementSet subscription = ComplementSet.newSet(storeBackend.getSubscription()); - ComplementSet unassignedPartitionSet = ComplementSet.newSet(storageEngine.getPersistedPartitionIds()); - unassignedPartitionSet.removeAll(subscription); - storeBackend.unsubscribe(unassignedPartitionSet); - storeBackend.subscribe(ComplementSet.newSet(partitions), Optional.of(version)); - }); + if (!configLoader.getCombinedProperties().getBoolean(DA_VINCI_BOOTSTRAP_SUBSCRIPTION_DISABLED, false)) { + // Subscribe all bootstrap version partitions. + storeNameToBootstrapVersionMap.forEach((storeName, version) -> { + List partitions = storeNameToPartitionListMap.get(storeName); + String versionTopic = version.kafkaTopicName(); + LOGGER.info("Bootstrapping partitions {} for {}", partitions, versionTopic); + AbstractStorageEngine storageEngine = storageService.getStorageEngine(versionTopic); + aggVersionedStorageEngineStats.setStorageEngine(versionTopic, storageEngine); + StoreBackend storeBackend = getStoreOrThrow(storeName); + ComplementSet subscription = ComplementSet.newSet(storeBackend.getSubscription()); + ComplementSet unassignedPartitionSet = ComplementSet.newSet(storageEngine.getPersistedPartitionIds()); + unassignedPartitionSet.removeAll(subscription); + storeBackend.unsubscribe(unassignedPartitionSet); + storeBackend.subscribe(ComplementSet.newSet(partitions), Optional.of(version)); + }); + } } @Override diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java index 348b48128f2..efdfa8a0ddf 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java @@ -9,12 +9,7 @@ import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_LEVEL0_STOPS_WRITES_TRIGGER; import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_LEVEL0_STOPS_WRITES_TRIGGER_WRITE_ONLY_VERSION; import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED; -import static com.linkedin.venice.ConfigKeys.CLUSTER_NAME; -import static com.linkedin.venice.ConfigKeys.INGESTION_MEMORY_LIMIT; -import static com.linkedin.venice.ConfigKeys.INGESTION_USE_DA_VINCI_CLIENT; -import static com.linkedin.venice.ConfigKeys.KAFKA_ADMIN_CLASS; -import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS; -import static com.linkedin.venice.ConfigKeys.ZOOKEEPER_ADDRESS; +import static com.linkedin.venice.ConfigKeys.*; import static com.linkedin.venice.client.store.ClientFactory.getTransportClient; import static org.apache.avro.Schema.Type.RECORD; @@ -710,6 +705,7 @@ private VeniceConfigLoader buildVeniceConfig() { .put(CLUSTER_NAME, clusterName) .put(ZOOKEEPER_ADDRESS, zkAddress) .put(KAFKA_BOOTSTRAP_SERVERS, kafkaBootstrapServers) + .put(DA_VINCI_BOOTSTRAP_SUBSCRIPTION_DISABLED, false) .put(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, daVinciConfig.getStorageClass() == StorageClass.MEMORY_BACKED_BY_DISK) .put(INGESTION_USE_DA_VINCI_CLIENT, true) .put( diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java index b6aecf18476..f83835b9a69 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java @@ -97,6 +97,9 @@ private ConfigKeys() { public static final String DA_VINCI_CURRENT_VERSION_BOOTSTRAPPING_QUOTA_BYTES_PER_SECOND = "da.vinci.current.version.bootstrapping.quota.bytes.per.second"; + // Prevent automatic subscription of partitions while bootstrapping. + public static final String DA_VINCI_BOOTSTRAP_SUBSCRIPTION_DISABLED = "da.vinci.bootstrap.subscription.disabled"; + // Unordered throttlers aren't compatible with Shared Kafka Consumer and have no effect when Shared Consumer is used. public static final String KAFKA_FETCH_QUOTA_UNORDERED_BYTES_PER_SECOND = "kafka.fetch.quota.unordered.bytes.per.second"; From 002d55cb1c560bbf9701c3934cb13ba9f0b8e251 Mon Sep 17 00:00:00 2001 From: Kristy Lee <18691669+kristyelee@users.noreply.github.com> Date: Mon, 9 Dec 2024 15:04:13 -0800 Subject: [PATCH 15/29] [dvc] Partition Difference Calculation and Update to Partition Subscription. Enabled config flag over bootstrap subscription --- .../com/linkedin/davinci/DaVinciBackend.java | 4 +++- .../client/AvroGenericDaVinciClient.java | 8 +++++++- .../venice/endToEnd/DaVinciClientTest.java | 20 +++++++++++++------ 3 files changed, 24 insertions(+), 8 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index cb7b9a62295..dd42cb79bf9 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -1,6 +1,8 @@ package com.linkedin.davinci; -import static com.linkedin.venice.ConfigKeys.*; +import static com.linkedin.venice.ConfigKeys.DA_VINCI_BOOTSTRAP_SUBSCRIPTION_DISABLED; +import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_INSTANCE_NAME_SUFFIX; +import static com.linkedin.venice.ConfigKeys.VALIDATE_VENICE_INTERNAL_SCHEMA_VERSION; import static com.linkedin.venice.pushmonitor.ExecutionStatus.DVC_INGESTION_ERROR_DISK_FULL; import static com.linkedin.venice.pushmonitor.ExecutionStatus.DVC_INGESTION_ERROR_MEMORY_LIMIT_REACHED; import static com.linkedin.venice.pushmonitor.ExecutionStatus.DVC_INGESTION_ERROR_OTHER; diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java index efdfa8a0ddf..5b5389aba00 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java @@ -9,7 +9,13 @@ import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_LEVEL0_STOPS_WRITES_TRIGGER; import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_LEVEL0_STOPS_WRITES_TRIGGER_WRITE_ONLY_VERSION; import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED; -import static com.linkedin.venice.ConfigKeys.*; +import static com.linkedin.venice.ConfigKeys.CLUSTER_NAME; +import static com.linkedin.venice.ConfigKeys.DA_VINCI_BOOTSTRAP_SUBSCRIPTION_DISABLED; +import static com.linkedin.venice.ConfigKeys.INGESTION_MEMORY_LIMIT; +import static com.linkedin.venice.ConfigKeys.INGESTION_USE_DA_VINCI_CLIENT; +import static com.linkedin.venice.ConfigKeys.KAFKA_ADMIN_CLASS; +import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS; +import static com.linkedin.venice.ConfigKeys.ZOOKEEPER_ADDRESS; import static com.linkedin.venice.client.store.ClientFactory.getTransportClient; import static org.apache.avro.Schema.Type.RECORD; diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java index 79e1e05ff10..22436309a66 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java @@ -85,7 +85,16 @@ import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer; import com.linkedin.venice.store.rocksdb.RocksDBUtils; -import com.linkedin.venice.utils.*; +import com.linkedin.venice.utils.ComplementSet; +import com.linkedin.venice.utils.DataProviderUtils; +import com.linkedin.venice.utils.ForkedJavaProcess; +import com.linkedin.venice.utils.IntegrationTestPushUtils; +import com.linkedin.venice.utils.Pair; +import com.linkedin.venice.utils.PropertyBuilder; +import com.linkedin.venice.utils.TestUtils; +import com.linkedin.venice.utils.TestWriteUtils; +import com.linkedin.venice.utils.Utils; +import com.linkedin.venice.utils.VeniceProperties; import com.linkedin.venice.utils.metrics.MetricsRepositoryUtils; import com.linkedin.venice.writer.VeniceWriter; import com.linkedin.venice.writer.VeniceWriterFactory; @@ -1351,11 +1360,10 @@ public void testBootstrapSubscription(DaVinciConfig clientConfig) throws Excepti DaVinciBackend daVinciBackend = AvroGenericDaVinciClient.getBackend(); storeBackend = daVinciBackend.getStoreOrThrow(storeName1); - List partitions = new ArrayList(); - partitions.add(1); - partitions.add(2); - partitions.add(3); - partitions.add(4); + List partitions = new ArrayList<>(); + for (int i = 0; i < 5; ++i) { + partitions.add(i); + } ComplementSet subscription = ComplementSet.newSet(partitions); storeBackend.subscribe(subscription); From fd9fa020748521c00ea39a5a17a9796d6d9fd21a Mon Sep 17 00:00:00 2001 From: Kristy Lee <18691669+kristyelee@users.noreply.github.com> Date: Tue, 10 Dec 2024 16:59:25 -0800 Subject: [PATCH 16/29] [dvc] Config flag enabled for subscribing on disk partitions automatically. Integration Test [In Writing] --- .../java/com/linkedin/davinci/DaVinciBackend.java | 4 ++-- .../davinci/client/AvroGenericDaVinciClient.java | 4 ++-- .../main/java/com/linkedin/venice/ConfigKeys.java | 5 +++-- .../venice/endToEnd/DaVinciClientTest.java | 15 +++++---------- 4 files changed, 12 insertions(+), 16 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index dd42cb79bf9..24f41b3bec4 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -1,6 +1,6 @@ package com.linkedin.davinci; -import static com.linkedin.venice.ConfigKeys.DA_VINCI_BOOTSTRAP_SUBSCRIPTION_DISABLED; +import static com.linkedin.venice.ConfigKeys.DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY; import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_INSTANCE_NAME_SUFFIX; import static com.linkedin.venice.ConfigKeys.VALIDATE_VENICE_INTERNAL_SCHEMA_VERSION; import static com.linkedin.venice.pushmonitor.ExecutionStatus.DVC_INGESTION_ERROR_DISK_FULL; @@ -468,7 +468,7 @@ private synchronized void bootstrap() { configLoader.getVeniceServerConfig()); ingestionBackend.addIngestionNotifier(ingestionListener); - if (!configLoader.getCombinedProperties().getBoolean(DA_VINCI_BOOTSTRAP_SUBSCRIPTION_DISABLED, false)) { + if (configLoader.getCombinedProperties().getBoolean(DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY, true)) { // Subscribe all bootstrap version partitions. storeNameToBootstrapVersionMap.forEach((storeName, version) -> { List partitions = storeNameToPartitionListMap.get(storeName); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java index 5b5389aba00..2a900a2cc44 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java @@ -10,7 +10,7 @@ import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_LEVEL0_STOPS_WRITES_TRIGGER_WRITE_ONLY_VERSION; import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED; import static com.linkedin.venice.ConfigKeys.CLUSTER_NAME; -import static com.linkedin.venice.ConfigKeys.DA_VINCI_BOOTSTRAP_SUBSCRIPTION_DISABLED; +import static com.linkedin.venice.ConfigKeys.DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY; import static com.linkedin.venice.ConfigKeys.INGESTION_MEMORY_LIMIT; import static com.linkedin.venice.ConfigKeys.INGESTION_USE_DA_VINCI_CLIENT; import static com.linkedin.venice.ConfigKeys.KAFKA_ADMIN_CLASS; @@ -711,7 +711,7 @@ private VeniceConfigLoader buildVeniceConfig() { .put(CLUSTER_NAME, clusterName) .put(ZOOKEEPER_ADDRESS, zkAddress) .put(KAFKA_BOOTSTRAP_SERVERS, kafkaBootstrapServers) - .put(DA_VINCI_BOOTSTRAP_SUBSCRIPTION_DISABLED, false) + .put(DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY, true) .put(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, daVinciConfig.getStorageClass() == StorageClass.MEMORY_BACKED_BY_DISK) .put(INGESTION_USE_DA_VINCI_CLIENT, true) .put( diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java index f83835b9a69..ffa67980c24 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java @@ -97,8 +97,9 @@ private ConfigKeys() { public static final String DA_VINCI_CURRENT_VERSION_BOOTSTRAPPING_QUOTA_BYTES_PER_SECOND = "da.vinci.current.version.bootstrapping.quota.bytes.per.second"; - // Prevent automatic subscription of partitions while bootstrapping. - public static final String DA_VINCI_BOOTSTRAP_SUBSCRIPTION_DISABLED = "da.vinci.bootstrap.subscription.disabled"; + // On Da Vinci Client, control over automatic partition subscription. + public static final String DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY = + "da.vinci.subscribe.on.disk.partitions.automatically"; // Unordered throttlers aren't compatible with Shared Kafka Consumer and have no effect when Shared Consumer is used. public static final String KAFKA_FETCH_QUOTA_UNORDERED_BYTES_PER_SECOND = diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java index 22436309a66..1541f1a24bc 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java @@ -46,9 +46,7 @@ import com.linkedin.d2.balancer.D2Client; import com.linkedin.d2.balancer.D2ClientBuilder; -import com.linkedin.davinci.DaVinciBackend; import com.linkedin.davinci.DaVinciUserApp; -import com.linkedin.davinci.StoreBackend; import com.linkedin.davinci.client.AvroGenericDaVinciClient; import com.linkedin.davinci.client.DaVinciClient; import com.linkedin.davinci.client.DaVinciConfig; @@ -85,7 +83,6 @@ import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer; import com.linkedin.venice.store.rocksdb.RocksDBUtils; -import com.linkedin.venice.utils.ComplementSet; import com.linkedin.venice.utils.DataProviderUtils; import com.linkedin.venice.utils.ForkedJavaProcess; import com.linkedin.venice.utils.IntegrationTestPushUtils; @@ -1302,7 +1299,6 @@ public void testBootstrapSubscription(DaVinciConfig clientConfig) throws Excepti MetricsRepository metricsRepository = new MetricsRepository(); // Test multiple clients sharing the same ClientConfig/MetricsRepository & base data path - StoreBackend storeBackend; try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory( d2Client, VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME, @@ -1358,16 +1354,15 @@ public void testBootstrapSubscription(DaVinciConfig clientConfig) throws Excepti Optional.of(Collections.singleton(storeName1)))) { assertNotEquals(FileUtils.sizeOfDirectory(new File(baseDataPath)), 0); - DaVinciBackend daVinciBackend = AvroGenericDaVinciClient.getBackend(); - storeBackend = daVinciBackend.getStoreOrThrow(storeName1); - List partitions = new ArrayList<>(); + DaVinciClient client1 = factory.getAndStartGenericAvroClient(storeName1, clientConfig); + + Set partitions = new HashSet<>(); for (int i = 0; i < 5; ++i) { partitions.add(i); } - ComplementSet subscription = ComplementSet.newSet(partitions); - storeBackend.subscribe(subscription); - DaVinciClient client1 = factory.getAndStartGenericAvroClient(storeName1, clientConfig); + client1.subscribe(partitions); + client1.subscribeAll().get(); client1.unsubscribeAll(); TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, true, () -> { From 69d619a22d06531a659551417136581ae22059d5 Mon Sep 17 00:00:00 2001 From: Kristy Lee <18691669+kristyelee@users.noreply.github.com> Date: Wed, 11 Dec 2024 23:38:17 -0800 Subject: [PATCH 17/29] [dvc] Config flag enabled for subscribing on disk partitions automatically. Integration Test [In Writing] --- .../src/main/java/com/linkedin/davinci/DaVinciBackend.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index 24f41b3bec4..27144d6f02f 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -477,10 +477,6 @@ private synchronized void bootstrap() { AbstractStorageEngine storageEngine = storageService.getStorageEngine(versionTopic); aggVersionedStorageEngineStats.setStorageEngine(versionTopic, storageEngine); StoreBackend storeBackend = getStoreOrThrow(storeName); - ComplementSet subscription = ComplementSet.newSet(storeBackend.getSubscription()); - ComplementSet unassignedPartitionSet = ComplementSet.newSet(storageEngine.getPersistedPartitionIds()); - unassignedPartitionSet.removeAll(subscription); - storeBackend.unsubscribe(unassignedPartitionSet); storeBackend.subscribe(ComplementSet.newSet(partitions), Optional.of(version)); }); } From ac3224274b834924a3f569c033b2a44e200452c4 Mon Sep 17 00:00:00 2001 From: Kristy Lee <18691669+kristyelee@users.noreply.github.com> Date: Sun, 15 Dec 2024 21:27:14 -0800 Subject: [PATCH 18/29] [dvc] Config flag enabled for subscribing on disk partitions automatically. Integration Test [In Writing] --- .../com/linkedin/davinci/DaVinciBackend.java | 2 +- .../venice/endToEnd/DaVinciClientTest.java | 45 +++++++------------ 2 files changed, 16 insertions(+), 31 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index 27144d6f02f..8b8eb796a5a 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -575,7 +575,7 @@ public ReadOnlySchemaRepository getSchemaRepository() { return schemaRepository; } - StorageService getStorageService() { + public StorageService getStorageService() { return storageService; } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java index 1541f1a24bc..236aa2e56ad 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java @@ -11,6 +11,7 @@ import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_CHECK_INTERVAL_IN_MS; import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_SCAN_INTERVAL_IN_SECONDS; import static com.linkedin.venice.ConfigKeys.DA_VINCI_CURRENT_VERSION_BOOTSTRAPPING_SPEEDUP_ENABLED; +import static com.linkedin.venice.ConfigKeys.DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY; import static com.linkedin.venice.ConfigKeys.KAFKA_FETCH_QUOTA_RECORDS_PER_SECOND; import static com.linkedin.venice.ConfigKeys.PERSISTENCE_TYPE; import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_STORE_ENABLED; @@ -46,7 +47,9 @@ import com.linkedin.d2.balancer.D2Client; import com.linkedin.d2.balancer.D2ClientBuilder; +import com.linkedin.davinci.DaVinciBackend; import com.linkedin.davinci.DaVinciUserApp; +import com.linkedin.davinci.StoreBackend; import com.linkedin.davinci.client.AvroGenericDaVinciClient; import com.linkedin.davinci.client.DaVinciClient; import com.linkedin.davinci.client.DaVinciConfig; @@ -60,11 +63,7 @@ import com.linkedin.davinci.ingestion.utils.IsolatedIngestionUtils; import com.linkedin.venice.D2.D2ClientUtils; import com.linkedin.venice.compression.CompressionStrategy; -import com.linkedin.venice.controllerapi.ControllerClient; -import com.linkedin.venice.controllerapi.ControllerResponse; -import com.linkedin.venice.controllerapi.NewStoreResponse; -import com.linkedin.venice.controllerapi.UpdateStoreQueryParams; -import com.linkedin.venice.controllerapi.VersionCreationResponse; +import com.linkedin.venice.controllerapi.*; import com.linkedin.venice.exceptions.DiskLimitExhaustedException; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.helix.HelixReadOnlySchemaRepository; @@ -83,15 +82,7 @@ import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer; import com.linkedin.venice.store.rocksdb.RocksDBUtils; -import com.linkedin.venice.utils.DataProviderUtils; -import com.linkedin.venice.utils.ForkedJavaProcess; -import com.linkedin.venice.utils.IntegrationTestPushUtils; -import com.linkedin.venice.utils.Pair; -import com.linkedin.venice.utils.PropertyBuilder; -import com.linkedin.venice.utils.TestUtils; -import com.linkedin.venice.utils.TestWriteUtils; -import com.linkedin.venice.utils.Utils; -import com.linkedin.venice.utils.VeniceProperties; +import com.linkedin.venice.utils.*; import com.linkedin.venice.utils.metrics.MetricsRepositoryUtils; import com.linkedin.venice.writer.VeniceWriter; import com.linkedin.venice.writer.VeniceWriterFactory; @@ -1284,7 +1275,7 @@ public void testBootstrap(DaVinciConfig daVinciConfig) throws Exception { } @Test(timeOut = TEST_TIMEOUT, dataProvider = "dv-client-config-provider", dataProviderClass = DataProviderUtils.class) - public void testBootstrapSubscription(DaVinciConfig clientConfig) throws Exception { + public void testBootstrapSubscription(DaVinciConfig daVinciConfig) throws Exception { String storeName1 = createStoreWithMetaSystemStoreAndPushStatusSystemStore(KEY_COUNT); String baseDataPath = Utils.getTempDataDirectory().getAbsolutePath(); VeniceProperties backendConfig = new PropertyBuilder().put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true) @@ -1294,6 +1285,7 @@ public void testBootstrapSubscription(DaVinciConfig clientConfig) throws Excepti .put(DA_VINCI_CURRENT_VERSION_BOOTSTRAPPING_SPEEDUP_ENABLED, true) .put(PUSH_STATUS_STORE_ENABLED, true) .put(DAVINCI_PUSH_STATUS_CHECK_INTERVAL_IN_MS, 1000) + .put(DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY, false) .build(); MetricsRepository metricsRepository = new MetricsRepository(); @@ -1304,7 +1296,7 @@ public void testBootstrapSubscription(DaVinciConfig clientConfig) throws Excepti VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME, metricsRepository, backendConfig)) { - DaVinciClient client1 = factory.getAndStartGenericAvroClient(storeName1, clientConfig); + DaVinciClient client1 = factory.getAndStartGenericAvroClient(storeName1, daVinciConfig); // Test non-existent key access client1.subscribeAll().get(); @@ -1349,25 +1341,18 @@ public void testBootstrapSubscription(DaVinciConfig clientConfig) throws Excepti try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory( d2Client, VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME, - new MetricsRepository(), + metricsRepository, backendConfig, Optional.of(Collections.singleton(storeName1)))) { - assertNotEquals(FileUtils.sizeOfDirectory(new File(baseDataPath)), 0); - DaVinciClient client1 = factory.getAndStartGenericAvroClient(storeName1, clientConfig); + assertNotEquals(FileUtils.sizeOfDirectory(new File(baseDataPath)), 0); - Set partitions = new HashSet<>(); - for (int i = 0; i < 5; ++i) { - partitions.add(i); + DaVinciBackend daVinciBackend = AvroGenericDaVinciClient.getBackend(); + if (daVinciBackend != null) { + StoreBackend storeBackend = daVinciBackend.getStoreOrThrow(storeName1); + ComplementSet subscription = storeBackend.getSubscription(); + assertTrue(subscription.isEmpty()); } - - client1.subscribe(partitions); - - client1.subscribeAll().get(); - client1.unsubscribeAll(); - TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, true, () -> { - assertEquals(FileUtils.sizeOfDirectory(new File(baseDataPath)), 0); - }); } } From 9554248a2a03f57ddc4625d210015ad7cd6677c1 Mon Sep 17 00:00:00 2001 From: Kristy Lee <18691669+kristyelee@users.noreply.github.com> Date: Sun, 15 Dec 2024 23:00:14 -0800 Subject: [PATCH 19/29] [dvc] Config flag enabled for subscribing on disk partitions automatically. Integration Test [In Writing] --- .../main/java/com/linkedin/davinci/DaVinciBackend.java | 2 +- .../com/linkedin/venice/endToEnd/DaVinciClientTest.java | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index 8b8eb796a5a..27144d6f02f 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -575,7 +575,7 @@ public ReadOnlySchemaRepository getSchemaRepository() { return schemaRepository; } - public StorageService getStorageService() { + StorageService getStorageService() { return storageService; } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java index 236aa2e56ad..e6b420f4f99 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java @@ -63,7 +63,11 @@ import com.linkedin.davinci.ingestion.utils.IsolatedIngestionUtils; import com.linkedin.venice.D2.D2ClientUtils; import com.linkedin.venice.compression.CompressionStrategy; -import com.linkedin.venice.controllerapi.*; +import com.linkedin.venice.controllerapi.ControllerClient; +import com.linkedin.venice.controllerapi.ControllerResponse; +import com.linkedin.venice.controllerapi.NewStoreResponse; +import com.linkedin.venice.controllerapi.UpdateStoreQueryParams; +import com.linkedin.venice.controllerapi.VersionCreationResponse; import com.linkedin.venice.exceptions.DiskLimitExhaustedException; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.helix.HelixReadOnlySchemaRepository; @@ -1345,7 +1349,7 @@ public void testBootstrapSubscription(DaVinciConfig daVinciConfig) throws Except backendConfig, Optional.of(Collections.singleton(storeName1)))) { - assertNotEquals(FileUtils.sizeOfDirectory(new File(baseDataPath)), 0); + DaVinciClient client1 = factory.getAndStartGenericAvroClient(storeName1, daVinciConfig); DaVinciBackend daVinciBackend = AvroGenericDaVinciClient.getBackend(); if (daVinciBackend != null) { From 7b20b287edac2d26587bc745198c39ff2d1f1726 Mon Sep 17 00:00:00 2001 From: Kristy Lee <18691669+kristyelee@users.noreply.github.com> Date: Mon, 16 Dec 2024 12:34:52 -0800 Subject: [PATCH 20/29] [dvc] Config flag enabled for subscribing on disk partitions automatically. Integration Test [In Writing] --- .../com/linkedin/davinci/StoreBackend.java | 1 - .../venice/endToEnd/DaVinciClientTest.java | 22 +++++++++++++++++-- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java index f62c65fedc4..7e32557223c 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java @@ -158,7 +158,6 @@ synchronized CompletableFuture subscribe( // Recreate store config that was potentially deleted by unsubscribe. config.store(); } - subscription.addAll(partitions); if (daVinciFutureVersion == null) { diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java index e6b420f4f99..3556e3f7d9f 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java @@ -86,7 +86,16 @@ import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer; import com.linkedin.venice.store.rocksdb.RocksDBUtils; -import com.linkedin.venice.utils.*; +import com.linkedin.venice.utils.ComplementSet; +import com.linkedin.venice.utils.DataProviderUtils; +import com.linkedin.venice.utils.ForkedJavaProcess; +import com.linkedin.venice.utils.IntegrationTestPushUtils; +import com.linkedin.venice.utils.Pair; +import com.linkedin.venice.utils.PropertyBuilder; +import com.linkedin.venice.utils.TestUtils; +import com.linkedin.venice.utils.TestWriteUtils; +import com.linkedin.venice.utils.Utils; +import com.linkedin.venice.utils.VeniceProperties; import com.linkedin.venice.utils.metrics.MetricsRepositoryUtils; import com.linkedin.venice.writer.VeniceWriter; import com.linkedin.venice.writer.VeniceWriterFactory; @@ -1341,7 +1350,7 @@ public void testBootstrapSubscription(DaVinciConfig daVinciConfig) throws Except } } - // Test managed clients & data cleanup + // Test managed clients try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory( d2Client, VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME, @@ -1351,6 +1360,15 @@ public void testBootstrapSubscription(DaVinciConfig daVinciConfig) throws Except DaVinciClient client1 = factory.getAndStartGenericAvroClient(storeName1, daVinciConfig); + Set partitions = new HashSet<>(); + + for (int i = 0; i < 5; i++) { + partitions.add(i); + } + + client1.subscribe(partitions); + client1.unsubscribeAll(); + DaVinciBackend daVinciBackend = AvroGenericDaVinciClient.getBackend(); if (daVinciBackend != null) { StoreBackend storeBackend = daVinciBackend.getStoreOrThrow(storeName1); From 9bfe418d48060cc32797dd21266227e9859a35c1 Mon Sep 17 00:00:00 2001 From: Kristy Lee <18691669+kristyelee@users.noreply.github.com> Date: Wed, 8 Jan 2025 16:25:54 -0800 Subject: [PATCH 21/29] [dvc] Config flag enabled for subscribing on disk partition automatically. Integration Test [In Writing] --- .../venice/endToEnd/DaVinciClientTest.java | 27 ++----------------- 1 file changed, 2 insertions(+), 25 deletions(-) diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java index 3556e3f7d9f..7a54b45dd05 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java @@ -1324,30 +1324,6 @@ public void testBootstrapSubscription(DaVinciConfig daVinciConfig) throws Except // Test batch-get access assertEquals(client1.batchGet(keyValueMap.keySet()).get(), keyValueMap); - - // Test automatic new version ingestion - for (int i = 0; i < 2; ++i) { - // Test per-version partitioning parameters - int partitionCount = i + 1; - String iString = String.valueOf(i); - cluster.useControllerClient(controllerClient -> { - ControllerResponse response = controllerClient.updateStore( - storeName1, - new UpdateStoreQueryParams().setPartitionerClass(ConstantVenicePartitioner.class.getName()) - .setPartitionCount(partitionCount) - .setPartitionerParams( - Collections.singletonMap(ConstantVenicePartitioner.CONSTANT_PARTITION, iString))); - assertFalse(response.isError(), response.getError()); - }); - - Integer expectedValue = cluster.createVersion(storeName1, KEY_COUNT); - TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT, TimeUnit.MILLISECONDS, () -> { - for (int k = 0; k < KEY_COUNT; ++k) { - Object readValue = client1.get(k).get(); - assertEquals(readValue, expectedValue); - } - }); - } } // Test managed clients @@ -1362,12 +1338,13 @@ public void testBootstrapSubscription(DaVinciConfig daVinciConfig) throws Except Set partitions = new HashSet<>(); - for (int i = 0; i < 5; i++) { + for (int i = 0; i < 2; i++) { partitions.add(i); } client1.subscribe(partitions); client1.unsubscribeAll(); + assertEquals(client1.getPartitionCount(), 2); DaVinciBackend daVinciBackend = AvroGenericDaVinciClient.getBackend(); if (daVinciBackend != null) { From 7ba15f024b18f4748b2470a934cee38280d9f3df Mon Sep 17 00:00:00 2001 From: Kristy Lee <18691669+kristyelee@users.noreply.github.com> Date: Sun, 12 Jan 2025 12:29:09 -0800 Subject: [PATCH 22/29] [dvc] Config flag enabled for subscribing on disk partition automatically. Integration Test [In Writing] --- .../java/com/linkedin/venice/endToEnd/DaVinciClientTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java index 7a54b45dd05..11489663068 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java @@ -1343,13 +1343,13 @@ public void testBootstrapSubscription(DaVinciConfig daVinciConfig) throws Except } client1.subscribe(partitions); - client1.unsubscribeAll(); - assertEquals(client1.getPartitionCount(), 2); + assertEquals(client1.getPartitionCount(), 3); DaVinciBackend daVinciBackend = AvroGenericDaVinciClient.getBackend(); if (daVinciBackend != null) { StoreBackend storeBackend = daVinciBackend.getStoreOrThrow(storeName1); ComplementSet subscription = storeBackend.getSubscription(); + subscription.removeAll(ComplementSet.wrap(partitions)); assertTrue(subscription.isEmpty()); } } From a0ba24b5f5daacc8ed0c875e86413e4e29c5e4f7 Mon Sep 17 00:00:00 2001 From: Kristy Lee <18691669+kristyelee@users.noreply.github.com> Date: Mon, 13 Jan 2025 09:56:04 -0800 Subject: [PATCH 23/29] [dvc] Config flag enabled for subscribing on disk partitions automatically. Include integration test for testing config flag and subscription set. --- .../java/com/linkedin/venice/endToEnd/DaVinciClientTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java index 36bf041a7f8..77389a90c29 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java @@ -1055,8 +1055,9 @@ public void testBootstrapSubscription(DaVinciConfig daVinciConfig) throws Except if (daVinciBackend != null) { StoreBackend storeBackend = daVinciBackend.getStoreOrThrow(storeName1); ComplementSet subscription = storeBackend.getSubscription(); - subscription.removeAll(ComplementSet.wrap(partitions)); - assertTrue(subscription.isEmpty()); + assertTrue(subscription.contains(0)); + assertTrue(subscription.contains(1)); + assertFalse(subscription.contains(2)); } } } From 6f34d1bf3b5954dbc03aac40c4a0ff4ef1d072e0 Mon Sep 17 00:00:00 2001 From: Kristy Lee <18691669+kristyelee@users.noreply.github.com> Date: Sun, 26 Jan 2025 22:10:25 -0800 Subject: [PATCH 24/29] [dvc] Config flag enabled for subscribing on disk partitions automatically. Included integration test and writing of unit test. --- .../com/linkedin/davinci/DaVinciBackend.java | 2 +- .../linkedin/davinci/DaVinciBackendTest.java | 75 +++++++++++++++++++ 2 files changed, 76 insertions(+), 1 deletion(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index 3cdaf661a6b..4e1c8c72f11 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -399,7 +399,7 @@ private Function functionToCheckWhetherStorageEngineShouldBeKep }; } - private synchronized void bootstrap() { + protected synchronized void bootstrap() { List storageEngines = storageService.getStorageEngineRepository().getAllLocalStorageEngines(); LOGGER.info("Starting bootstrap, storageEngines: {}", storageEngines); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java index 2bab2878d87..1ce1d1877e4 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java @@ -4,6 +4,8 @@ import static com.linkedin.venice.pushmonitor.ExecutionStatus.ERROR; import static com.linkedin.venice.utils.DataProviderUtils.BOOLEAN; import static com.linkedin.venice.utils.DataProviderUtils.allPermutationGenerator; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -11,10 +13,22 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.fail; +import com.linkedin.davinci.config.VeniceConfigLoader; +import com.linkedin.davinci.stats.AggVersionedStorageEngineStats; +import com.linkedin.davinci.storage.StorageEngineRepository; +import com.linkedin.davinci.storage.StorageService; +import com.linkedin.davinci.store.AbstractStorageEngine; import com.linkedin.venice.exceptions.DiskLimitExhaustedException; import com.linkedin.venice.exceptions.MemoryLimitExhaustedException; import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.meta.Store; +import com.linkedin.venice.meta.SubscriptionBasedReadOnlyStoreRepository; +import com.linkedin.venice.meta.Version; import com.linkedin.venice.pushmonitor.ExecutionStatus; +import com.linkedin.venice.utils.VeniceProperties; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -139,4 +153,65 @@ public void testBootstrappingAwareCompletableFuture() verify(backend, times(2)).hasCurrentVersionBootstrapping(); } + @Test + public void testBootstrappingSubscription() throws NoSuchFieldException, IllegalAccessException { + DaVinciBackend backend = mock(DaVinciBackend.class); + StorageService mockStorageService = mock(StorageService.class); + + Field storageServiceField = DaVinciBackend.class.getDeclaredField("storageService"); + storageServiceField.setAccessible(true); + storageServiceField.set(backend, mockStorageService); + + StorageEngineRepository mockStorageEngineRepository = mock(StorageEngineRepository.class); + AbstractStorageEngine abstractStorageEngine = mock(AbstractStorageEngine.class); + mockStorageEngineRepository.addLocalStorageEngine(abstractStorageEngine); + String resourceName = "test_store_v1"; + when(abstractStorageEngine.getStoreVersionName()).thenReturn(resourceName); + + abstractStorageEngine.addStoragePartition(0); + abstractStorageEngine.addStoragePartition(1); + + List localStorageEngines = new ArrayList<>(); + localStorageEngines.add(abstractStorageEngine); + + when(mockStorageService.getStorageEngineRepository()).thenReturn(mockStorageEngineRepository); + when(mockStorageService.getStorageEngine(resourceName)).thenReturn(abstractStorageEngine); + when(mockStorageEngineRepository.getAllLocalStorageEngines()).thenReturn(localStorageEngines); + when(backend.isIsolatedIngestion()).thenReturn(false); + + List userPartitionList = new ArrayList<>(); + userPartitionList.add(0); + userPartitionList.add(1); + userPartitionList.add(2); + when(mockStorageService.getUserPartitions(anyString())).thenReturn(userPartitionList); + + StoreBackend mockStoreBackend = mock(StoreBackend.class); + when(backend.getStoreOrThrow(anyString())).thenReturn(mockStoreBackend); + + Version mockVersion = mock(Version.class); + Store mockStore = mock(Store.class); + SubscriptionBasedReadOnlyStoreRepository mockStoreRepository = mock(SubscriptionBasedReadOnlyStoreRepository.class); + Field storeRepositoryField = DaVinciBackend.class.getDeclaredField("storeRepository"); + storeRepositoryField.setAccessible(true); + storeRepositoryField.set(backend, mockStoreRepository); + when(mockStoreRepository.getStoreOrThrow(anyString())).thenReturn(mockStore); + when(mockStore.getVersion(anyInt())).thenReturn(mockVersion); + + VeniceConfigLoader mockConfigLoader = mock(VeniceConfigLoader.class); + Field configLoaderField = DaVinciBackend.class.getDeclaredField("configLoader"); + configLoaderField.setAccessible(true); + configLoaderField.set(backend, mockConfigLoader); + VeniceProperties mockCombinedProperties = mock(VeniceProperties.class); + when(mockConfigLoader.getCombinedProperties()).thenReturn(mockCombinedProperties); + + AggVersionedStorageEngineStats mockAggVersionedStorageEngineStats = mock(AggVersionedStorageEngineStats.class); + Field aggVersionedStorageEngineStatsField = DaVinciBackend.class.getDeclaredField("aggVersionedStorageEngineStats"); + aggVersionedStorageEngineStatsField.setAccessible(true); + aggVersionedStorageEngineStatsField.set(backend, mockAggVersionedStorageEngineStats); + + // DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY == false + when(mockCombinedProperties.getBoolean(anyString(), anyBoolean())).thenReturn(false); + doCallRealMethod().when(backend).bootstrap(); + backend.bootstrap(); + } } From 15f2fa1714fda44b07abd8d1e6a43c48c55d85ec Mon Sep 17 00:00:00 2001 From: Kristy Lee <18691669+kristyelee@users.noreply.github.com> Date: Mon, 27 Jan 2025 09:40:31 -0800 Subject: [PATCH 25/29] Unit test --- .../linkedin/davinci/DaVinciBackendTest.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java index 1ce1d1877e4..334fbd1a7a1 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java @@ -6,6 +6,7 @@ import static com.linkedin.venice.utils.DataProviderUtils.allPermutationGenerator; import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -14,6 +15,9 @@ import static org.testng.Assert.fail; import com.linkedin.davinci.config.VeniceConfigLoader; +import com.linkedin.davinci.ingestion.DefaultIngestionBackend; +import com.linkedin.davinci.kafka.consumer.KafkaStoreIngestionService; +import com.linkedin.davinci.notifier.VeniceNotifier; import com.linkedin.davinci.stats.AggVersionedStorageEngineStats; import com.linkedin.davinci.storage.StorageEngineRepository; import com.linkedin.davinci.storage.StorageService; @@ -209,6 +213,20 @@ public void testBootstrappingSubscription() throws NoSuchFieldException, Illegal aggVersionedStorageEngineStatsField.setAccessible(true); aggVersionedStorageEngineStatsField.set(backend, mockAggVersionedStorageEngineStats); + DefaultIngestionBackend ingestionBackend = mock(DefaultIngestionBackend.class); + Field ingestionBackendField = DaVinciBackend.class.getDeclaredField("ingestionBackend"); + ingestionBackendField.setAccessible(true); + ingestionBackendField.set(backend, ingestionBackend); + VeniceNotifier ingestionListener = mock(VeniceNotifier.class); + Field ingestionListenerField = DaVinciBackend.class.getDeclaredField("ingestionListener"); + ingestionListenerField.setAccessible(true); + ingestionListenerField.set(backend, ingestionListener); + KafkaStoreIngestionService storeIngestionService = mock(KafkaStoreIngestionService.class); + Field storeIngestionServiceField = DefaultIngestionBackend.class.getDeclaredField("storeIngestionService"); + storeIngestionServiceField.setAccessible(true); + storeIngestionServiceField.set(ingestionBackend, storeIngestionService); + doNothing().when(ingestionBackend).addIngestionNotifier(any()); + // DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY == false when(mockCombinedProperties.getBoolean(anyString(), anyBoolean())).thenReturn(false); doCallRealMethod().when(backend).bootstrap(); From 4cb909f2aa41ccb38c6a5ce15a6796857375240f Mon Sep 17 00:00:00 2001 From: Kristy Lee <18691669+kristyelee@users.noreply.github.com> Date: Tue, 28 Jan 2025 17:32:52 -0800 Subject: [PATCH 26/29] Unit test --- .../java/com/linkedin/davinci/DaVinciBackendTest.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java index 334fbd1a7a1..98ac8271d81 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java @@ -4,7 +4,10 @@ import static com.linkedin.venice.pushmonitor.ExecutionStatus.ERROR; import static com.linkedin.venice.utils.DataProviderUtils.BOOLEAN; import static com.linkedin.venice.utils.DataProviderUtils.allPermutationGenerator; -import static org.mockito.ArgumentMatchers.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; @@ -225,6 +228,10 @@ public void testBootstrappingSubscription() throws NoSuchFieldException, Illegal Field storeIngestionServiceField = DefaultIngestionBackend.class.getDeclaredField("storeIngestionService"); storeIngestionServiceField.setAccessible(true); storeIngestionServiceField.set(ingestionBackend, storeIngestionService); + Field ingestionServiceField = DaVinciBackend.class.getDeclaredField("ingestionService"); + ingestionServiceField.setAccessible(true); + ingestionServiceField.set(backend, storeIngestionService); + doNothing().when(ingestionBackend).addIngestionNotifier(any()); // DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY == false From c278353ddb593c548d1ed76823a0156376c9b995 Mon Sep 17 00:00:00 2001 From: Kristy Lee <18691669+kristyelee@users.noreply.github.com> Date: Wed, 29 Jan 2025 20:27:56 -0800 Subject: [PATCH 27/29] Unit test --- .../linkedin/davinci/DaVinciBackendTest.java | 34 +++++++++++++++---- 1 file changed, 27 insertions(+), 7 deletions(-) diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java index 98ac8271d81..3aab2e62ff4 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java @@ -8,13 +8,10 @@ import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.doCallRealMethod; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.linkedin.davinci.config.VeniceConfigLoader; @@ -32,13 +29,17 @@ import com.linkedin.venice.meta.SubscriptionBasedReadOnlyStoreRepository; import com.linkedin.venice.meta.Version; import com.linkedin.venice.pushmonitor.ExecutionStatus; +import com.linkedin.venice.utils.ComplementSet; import com.linkedin.venice.utils.VeniceProperties; import java.lang.reflect.Field; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -168,7 +169,6 @@ public void testBootstrappingSubscription() throws NoSuchFieldException, Illegal Field storageServiceField = DaVinciBackend.class.getDeclaredField("storageService"); storageServiceField.setAccessible(true); storageServiceField.set(backend, mockStorageService); - StorageEngineRepository mockStorageEngineRepository = mock(StorageEngineRepository.class); AbstractStorageEngine abstractStorageEngine = mock(AbstractStorageEngine.class); mockStorageEngineRepository.addLocalStorageEngine(abstractStorageEngine); @@ -192,8 +192,23 @@ public void testBootstrappingSubscription() throws NoSuchFieldException, Illegal userPartitionList.add(2); when(mockStorageService.getUserPartitions(anyString())).thenReturn(userPartitionList); + HashSet backendSubscription = new HashSet<>(); + backendSubscription.add(0); + backendSubscription.add(1); + StoreBackend mockStoreBackend = mock(StoreBackend.class); when(backend.getStoreOrThrow(anyString())).thenReturn(mockStoreBackend); + ComplementSet backendSubscriptionSet = ComplementSet.wrap(backendSubscription); + when(mockStoreBackend.getSubscription()).thenReturn(backendSubscriptionSet); + + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + ComplementSet partitions = invocation.getArgument(0); + mockStoreBackend.getSubscription().addAll(partitions); + return null; + } + }).when(mockStoreBackend).subscribe(any(), any()); Version mockVersion = mock(Version.class); Store mockStore = mock(Store.class); @@ -238,5 +253,10 @@ public void testBootstrappingSubscription() throws NoSuchFieldException, Illegal when(mockCombinedProperties.getBoolean(anyString(), anyBoolean())).thenReturn(false); doCallRealMethod().when(backend).bootstrap(); backend.bootstrap(); + + ComplementSet subscription = mockStoreBackend.getSubscription(); + assertTrue(subscription.contains(0)); + assertTrue(subscription.contains(1)); + assertFalse(subscription.contains(2)); } } From aee0931638796c69ad7f24274599b4a1d9165f5c Mon Sep 17 00:00:00 2001 From: Kristy Lee <18691669+kristyelee@users.noreply.github.com> Date: Fri, 31 Jan 2025 13:57:26 -0800 Subject: [PATCH 28/29] Unit test --- .../src/main/java/com/linkedin/davinci/DaVinciBackend.java | 6 +++++- .../test/java/com/linkedin/davinci/DaVinciBackendTest.java | 5 +++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index 4e1c8c72f11..cea256dde12 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -399,7 +399,7 @@ private Function functionToCheckWhetherStorageEngineShouldBeKep }; } - protected synchronized void bootstrap() { + private synchronized void bootstrap() { List storageEngines = storageService.getStorageEngineRepository().getAllLocalStorageEngines(); LOGGER.info("Starting bootstrap, storageEngines: {}", storageEngines); @@ -669,6 +669,10 @@ protected void handleStoreChanged(StoreBackend storeBackend) { storeBackend.trySubscribeDaVinciFutureVersion(); } + protected void executeBootstrapping() { + bootstrap(); + } + Version getVeniceLatestNonFaultyVersion(String storeName, Set faultyVersions) { try { return getVeniceLatestNonFaultyVersion(getStoreRepository().getStoreOrThrow(storeName), faultyVersions); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java index 3aab2e62ff4..38ec9ef7cb7 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java @@ -239,6 +239,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable { Field ingestionListenerField = DaVinciBackend.class.getDeclaredField("ingestionListener"); ingestionListenerField.setAccessible(true); ingestionListenerField.set(backend, ingestionListener); + KafkaStoreIngestionService storeIngestionService = mock(KafkaStoreIngestionService.class); Field storeIngestionServiceField = DefaultIngestionBackend.class.getDeclaredField("storeIngestionService"); storeIngestionServiceField.setAccessible(true); @@ -251,8 +252,8 @@ public Object answer(InvocationOnMock invocation) throws Throwable { // DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY == false when(mockCombinedProperties.getBoolean(anyString(), anyBoolean())).thenReturn(false); - doCallRealMethod().when(backend).bootstrap(); - backend.bootstrap(); + doCallRealMethod().when(backend).executeBootstrapping(); + backend.executeBootstrapping(); ComplementSet subscription = mockStoreBackend.getSubscription(); assertTrue(subscription.contains(0)); From 0e82518609811c3ad88f3da9dc2804109ae469a7 Mon Sep 17 00:00:00 2001 From: Kristy Lee <18691669+kristyelee@users.noreply.github.com> Date: Fri, 31 Jan 2025 16:58:55 -0800 Subject: [PATCH 29/29] [dvc] Config flag for subscribing on disk partitions automatically. Updated with integration test and unit test. --- .../java/com/linkedin/davinci/DaVinciBackendTest.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java index 38ec9ef7cb7..113ee7a215b 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java @@ -8,7 +8,13 @@ import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; @@ -239,7 +245,6 @@ public Object answer(InvocationOnMock invocation) throws Throwable { Field ingestionListenerField = DaVinciBackend.class.getDeclaredField("ingestionListener"); ingestionListenerField.setAccessible(true); ingestionListenerField.set(backend, ingestionListener); - KafkaStoreIngestionService storeIngestionService = mock(KafkaStoreIngestionService.class); Field storeIngestionServiceField = DefaultIngestionBackend.class.getDeclaredField("storeIngestionService"); storeIngestionServiceField.setAccessible(true); @@ -247,7 +252,6 @@ public Object answer(InvocationOnMock invocation) throws Throwable { Field ingestionServiceField = DaVinciBackend.class.getDeclaredField("ingestionService"); ingestionServiceField.setAccessible(true); ingestionServiceField.set(backend, storeIngestionService); - doNothing().when(ingestionBackend).addIngestionNotifier(any()); // DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY == false