Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[da-vinci][dvc] Flag enablement for automatic subscription of partitions #1332

Open
wants to merge 35 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
d282196
Initial commit
kristyelee Nov 20, 2024
e7aca2e
Merge branch 'linkedin:main' into kristy_lee/650_davinci
kristyelee Nov 20, 2024
ec82b2c
Test Commit
kristyelee Nov 25, 2024
5fa6390
Test Commit
kristyelee Nov 25, 2024
6601afd
Test unsubscription of partitions
kristyelee Nov 26, 2024
5b18fe9
Rewrite unsubscription of partitions
kristyelee Nov 26, 2024
e5e6046
Merge branch 'linkedin:main' into kristy_lee/650_davinci
kristyelee Nov 26, 2024
ef49d50
Modify Partition Removal
kristyelee Nov 26, 2024
0059955
Partition adjustment in bootstrapping [DaVinciBackend]
kristyelee Nov 27, 2024
b32d787
Merge branch 'linkedin:main' into kristy_lee/650_davinci
kristyelee Nov 29, 2024
1fa3eba
[dvc] Partition Difference Calculation and Update to Partition Subscr…
kristyelee Dec 2, 2024
321fb40
Integration Test [In Writing]
kristyelee Dec 4, 2024
52a557c
Partition Removal
kristyelee Dec 5, 2024
c5c9f16
Integration Test [In Writing]
kristyelee Dec 5, 2024
b520ab8
Integration Test [In Writing]
kristyelee Dec 6, 2024
262bbc6
Integration Test [In Writing]
kristyelee Dec 9, 2024
699c200
[dvc] Partition Difference Calculation and Update to Partition Subscr…
kristyelee Dec 9, 2024
002d55c
[dvc] Partition Difference Calculation and Update to Partition Subscr…
kristyelee Dec 9, 2024
fd9fa02
[dvc] Config flag enabled for subscribing on disk partitions automati…
kristyelee Dec 11, 2024
69d619a
[dvc] Config flag enabled for subscribing on disk partitions automati…
kristyelee Dec 12, 2024
ac32242
[dvc] Config flag enabled for subscribing on disk partitions automati…
kristyelee Dec 16, 2024
9554248
[dvc] Config flag enabled for subscribing on disk partitions automati…
kristyelee Dec 16, 2024
7b20b28
[dvc] Config flag enabled for subscribing on disk partitions automati…
kristyelee Dec 16, 2024
9bfe418
[dvc] Config flag enabled for subscribing on disk partition automatic…
kristyelee Jan 9, 2025
7ba15f0
[dvc] Config flag enabled for subscribing on disk partition automatic…
kristyelee Jan 12, 2025
b194128
Merge branch 'linkedin:main' into kristy_lee/650_davinci
kristyelee Jan 13, 2025
a0ba24b
[dvc] Config flag enabled for subscribing on disk partitions automati…
kristyelee Jan 13, 2025
b28feda
Merge branch 'linkedin:main' into kristy_lee/650_davinci
kristyelee Jan 21, 2025
2af0a75
Merge branch 'linkedin:main' into kristy_lee/650_davinci
kristyelee Jan 24, 2025
6f34d1b
[dvc] Config flag enabled for subscribing on disk partitions automati…
kristyelee Jan 27, 2025
15f2fa1
Unit test
kristyelee Jan 27, 2025
4cb909f
Unit test
kristyelee Jan 29, 2025
c278353
Unit test
kristyelee Jan 30, 2025
aee0931
Unit test
kristyelee Jan 31, 2025
0e82518
[dvc] Config flag for subscribing on disk partitions automatically. U…
kristyelee Feb 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.davinci;

import static com.linkedin.venice.ConfigKeys.DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY;
import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_INSTANCE_NAME_SUFFIX;
import static com.linkedin.venice.ConfigKeys.VALIDATE_VENICE_INTERNAL_SCHEMA_VERSION;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.DVC_INGESTION_ERROR_DISK_FULL;
Expand Down Expand Up @@ -207,6 +208,7 @@ public DaVinciBackend(
String pid = Utils.getPid();
String instanceSuffix =
configLoader.getCombinedProperties().getString(PUSH_STATUS_INSTANCE_NAME_SUFFIX, (pid == null ? "NA" : pid));
// Current instance name.
String instanceName = Utils.getHostName() + "_" + instanceSuffix;

// Fetch latest update schema's protocol ID for Push Status Store from Router.
Expand Down Expand Up @@ -466,16 +468,18 @@ private synchronized void bootstrap() {
configLoader.getVeniceServerConfig());
ingestionBackend.addIngestionNotifier(ingestionListener);

// Subscribe all bootstrap version partitions.
storeNameToBootstrapVersionMap.forEach((storeName, version) -> {
List<Integer> partitions = storeNameToPartitionListMap.get(storeName);
String versionTopic = version.kafkaTopicName();
LOGGER.info("Bootstrapping partitions {} for {}", partitions, versionTopic);
AbstractStorageEngine storageEngine = storageService.getStorageEngine(versionTopic);
aggVersionedStorageEngineStats.setStorageEngine(versionTopic, storageEngine);
StoreBackend storeBackend = getStoreOrThrow(storeName);
storeBackend.subscribe(ComplementSet.newSet(partitions), Optional.of(version));
});
if (configLoader.getCombinedProperties().getBoolean(DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY, true)) {
kristyelee marked this conversation as resolved.
Show resolved Hide resolved
// Subscribe all bootstrap version partitions.
storeNameToBootstrapVersionMap.forEach((storeName, version) -> {
List<Integer> partitions = storeNameToPartitionListMap.get(storeName);
String versionTopic = version.kafkaTopicName();
LOGGER.info("Bootstrapping partitions {} for {}", partitions, versionTopic);
AbstractStorageEngine storageEngine = storageService.getStorageEngine(versionTopic);
aggVersionedStorageEngineStats.setStorageEngine(versionTopic, storageEngine);
StoreBackend storeBackend = getStoreOrThrow(storeName);
storeBackend.subscribe(ComplementSet.newSet(partitions), Optional.of(version));
});
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ public StoreBackendStats getStats() {
return stats;
}

public ComplementSet<Integer> getSubscription() {
return subscription;
}

public ReferenceCounted<VersionBackend> getDaVinciCurrentVersion() {
return daVinciCurrentVersionRef.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_LEVEL0_STOPS_WRITES_TRIGGER_WRITE_ONLY_VERSION;
import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED;
import static com.linkedin.venice.ConfigKeys.CLUSTER_NAME;
import static com.linkedin.venice.ConfigKeys.DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY;
import static com.linkedin.venice.ConfigKeys.INGESTION_MEMORY_LIMIT;
import static com.linkedin.venice.ConfigKeys.INGESTION_USE_DA_VINCI_CLIENT;
import static com.linkedin.venice.ConfigKeys.KAFKA_ADMIN_CLASS;
Expand Down Expand Up @@ -710,6 +711,7 @@ private VeniceConfigLoader buildVeniceConfig() {
.put(CLUSTER_NAME, clusterName)
.put(ZOOKEEPER_ADDRESS, zkAddress)
.put(KAFKA_BOOTSTRAP_SERVERS, kafkaBootstrapServers)
.put(DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY, true)
ZacAttack marked this conversation as resolved.
Show resolved Hide resolved
.put(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, daVinciConfig.getStorageClass() == StorageClass.MEMORY_BACKED_BY_DISK)
.put(INGESTION_USE_DA_VINCI_CLIENT, true)
.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ private ConfigKeys() {
public static final String DA_VINCI_CURRENT_VERSION_BOOTSTRAPPING_QUOTA_BYTES_PER_SECOND =
"da.vinci.current.version.bootstrapping.quota.bytes.per.second";

// On Da Vinci Client, control over automatic partition subscription.
public static final String DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY =
"da.vinci.subscribe.on.disk.partitions.automatically";

// Unordered throttlers aren't compatible with Shared Kafka Consumer and have no effect when Shared Consumer is used.
public static final String KAFKA_FETCH_QUOTA_UNORDERED_BYTES_PER_SECOND =
"kafka.fetch.quota.unordered.bytes.per.second";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_CHECK_INTERVAL_IN_MS;
import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_SCAN_INTERVAL_IN_SECONDS;
import static com.linkedin.venice.ConfigKeys.DA_VINCI_CURRENT_VERSION_BOOTSTRAPPING_SPEEDUP_ENABLED;
import static com.linkedin.venice.ConfigKeys.DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY;
import static com.linkedin.venice.ConfigKeys.KAFKA_FETCH_QUOTA_RECORDS_PER_SECOND;
import static com.linkedin.venice.ConfigKeys.PERSISTENCE_TYPE;
import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_STORE_ENABLED;
Expand Down Expand Up @@ -46,7 +47,9 @@

import com.linkedin.d2.balancer.D2Client;
import com.linkedin.d2.balancer.D2ClientBuilder;
import com.linkedin.davinci.DaVinciBackend;
import com.linkedin.davinci.DaVinciUserApp;
import com.linkedin.davinci.StoreBackend;
import com.linkedin.davinci.client.AvroGenericDaVinciClient;
import com.linkedin.davinci.client.DaVinciClient;
import com.linkedin.davinci.client.DaVinciConfig;
Expand Down Expand Up @@ -83,6 +86,7 @@
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
import com.linkedin.venice.store.rocksdb.RocksDBUtils;
import com.linkedin.venice.utils.ComplementSet;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.ForkedJavaProcess;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
Expand Down Expand Up @@ -1283,6 +1287,97 @@ public void testBootstrap(DaVinciConfig daVinciConfig) throws Exception {
}
}

@Test(timeOut = TEST_TIMEOUT, dataProvider = "dv-client-config-provider", dataProviderClass = DataProviderUtils.class)
public void testBootstrapSubscription(DaVinciConfig daVinciConfig) throws Exception {
String storeName1 = createStoreWithMetaSystemStoreAndPushStatusSystemStore(KEY_COUNT);
String baseDataPath = Utils.getTempDataDirectory().getAbsolutePath();
VeniceProperties backendConfig = new PropertyBuilder().put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true)
.put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1)
.put(DATA_BASE_PATH, baseDataPath)
.put(PERSISTENCE_TYPE, ROCKS_DB)
.put(DA_VINCI_CURRENT_VERSION_BOOTSTRAPPING_SPEEDUP_ENABLED, true)
.put(PUSH_STATUS_STORE_ENABLED, true)
.put(DAVINCI_PUSH_STATUS_CHECK_INTERVAL_IN_MS, 1000)
.put(DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY, false)
.build();

MetricsRepository metricsRepository = new MetricsRepository();

// Test multiple clients sharing the same ClientConfig/MetricsRepository & base data path
try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory(
d2Client,
VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME,
metricsRepository,
backendConfig)) {
DaVinciClient<Integer, Object> client1 = factory.getAndStartGenericAvroClient(storeName1, daVinciConfig);

// Test non-existent key access
client1.subscribeAll().get();
assertNull(client1.get(KEY_COUNT + 1).get());

// Test single-get access
Map<Integer, Integer> keyValueMap = new HashMap<>();
kristyelee marked this conversation as resolved.
Show resolved Hide resolved
for (int k = 0; k < KEY_COUNT; ++k) {
assertEquals(client1.get(k).get(), 1);
keyValueMap.put(k, 1);
}

// Test batch-get access
assertEquals(client1.batchGet(keyValueMap.keySet()).get(), keyValueMap);

// Test automatic new version ingestion
for (int i = 0; i < 2; ++i) {
// Test per-version partitioning parameters
int partitionCount = i + 1;
String iString = String.valueOf(i);
cluster.useControllerClient(controllerClient -> {
ControllerResponse response = controllerClient.updateStore(
storeName1,
new UpdateStoreQueryParams().setPartitionerClass(ConstantVenicePartitioner.class.getName())
.setPartitionCount(partitionCount)
.setPartitionerParams(
Collections.singletonMap(ConstantVenicePartitioner.CONSTANT_PARTITION, iString)));
assertFalse(response.isError(), response.getError());
});

Integer expectedValue = cluster.createVersion(storeName1, KEY_COUNT);
TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT, TimeUnit.MILLISECONDS, () -> {
for (int k = 0; k < KEY_COUNT; ++k) {
Object readValue = client1.get(k).get();
assertEquals(readValue, expectedValue);
}
});
}
}
kristyelee marked this conversation as resolved.
Show resolved Hide resolved

// Test managed clients
try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory(
d2Client,
VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME,
metricsRepository,
backendConfig,
Optional.of(Collections.singleton(storeName1)))) {

DaVinciClient<Integer, Object> client1 = factory.getAndStartGenericAvroClient(storeName1, daVinciConfig);

Set<Integer> partitions = new HashSet<>();

for (int i = 0; i < 5; i++) {
partitions.add(i);
}

client1.subscribe(partitions);
client1.unsubscribeAll();

DaVinciBackend daVinciBackend = AvroGenericDaVinciClient.getBackend();
if (daVinciBackend != null) {
StoreBackend storeBackend = daVinciBackend.getStoreOrThrow(storeName1);
ComplementSet<Integer> subscription = storeBackend.getSubscription();
assertTrue(subscription.isEmpty());
kvargha marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

@Test(timeOut = TEST_TIMEOUT, dataProvider = "dv-client-config-provider", dataProviderClass = DataProviderUtils.class)
public void testPartialSubscription(DaVinciConfig daVinciConfig) throws Exception {
String storeName = createStoreWithMetaSystemStoreAndPushStatusSystemStore(KEY_COUNT);
Expand Down
Loading