Skip to content

Commit

Permalink
Extract flink converter common logic.
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Jan 21, 2025
1 parent 6fb45d6 commit bb82992
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class IcebergPropertiesUtils {
// will only need to set the configuration 'catalog-backend' in Gravitino and Gravitino will
// change it to `catalogType` automatically and pass it to Iceberg.
public static final Map<String, String> GRAVITINO_CONFIG_TO_ICEBERG;
public static final Map<String, String> ICEBERG_CATALOG_CONFIG_TO_GRAVITINO;

static {
Map<String, String> map = new HashMap();
Expand Down Expand Up @@ -65,6 +66,14 @@ public class IcebergPropertiesUtils {
AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_KEY,
IcebergConstants.ICEBERG_ADLS_STORAGE_ACCOUNT_KEY);
GRAVITINO_CONFIG_TO_ICEBERG = Collections.unmodifiableMap(map);

Map<String, String> icebergCatalogConfigToGravitino = new HashMap<>();
map.forEach(
(key, value) -> {
icebergCatalogConfigToGravitino.put(value, key);
});
ICEBERG_CATALOG_CONFIG_TO_GRAVITINO =
Collections.unmodifiableMap(icebergCatalogConfigToGravitino);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

package org.apache.gravitino.flink.connector;

import com.google.common.collect.Maps;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.CommonCatalogOptions;

/**
* PropertiesConverter is used to convert properties between Flink properties and Apache Gravitino
Expand All @@ -32,23 +34,76 @@ public interface PropertiesConverter {

/**
* Converts properties from application provided properties and Flink connector properties to
* Gravitino properties.
* Gravitino properties.This method processes the Flink configuration and transforms it into a
* format suitable for the Gravitino catalog.
*
* @param flinkConf The configuration provided by Flink.
* @return properties for the Gravitino catalog.
* @param flinkConf The Flink configuration containing connector properties. This includes both
* Flink-specific properties and any user-provided properties.
* @return A map of properties converted for use in the Gravitino catalog. The returned map
* includes both directly transformed properties and bypass properties prefixed with {@link
* #FLINK_PROPERTY_PREFIX}.
*/
default Map<String, String> toGravitinoCatalogProperties(Configuration flinkConf) {
return flinkConf.toMap();
Map<String, String> gravitinoProperties = Maps.newHashMap();
for (Map.Entry<String, String> entry : flinkConf.toMap().entrySet()) {
String gravitinoKey = transformPropertyToGravitinoCatalog(entry.getKey());
if (gravitinoKey != null) {
gravitinoProperties.put(gravitinoKey, entry.getValue());
} else if (!entry.getKey().startsWith(FLINK_PROPERTY_PREFIX)) {
gravitinoProperties.put(FLINK_PROPERTY_PREFIX + entry.getKey(), entry.getValue());
} else {
gravitinoProperties.put(entry.getKey(), entry.getValue());
}
}
return gravitinoProperties;
}

/**
* Converts properties from Gravitino properties to Flink connector properties.
* Converts properties from Gravitino catalog properties to Flink connector properties. This
* method processes the Gravitino properties and transforms them into a format suitable for the
* Flink connector.
*
* @param gravitinoProperties The properties provided by Gravitino.
* @return properties for the Flink connector.
* @param gravitinoProperties The properties provided by the Gravitino catalog. This includes both
* Gravitino-specific properties and any bypass properties prefixed with {@link
* #FLINK_PROPERTY_PREFIX}.
* @return A map of properties converted for use in the Flink connector. The returned map includes
* both transformed properties and the Flink catalog type.
*/
default Map<String, String> toFlinkCatalogProperties(Map<String, String> gravitinoProperties) {
return gravitinoProperties;
Map<String, String> all = Maps.newHashMap();
gravitinoProperties.forEach(
(key, value) -> {
String flinkConfigKey = key;
if (key.startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX)) {
flinkConfigKey = key.substring(PropertiesConverter.FLINK_PROPERTY_PREFIX.length());
}
all.put(flinkConfigKey, value);
});
Map<String, String> allProperties = transformPropertiesToFlinkCatalog(all);
allProperties.put(CommonCatalogOptions.CATALOG_TYPE.key(), getFlinkCatalogType());
return allProperties;
}

/**
* Transforms a Flink configuration key to a corresponding Gravitino catalog property key. This
* method is used to map Flink-specific configuration keys to Gravitino catalog properties.
*
* @param configKey The Flink configuration key to be transformed.
* @return The corresponding Gravitino catalog property key, or {@code null} if no transformation
* is needed.
*/
String transformPropertyToGravitinoCatalog(String configKey);

/**
* Transforms a map of properties from Gravitino catalog properties to Flink connector properties.
* This method is used to convert properties that are specific to Gravitino into a format that can
* be understood by the Flink connector.
*
* @param allProperties A map of properties to be transformed.
* @return A map of properties transformed into Flink connector properties.
*/
default Map<String, String> transformPropertiesToFlinkCatalog(Map<String, String> allProperties) {
return allProperties;
}

/**
Expand Down Expand Up @@ -90,4 +145,12 @@ default Map<String, String> toFlinkTableProperties(Map<String, String> gravitino
default Map<String, String> toGravitinoTableProperties(Map<String, String> flinkProperties) {
return flinkProperties;
}

/**
* Retrieves the Flink catalog type associated with this converter. This method is used to
* determine the type of Flink catalog that this converter is designed for.
*
* @return The Flink catalog type.
*/
String getFlinkCatalogType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import com.google.common.collect.Maps;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.gravitino.catalog.hive.HiveConstants;
import org.apache.gravitino.flink.connector.PropertiesConverter;
import org.apache.hadoop.hive.conf.HiveConf;
Expand All @@ -34,46 +32,24 @@ public class HivePropertiesConverter implements PropertiesConverter {
private HivePropertiesConverter() {}

public static final HivePropertiesConverter INSTANCE = new HivePropertiesConverter();

private static final Map<String, String> HIVE_CATALOG_CONFIG_TO_GRAVITINO =
ImmutableMap.of(HiveConf.ConfVars.METASTOREURIS.varname, HiveConstants.METASTORE_URIS);
private static final Map<String, String> GRAVITINO_CONFIG_TO_HIVE =
ImmutableMap.of(HiveConstants.METASTORE_URIS, HiveConf.ConfVars.METASTOREURIS.varname);

@Override
public Map<String, String> toGravitinoCatalogProperties(Configuration flinkConf) {
Map<String, String> gravitinoProperties = Maps.newHashMap();

for (Map.Entry<String, String> entry : flinkConf.toMap().entrySet()) {
String gravitinoKey = HIVE_CATALOG_CONFIG_TO_GRAVITINO.get(entry.getKey());
if (gravitinoKey != null) {
gravitinoProperties.put(gravitinoKey, entry.getValue());
} else if (!entry.getKey().startsWith(FLINK_PROPERTY_PREFIX)) {
gravitinoProperties.put(FLINK_PROPERTY_PREFIX + entry.getKey(), entry.getValue());
} else {
gravitinoProperties.put(entry.getKey(), entry.getValue());
}
}

return gravitinoProperties;
public String transformPropertyToGravitinoCatalog(String configKey) {
return HIVE_CATALOG_CONFIG_TO_GRAVITINO.get(configKey);
}

@Override
public Map<String, String> toFlinkCatalogProperties(Map<String, String> gravitinoProperties) {
Map<String, String> flinkCatalogProperties = Maps.newHashMap();
flinkCatalogProperties.put(
CommonCatalogOptions.CATALOG_TYPE.key(), GravitinoHiveCatalogFactoryOptions.IDENTIFIER);

gravitinoProperties.forEach(
public Map<String, String> transformPropertiesToFlinkCatalog(Map<String, String> allProperties) {
Map<String, String> all = Maps.newHashMap();
allProperties.forEach(
(key, value) -> {
String flinkConfigKey = key;
if (key.startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX)) {
flinkConfigKey = key.substring(PropertiesConverter.FLINK_PROPERTY_PREFIX.length());
}
flinkCatalogProperties.put(
GRAVITINO_CONFIG_TO_HIVE.getOrDefault(flinkConfigKey, flinkConfigKey), value);
all.put(GRAVITINO_CONFIG_TO_HIVE.getOrDefault(key, key), value);
});
return flinkCatalogProperties;
return all;
}

@Override
Expand All @@ -95,4 +71,9 @@ public Map<String, String> toFlinkTableProperties(Map<String, String> gravitinoP
properties.put("connector", "hive");
return properties;
}

@Override
public String getFlinkCatalogType() {
return GravitinoHiveCatalogFactoryOptions.IDENTIFIER;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@

package org.apache.gravitino.flink.connector.iceberg;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergPropertiesUtils;
import org.apache.gravitino.flink.connector.PropertiesConverter;
Expand All @@ -38,36 +37,23 @@ private IcebergPropertiesConverter() {}
IcebergConstants.CATALOG_BACKEND, IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE);

@Override
public Map<String, String> toFlinkCatalogProperties(Map<String, String> gravitinoProperties) {
Preconditions.checkArgument(
gravitinoProperties != null, "Iceberg Catalog properties should not be null.");

Map<String, String> all = new HashMap<>();
if (gravitinoProperties != null) {
gravitinoProperties.forEach(
(k, v) -> {
if (k.startsWith(FLINK_PROPERTY_PREFIX)) {
String newKey = k.substring(FLINK_PROPERTY_PREFIX.length());
all.put(newKey, v);
}
});
}
Map<String, String> transformedProperties =
IcebergPropertiesUtils.toIcebergCatalogProperties(gravitinoProperties);
public String transformPropertyToGravitinoCatalog(String configKey) {
return IcebergPropertiesUtils.ICEBERG_CATALOG_CONFIG_TO_GRAVITINO.get(configKey);
}

if (transformedProperties != null) {
all.putAll(transformedProperties);
}
all.put(
CommonCatalogOptions.CATALOG_TYPE.key(), GravitinoIcebergCatalogFactoryOptions.IDENTIFIER);
// Map "catalog-backend" to "catalog-type".
// TODO If catalog backend is CUSTOM, we need special compatibility logic.
GRAVITINO_CONFIG_TO_FLINK_ICEBERG.forEach(
@Override
public Map<String, String> transformPropertiesToFlinkCatalog(Map<String, String> allProperties) {
Map<String, String> all = Maps.newHashMap();
allProperties.forEach(
(key, value) -> {
if (all.containsKey(key)) {
String config = all.remove(key);
all.put(value, config);
String icebergConfigKey = key;
if (IcebergPropertiesUtils.GRAVITINO_CONFIG_TO_ICEBERG.containsKey(key)) {
icebergConfigKey = IcebergPropertiesUtils.GRAVITINO_CONFIG_TO_ICEBERG.get(key);
}
if (GRAVITINO_CONFIG_TO_FLINK_ICEBERG.containsKey(key)) {
icebergConfigKey = GRAVITINO_CONFIG_TO_FLINK_ICEBERG.get(key);
}
all.put(icebergConfigKey, value);
});
return all;
}
Expand All @@ -78,7 +64,7 @@ public Map<String, String> toGravitinoTableProperties(Map<String, String> proper
}

@Override
public Map<String, String> toFlinkTableProperties(Map<String, String> properties) {
return new HashMap<>(properties);
public String getFlinkCatalogType() {
return GravitinoIcebergCatalogFactoryOptions.IDENTIFIER;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,7 @@

package org.apache.gravitino.flink.connector.paimon;

import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants;
import org.apache.gravitino.catalog.lakehouse.paimon.PaimonPropertiesUtils;
import org.apache.gravitino.flink.connector.PropertiesConverter;
Expand All @@ -36,45 +32,27 @@ public class PaimonPropertiesConverter implements PropertiesConverter {
private PaimonPropertiesConverter() {}

@Override
public Map<String, String> toGravitinoCatalogProperties(Configuration flinkConf) {
Map<String, String> gravitinoProperties = Maps.newHashMap();
Map<String, String> flinkConfMap = flinkConf.toMap();
for (Map.Entry<String, String> entry : flinkConfMap.entrySet()) {
String gravitinoKey =
PaimonPropertiesUtils.PAIMON_CATALOG_CONFIG_TO_GRAVITINO.get(entry.getKey());
if (gravitinoKey != null) {
gravitinoProperties.put(gravitinoKey, entry.getValue());
} else if (!entry.getKey().startsWith(FLINK_PROPERTY_PREFIX)) {
gravitinoProperties.put(FLINK_PROPERTY_PREFIX + entry.getKey(), entry.getValue());
} else {
gravitinoProperties.put(entry.getKey(), entry.getValue());
}
public String transformPropertyToGravitinoCatalog(String configKey) {
if (configKey.equalsIgnoreCase(PaimonConstants.METASTORE)) {
return PaimonConstants.CATALOG_BACKEND;
}
gravitinoProperties.put(
PaimonConstants.CATALOG_BACKEND,
flinkConfMap.getOrDefault(PaimonConstants.METASTORE, FileSystemCatalogFactory.IDENTIFIER));
return gravitinoProperties;
return PaimonPropertiesUtils.PAIMON_CATALOG_CONFIG_TO_GRAVITINO.get(configKey);
}

@Override
public Map<String, String> toFlinkCatalogProperties(Map<String, String> gravitinoProperties) {
Map<String, String> all = new HashMap<>();
gravitinoProperties.forEach(
(key, value) -> {
String flinkConfigKey = key;
if (key.startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX)) {
flinkConfigKey = key.substring(PropertiesConverter.FLINK_PROPERTY_PREFIX.length());
}
all.put(flinkConfigKey, value);
});
Map<String, String> paimonCatalogProperties =
PaimonPropertiesUtils.toPaimonCatalogProperties(all);
paimonCatalogProperties.put(
PaimonConstants.METASTORE,
paimonCatalogProperties.getOrDefault(
PaimonConstants.CATALOG_BACKEND, FileSystemCatalogFactory.IDENTIFIER));
paimonCatalogProperties.put(
CommonCatalogOptions.CATALOG_TYPE.key(), GravitinoPaimonCatalogFactoryOptions.IDENTIFIER);
return paimonCatalogProperties;
public Map<String, String> transformPropertiesToFlinkCatalog(Map<String, String> allProperties) {
Map<String, String> converted = PaimonPropertiesUtils.toPaimonCatalogProperties(allProperties);
if (allProperties.containsKey(PaimonConstants.CATALOG_BACKEND)) {
converted.put(
PaimonConstants.METASTORE,
allProperties.getOrDefault(
PaimonConstants.CATALOG_BACKEND, FileSystemCatalogFactory.IDENTIFIER));
}
return converted;
}

@Override
public String getFlinkCatalogType() {
return GravitinoPaimonCatalogFactoryOptions.IDENTIFIER;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ void testCatalogPropertiesWithHiveBackend() {
"hive-uri",
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE,
"hive-warehouse",
"key1",
"flink.bypass.key1",
"value1"));
Assertions.assertEquals(
ImmutableMap.of(
Expand All @@ -50,7 +50,9 @@ void testCatalogPropertiesWithHiveBackend() {
IcebergPropertiesConstants.ICEBERG_CATALOG_URI,
"hive-uri",
IcebergPropertiesConstants.ICEBERG_CATALOG_WAREHOUSE,
"hive-warehouse"),
"hive-warehouse",
"key1",
"value1"),
properties);
}

Expand All @@ -65,7 +67,7 @@ void testCatalogPropertiesWithRestBackend() {
"rest-uri",
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE,
"rest-warehouse",
"key1",
"flink.bypass.key1",
"value1"));
Assertions.assertEquals(
ImmutableMap.of(
Expand All @@ -76,7 +78,9 @@ void testCatalogPropertiesWithRestBackend() {
IcebergPropertiesConstants.ICEBERG_CATALOG_URI,
"rest-uri",
IcebergPropertiesConstants.ICEBERG_CATALOG_WAREHOUSE,
"rest-warehouse"),
"rest-warehouse",
"key1",
"value1"),
properties);
}
}
Loading

0 comments on commit bb82992

Please sign in to comment.