diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PropertiesConverter.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PropertiesConverter.java index efc6411b951..b47c9dbaaa0 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PropertiesConverter.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PropertiesConverter.java @@ -34,15 +34,19 @@ public interface PropertiesConverter { /** * Converts properties from application provided properties and Flink connector properties to - * Gravitino properties. + * Gravitino properties.This method processes the Flink configuration and transforms it into a + * format suitable for the Gravitino catalog. * - * @param flinkConf The configuration provided by Flink. - * @return properties for the Gravitino catalog. + * @param flinkConf The Flink configuration containing connector properties. This includes both + * Flink-specific properties and any user-provided properties. + * @return A map of properties converted for use in the Gravitino catalog. The returned map + * includes both directly transformed properties and bypass properties prefixed with {@link + * #FLINK_PROPERTY_PREFIX}. */ default Map toGravitinoCatalogProperties(Configuration flinkConf) { Map gravitinoProperties = Maps.newHashMap(); for (Map.Entry entry : flinkConf.toMap().entrySet()) { - String gravitinoKey = transformPropertiesToGravitinoCatalog(entry.getKey()); + String gravitinoKey = transformPropertyToGravitinoCatalog(entry.getKey()); if (gravitinoKey != null) { gravitinoProperties.put(gravitinoKey, entry.getValue()); } else if (!entry.getKey().startsWith(FLINK_PROPERTY_PREFIX)) { @@ -55,10 +59,15 @@ default Map toGravitinoCatalogProperties(Configuration flinkConf } /** - * Converts properties from Gravitino properties to Flink connector properties. + * Converts properties from Gravitino catalog properties to Flink connector properties. This + * method processes the Gravitino properties and transforms them into a format suitable for the + * Flink connector. * - * @param gravitinoProperties The properties provided by Gravitino. - * @return properties for the Flink connector. + * @param gravitinoProperties The properties provided by the Gravitino catalog. This includes both + * Gravitino-specific properties and any bypass properties prefixed with {@link + * #FLINK_PROPERTY_PREFIX}. + * @return A map of properties converted for use in the Flink connector. The returned map includes + * both transformed properties and the Flink catalog type. */ default Map toFlinkCatalogProperties(Map gravitinoProperties) { Map all = Maps.newHashMap(); @@ -75,8 +84,24 @@ default Map toFlinkCatalogProperties(Map graviti return allProperties; } - String transformPropertiesToGravitinoCatalog(String configKey); + /** + * Transforms a Flink configuration key to a corresponding Gravitino catalog property key. This + * method is used to map Flink-specific configuration keys to Gravitino catalog properties. + * + * @param configKey The Flink configuration key to be transformed. + * @return The corresponding Gravitino catalog property key, or {@code null} if no transformation + * is needed. + */ + String transformPropertyToGravitinoCatalog(String configKey); + /** + * Transforms a map of properties from Gravitino catalog properties to Flink connector properties. + * This method is used to convert properties that are specific to Gravitino into a format that can + * be understood by the Flink connector. + * + * @param allProperties A map of properties to be transformed. + * @return A map of properties transformed into Flink connector properties. + */ default Map transformPropertiesToFlinkCatalog(Map allProperties) { return allProperties; } @@ -121,5 +146,11 @@ default Map toGravitinoTableProperties(Map flink return flinkProperties; } + /** + * Retrieves the Flink catalog type associated with this converter. This method is used to + * determine the type of Flink catalog that this converter is designed for. + * + * @return The Flink catalog type. + */ String getFlinkCatalogType(); } diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HivePropertiesConverter.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HivePropertiesConverter.java index fbd17002df0..34eda6d81ed 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HivePropertiesConverter.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HivePropertiesConverter.java @@ -38,7 +38,7 @@ private HivePropertiesConverter() {} ImmutableMap.of(HiveConstants.METASTORE_URIS, HiveConf.ConfVars.METASTOREURIS.varname); @Override - public String transformPropertiesToGravitinoCatalog(String configKey) { + public String transformPropertyToGravitinoCatalog(String configKey) { return HIVE_CATALOG_CONFIG_TO_GRAVITINO.get(configKey); } diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java index 5b0acd2f78b..d2a237f0ee7 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java @@ -37,7 +37,7 @@ private IcebergPropertiesConverter() {} IcebergConstants.CATALOG_BACKEND, IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE); @Override - public String transformPropertiesToGravitinoCatalog(String configKey) { + public String transformPropertyToGravitinoCatalog(String configKey) { return IcebergPropertiesUtils.ICEBERG_CATALOG_CONFIG_TO_GRAVITINO.get(configKey); } diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java index 0551c8720d1..2f4defd72cd 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java @@ -20,7 +20,6 @@ package org.apache.gravitino.flink.connector.paimon; import java.util.Map; -import org.apache.flink.configuration.Configuration; import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants; import org.apache.gravitino.catalog.lakehouse.paimon.PaimonPropertiesUtils; import org.apache.gravitino.flink.connector.PropertiesConverter; @@ -33,35 +32,23 @@ public class PaimonPropertiesConverter implements PropertiesConverter { private PaimonPropertiesConverter() {} @Override - public Map toGravitinoCatalogProperties(Configuration flinkConf) { - Map gravitinoProperties = - PropertiesConverter.super.toGravitinoCatalogProperties(flinkConf); - Map flinkConfMap = flinkConf.toMap(); - gravitinoProperties.put( - PaimonConstants.CATALOG_BACKEND, - flinkConfMap.getOrDefault(PaimonConstants.METASTORE, FileSystemCatalogFactory.IDENTIFIER)); - return gravitinoProperties; - } - - @Override - public String transformPropertiesToGravitinoCatalog(String configKey) { + public String transformPropertyToGravitinoCatalog(String configKey) { + if (configKey.equalsIgnoreCase(PaimonConstants.METASTORE)) { + return PaimonConstants.CATALOG_BACKEND; + } return PaimonPropertiesUtils.PAIMON_CATALOG_CONFIG_TO_GRAVITINO.get(configKey); } - @Override - public Map toFlinkCatalogProperties(Map gravitinoProperties) { - Map paimonCatalogProperties = - PropertiesConverter.super.toFlinkCatalogProperties(gravitinoProperties); - paimonCatalogProperties.put( - PaimonConstants.METASTORE, - paimonCatalogProperties.getOrDefault( - PaimonConstants.CATALOG_BACKEND, FileSystemCatalogFactory.IDENTIFIER)); - return paimonCatalogProperties; - } - @Override public Map transformPropertiesToFlinkCatalog(Map allProperties) { - return PaimonPropertiesUtils.toPaimonCatalogProperties(allProperties); + Map converted = PaimonPropertiesUtils.toPaimonCatalogProperties(allProperties); + if (allProperties.containsKey(PaimonConstants.CATALOG_BACKEND)) { + converted.put( + PaimonConstants.METASTORE, + allProperties.getOrDefault( + PaimonConstants.CATALOG_BACKEND, FileSystemCatalogFactory.IDENTIFIER)); + } + return converted; } @Override diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java index a03b4a198e1..a73b257bedb 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java @@ -25,11 +25,6 @@ import org.apache.gravitino.Catalog; import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants; import org.apache.gravitino.flink.connector.integration.test.FlinkCommonIT; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @Tag("gravitino-docker-test") @@ -98,7 +93,7 @@ public void testCreateGravitinoPaimonCatalogUsingSQL() { "create catalog %s with (" + "'type'='gravitino-paimon', " + "'warehouse'='%s'," - + "'catalog.backend'='filesystem'" + + "'metastore'='filesystem'" + ")", catalogName, warehouse)); String[] catalogs = tableEnv.listCatalogs();