Skip to content

Commit

Permalink
fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Jan 20, 2025
1 parent 97bc2a7 commit c2526d0
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,19 @@ public interface PropertiesConverter {

/**
* Converts properties from application provided properties and Flink connector properties to
* Gravitino properties.
* Gravitino properties.This method processes the Flink configuration and transforms it into a
* format suitable for the Gravitino catalog.
*
* @param flinkConf The configuration provided by Flink.
* @return properties for the Gravitino catalog.
* @param flinkConf The Flink configuration containing connector properties. This includes both
* Flink-specific properties and any user-provided properties.
* @return A map of properties converted for use in the Gravitino catalog. The returned map
* includes both directly transformed properties and bypass properties prefixed with {@link
* #FLINK_PROPERTY_PREFIX}.
*/
default Map<String, String> toGravitinoCatalogProperties(Configuration flinkConf) {
Map<String, String> gravitinoProperties = Maps.newHashMap();
for (Map.Entry<String, String> entry : flinkConf.toMap().entrySet()) {
String gravitinoKey = transformPropertiesToGravitinoCatalog(entry.getKey());
String gravitinoKey = transformPropertyToGravitinoCatalog(entry.getKey());
if (gravitinoKey != null) {
gravitinoProperties.put(gravitinoKey, entry.getValue());
} else if (!entry.getKey().startsWith(FLINK_PROPERTY_PREFIX)) {
Expand All @@ -55,10 +59,15 @@ default Map<String, String> toGravitinoCatalogProperties(Configuration flinkConf
}

/**
* Converts properties from Gravitino properties to Flink connector properties.
* Converts properties from Gravitino catalog properties to Flink connector properties. This
* method processes the Gravitino properties and transforms them into a format suitable for the
* Flink connector.
*
* @param gravitinoProperties The properties provided by Gravitino.
* @return properties for the Flink connector.
* @param gravitinoProperties The properties provided by the Gravitino catalog. This includes both
* Gravitino-specific properties and any bypass properties prefixed with {@link
* #FLINK_PROPERTY_PREFIX}.
* @return A map of properties converted for use in the Flink connector. The returned map includes
* both transformed properties and the Flink catalog type.
*/
default Map<String, String> toFlinkCatalogProperties(Map<String, String> gravitinoProperties) {
Map<String, String> all = Maps.newHashMap();
Expand All @@ -75,8 +84,24 @@ default Map<String, String> toFlinkCatalogProperties(Map<String, String> graviti
return allProperties;
}

String transformPropertiesToGravitinoCatalog(String configKey);
/**
* Transforms a Flink configuration key to a corresponding Gravitino catalog property key. This
* method is used to map Flink-specific configuration keys to Gravitino catalog properties.
*
* @param configKey The Flink configuration key to be transformed.
* @return The corresponding Gravitino catalog property key, or {@code null} if no transformation
* is needed.
*/
String transformPropertyToGravitinoCatalog(String configKey);

/**
* Transforms a map of properties from Gravitino catalog properties to Flink connector properties.
* This method is used to convert properties that are specific to Gravitino into a format that can
* be understood by the Flink connector.
*
* @param allProperties A map of properties to be transformed.
* @return A map of properties transformed into Flink connector properties.
*/
default Map<String, String> transformPropertiesToFlinkCatalog(Map<String, String> allProperties) {
return allProperties;
}
Expand Down Expand Up @@ -121,5 +146,11 @@ default Map<String, String> toGravitinoTableProperties(Map<String, String> flink
return flinkProperties;
}

/**
* Retrieves the Flink catalog type associated with this converter. This method is used to
* determine the type of Flink catalog that this converter is designed for.
*
* @return The Flink catalog type.
*/
String getFlinkCatalogType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ private HivePropertiesConverter() {}
ImmutableMap.of(HiveConstants.METASTORE_URIS, HiveConf.ConfVars.METASTOREURIS.varname);

@Override
public String transformPropertiesToGravitinoCatalog(String configKey) {
public String transformPropertyToGravitinoCatalog(String configKey) {
return HIVE_CATALOG_CONFIG_TO_GRAVITINO.get(configKey);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ private IcebergPropertiesConverter() {}
IcebergConstants.CATALOG_BACKEND, IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE);

@Override
public String transformPropertiesToGravitinoCatalog(String configKey) {
public String transformPropertyToGravitinoCatalog(String configKey) {
return IcebergPropertiesUtils.ICEBERG_CATALOG_CONFIG_TO_GRAVITINO.get(configKey);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.gravitino.flink.connector.paimon;

import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants;
import org.apache.gravitino.catalog.lakehouse.paimon.PaimonPropertiesUtils;
import org.apache.gravitino.flink.connector.PropertiesConverter;
Expand All @@ -33,35 +32,23 @@ public class PaimonPropertiesConverter implements PropertiesConverter {
private PaimonPropertiesConverter() {}

@Override
public Map<String, String> toGravitinoCatalogProperties(Configuration flinkConf) {
Map<String, String> gravitinoProperties =
PropertiesConverter.super.toGravitinoCatalogProperties(flinkConf);
Map<String, String> flinkConfMap = flinkConf.toMap();
gravitinoProperties.put(
PaimonConstants.CATALOG_BACKEND,
flinkConfMap.getOrDefault(PaimonConstants.METASTORE, FileSystemCatalogFactory.IDENTIFIER));
return gravitinoProperties;
}

@Override
public String transformPropertiesToGravitinoCatalog(String configKey) {
public String transformPropertyToGravitinoCatalog(String configKey) {
if (configKey.equalsIgnoreCase(PaimonConstants.METASTORE)) {
return PaimonConstants.CATALOG_BACKEND;
}
return PaimonPropertiesUtils.PAIMON_CATALOG_CONFIG_TO_GRAVITINO.get(configKey);
}

@Override
public Map<String, String> toFlinkCatalogProperties(Map<String, String> gravitinoProperties) {
Map<String, String> paimonCatalogProperties =
PropertiesConverter.super.toFlinkCatalogProperties(gravitinoProperties);
paimonCatalogProperties.put(
PaimonConstants.METASTORE,
paimonCatalogProperties.getOrDefault(
PaimonConstants.CATALOG_BACKEND, FileSystemCatalogFactory.IDENTIFIER));
return paimonCatalogProperties;
}

@Override
public Map<String, String> transformPropertiesToFlinkCatalog(Map<String, String> allProperties) {
return PaimonPropertiesUtils.toPaimonCatalogProperties(allProperties);
Map<String, String> converted = PaimonPropertiesUtils.toPaimonCatalogProperties(allProperties);
if (allProperties.containsKey(PaimonConstants.CATALOG_BACKEND)) {
converted.put(
PaimonConstants.METASTORE,
allProperties.getOrDefault(
PaimonConstants.CATALOG_BACKEND, FileSystemCatalogFactory.IDENTIFIER));
}
return converted;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,6 @@
import org.apache.gravitino.Catalog;
import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants;
import org.apache.gravitino.flink.connector.integration.test.FlinkCommonIT;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

@Tag("gravitino-docker-test")
Expand Down Expand Up @@ -98,7 +93,7 @@ public void testCreateGravitinoPaimonCatalogUsingSQL() {
"create catalog %s with ("
+ "'type'='gravitino-paimon', "
+ "'warehouse'='%s',"
+ "'catalog.backend'='filesystem'"
+ "'metastore'='filesystem'"
+ ")",
catalogName, warehouse));
String[] catalogs = tableEnv.listCatalogs();
Expand Down

0 comments on commit c2526d0

Please sign in to comment.