From fd26b565aef1527fcd126b89c4e110180fa00a83 Mon Sep 17 00:00:00 2001 From: Xiaojian Sun Date: Thu, 16 Jan 2025 15:56:55 +0800 Subject: [PATCH] [#3515] feat(flink-connector): Support flink iceberg catalog (#5914) ### What changes were proposed in this pull request? Support flink iceberg catalog ### Why are the changes needed? Fix: [#3515](https://github.com/apache/gravitino/issues/3515) ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? FlinkIcebergCatalogIT FlinkIcebergHiveCatalogIT --- docs/flink-connector/flink-catalog-iceberg.md | 78 +++ flink-connector/flink/build.gradle.kts | 6 + .../catalog/GravitinoCatalogManager.java | 11 - .../iceberg/GravitinoIcebergCatalog.java | 67 +++ .../GravitinoIcebergCatalogFactory.java | 95 ++++ ...GravitinoIcebergCatalogFactoryOptions.java | 33 ++ .../iceberg/IcebergPropertiesConstants.java | 49 ++ .../iceberg/IcebergPropertiesConverter.java | 84 +++ .../org.apache.flink.table.factories.Factory | 3 +- .../TestIcebergPropertiesConverter.java | 82 +++ .../integration/test/FlinkCommonIT.java | 62 ++- .../integration/test/FlinkEnvIT.java | 37 +- .../test/hive/FlinkHiveCatalogIT.java | 42 +- .../test/iceberg/FlinkIcebergCatalogIT.java | 501 ++++++++++++++++++ .../iceberg/FlinkIcebergHiveCatalogIT.java | 46 ++ .../test/paimon/FlinkPaimonCatalogIT.java | 21 +- .../integration/test/utils/TestUtils.java | 3 +- 17 files changed, 1152 insertions(+), 68 deletions(-) create mode 100644 docs/flink-connector/flink-catalog-iceberg.md create mode 100644 flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalog.java create mode 100644 flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactory.java create mode 100644 flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactoryOptions.java create mode 100644 flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConstants.java create mode 100644 flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java create mode 100644 flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/iceberg/TestIcebergPropertiesConverter.java create mode 100644 flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java create mode 100644 flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergHiveCatalogIT.java diff --git a/docs/flink-connector/flink-catalog-iceberg.md b/docs/flink-connector/flink-catalog-iceberg.md new file mode 100644 index 00000000000..54d7c0879fb --- /dev/null +++ b/docs/flink-connector/flink-catalog-iceberg.md @@ -0,0 +1,78 @@ +--- +title: "Flink connector Iceberg catalog" +slug: /flink-connector/flink-catalog-iceberg +keyword: flink connector iceberg catalog +license: "This software is licensed under the Apache License version 2." +--- + +The Apache Gravitino Flink connector can be used to read and write Iceberg tables, with the metadata managed by the Gravitino server. +To enable the Flink connector, you must download the Iceberg Flink runtime JAR and place it in the Flink classpath. + +## Capabilities + +#### Supported DML and DDL operations: + +- `CREATE CATALOG` +- `CREATE DATABASE` +- `CREATE TABLE` +- `DROP TABLE` +- `ALTER TABLE` +- `INSERT INTO & OVERWRITE` +- `SELECT` + +#### Operations not supported: + +- Partition operations +- View operations +- Metadata tables, like: + - `{iceberg_catalog}.{iceberg_database}.{iceberg_table}&snapshots` +- Query UDF +- `UPDATE` clause +- `DELETE` clause +- `CREATE TABLE LIKE` clause + +## SQL example +```sql + +-- Suppose iceberg_a is the Iceberg catalog name managed by Gravitino + +USE iceberg_a; + +CREATE DATABASE IF NOT EXISTS mydatabase; +USE mydatabase; + +CREATE TABLE sample ( + id BIGINT COMMENT 'unique id', + data STRING NOT NULL +) PARTITIONED BY (data) +WITH ('format-version'='2'); + +INSERT INTO sample +VALUES (1, 'A'), (2, 'B'); + +SELECT * FROM sample WHERE data = 'B'; + +``` + +## Catalog properties + +The Gravitino Flink connector transforms the following properties in a catalog to Flink connector configuration. + + +| Gravitino catalog property name | Flink Iceberg connector configuration | Description | Since Version | +|---------------------------------|---------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------| +| `catalog-backend` | `catalog-type` | Catalog backend type, currently, only `Hive` Catalog is supported, `JDBC` and `Rest` in Continuous Validation | 0.8.0-incubating | +| `uri` | `uri` | Catalog backend URI | 0.8.0-incubating | +| `warehouse` | `warehouse` | Catalog backend warehouse | 0.8.0-incubating | +| `io-impl` | `io-impl` | The IO implementation for `FileIO` in Iceberg. | 0.8.0-incubating | +| `oss-endpoint` | `oss.endpoint` | The endpoint of Aliyun OSS service. | 0.8.0-incubating | +| `oss-access-key-id` | `client.access-key-id` | The static access key ID used to access OSS data. | 0.8.0-incubating | +| `oss-secret-access-key` | `client.access-key-secret` | The static secret access key used to access OSS data. | 0.8.0-incubating | + +Gravitino catalog property names with the prefix `flink.bypass.` are passed to Flink iceberg connector. For example, using `flink.bypass.clients` to pass the `clients` to the Flink iceberg connector. + +## Storage + +### OSS + +Additionally, you need download the [Aliyun OSS SDK](https://gosspublic.alicdn.com/sdks/java/aliyun_java_sdk_3.10.2.zip), and copy `aliyun-sdk-oss-3.10.2.jar`, `hamcrest-core-1.1.jar`, `jdom2-2.0.6.jar` to the Flink classpath. diff --git a/flink-connector/flink/build.gradle.kts b/flink-connector/flink/build.gradle.kts index f137a3eae1b..4c9bd036ae9 100644 --- a/flink-connector/flink/build.gradle.kts +++ b/flink-connector/flink/build.gradle.kts @@ -30,6 +30,8 @@ var paimonVersion: String = libs.versions.paimon.get() val flinkVersion: String = libs.versions.flink.get() val flinkMajorVersion: String = flinkVersion.substringBeforeLast(".") +val icebergVersion: String = libs.versions.iceberg.get() + // The Flink only support scala 2.12, and all scala api will be removed in a future version. // You can find more detail at the following issues: // https://issues.apache.org/jira/browse/FLINK-23986, @@ -44,6 +46,8 @@ dependencies { implementation(libs.guava) compileOnly(project(":clients:client-java-runtime", configuration = "shadow")) + + compileOnly("org.apache.iceberg:iceberg-flink-runtime-$flinkMajorVersion:$icebergVersion") compileOnly("org.apache.flink:flink-connector-hive_$scalaVersion:$flinkVersion") compileOnly("org.apache.flink:flink-table-common:$flinkVersion") compileOnly("org.apache.flink:flink-table-api-java:$flinkVersion") @@ -88,7 +92,9 @@ dependencies { testImplementation(libs.testcontainers) testImplementation(libs.testcontainers.junit.jupiter) testImplementation(libs.testcontainers.mysql) + testImplementation(libs.metrics.core) + testImplementation("org.apache.iceberg:iceberg-flink-runtime-$flinkMajorVersion:$icebergVersion") testImplementation("org.apache.flink:flink-connector-hive_$scalaVersion:$flinkVersion") testImplementation("org.apache.flink:flink-table-common:$flinkVersion") testImplementation("org.apache.flink:flink-table-api-java:$flinkVersion") diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java index 7693e5d4c9a..0b0b89f3a5a 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java @@ -35,12 +35,10 @@ public class GravitinoCatalogManager { private static GravitinoCatalogManager gravitinoCatalogManager; private volatile boolean isClosed = false; - private final String metalakeName; private final GravitinoMetalake metalake; private final GravitinoAdminClient gravitinoClient; private GravitinoCatalogManager(String gravitinoUri, String metalakeName) { - this.metalakeName = metalakeName; this.gravitinoClient = GravitinoAdminClient.builder(gravitinoUri).build(); this.metalake = gravitinoClient.loadMetalake(metalakeName); } @@ -99,15 +97,6 @@ public Catalog getGravitinoCatalogInfo(String name) { return catalog; } - /** - * Get the metalake. - * - * @return the metalake name. - */ - public String getMetalakeName() { - return metalakeName; - } - /** * Create catalog in Gravitino. * diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalog.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalog.java new file mode 100644 index 00000000000..30fac96bbc8 --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalog.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.flink.connector.iceberg; + +import java.util.Map; +import java.util.Optional; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.factories.Factory; +import org.apache.gravitino.flink.connector.PartitionConverter; +import org.apache.gravitino.flink.connector.PropertiesConverter; +import org.apache.gravitino.flink.connector.catalog.BaseCatalog; +import org.apache.iceberg.flink.FlinkCatalog; +import org.apache.iceberg.flink.FlinkCatalogFactory; + +/** Gravitino Iceberg Catalog. */ +public class GravitinoIcebergCatalog extends BaseCatalog { + + private final FlinkCatalog icebergCatalog; + + protected GravitinoIcebergCatalog( + String catalogName, + String defaultDatabase, + PropertiesConverter propertiesConverter, + PartitionConverter partitionConverter, + Map properties) { + super(catalogName, defaultDatabase, propertiesConverter, partitionConverter); + FlinkCatalogFactory flinkCatalogFactory = new FlinkCatalogFactory(); + this.icebergCatalog = (FlinkCatalog) flinkCatalogFactory.createCatalog(catalogName, properties); + } + + @Override + public void open() throws CatalogException { + icebergCatalog.open(); + } + + @Override + public void close() throws CatalogException { + icebergCatalog.close(); + } + + @Override + public Optional getFactory() { + return icebergCatalog.getFactory(); + } + + @Override + protected AbstractCatalog realCatalog() { + return icebergCatalog; + } +} diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactory.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactory.java new file mode 100644 index 00000000000..ad0363d9867 --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactory.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.flink.connector.iceberg; + +import java.util.Collections; +import java.util.Set; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.gravitino.flink.connector.DefaultPartitionConverter; +import org.apache.gravitino.flink.connector.PartitionConverter; +import org.apache.gravitino.flink.connector.PropertiesConverter; +import org.apache.gravitino.flink.connector.catalog.BaseCatalogFactory; +import org.apache.gravitino.flink.connector.utils.FactoryUtils; + +public class GravitinoIcebergCatalogFactory implements BaseCatalogFactory { + + @Override + public Catalog createCatalog(Context context) { + final FactoryUtil.CatalogFactoryHelper helper = + FactoryUtils.createCatalogFactoryHelper(this, context); + return new GravitinoIcebergCatalog( + context.getName(), + helper.getOptions().get(GravitinoIcebergCatalogFactoryOptions.DEFAULT_DATABASE), + propertiesConverter(), + partitionConverter(), + context.getOptions()); + } + + @Override + public String factoryIdentifier() { + return GravitinoIcebergCatalogFactoryOptions.IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + return Collections.emptySet(); + } + + @Override + public Set> optionalOptions() { + return Collections.emptySet(); + } + + /** + * Define gravitino catalog provider. + * + * @return + */ + @Override + public String gravitinoCatalogProvider() { + return "lakehouse-iceberg"; + } + + /** + * Define gravitino catalog type. + * + * @return + */ + @Override + public org.apache.gravitino.Catalog.Type gravitinoCatalogType() { + return org.apache.gravitino.Catalog.Type.RELATIONAL; + } + + /** + * Define properties converter. + * + * @return + */ + @Override + public PropertiesConverter propertiesConverter() { + return IcebergPropertiesConverter.INSTANCE; + } + + @Override + public PartitionConverter partitionConverter() { + return DefaultPartitionConverter.INSTANCE; + } +} diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactoryOptions.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactoryOptions.java new file mode 100644 index 00000000000..95e1a21de85 --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactoryOptions.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.iceberg; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.iceberg.flink.FlinkCatalogFactory; + +public class GravitinoIcebergCatalogFactoryOptions { + + public static final String IDENTIFIER = "gravitino-iceberg"; + public static final ConfigOption DEFAULT_DATABASE = + ConfigOptions.key(FlinkCatalogFactory.DEFAULT_DATABASE) + .stringType() + .defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME); +} diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConstants.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConstants.java new file mode 100644 index 00000000000..163cfac882e --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConstants.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.iceberg; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.flink.FlinkCatalogFactory; + +public class IcebergPropertiesConstants { + @VisibleForTesting + public static String GRAVITINO_ICEBERG_CATALOG_BACKEND = IcebergConstants.CATALOG_BACKEND; + + public static final String ICEBERG_CATALOG_TYPE = FlinkCatalogFactory.ICEBERG_CATALOG_TYPE; + + public static final String GRAVITINO_ICEBERG_CATALOG_WAREHOUSE = IcebergConstants.WAREHOUSE; + + public static final String ICEBERG_CATALOG_WAREHOUSE = CatalogProperties.WAREHOUSE_LOCATION; + + public static final String GRAVITINO_ICEBERG_CATALOG_URI = IcebergConstants.URI; + + public static final String ICEBERG_CATALOG_URI = CatalogProperties.URI; + + @VisibleForTesting + public static String ICEBERG_CATALOG_BACKEND_HIVE = CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE; + + public static final String GRAVITINO_ICEBERG_CATALOG_BACKEND_HIVE = "hive"; + + @VisibleForTesting + public static final String ICEBERG_CATALOG_BACKEND_REST = CatalogUtil.ICEBERG_CATALOG_TYPE_REST; +} diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java new file mode 100644 index 00000000000..7684d3eadbb --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.iceberg; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import java.util.HashMap; +import java.util.Map; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; +import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergPropertiesUtils; +import org.apache.gravitino.flink.connector.PropertiesConverter; + +public class IcebergPropertiesConverter implements PropertiesConverter { + public static IcebergPropertiesConverter INSTANCE = new IcebergPropertiesConverter(); + + private IcebergPropertiesConverter() {} + + private static final Map GRAVITINO_CONFIG_TO_FLINK_ICEBERG = + ImmutableMap.of( + IcebergConstants.CATALOG_BACKEND, IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE); + + @Override + public Map toFlinkCatalogProperties(Map gravitinoProperties) { + Preconditions.checkArgument( + gravitinoProperties != null, "Iceberg Catalog properties should not be null."); + + Map all = new HashMap<>(); + if (gravitinoProperties != null) { + gravitinoProperties.forEach( + (k, v) -> { + if (k.startsWith(FLINK_PROPERTY_PREFIX)) { + String newKey = k.substring(FLINK_PROPERTY_PREFIX.length()); + all.put(newKey, v); + } + }); + } + Map transformedProperties = + IcebergPropertiesUtils.toIcebergCatalogProperties(gravitinoProperties); + + if (transformedProperties != null) { + all.putAll(transformedProperties); + } + all.put( + CommonCatalogOptions.CATALOG_TYPE.key(), GravitinoIcebergCatalogFactoryOptions.IDENTIFIER); + // Map "catalog-backend" to "catalog-type". + // TODO If catalog backend is CUSTOM, we need special compatibility logic. + GRAVITINO_CONFIG_TO_FLINK_ICEBERG.forEach( + (key, value) -> { + if (all.containsKey(key)) { + String config = all.remove(key); + all.put(value, config); + } + }); + return all; + } + + @Override + public Map toGravitinoTableProperties(Map properties) { + return new HashMap<>(properties); + } + + @Override + public Map toFlinkTableProperties(Map properties) { + return new HashMap<>(properties); + } +} diff --git a/flink-connector/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-connector/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index a535afb6dc2..45ff2512e73 100644 --- a/flink-connector/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/flink-connector/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -19,4 +19,5 @@ org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactory org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalogFactory -org.apache.gravitino.flink.connector.paimon.GravitinoPaimonCatalogFactory \ No newline at end of file +org.apache.gravitino.flink.connector.paimon.GravitinoPaimonCatalogFactory +org.apache.gravitino.flink.connector.iceberg.GravitinoIcebergCatalogFactory diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/iceberg/TestIcebergPropertiesConverter.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/iceberg/TestIcebergPropertiesConverter.java new file mode 100644 index 00000000000..d6de522f396 --- /dev/null +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/iceberg/TestIcebergPropertiesConverter.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.iceberg; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestIcebergPropertiesConverter { + private static final IcebergPropertiesConverter CONVERTER = IcebergPropertiesConverter.INSTANCE; + + @Test + void testCatalogPropertiesWithHiveBackend() { + Map properties = + CONVERTER.toFlinkCatalogProperties( + ImmutableMap.of( + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND, + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND_HIVE, + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI, + "hive-uri", + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE, + "hive-warehouse", + "key1", + "value1")); + Assertions.assertEquals( + ImmutableMap.of( + CommonCatalogOptions.CATALOG_TYPE.key(), + GravitinoIcebergCatalogFactoryOptions.IDENTIFIER, + IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE, + IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_HIVE, + IcebergPropertiesConstants.ICEBERG_CATALOG_URI, + "hive-uri", + IcebergPropertiesConstants.ICEBERG_CATALOG_WAREHOUSE, + "hive-warehouse"), + properties); + } + + @Test + void testCatalogPropertiesWithRestBackend() { + Map properties = + CONVERTER.toFlinkCatalogProperties( + ImmutableMap.of( + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND, + IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_REST, + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI, + "rest-uri", + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE, + "rest-warehouse", + "key1", + "value1")); + Assertions.assertEquals( + ImmutableMap.of( + CommonCatalogOptions.CATALOG_TYPE.key(), + GravitinoIcebergCatalogFactoryOptions.IDENTIFIER, + IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE, + IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_REST, + IcebergPropertiesConstants.ICEBERG_CATALOG_URI, + "rest-uri", + IcebergPropertiesConstants.ICEBERG_CATALOG_WAREHOUSE, + "rest-warehouse"), + properties); + } +} diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java index 5a363e4e51b..b45e5f46ec2 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java @@ -72,6 +72,14 @@ protected boolean supportSchemaOperationWithCommentAndOptions() { return true; } + protected boolean supportGetSchemaWithoutCommentAndOption() { + return true; + } + + protected abstract String getProvider(); + + protected abstract boolean supportDropCascade(); + @Test public void testCreateSchema() { doWithCatalog( @@ -83,13 +91,14 @@ public void testCreateSchema() { TestUtils.assertTableResult(tableResult, ResultKind.SUCCESS); catalog.asSchemas().schemaExists(schema); } finally { - catalog.asSchemas().dropSchema(schema, true); + catalog.asSchemas().dropSchema(schema, supportDropCascade()); Assertions.assertFalse(catalog.asSchemas().schemaExists(schema)); } }); } @Test + @EnabledIf("supportGetSchemaWithoutCommentAndOption") public void testGetSchemaWithoutCommentAndOption() { doWithCatalog( currentCatalog(), @@ -134,12 +143,11 @@ public void testGetSchemaWithCommentAndOptions() { Schema loadedSchema = catalog.asSchemas().loadSchema(schema); Assertions.assertEquals(schema, loadedSchema.name()); Assertions.assertEquals(comment, loadedSchema.comment()); - Assertions.assertEquals(2, loadedSchema.properties().size()); Assertions.assertEquals(propertyValue, loadedSchema.properties().get(propertyKey)); Assertions.assertEquals( location, loadedSchema.properties().get(HiveConstants.LOCATION)); } finally { - catalog.asSchemas().dropSchema(schema, true); + catalog.asSchemas().dropSchema(schema, supportDropCascade()); Assertions.assertFalse(catalog.asSchemas().schemaExists(schema)); } }); @@ -177,9 +185,9 @@ public void testListSchema() { Assertions.assertEquals(schema2, schemas[2]); Assertions.assertEquals(schema3, schemas[3]); } finally { - catalog.asSchemas().dropSchema(schema, true); - catalog.asSchemas().dropSchema(schema2, true); - catalog.asSchemas().dropSchema(schema3, true); + catalog.asSchemas().dropSchema(schema, supportDropCascade()); + catalog.asSchemas().dropSchema(schema2, supportDropCascade()); + catalog.asSchemas().dropSchema(schema3, supportDropCascade()); Assertions.assertEquals(1, catalog.asSchemas().listSchemas().length); } }); @@ -204,7 +212,6 @@ public void testAlterSchemaWithCommentAndOptions() { Schema loadedSchema = catalog.asSchemas().loadSchema(schema); Assertions.assertEquals(schema, loadedSchema.name()); Assertions.assertEquals("test comment", loadedSchema.comment()); - Assertions.assertEquals(3, loadedSchema.properties().size()); Assertions.assertEquals("value1", loadedSchema.properties().get("key1")); Assertions.assertEquals("value2", loadedSchema.properties().get("key2")); Assertions.assertNotNull(loadedSchema.properties().get("location")); @@ -215,11 +222,10 @@ public void testAlterSchemaWithCommentAndOptions() { Schema reloadedSchema = catalog.asSchemas().loadSchema(schema); Assertions.assertEquals(schema, reloadedSchema.name()); Assertions.assertEquals("test comment", reloadedSchema.comment()); - Assertions.assertEquals(4, reloadedSchema.properties().size()); Assertions.assertEquals("new-value", reloadedSchema.properties().get("key1")); Assertions.assertEquals("value3", reloadedSchema.properties().get("key3")); } finally { - catalog.asSchemas().dropSchema(schema, true); + catalog.asSchemas().dropSchema(schema, supportDropCascade()); } }); } @@ -270,7 +276,8 @@ public void testCreateSimpleTable() { Row.of("A", 1.0), Row.of("B", 2.0)); }, - true); + true, + supportDropCascade()); } @Test @@ -303,7 +310,8 @@ public void testListTables() { Row.of("test_table1"), Row.of("test_table2")); }, - true); + true, + supportDropCascade()); } @Test @@ -320,12 +328,11 @@ public void testDropTable() { NameIdentifier identifier = NameIdentifier.of(databaseName, tableName); catalog.asTableCatalog().createTable(identifier, columns, "comment1", ImmutableMap.of()); Assertions.assertTrue(catalog.asTableCatalog().tableExists(identifier)); - - TableResult result = sql("DROP TABLE %s", tableName); - TestUtils.assertTableResult(result, ResultKind.SUCCESS); + sql("DROP TABLE IF EXISTS %s", tableName); Assertions.assertFalse(catalog.asTableCatalog().tableExists(identifier)); }, - true); + true, + supportDropCascade()); } @Test @@ -379,7 +386,8 @@ public void testGetSimpleTable() { fail(e); } }, - true); + true, + supportDropCascade()); } @Test @@ -415,7 +423,8 @@ public void testRenameColumn() { }; assertColumns(expected, actual); }, - true); + true, + supportDropCascade()); } @Test @@ -466,6 +475,7 @@ public void testAlterTableComment() { .asTableCatalog() .loadTable(NameIdentifier.of(databaseName, tableName)); Assertions.assertEquals(newComment, gravitinoTable.comment()); + } catch (DatabaseNotExistException | TableAlreadyExistException | TableNotExistException e) { @@ -475,7 +485,8 @@ public void testAlterTableComment() { fail("Catalog doesn't exist"); } }, - true); + true, + supportDropCascade()); } @Test @@ -511,7 +522,8 @@ public void testAlterTableAddColumn() { }; assertColumns(expected, actual); }, - true); + true, + supportDropCascade()); } @Test @@ -542,7 +554,8 @@ public void testAlterTableDropColumn() { new Column[] {Column.of("order_amount", Types.IntegerType.get(), "ORDER_AMOUNT")}; assertColumns(expected, actual); }, - true); + true, + supportDropCascade()); } @Test @@ -584,7 +597,8 @@ public void testAlterColumnTypeAndChangeOrder() { }; assertColumns(expected, actual); }, - true); + true, + supportDropCascade()); } @Test @@ -612,7 +626,8 @@ public void testRenameTable() { Assertions.assertTrue( catalog.asTableCatalog().tableExists(NameIdentifier.of(databaseName, newTableName))); }, - true); + true, + supportDropCascade()); } @Test @@ -655,6 +670,7 @@ public void testAlterTableProperties() { Assertions.assertEquals("value1", properties.get("key")); Assertions.assertNull(properties.get("key2")); }, - true); + true, + supportDropCascade()); } } diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java index f56b5297e17..959123f3362 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java @@ -19,19 +19,25 @@ package org.apache.gravitino.flink.connector.integration.test; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import com.google.errorprone.annotations.FormatMethod; import com.google.errorprone.annotations.FormatString; import java.io.IOException; import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.function.Consumer; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.ResultKind; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.internal.TableEnvironmentImpl; +import org.apache.flink.types.Row; import org.apache.gravitino.Catalog; import org.apache.gravitino.client.GravitinoMetalake; import org.apache.gravitino.flink.connector.PropertiesConverter; +import org.apache.gravitino.flink.connector.integration.test.utils.TestUtils; import org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions; import org.apache.gravitino.integration.test.container.ContainerSuite; import org.apache.gravitino.integration.test.container.HiveContainer; @@ -154,31 +160,56 @@ private static void stopFlinkEnv() { } @FormatMethod - protected TableResult sql(@FormatString String sql, Object... args) { + protected static TableResult sql(@FormatString String sql, Object... args) { return tableEnv.executeSql(String.format(sql, args)); } protected void doWithSchema( Catalog catalog, String schemaName, Consumer action, boolean dropSchema) { + doWithSchema(catalog, schemaName, action, dropSchema, true); + } + + protected void doWithSchema( + Catalog catalog, + String schemaName, + Consumer action, + boolean dropSchema, + boolean cascade) { Preconditions.checkNotNull(catalog); Preconditions.checkNotNull(schemaName); try { tableEnv.useCatalog(catalog.name()); if (!catalog.asSchemas().schemaExists(schemaName)) { - catalog.asSchemas().createSchema(schemaName, null, null); + catalog.asSchemas().createSchema(schemaName, null, getCreateSchemaProps(schemaName)); } tableEnv.useDatabase(schemaName); action.accept(catalog); } finally { if (dropSchema) { - catalog.asSchemas().dropSchema(schemaName, true); + clearTableInSchema(); + catalog.asSchemas().dropSchema(schemaName, cascade); } } } + protected Map getCreateSchemaProps(String schemaName) { + return null; + } + protected static void doWithCatalog(Catalog catalog, Consumer action) { Preconditions.checkNotNull(catalog); tableEnv.useCatalog(catalog.name()); action.accept(catalog); } + + /** Iceberg requires deleting the table first, then deleting the schema. */ + protected static void clearTableInSchema() { + TableResult result = sql("SHOW TABLES"); + List rows = Lists.newArrayList(result.collect()); + for (Row row : rows) { + String tableName = row.getField(0).toString(); + TableResult deleteResult = sql("DROP TABLE IF EXISTS %s", tableName); + TestUtils.assertTableResult(deleteResult, ResultKind.SUCCESS); + } + } } diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java index bb7b25f6b20..7792068e249 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java @@ -29,7 +29,6 @@ import java.util.Arrays; import java.util.Map; import java.util.Optional; -import java.util.function.Consumer; import java.util.stream.Collectors; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.DataTypes; @@ -73,7 +72,7 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT { private static org.apache.gravitino.Catalog hiveCatalog; @BeforeAll - static void hiveStartUp() { + void hiveStartUp() { initDefaultHiveCatalog(); } @@ -83,13 +82,13 @@ static void hiveStop() { metalake.dropCatalog(DEFAULT_HIVE_CATALOG, true); } - protected static void initDefaultHiveCatalog() { + protected void initDefaultHiveCatalog() { Preconditions.checkNotNull(metalake); hiveCatalog = metalake.createCatalog( DEFAULT_HIVE_CATALOG, org.apache.gravitino.Catalog.Type.RELATIONAL, - "hive", + getProvider(), null, ImmutableMap.of("metastore.uris", hiveMetastoreUri)); } @@ -583,32 +582,23 @@ public void testGetHiveTable() { true); } + @Override + protected Map getCreateSchemaProps(String schemaName) { + return ImmutableMap.of("location", warehouse + "/" + schemaName); + } + @Override protected org.apache.gravitino.Catalog currentCatalog() { return hiveCatalog; } - protected void doWithSchema( - org.apache.gravitino.Catalog catalog, - String schemaName, - Consumer action, - boolean dropSchema) { - Preconditions.checkNotNull(catalog); - Preconditions.checkNotNull(schemaName); - try { - tableEnv.useCatalog(catalog.name()); - if (!catalog.asSchemas().schemaExists(schemaName)) { - catalog - .asSchemas() - .createSchema( - schemaName, null, ImmutableMap.of("location", warehouse + "/" + schemaName)); - } - tableEnv.useDatabase(schemaName); - action.accept(catalog); - } finally { - if (dropSchema) { - catalog.asSchemas().dropSchema(schemaName, true); - } - } + @Override + protected String getProvider() { + return "hive"; + } + + @Override + protected boolean supportDropCascade() { + return true; } } diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java new file mode 100644 index 00000000000..0834def90b7 --- /dev/null +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java @@ -0,0 +1,501 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.integration.test.iceberg; + +import static org.apache.gravitino.flink.connector.integration.test.utils.TestUtils.assertColumns; +import static org.apache.gravitino.flink.connector.integration.test.utils.TestUtils.toFlinkPhysicalColumn; +import static org.apache.gravitino.rel.expressions.transforms.Transforms.EMPTY_TRANSFORM; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.ResultKind; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDescriptor; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.catalog.DefaultCatalogTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.types.Row; +import org.apache.gravitino.Catalog; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; +import org.apache.gravitino.flink.connector.iceberg.GravitinoIcebergCatalog; +import org.apache.gravitino.flink.connector.iceberg.GravitinoIcebergCatalogFactoryOptions; +import org.apache.gravitino.flink.connector.integration.test.FlinkCommonIT; +import org.apache.gravitino.flink.connector.integration.test.utils.TestUtils; +import org.apache.gravitino.rel.Column; +import org.apache.gravitino.rel.Table; +import org.apache.gravitino.rel.expressions.transforms.Transform; +import org.apache.gravitino.rel.expressions.transforms.Transforms; +import org.apache.gravitino.rel.types.Types; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public abstract class FlinkIcebergCatalogIT extends FlinkCommonIT { + + private static final String DEFAULT_ICEBERG_CATALOG = "flink_iceberg_catalog"; + + private static org.apache.gravitino.Catalog icebergCatalog; + + @BeforeAll + public void before() { + Preconditions.checkNotNull(metalake); + icebergCatalog = + metalake.createCatalog( + DEFAULT_ICEBERG_CATALOG, + org.apache.gravitino.Catalog.Type.RELATIONAL, + getProvider(), + null, + getCatalogConfigs()); + } + + protected abstract Map getCatalogConfigs(); + + @Test + public void testCreateGravitinoIcebergCatalog() { + tableEnv.useCatalog(DEFAULT_CATALOG); + int numCatalogs = tableEnv.listCatalogs().length; + + // Create a new catalog. + String catalogName = "gravitino_iceberg_catalog"; + Configuration configuration = Configuration.fromMap(getCatalogConfigs()); + configuration.set( + CommonCatalogOptions.CATALOG_TYPE, GravitinoIcebergCatalogFactoryOptions.IDENTIFIER); + + CatalogDescriptor catalogDescriptor = CatalogDescriptor.of(catalogName, configuration); + tableEnv.createCatalog(catalogName, catalogDescriptor); + Assertions.assertTrue(metalake.catalogExists(catalogName)); + + // Check the catalog properties. + org.apache.gravitino.Catalog gravitinoCatalog = metalake.loadCatalog(catalogName); + Map properties = gravitinoCatalog.properties(); + Assertions.assertEquals(hiveMetastoreUri, properties.get(IcebergConstants.URI)); + + // Get the created catalog. + Optional catalog = tableEnv.getCatalog(catalogName); + Assertions.assertTrue(catalog.isPresent()); + Assertions.assertInstanceOf(GravitinoIcebergCatalog.class, catalog.get()); + + // List catalogs. + String[] catalogs = tableEnv.listCatalogs(); + Assertions.assertEquals(numCatalogs + 1, catalogs.length, "Should create a new catalog"); + Assertions.assertTrue( + Arrays.asList(catalogs).contains(catalogName), "Should create the correct catalog."); + + Assertions.assertEquals( + DEFAULT_CATALOG, + tableEnv.getCurrentCatalog(), + "Current catalog should be default_catalog in flink"); + + // Change the current catalog to the new created catalog. + tableEnv.useCatalog(catalogName); + Assertions.assertEquals( + catalogName, + tableEnv.getCurrentCatalog(), + "Current catalog should be the one that is created just now."); + + // Drop the catalog. Only support drop catalog by SQL. + tableEnv.useCatalog(DEFAULT_CATALOG); + tableEnv.executeSql("drop catalog " + catalogName); + Assertions.assertFalse(metalake.catalogExists(catalogName)); + + Optional droppedCatalog = + tableEnv.getCatalog(catalogName); + Assertions.assertFalse(droppedCatalog.isPresent(), "Catalog should be dropped"); + } + + @Test + public void testCreateGravitinoIcebergUsingSQL() { + tableEnv.useCatalog(DEFAULT_CATALOG); + int numCatalogs = tableEnv.listCatalogs().length; + + // Create a new catalog. + String catalogName = "gravitino_iceberg_using_sql"; + tableEnv.executeSql( + String.format( + "create catalog %s with (" + + "'type'='%s', " + + "'catalog-backend'='%s'," + + "'uri'='%s'," + + "'warehouse'='%s'" + + ")", + catalogName, + GravitinoIcebergCatalogFactoryOptions.IDENTIFIER, + getCatalogBackend(), + hiveMetastoreUri, + warehouse)); + Assertions.assertTrue(metalake.catalogExists(catalogName)); + + // Check the properties of the created catalog. + org.apache.gravitino.Catalog gravitinoCatalog = metalake.loadCatalog(catalogName); + Map properties = gravitinoCatalog.properties(); + Assertions.assertEquals(hiveMetastoreUri, properties.get(IcebergConstants.URI)); + + Assertions.assertEquals( + GravitinoIcebergCatalogFactoryOptions.IDENTIFIER, + properties.get(CommonCatalogOptions.CATALOG_TYPE.key())); + + // Get the created catalog. + Optional catalog = tableEnv.getCatalog(catalogName); + Assertions.assertTrue(catalog.isPresent()); + Assertions.assertInstanceOf(GravitinoIcebergCatalog.class, catalog.get()); + + // List catalogs. + String[] catalogs = tableEnv.listCatalogs(); + Assertions.assertEquals(numCatalogs + 1, catalogs.length, "Should create a new catalog"); + Assertions.assertTrue( + Arrays.asList(catalogs).contains(catalogName), "Should create the correct catalog."); + + // Use SQL to list catalogs. + TableResult result = tableEnv.executeSql("show catalogs"); + Assertions.assertEquals( + numCatalogs + 1, Lists.newArrayList(result.collect()).size(), "Should have 2 catalogs"); + + Assertions.assertEquals( + DEFAULT_CATALOG, + tableEnv.getCurrentCatalog(), + "Current catalog should be default_catalog in flink"); + + // Change the current catalog to the new created catalog. + tableEnv.useCatalog(catalogName); + Assertions.assertEquals( + catalogName, + tableEnv.getCurrentCatalog(), + "Current catalog should be the one that is created just now."); + + // Drop the catalog. Only support using SQL to drop catalog. + tableEnv.useCatalog(DEFAULT_CATALOG); + tableEnv.executeSql("drop catalog " + catalogName); + Assertions.assertFalse(metalake.catalogExists(catalogName)); + + Optional droppedCatalog = + tableEnv.getCatalog(catalogName); + Assertions.assertFalse(droppedCatalog.isPresent(), "Catalog should be dropped"); + } + + @Test + public void testGetCatalogFromGravitino() { + // list catalogs. + int numCatalogs = tableEnv.listCatalogs().length; + + // create a new catalog. + String catalogName = "iceberg_catalog_in_gravitino"; + org.apache.gravitino.Catalog gravitinoCatalog = + metalake.createCatalog( + catalogName, + org.apache.gravitino.Catalog.Type.RELATIONAL, + getProvider(), + null, + getCatalogConfigs()); + Assertions.assertNotNull(gravitinoCatalog); + Assertions.assertEquals(catalogName, gravitinoCatalog.name()); + Assertions.assertTrue(metalake.catalogExists(catalogName)); + Assertions.assertEquals( + numCatalogs + 1, tableEnv.listCatalogs().length, "Should create a new catalog"); + + // get the catalog from Gravitino. + Optional flinkIcebergCatalog = + tableEnv.getCatalog(catalogName); + Assertions.assertTrue(flinkIcebergCatalog.isPresent()); + Assertions.assertInstanceOf(GravitinoIcebergCatalog.class, flinkIcebergCatalog.get()); + + // drop the catalog. + tableEnv.useCatalog(DEFAULT_CATALOG); + tableEnv.executeSql("drop catalog " + catalogName); + Assertions.assertFalse(metalake.catalogExists(catalogName)); + Assertions.assertEquals( + numCatalogs, tableEnv.listCatalogs().length, "The created catalog should be dropped."); + } + + @Test + public void testIcebergTableWithPartition() { + String databaseName = "test_iceberg_table_partition"; + String tableName = "iceberg_table_with_partition"; + String key = "test key"; + String value = "test value"; + + doWithSchema( + icebergCatalog, + databaseName, + catalog -> { + TableResult result = + sql( + "CREATE TABLE %s (" + + " id BIGINT COMMENT 'unique id'," + + " data STRING NOT NULL" + + " ) PARTITIONED BY (data)" + + " WITH (" + + "'%s' = '%s')", + tableName, key, value); + TestUtils.assertTableResult(result, ResultKind.SUCCESS); + + Table table = + catalog.asTableCatalog().loadTable(NameIdentifier.of(databaseName, tableName)); + Assertions.assertNotNull(table); + Assertions.assertEquals(value, table.properties().get(key)); + Column[] columns = + new Column[] { + Column.of("id", Types.LongType.get(), "unique id", true, false, null), + Column.of("data", Types.StringType.get(), null, false, false, null) + }; + assertColumns(columns, table.columns()); + Transform[] partitions = new Transform[] {Transforms.identity("data")}; + Assertions.assertArrayEquals(partitions, table.partitioning()); + + // load flink catalog + try { + org.apache.flink.table.catalog.Catalog flinkCatalog = + tableEnv.getCatalog(currentCatalog().name()).get(); + CatalogBaseTable flinkTable = + flinkCatalog.getTable(ObjectPath.fromString(databaseName + "." + tableName)); + DefaultCatalogTable catalogTable = (DefaultCatalogTable) flinkTable; + Assertions.assertTrue(catalogTable.isPartitioned()); + Assertions.assertArrayEquals( + new String[] {"data"}, catalogTable.getPartitionKeys().toArray()); + } catch (Exception e) { + Assertions.fail("Table should be exist", e); + } + + // write and read. + TestUtils.assertTableResult( + sql("INSERT INTO %s VALUES (1, 'A'), (2, 'B')", tableName), + ResultKind.SUCCESS_WITH_CONTENT, + Row.of(-1L)); + TestUtils.assertTableResult( + sql("SELECT * FROM %s ORDER BY data", tableName), + ResultKind.SUCCESS_WITH_CONTENT, + Row.of(1, "A"), + Row.of(2, "B")); + }, + true, + supportDropCascade()); + } + + @Test + public void testCreateIcebergTable() { + String databaseName = "test_create_iceberg_table"; + String tableName = "iceberg_table"; + String comment = "test table comment"; + String key = "test key"; + String value = "test value"; + + doWithSchema( + metalake.loadCatalog(DEFAULT_ICEBERG_CATALOG), + databaseName, + catalog -> { + TableResult result = + sql( + "CREATE TABLE %s (" + + " string_type STRING COMMENT 'string_type', " + + " double_type DOUBLE COMMENT 'double_type'," + + " int_type INT COMMENT 'int_type'," + + " varchar_type VARCHAR COMMENT 'varchar_type'," + + " boolean_type BOOLEAN COMMENT 'boolean_type'," + + " binary_type VARBINARY(10) COMMENT 'binary_type'," + + " decimal_type DECIMAL(10, 2) COMMENT 'decimal_type'," + + " bigint_type BIGINT COMMENT 'bigint_type'," + + " float_type FLOAT COMMENT 'float_type'," + + " date_type DATE COMMENT 'date_type'," + + " timestamp_type TIMESTAMP COMMENT 'timestamp_type'," + + " array_type ARRAY COMMENT 'array_type'," + + " map_type MAP COMMENT 'map_type'," + + " struct_type ROW" + + " ) COMMENT '%s' WITH (" + + "'%s' = '%s')", + tableName, comment, key, value); + TestUtils.assertTableResult(result, ResultKind.SUCCESS); + + Table table = + catalog.asTableCatalog().loadTable(NameIdentifier.of(databaseName, tableName)); + Assertions.assertNotNull(table); + Assertions.assertEquals(comment, table.comment()); + Assertions.assertEquals(value, table.properties().get(key)); + Column[] columns = + new Column[] { + Column.of("string_type", Types.StringType.get(), "string_type", true, false, null), + Column.of("double_type", Types.DoubleType.get(), "double_type"), + Column.of("int_type", Types.IntegerType.get(), "int_type"), + Column.of("varchar_type", Types.StringType.get(), "varchar_type"), + Column.of("boolean_type", Types.BooleanType.get(), "boolean_type"), + Column.of("binary_type", Types.BinaryType.get(), "binary_type"), + Column.of("decimal_type", Types.DecimalType.of(10, 2), "decimal_type"), + Column.of("bigint_type", Types.LongType.get(), "bigint_type"), + Column.of("float_type", Types.FloatType.get(), "float_type"), + Column.of("date_type", Types.DateType.get(), "date_type"), + Column.of( + "timestamp_type", Types.TimestampType.withoutTimeZone(), "timestamp_type"), + Column.of( + "array_type", Types.ListType.of(Types.IntegerType.get(), true), "array_type"), + Column.of( + "map_type", + Types.MapType.of(Types.IntegerType.get(), Types.StringType.get(), true), + "map_type"), + Column.of( + "struct_type", + Types.StructType.of( + Types.StructType.Field.nullableField("k1", Types.IntegerType.get()), + Types.StructType.Field.nullableField("k2", Types.StringType.get())), + null) + }; + assertColumns(columns, table.columns()); + Assertions.assertArrayEquals(EMPTY_TRANSFORM, table.partitioning()); + }, + true, + supportDropCascade()); + } + + @Test + public void testGetIcebergTable() { + Column[] columns = + new Column[] { + Column.of("string_type", Types.StringType.get(), "string_type", true, false, null), + Column.of("double_type", Types.DoubleType.get(), "double_type"), + Column.of("int_type", Types.IntegerType.get(), "int_type"), + Column.of("varchar_type", Types.StringType.get(), "varchar_type"), + Column.of("boolean_type", Types.BooleanType.get(), "boolean_type"), + Column.of("binary_type", Types.BinaryType.get(), "binary_type"), + Column.of("decimal_type", Types.DecimalType.of(10, 2), "decimal_type"), + Column.of("bigint_type", Types.LongType.get(), "bigint_type"), + Column.of("float_type", Types.FloatType.get(), "float_type"), + Column.of("date_type", Types.DateType.get(), "date_type"), + Column.of("timestamp_type", Types.TimestampType.withoutTimeZone(), "timestamp_type"), + Column.of("array_type", Types.ListType.of(Types.IntegerType.get(), true), "array_type"), + Column.of( + "map_type", + Types.MapType.of(Types.IntegerType.get(), Types.StringType.get(), true), + "map_type"), + Column.of( + "struct_type", + Types.StructType.of( + Types.StructType.Field.nullableField("k1", Types.IntegerType.get()), + Types.StructType.Field.nullableField("k2", Types.StringType.get())), + null) + }; + + String databaseName = "test_get_iceberg_table"; + doWithSchema( + metalake.loadCatalog(DEFAULT_ICEBERG_CATALOG), + databaseName, + catalog -> { + String tableName = "test_desc_table"; + String comment = "comment1"; + catalog + .asTableCatalog() + .createTable( + NameIdentifier.of(databaseName, "test_desc_table"), + columns, + comment, + ImmutableMap.of("k1", "v1")); + + Optional flinkCatalog = + tableEnv.getCatalog(DEFAULT_ICEBERG_CATALOG); + Assertions.assertTrue(flinkCatalog.isPresent()); + try { + CatalogBaseTable table = + flinkCatalog.get().getTable(new ObjectPath(databaseName, tableName)); + Assertions.assertNotNull(table); + Assertions.assertEquals(CatalogBaseTable.TableKind.TABLE, table.getTableKind()); + Assertions.assertEquals(comment, table.getComment()); + + org.apache.flink.table.catalog.Column[] expected = + new org.apache.flink.table.catalog.Column[] { + org.apache.flink.table.catalog.Column.physical("string_type", DataTypes.STRING()) + .withComment("string_type"), + org.apache.flink.table.catalog.Column.physical("double_type", DataTypes.DOUBLE()) + .withComment("double_type"), + org.apache.flink.table.catalog.Column.physical("int_type", DataTypes.INT()) + .withComment("int_type"), + org.apache.flink.table.catalog.Column.physical( + "varchar_type", DataTypes.VARCHAR(Integer.MAX_VALUE)) + .withComment("varchar_type"), + org.apache.flink.table.catalog.Column.physical( + "boolean_type", DataTypes.BOOLEAN()) + .withComment("boolean_type"), + org.apache.flink.table.catalog.Column.physical("binary_type", DataTypes.BYTES()) + .withComment("binary_type"), + org.apache.flink.table.catalog.Column.physical( + "decimal_type", DataTypes.DECIMAL(10, 2)) + .withComment("decimal_type"), + org.apache.flink.table.catalog.Column.physical("bigint_type", DataTypes.BIGINT()) + .withComment("bigint_type"), + org.apache.flink.table.catalog.Column.physical("float_type", DataTypes.FLOAT()) + .withComment("float_type"), + org.apache.flink.table.catalog.Column.physical("date_type", DataTypes.DATE()) + .withComment("date_type"), + org.apache.flink.table.catalog.Column.physical( + "timestamp_type", DataTypes.TIMESTAMP()) + .withComment("timestamp_type"), + org.apache.flink.table.catalog.Column.physical( + "array_type", DataTypes.ARRAY(DataTypes.INT())) + .withComment("array_type"), + org.apache.flink.table.catalog.Column.physical( + "map_type", DataTypes.MAP(DataTypes.INT(), DataTypes.STRING())) + .withComment("map_type"), + org.apache.flink.table.catalog.Column.physical( + "struct_type", + DataTypes.ROW( + DataTypes.FIELD("k1", DataTypes.INT()), + DataTypes.FIELD("k2", DataTypes.STRING()))) + }; + org.apache.flink.table.catalog.Column[] actual = + toFlinkPhysicalColumn(table.getUnresolvedSchema().getColumns()); + Assertions.assertArrayEquals(expected, actual); + + CatalogTable catalogTable = (CatalogTable) table; + Assertions.assertFalse(catalogTable.isPartitioned()); + } catch (TableNotExistException e) { + Assertions.fail(e); + } + }, + true, + supportDropCascade()); + } + + @Override + protected Catalog currentCatalog() { + return icebergCatalog; + } + + @Override + protected String getProvider() { + return "lakehouse-iceberg"; + } + + @Override + protected boolean supportGetSchemaWithoutCommentAndOption() { + return false; + } + + @Override + protected boolean supportDropCascade() { + return false; + } + + protected abstract String getCatalogBackend(); +} diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergHiveCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergHiveCatalogIT.java new file mode 100644 index 00000000000..fc21ce2c247 --- /dev/null +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergHiveCatalogIT.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.integration.test.iceberg; + +import com.google.common.collect.Maps; +import java.util.Map; +import org.apache.gravitino.flink.connector.iceberg.IcebergPropertiesConstants; +import org.junit.jupiter.api.Tag; + +@Tag("gravitino-docker-test") +public class FlinkIcebergHiveCatalogIT extends FlinkIcebergCatalogIT { + + @Override + protected Map getCatalogConfigs() { + Map catalogProperties = Maps.newHashMap(); + catalogProperties.put( + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND, + IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_HIVE); + catalogProperties.put( + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE, warehouse); + catalogProperties.put( + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI, hiveMetastoreUri); + return catalogProperties; + } + + protected String getCatalogBackend() { + return "hive"; + } +} diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java index 57a17c2a114..a03b4a198e1 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java @@ -47,12 +47,22 @@ protected boolean supportSchemaOperationWithCommentAndOptions() { return false; } + @Override + protected String getProvider() { + return "lakehouse-paimon"; + } + + @Override + protected boolean supportDropCascade() { + return true; + } + protected Catalog currentCatalog() { return catalog; } @BeforeAll - static void setup() { + void setup() { initPaimonCatalog(); } @@ -62,13 +72,13 @@ static void stop() { metalake.dropCatalog(DEFAULT_PAIMON_CATALOG, true); } - private static void initPaimonCatalog() { + private void initPaimonCatalog() { Preconditions.checkNotNull(metalake); catalog = metalake.createCatalog( DEFAULT_PAIMON_CATALOG, org.apache.gravitino.Catalog.Type.RELATIONAL, - "lakehouse-paimon", + getProvider(), null, ImmutableMap.of( PaimonConstants.CATALOG_BACKEND, @@ -98,4 +108,9 @@ public void testCreateGravitinoPaimonCatalogUsingSQL() { Map properties = gravitinoCatalog.properties(); Assertions.assertEquals(warehouse, properties.get("warehouse")); } + + @Override + protected Map getCreateSchemaProps(String schemaName) { + return null; + } } diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/utils/TestUtils.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/utils/TestUtils.java index ba16a9c07b1..02710bcfb35 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/utils/TestUtils.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/utils/TestUtils.java @@ -42,7 +42,8 @@ public static void assertTableResult( Row expectedRow = expected[i]; Row actualRow = actualRows.get(i); Assertions.assertEquals(expectedRow.getKind(), actualRow.getKind()); - Assertions.assertEquals(expectedRow, actualRow); + // Only compare string value. + Assertions.assertEquals(expectedRow.toString(), actualRow.toString()); } } }