Skip to content

Commit

Permalink
fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Jan 20, 2025
1 parent 97bc2a7 commit fcc9d8d
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,19 @@ public interface PropertiesConverter {

/**
* Converts properties from application provided properties and Flink connector properties to
* Gravitino properties.
* Gravitino properties.This method processes the Flink configuration and transforms it into a
* format suitable for the Gravitino catalog.
*
* @param flinkConf The configuration provided by Flink.
* @return properties for the Gravitino catalog.
* @param flinkConf The Flink configuration containing connector properties. This includes both
* Flink-specific properties and any user-provided properties.
* @return A map of properties converted for use in the Gravitino catalog. The returned map
* includes both directly transformed properties and bypass properties prefixed with {@link
* #FLINK_PROPERTY_PREFIX}.
*/
default Map<String, String> toGravitinoCatalogProperties(Configuration flinkConf) {
Map<String, String> gravitinoProperties = Maps.newHashMap();
for (Map.Entry<String, String> entry : flinkConf.toMap().entrySet()) {
String gravitinoKey = transformPropertiesToGravitinoCatalog(entry.getKey());
String gravitinoKey = transformPropertyToGravitinoCatalog(entry.getKey());
if (gravitinoKey != null) {
gravitinoProperties.put(gravitinoKey, entry.getValue());
} else if (!entry.getKey().startsWith(FLINK_PROPERTY_PREFIX)) {
Expand All @@ -55,10 +59,15 @@ default Map<String, String> toGravitinoCatalogProperties(Configuration flinkConf
}

/**
* Converts properties from Gravitino properties to Flink connector properties.
* Converts properties from Gravitino catalog properties to Flink connector properties. This
* method processes the Gravitino properties and transforms them into a format suitable for the
* Flink connector.
*
* @param gravitinoProperties The properties provided by Gravitino.
* @return properties for the Flink connector.
* @param gravitinoProperties The properties provided by the Gravitino catalog. This includes both
* Gravitino-specific properties and any bypass properties prefixed with {@link
* #FLINK_PROPERTY_PREFIX}.
* @return A map of properties converted for use in the Flink connector. The returned map includes
* both transformed properties and the Flink catalog type.
*/
default Map<String, String> toFlinkCatalogProperties(Map<String, String> gravitinoProperties) {
Map<String, String> all = Maps.newHashMap();
Expand All @@ -75,8 +84,24 @@ default Map<String, String> toFlinkCatalogProperties(Map<String, String> graviti
return allProperties;
}

String transformPropertiesToGravitinoCatalog(String configKey);
/**
* Transforms a Flink configuration key to a corresponding Gravitino catalog property key. This
* method is used to map Flink-specific configuration keys to Gravitino catalog properties.
*
* @param configKey The Flink configuration key to be transformed.
* @return The corresponding Gravitino catalog property key, or {@code null} if no transformation
* is needed.
*/
String transformPropertyToGravitinoCatalog(String configKey);

/**
* Transforms a map of properties from Gravitino catalog properties to Flink connector properties.
* This method is used to convert properties that are specific to Gravitino into a format that can
* be understood by the Flink connector.
*
* @param allProperties A map of properties to be transformed.
* @return A map of properties transformed into Flink connector properties.
*/
default Map<String, String> transformPropertiesToFlinkCatalog(Map<String, String> allProperties) {
return allProperties;
}
Expand Down Expand Up @@ -121,5 +146,11 @@ default Map<String, String> toGravitinoTableProperties(Map<String, String> flink
return flinkProperties;
}

/**
* Retrieves the Flink catalog type associated with this converter. This method is used to
* determine the type of Flink catalog that this converter is designed for.
*
* @return The Flink catalog type.
*/
String getFlinkCatalogType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ private HivePropertiesConverter() {}
ImmutableMap.of(HiveConstants.METASTORE_URIS, HiveConf.ConfVars.METASTOREURIS.varname);

@Override
public String transformPropertiesToGravitinoCatalog(String configKey) {
public String transformPropertyToGravitinoCatalog(String configKey) {
return HIVE_CATALOG_CONFIG_TO_GRAVITINO.get(configKey);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ private IcebergPropertiesConverter() {}
IcebergConstants.CATALOG_BACKEND, IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE);

@Override
public String transformPropertiesToGravitinoCatalog(String configKey) {
public String transformPropertyToGravitinoCatalog(String configKey) {
return IcebergPropertiesUtils.ICEBERG_CATALOG_CONFIG_TO_GRAVITINO.get(configKey);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public Map<String, String> toGravitinoCatalogProperties(Configuration flinkConf)
}

@Override
public String transformPropertiesToGravitinoCatalog(String configKey) {
public String transformPropertyToGravitinoCatalog(String configKey) {
return PaimonPropertiesUtils.PAIMON_CATALOG_CONFIG_TO_GRAVITINO.get(configKey);
}

Expand Down

0 comments on commit fcc9d8d

Please sign in to comment.