Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DVC] Add RequestBasedMetaRepository to enable metadata retrieval directly from server #1461

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
8ebc77b
add store properties to metadata retrieval endpoint
pthirun Dec 2, 2024
568d0c8
populate store properties payload
pthirun Dec 3, 2024
79c8c75
rename endpoint response from MetadataByClient to MetadataWithStorePr…
pthirun Dec 3, 2024
59fca27
Controller changes to register the new schema to the corresponding sy…
pthirun Dec 4, 2024
babbbd6
use local ssl cert for integration test
pthirun Dec 4, 2024
6e0ee3e
refactor request based store properties into its own endpoint
pthirun Dec 5, 2024
c1f1a58
move test request logic from admin tool to test
pthirun Dec 5, 2024
b74dc51
create new test for store properties endpoint
pthirun Dec 6, 2024
6618645
add gradle-wrapper.jar
pthirun Dec 6, 2024
3431789
fix spotbugs
pthirun Dec 6, 2024
b400718
getStoreProperties: add null checks for store version
pthirun Dec 9, 2024
918d845
refactor
pthirun Dec 10, 2024
0628c0c
Merge pull request #1 from linkedin/main
pthirun Dec 11, 2024
39bf9b3
update cloneStoreProperties to add new fields
pthirun Dec 11, 2024
a369110
add unit test for cloneStoreProperties
pthirun Dec 12, 2024
12c97f0
add random object to unit test
pthirun Dec 12, 2024
92a7960
fix server store acl handler
pthirun Dec 12, 2024
09414c3
add nested object checks in unit test
pthirun Dec 12, 2024
2323177
add integration test to cover store changes
pthirun Dec 17, 2024
3e09dc2
Merge pull request #2 from linkedin/main
pthirun Dec 17, 2024
8dd8fdb
add support for largest_known_schema_id to endpoint; add test
pthirun Dec 21, 2024
c0972aa
add unit test for getStoreProperties
pthirun Dec 23, 2024
2760fee
Merge branch 'main' into main
pthirun Dec 23, 2024
d45d106
Merge pull request #3 from linkedin/main
pthirun Jan 6, 2025
600afaf
clean up debug logs
pthirun Jan 6, 2025
13c8d28
Merge pull request #4 from linkedin/main
pthirun Jan 7, 2025
3cba91b
add requestbasedmetarepository to dvc
pthirun Jan 13, 2025
dd6680f
Merge pull request #5 from linkedin/main
pthirun Jan 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package com.linkedin.davinci.repository;

import static com.linkedin.venice.ConfigKeys.CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS;
import static com.linkedin.venice.system.store.MetaStoreWriter.KEY_STRING_SCHEMA_ID;
import static com.linkedin.venice.system.store.MetaStoreWriter.KEY_STRING_STORE_NAME;
import static java.lang.Thread.currentThread;

import com.linkedin.davinci.stats.NativeMetadataRepositoryStats;
Expand All @@ -25,17 +23,11 @@
import com.linkedin.venice.schema.rmd.RmdSchemaEntry;
import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry;
import com.linkedin.venice.service.ICProvider;
import com.linkedin.venice.system.store.MetaStoreDataType;
import com.linkedin.venice.systemstore.schemas.StoreClusterConfig;
import com.linkedin.venice.systemstore.schemas.StoreMetaKey;
import com.linkedin.venice.systemstore.schemas.StoreMetaValue;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -70,7 +62,7 @@ public abstract class NativeMetadataRepository
private final Map<String, StoreConfig> storeConfigMap = new VeniceConcurrentHashMap<>();
// Local cache for key/value schemas. SchemaData supports one key schema per store only, which may need to be changed
// for key schema evolvability.
private final Map<String, SchemaData> schemaMap = new VeniceConcurrentHashMap<>();
protected final Map<String, SchemaData> schemaMap = new VeniceConcurrentHashMap<>();
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private final Set<StoreDataChangedListener> listeners = new CopyOnWriteArraySet<>();
private final AtomicLong totalStoreReadQuota = new AtomicLong();
Expand Down Expand Up @@ -128,8 +120,12 @@ public static NativeMetadataRepository getInstance(
LOGGER.info(
"Initializing {} with {}",
NativeMetadataRepository.class.getSimpleName(),
ThinClientMetaStoreBasedRepository.class.getSimpleName());
return new ThinClientMetaStoreBasedRepository(clientConfig, backendConfig, icProvider);
RequestBasedMetaRepository.class.getSimpleName());
if (clientConfig.isUseRequestBasedMetaRepository()) {
return new RequestBasedMetaRepository(clientConfig, backendConfig);
} else {
return new ThinClientMetaStoreBasedRepository(clientConfig, backendConfig, icProvider);
}
}

@Override
Expand Down Expand Up @@ -171,20 +167,14 @@ public boolean hasStore(String storeName) {
@Override
public Store refreshOneStore(String storeName) {
try {
getAndSetStoreConfigFromSystemStore(storeName);
StoreConfig storeConfig = storeConfigMap.get(storeName);
StoreConfig storeConfig = cacheStoreConfigFromRemote(storeName);
if (storeConfig == null) {
throw new VeniceException("StoreConfig is missing unexpectedly for store: " + storeName);
}
Store newStore = getStoreFromSystemStore(storeName, storeConfig.getCluster());
// isDeleting check to detect deleted store is only supported by meta system store based implementation.
if (newStore != null && !storeConfig.isDeleting()) {
putStore(newStore);
getAndCacheSchemaDataFromSystemStore(storeName);
nativeMetadataRepositoryStats.updateCacheTimestamp(storeName, clock.millis());
} else {
removeStore(storeName);
}
Store newStore = fetchStoreFromRemote(storeName, storeConfig.getCluster());
putStore(newStore);
getAndCacheSchemaData(storeName);
nativeMetadataRepositoryStats.updateCacheTimestamp(storeName, clock.millis());
return newStore;
} catch (ServiceDiscoveryException | MissingKeyInStoreMetadataException e) {
throw new VeniceNoStoreException(storeName, e);
Expand Down Expand Up @@ -393,74 +383,17 @@ public void clear() {
* Get the store cluster config from system store and update the local cache with it. Different implementation will
* get the data differently but should all populate the store cluster config map.
*/
protected void getAndSetStoreConfigFromSystemStore(String storeName) {
storeConfigMap.put(storeName, getStoreConfigFromSystemStore(storeName));
protected StoreConfig cacheStoreConfigFromRemote(String storeName) {
StoreConfig storeConfig = fetchStoreConfigFromRemote(storeName);
storeConfigMap.put(storeName, storeConfig);
return storeConfig;
}

protected abstract StoreConfig getStoreConfigFromSystemStore(String storeName);
protected abstract StoreConfig fetchStoreConfigFromRemote(String storeName);

protected abstract Store getStoreFromSystemStore(String storeName, String clusterName);
protected abstract Store fetchStoreFromRemote(String storeName, String clusterName);

protected abstract StoreMetaValue getStoreMetaValue(String storeName, StoreMetaKey key);

// Helper function with common code for retrieving StoreConfig from meta system store.
protected StoreConfig getStoreConfigFromMetaSystemStore(String storeName) {
StoreClusterConfig clusterConfig = getStoreMetaValue(
storeName,
MetaStoreDataType.STORE_CLUSTER_CONFIG
.getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, storeName))).storeClusterConfig;
return new StoreConfig(clusterConfig);
}

// Helper function with common code for retrieving SchemaData from meta system store.
protected SchemaData getSchemaDataFromMetaSystemStore(String storeName) {
SchemaData schemaData = schemaMap.get(storeName);
SchemaEntry keySchema;
if (schemaData == null) {
// Retrieve the key schema and initialize SchemaData only if it's not cached yet.
StoreMetaKey keySchemaKey = MetaStoreDataType.STORE_KEY_SCHEMAS
.getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, storeName));
Map<CharSequence, CharSequence> keySchemaMap =
getStoreMetaValue(storeName, keySchemaKey).storeKeySchemas.keySchemaMap;
if (keySchemaMap.isEmpty()) {
throw new VeniceException("No key schema found for store: " + storeName);
}
Map.Entry<CharSequence, CharSequence> keySchemaEntry = keySchemaMap.entrySet().iterator().next();
keySchema =
new SchemaEntry(Integer.parseInt(keySchemaEntry.getKey().toString()), keySchemaEntry.getValue().toString());
schemaData = new SchemaData(storeName, keySchema);
}
StoreMetaKey valueSchemaKey = MetaStoreDataType.STORE_VALUE_SCHEMAS
.getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, storeName));
Map<CharSequence, CharSequence> valueSchemaMap =
getStoreMetaValue(storeName, valueSchemaKey).storeValueSchemas.valueSchemaMap;
// Check the value schema string, if it's empty then try to query the other key space for individual value schema.
for (Map.Entry<CharSequence, CharSequence> entry: valueSchemaMap.entrySet()) {
// Check if we already have the corresponding value schema
int valueSchemaId = Integer.parseInt(entry.getKey().toString());
if (schemaData.getValueSchema(valueSchemaId) != null) {
continue;
}
if (entry.getValue().toString().isEmpty()) {
// The value schemas might be too large to be stored in a single K/V.
StoreMetaKey individualValueSchemaKey =
MetaStoreDataType.STORE_VALUE_SCHEMA.getStoreMetaKey(new HashMap<String, String>() {
{
put(KEY_STRING_STORE_NAME, storeName);
put(KEY_STRING_SCHEMA_ID, entry.getKey().toString());
}
});
// Empty string is not a valid value schema therefore it's safe to throw exceptions if we also cannot find it in
// the individual value schema key space.
String valueSchema =
getStoreMetaValue(storeName, individualValueSchemaKey).storeValueSchema.valueSchema.toString();
schemaData.addValueSchema(new SchemaEntry(valueSchemaId, valueSchema));
} else {
schemaData.addValueSchema(new SchemaEntry(valueSchemaId, entry.getValue().toString()));
}
}
return schemaData;
}
protected abstract SchemaData getSchemaData(String storeName);

protected Store putStore(Store newStore) {
// Workaround to make old metadata compatible with new fields
Expand Down Expand Up @@ -516,11 +449,11 @@ protected void notifyStoreChanged(Store store) {
}
}

protected SchemaData getAndCacheSchemaDataFromSystemStore(String storeName) {
protected SchemaData getAndCacheSchemaData(String storeName) {
if (!hasStore(storeName)) {
throw new VeniceNoStoreException(storeName);
}
SchemaData schemaData = getSchemaDataFromSystemStore(storeName);
SchemaData schemaData = getSchemaData(storeName);
schemaMap.put(storeName, schemaData);
return schemaData;
}
Expand All @@ -532,7 +465,7 @@ protected SchemaData getAndCacheSchemaDataFromSystemStore(String storeName) {
private SchemaData getSchemaDataFromReadThroughCache(String storeName) throws VeniceNoStoreException {
SchemaData schemaData = schemaMap.get(storeName);
if (schemaData == null) {
schemaData = getAndCacheSchemaDataFromSystemStore(storeName);
schemaData = getAndCacheSchemaData(storeName);
}
return schemaData;
}
Expand All @@ -545,8 +478,6 @@ protected SchemaEntry getValueSchemaInternally(String storeName, int id) {
return schemaData.getValueSchema(id);
}

protected abstract SchemaData getSchemaDataFromSystemStore(String storeName);

/**
* This function is used to remove schema entry for the given store from local cache,
* and related listeners as well.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package com.linkedin.davinci.repository;

import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.D2ServiceDiscovery;
import com.linkedin.venice.client.store.transport.D2TransportClient;
import com.linkedin.venice.client.store.transport.TransportClientResponse;
import com.linkedin.venice.meta.QueryAction;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.StoreConfig;
import com.linkedin.venice.meta.ZKStore;
import com.linkedin.venice.metadata.response.StorePropertiesResponseRecord;
import com.linkedin.venice.schema.SchemaData;
import com.linkedin.venice.schema.SchemaEntry;
import com.linkedin.venice.serializer.FastSerializerDeserializerFactory;
import com.linkedin.venice.serializer.RecordDeserializer;
import com.linkedin.venice.systemstore.schemas.StoreClusterConfig;
import com.linkedin.venice.systemstore.schemas.StoreProperties;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.util.Map;
import org.apache.avro.Schema;


public class RequestBasedMetaRepository extends NativeMetadataRepository {

// cluster -> client
private final Map<String, D2TransportClient> d2TransportClientMap = new VeniceConcurrentHashMap<>();

// storeName -> T
private final Map<String, SchemaData> storeSchemaMap = new VeniceConcurrentHashMap<>();

private final D2TransportClient d2DiscoveryTransportClient;
private D2ServiceDiscovery d2ServiceDiscovery;

public RequestBasedMetaRepository(ClientConfig clientConfig, VeniceProperties backendConfig) {
super(clientConfig, backendConfig);
this.d2ServiceDiscovery = new D2ServiceDiscovery();
this.d2DiscoveryTransportClient =
new D2TransportClient(clientConfig.getD2ServiceName(), clientConfig.getD2Client());
}

@Override
public void clear() {
super.clear();

// Clear cache
d2TransportClientMap.clear();
storeSchemaMap.clear();
}

@Override
protected StoreConfig fetchStoreConfigFromRemote(String storeName) {
// Create StoreConfig from D2
D2TransportClient d2TransportClient = getD2TransportClient(storeName);

StoreClusterConfig storeClusterConfig = new StoreClusterConfig();
storeClusterConfig.setStoreName(storeName);
storeClusterConfig.setCluster(d2TransportClient.getServiceName());

return new StoreConfig(storeClusterConfig);
}

@Override
protected Store fetchStoreFromRemote(String storeName, String clusterName) {
// Fetch store, bypass cache
StorePropertiesResponseRecord record = fetchAndCacheStorePropertiesResponseRecord(storeName);
StoreProperties storeProperties = record.storeMetaValue.storeProperties;
return new ZKStore(storeProperties);
}

@Override
protected SchemaData getSchemaData(String storeName) {
if (!storeSchemaMap.containsKey(storeName)) {
// Cache miss
fetchAndCacheStorePropertiesResponseRecord(storeName);
}

return storeSchemaMap.get(storeName);
}

private StorePropertiesResponseRecord fetchAndCacheStorePropertiesResponseRecord(String storeName) {

// Request
int maxValueSchemaId = getMaxValueSchemaId(storeName);
D2TransportClient d2TransportClient = getD2TransportClient(storeName);
String requestBasedStorePropertiesURL = QueryAction.STORE_PROPERTIES.toString().toLowerCase() + "/" + storeName;
if (maxValueSchemaId > SchemaData.UNKNOWN_SCHEMA_ID) {
requestBasedStorePropertiesURL += "/" + maxValueSchemaId;
}

TransportClientResponse response;
try {
response = d2TransportClient.get(requestBasedStorePropertiesURL).get();
} catch (Exception e) {
throw new RuntimeException(
"Encountered exception while trying to send store properties request to " + requestBasedStorePropertiesURL
+ ": " + e);
}

// Deserialize
Schema writerSchema = StorePropertiesResponseRecord.SCHEMA$;
RecordDeserializer<StorePropertiesResponseRecord> recordDeserializer = FastSerializerDeserializerFactory
.getFastAvroSpecificDeserializer(writerSchema, StorePropertiesResponseRecord.class);
StorePropertiesResponseRecord record = recordDeserializer.deserialize(response.getBody());

// Cache
cacheStoreSchema(storeName, record);

return record;
}

D2TransportClient getD2TransportClient(String storeName) {
synchronized (this) {
// Get cluster for store
String serverD2ServiceName =
d2ServiceDiscovery.find(d2DiscoveryTransportClient, storeName, true).getServerD2Service();
if (d2TransportClientMap.containsKey(serverD2ServiceName)) {
return d2TransportClientMap.get(serverD2ServiceName);
}
D2TransportClient d2TransportClient = new D2TransportClient(serverD2ServiceName, clientConfig.getD2Client());
d2TransportClientMap.put(serverD2ServiceName, d2TransportClient);
return d2TransportClient;
}
}

private int getMaxValueSchemaId(String storeName) {
if (!schemaMap.containsKey(storeName)) {
return SchemaData.UNKNOWN_SCHEMA_ID;
}
return schemaMap.get(storeName).getMaxValueSchemaId();
}

private void cacheStoreSchema(String storeName, StorePropertiesResponseRecord record) {

if (!storeSchemaMap.containsKey(storeName)) {
// New schema data
Map.Entry<CharSequence, CharSequence> keySchemaEntry =
record.getStoreMetaValue().getStoreKeySchemas().getKeySchemaMap().entrySet().iterator().next();
SchemaData schemaData = new SchemaData(
storeName,
new SchemaEntry(Integer.parseInt(keySchemaEntry.getKey().toString()), keySchemaEntry.getValue().toString()));
storeSchemaMap.put(storeName, schemaData);
}

// Store Value Schemas
for (Map.Entry<CharSequence, CharSequence> entry: record.getStoreMetaValue()
.getStoreValueSchemas()
.getValueSchemaMap()
.entrySet()) {
storeSchemaMap.get(storeName)
.addValueSchema(new SchemaEntry(Integer.parseInt(entry.getKey().toString()), entry.getValue().toString()));
}
}
}
Loading
Loading