Skip to content

Commit

Permalink
[dvc] Config flag enabled for subscribing on disk partitions automati…
Browse files Browse the repository at this point in the history
…cally. Integration Test [In Writing]
  • Loading branch information
kristyelee committed Dec 11, 2024
1 parent 002d55c commit fd9fa02
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.linkedin.davinci;

import static com.linkedin.venice.ConfigKeys.DA_VINCI_BOOTSTRAP_SUBSCRIPTION_DISABLED;
import static com.linkedin.venice.ConfigKeys.DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY;
import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_INSTANCE_NAME_SUFFIX;
import static com.linkedin.venice.ConfigKeys.VALIDATE_VENICE_INTERNAL_SCHEMA_VERSION;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.DVC_INGESTION_ERROR_DISK_FULL;
Expand Down Expand Up @@ -468,7 +468,7 @@ private synchronized void bootstrap() {
configLoader.getVeniceServerConfig());
ingestionBackend.addIngestionNotifier(ingestionListener);

if (!configLoader.getCombinedProperties().getBoolean(DA_VINCI_BOOTSTRAP_SUBSCRIPTION_DISABLED, false)) {
if (configLoader.getCombinedProperties().getBoolean(DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY, true)) {
// Subscribe all bootstrap version partitions.
storeNameToBootstrapVersionMap.forEach((storeName, version) -> {
List<Integer> partitions = storeNameToPartitionListMap.get(storeName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_LEVEL0_STOPS_WRITES_TRIGGER_WRITE_ONLY_VERSION;
import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED;
import static com.linkedin.venice.ConfigKeys.CLUSTER_NAME;
import static com.linkedin.venice.ConfigKeys.DA_VINCI_BOOTSTRAP_SUBSCRIPTION_DISABLED;
import static com.linkedin.venice.ConfigKeys.DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY;
import static com.linkedin.venice.ConfigKeys.INGESTION_MEMORY_LIMIT;
import static com.linkedin.venice.ConfigKeys.INGESTION_USE_DA_VINCI_CLIENT;
import static com.linkedin.venice.ConfigKeys.KAFKA_ADMIN_CLASS;
Expand Down Expand Up @@ -711,7 +711,7 @@ private VeniceConfigLoader buildVeniceConfig() {
.put(CLUSTER_NAME, clusterName)
.put(ZOOKEEPER_ADDRESS, zkAddress)
.put(KAFKA_BOOTSTRAP_SERVERS, kafkaBootstrapServers)
.put(DA_VINCI_BOOTSTRAP_SUBSCRIPTION_DISABLED, false)
.put(DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY, true)
.put(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, daVinciConfig.getStorageClass() == StorageClass.MEMORY_BACKED_BY_DISK)
.put(INGESTION_USE_DA_VINCI_CLIENT, true)
.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,9 @@ private ConfigKeys() {
public static final String DA_VINCI_CURRENT_VERSION_BOOTSTRAPPING_QUOTA_BYTES_PER_SECOND =
"da.vinci.current.version.bootstrapping.quota.bytes.per.second";

// Prevent automatic subscription of partitions while bootstrapping.
public static final String DA_VINCI_BOOTSTRAP_SUBSCRIPTION_DISABLED = "da.vinci.bootstrap.subscription.disabled";
// On Da Vinci Client, control over automatic partition subscription.
public static final String DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY =
"da.vinci.subscribe.on.disk.partitions.automatically";

// Unordered throttlers aren't compatible with Shared Kafka Consumer and have no effect when Shared Consumer is used.
public static final String KAFKA_FETCH_QUOTA_UNORDERED_BYTES_PER_SECOND =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@

import com.linkedin.d2.balancer.D2Client;
import com.linkedin.d2.balancer.D2ClientBuilder;
import com.linkedin.davinci.DaVinciBackend;
import com.linkedin.davinci.DaVinciUserApp;
import com.linkedin.davinci.StoreBackend;
import com.linkedin.davinci.client.AvroGenericDaVinciClient;
import com.linkedin.davinci.client.DaVinciClient;
import com.linkedin.davinci.client.DaVinciConfig;
Expand Down Expand Up @@ -85,7 +83,6 @@
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
import com.linkedin.venice.store.rocksdb.RocksDBUtils;
import com.linkedin.venice.utils.ComplementSet;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.ForkedJavaProcess;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
Expand Down Expand Up @@ -1302,7 +1299,6 @@ public void testBootstrapSubscription(DaVinciConfig clientConfig) throws Excepti
MetricsRepository metricsRepository = new MetricsRepository();

// Test multiple clients sharing the same ClientConfig/MetricsRepository & base data path
StoreBackend storeBackend;
try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory(
d2Client,
VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME,
Expand Down Expand Up @@ -1358,16 +1354,15 @@ public void testBootstrapSubscription(DaVinciConfig clientConfig) throws Excepti
Optional.of(Collections.singleton(storeName1)))) {
assertNotEquals(FileUtils.sizeOfDirectory(new File(baseDataPath)), 0);

DaVinciBackend daVinciBackend = AvroGenericDaVinciClient.getBackend();
storeBackend = daVinciBackend.getStoreOrThrow(storeName1);
List<Integer> partitions = new ArrayList<>();
DaVinciClient<Integer, Object> client1 = factory.getAndStartGenericAvroClient(storeName1, clientConfig);

Set<Integer> partitions = new HashSet<>();
for (int i = 0; i < 5; ++i) {
partitions.add(i);
}
ComplementSet<Integer> subscription = ComplementSet.newSet(partitions);
storeBackend.subscribe(subscription);

DaVinciClient<Integer, Object> client1 = factory.getAndStartGenericAvroClient(storeName1, clientConfig);
client1.subscribe(partitions);

client1.subscribeAll().get();
client1.unsubscribeAll();
TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, true, () -> {
Expand Down

0 comments on commit fd9fa02

Please sign in to comment.