diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PropertiesConverter.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PropertiesConverter.java index efc6411b951..b47c9dbaaa0 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PropertiesConverter.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PropertiesConverter.java @@ -34,15 +34,19 @@ public interface PropertiesConverter { /** * Converts properties from application provided properties and Flink connector properties to - * Gravitino properties. + * Gravitino properties.This method processes the Flink configuration and transforms it into a + * format suitable for the Gravitino catalog. * - * @param flinkConf The configuration provided by Flink. - * @return properties for the Gravitino catalog. + * @param flinkConf The Flink configuration containing connector properties. This includes both + * Flink-specific properties and any user-provided properties. + * @return A map of properties converted for use in the Gravitino catalog. The returned map + * includes both directly transformed properties and bypass properties prefixed with {@link + * #FLINK_PROPERTY_PREFIX}. */ default Map toGravitinoCatalogProperties(Configuration flinkConf) { Map gravitinoProperties = Maps.newHashMap(); for (Map.Entry entry : flinkConf.toMap().entrySet()) { - String gravitinoKey = transformPropertiesToGravitinoCatalog(entry.getKey()); + String gravitinoKey = transformPropertyToGravitinoCatalog(entry.getKey()); if (gravitinoKey != null) { gravitinoProperties.put(gravitinoKey, entry.getValue()); } else if (!entry.getKey().startsWith(FLINK_PROPERTY_PREFIX)) { @@ -55,10 +59,15 @@ default Map toGravitinoCatalogProperties(Configuration flinkConf } /** - * Converts properties from Gravitino properties to Flink connector properties. + * Converts properties from Gravitino catalog properties to Flink connector properties. This + * method processes the Gravitino properties and transforms them into a format suitable for the + * Flink connector. * - * @param gravitinoProperties The properties provided by Gravitino. - * @return properties for the Flink connector. + * @param gravitinoProperties The properties provided by the Gravitino catalog. This includes both + * Gravitino-specific properties and any bypass properties prefixed with {@link + * #FLINK_PROPERTY_PREFIX}. + * @return A map of properties converted for use in the Flink connector. The returned map includes + * both transformed properties and the Flink catalog type. */ default Map toFlinkCatalogProperties(Map gravitinoProperties) { Map all = Maps.newHashMap(); @@ -75,8 +84,24 @@ default Map toFlinkCatalogProperties(Map graviti return allProperties; } - String transformPropertiesToGravitinoCatalog(String configKey); + /** + * Transforms a Flink configuration key to a corresponding Gravitino catalog property key. This + * method is used to map Flink-specific configuration keys to Gravitino catalog properties. + * + * @param configKey The Flink configuration key to be transformed. + * @return The corresponding Gravitino catalog property key, or {@code null} if no transformation + * is needed. + */ + String transformPropertyToGravitinoCatalog(String configKey); + /** + * Transforms a map of properties from Gravitino catalog properties to Flink connector properties. + * This method is used to convert properties that are specific to Gravitino into a format that can + * be understood by the Flink connector. + * + * @param allProperties A map of properties to be transformed. + * @return A map of properties transformed into Flink connector properties. + */ default Map transformPropertiesToFlinkCatalog(Map allProperties) { return allProperties; } @@ -121,5 +146,11 @@ default Map toGravitinoTableProperties(Map flink return flinkProperties; } + /** + * Retrieves the Flink catalog type associated with this converter. This method is used to + * determine the type of Flink catalog that this converter is designed for. + * + * @return The Flink catalog type. + */ String getFlinkCatalogType(); } diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HivePropertiesConverter.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HivePropertiesConverter.java index fbd17002df0..34eda6d81ed 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HivePropertiesConverter.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HivePropertiesConverter.java @@ -38,7 +38,7 @@ private HivePropertiesConverter() {} ImmutableMap.of(HiveConstants.METASTORE_URIS, HiveConf.ConfVars.METASTOREURIS.varname); @Override - public String transformPropertiesToGravitinoCatalog(String configKey) { + public String transformPropertyToGravitinoCatalog(String configKey) { return HIVE_CATALOG_CONFIG_TO_GRAVITINO.get(configKey); } diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java index 5b0acd2f78b..d2a237f0ee7 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java @@ -37,7 +37,7 @@ private IcebergPropertiesConverter() {} IcebergConstants.CATALOG_BACKEND, IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE); @Override - public String transformPropertiesToGravitinoCatalog(String configKey) { + public String transformPropertyToGravitinoCatalog(String configKey) { return IcebergPropertiesUtils.ICEBERG_CATALOG_CONFIG_TO_GRAVITINO.get(configKey); } diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java index 0551c8720d1..b17619421f5 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java @@ -44,7 +44,7 @@ public Map toGravitinoCatalogProperties(Configuration flinkConf) } @Override - public String transformPropertiesToGravitinoCatalog(String configKey) { + public String transformPropertyToGravitinoCatalog(String configKey) { return PaimonPropertiesUtils.PAIMON_CATALOG_CONFIG_TO_GRAVITINO.get(configKey); }