From 8b20d042d7fec2192ab158d83e1883593b85e6fb Mon Sep 17 00:00:00 2001 From: youze Liang <525672876@qq.com> Date: Fri, 17 Jan 2025 09:07:31 +0800 Subject: [PATCH] add integeration test --- docs/spark-connector/spark-catalog-jdbc.md | 4 +- .../connector/jdbc/GravitinoJdbcCatalog.java | 25 +++++- .../spark/connector/jdbc/SparkJdbcTable.java | 71 +++++++++++++++ .../jdbc/SparkJdbcTypeConverter.java | 38 ++++++++ .../connector/version/CatalogNameAdaptor.java | 11 ++- .../integration/test/SparkCommonIT.java | 37 +++++--- .../integration/test/SparkEnvIT.java | 15 ++++ .../test/hive/SparkHiveCatalogIT.java | 10 +++ .../test/iceberg/SparkIcebergCatalogIT.java | 12 +++ .../test/jdbc/SparkJdbcCatalogIT.java | 89 +++++++++++++++++++ .../test/jdbc/SparkJdbcTableInfoChecker.java | 54 +++++++++++ .../test/paimon/SparkPaimonCatalogIT.java | 10 +++ .../integration/test/util/SparkTableInfo.java | 3 + .../test/util/SparkTableInfoChecker.java | 6 +- .../jdbc/GravitinoJdbcCatalogSpark33.java | 22 +++++ .../test/jdbc/SparkJdbcCatalogIT33.java | 36 ++++++++ .../jdbc/GravitinoJdbcCatalogSpark34.java | 38 ++++++++ .../jdbc/SparkJdbcTypeConverter34.java | 37 ++++++++ .../test/jdbc/SparkJdbcCatalogIT34.java | 35 ++++++++ .../jdbc/GravitinoJdbcCatalogSpark35.java | 22 +++++ .../test/jdbc/SparkJdbcCatalogIT35.java | 36 ++++++++ 21 files changed, 592 insertions(+), 19 deletions(-) create mode 100644 spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/jdbc/SparkJdbcTable.java create mode 100644 spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/jdbc/SparkJdbcTypeConverter.java create mode 100644 spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcCatalogIT.java create mode 100644 spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcTableInfoChecker.java create mode 100644 spark-connector/v3.3/spark/src/main/java/org/apache/gravitino/spark/connector/jdbc/GravitinoJdbcCatalogSpark33.java create mode 100644 spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcCatalogIT33.java create mode 100644 spark-connector/v3.4/spark/src/main/java/org/apache/gravitino/spark/connector/jdbc/GravitinoJdbcCatalogSpark34.java create mode 100644 spark-connector/v3.4/spark/src/main/java/org/apache/gravitino/spark/connector/jdbc/SparkJdbcTypeConverter34.java create mode 100644 spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcCatalogIT34.java create mode 100644 spark-connector/v3.5/spark/src/main/java/org/apache/gravitino/spark/connector/jdbc/GravitinoJdbcCatalogSpark35.java create mode 100644 spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcCatalogIT35.java diff --git a/docs/spark-connector/spark-catalog-jdbc.md b/docs/spark-connector/spark-catalog-jdbc.md index 6989ff2808f..3b1a3a7616c 100644 --- a/docs/spark-connector/spark-catalog-jdbc.md +++ b/docs/spark-connector/spark-catalog-jdbc.md @@ -22,7 +22,6 @@ The Apache Gravitino Spark connector offers the capability to read JDBC tables, ::: #### Not supported operations: -- `CREATE DATABASE` - `UPDATE` - `DELETE` - `TRUNCATE` @@ -32,7 +31,8 @@ The Apache Gravitino Spark connector offers the capability to read JDBC tables, ```sql -- Suppose mysql_a is the mysql catalog name managed by Gravitino USE mysql_a; --- Suppose mydatabase is in your mysql + +CREATE DATABASE IF NOT EXISTS mydatabase; USE mydatabase; CREATE TABLE IF NOT EXISTS employee ( diff --git a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/jdbc/GravitinoJdbcCatalog.java b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/jdbc/GravitinoJdbcCatalog.java index c079d4376a3..60fbc6aa83f 100644 --- a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/jdbc/GravitinoJdbcCatalog.java +++ b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/jdbc/GravitinoJdbcCatalog.java @@ -19,14 +19,18 @@ package org.apache.gravitino.spark.connector.jdbc; +import com.google.common.collect.Maps; import java.util.Map; import org.apache.gravitino.spark.connector.PropertiesConverter; import org.apache.gravitino.spark.connector.SparkTransformConverter; import org.apache.gravitino.spark.connector.SparkTypeConverter; import org.apache.gravitino.spark.connector.catalog.BaseCatalog; +import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.SupportsNamespaces; import org.apache.spark.sql.connector.catalog.Table; import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTable; import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog; import org.apache.spark.sql.util.CaseInsensitiveStringMap; @@ -51,7 +55,14 @@ protected Table createSparkTable( PropertiesConverter propertiesConverter, SparkTransformConverter sparkTransformConverter, SparkTypeConverter sparkTypeConverter) { - return sparkTable; + return new SparkJdbcTable( + identifier, + gravitinoTable, + (JDBCTable) sparkTable, + (JDBCTableCatalog) sparkCatalog, + propertiesConverter, + sparkTransformConverter, + sparkTypeConverter); } @Override @@ -63,4 +74,16 @@ protected PropertiesConverter getPropertiesConverter() { protected SparkTransformConverter getSparkTransformConverter() { return new SparkTransformConverter(false); } + + @Override + protected SparkTypeConverter getSparkTypeConverter() { + return new SparkJdbcTypeConverter(); + } + + @Override + public void createNamespace(String[] namespace, Map metadata) + throws NamespaceAlreadyExistsException { + super.createNamespace( + namespace, Maps.filterKeys(metadata, key -> key.equals(SupportsNamespaces.PROP_COMMENT))); + } } diff --git a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/jdbc/SparkJdbcTable.java b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/jdbc/SparkJdbcTable.java new file mode 100644 index 00000000000..3de807c3685 --- /dev/null +++ b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/jdbc/SparkJdbcTable.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.spark.connector.jdbc; + +import java.util.Map; +import org.apache.gravitino.rel.Table; +import org.apache.gravitino.spark.connector.PropertiesConverter; +import org.apache.gravitino.spark.connector.SparkTransformConverter; +import org.apache.gravitino.spark.connector.SparkTypeConverter; +import org.apache.gravitino.spark.connector.utils.GravitinoTableInfoHelper; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTable; +import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog; +import org.apache.spark.sql.types.StructType; + +public class SparkJdbcTable extends JDBCTable { + + private GravitinoTableInfoHelper gravitinoTableInfoHelper; + + public SparkJdbcTable( + Identifier identifier, + Table gravitinoTable, + JDBCTable jdbcTable, + JDBCTableCatalog jdbcTableCatalog, + PropertiesConverter propertiesConverter, + SparkTransformConverter sparkTransformConverter, + SparkTypeConverter sparkTypeConverter) { + super(identifier, jdbcTable.schema(), jdbcTable.jdbcOptions()); + this.gravitinoTableInfoHelper = + new GravitinoTableInfoHelper( + false, + identifier, + gravitinoTable, + propertiesConverter, + sparkTransformConverter, + sparkTypeConverter); + } + + @Override + public String name() { + return gravitinoTableInfoHelper.name(); + } + + @Override + @SuppressWarnings("deprecation") + public StructType schema() { + return gravitinoTableInfoHelper.schema(); + } + + @Override + public Map properties() { + return gravitinoTableInfoHelper.properties(); + } +} diff --git a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/jdbc/SparkJdbcTypeConverter.java b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/jdbc/SparkJdbcTypeConverter.java new file mode 100644 index 00000000000..a73d592c7d6 --- /dev/null +++ b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/jdbc/SparkJdbcTypeConverter.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.spark.connector.jdbc; + +import org.apache.gravitino.rel.types.Type; +import org.apache.gravitino.rel.types.Types; +import org.apache.gravitino.spark.connector.SparkTypeConverter; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; + +public class SparkJdbcTypeConverter extends SparkTypeConverter { + + @Override + public DataType toSparkType(Type gravitinoType) { + if (gravitinoType instanceof Types.VarCharType) { + return DataTypes.StringType; + } else { + return super.toSparkType(gravitinoType); + } + } +} diff --git a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/version/CatalogNameAdaptor.java b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/version/CatalogNameAdaptor.java index 41e769ca2b4..9d8594b9124 100644 --- a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/version/CatalogNameAdaptor.java +++ b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/version/CatalogNameAdaptor.java @@ -46,13 +46,22 @@ public class CatalogNameAdaptor { "lakehouse-paimon-3.5", "org.apache.gravitino.spark.connector.paimon.GravitinoPaimonCatalogSpark35"); + private static final Map jdbcCatalogNames = + ImmutableMap.of( + "3.3", + "org.apache.gravitino.spark.connector.jdbc.GravitinoJdbcCatalogSpark33", + "3.4", + "org.apache.gravitino.spark.connector.jdbc.GravitinoJdbcCatalogSpark34", + "3.5", + "org.apache.gravitino.spark.connector.jdbc.GravitinoJdbcCatalogSpark35"); + private static String sparkVersion() { return package$.MODULE$.SPARK_VERSION(); } private static String getCatalogName(String provider, int majorVersion, int minorVersion) { if (provider.startsWith("jdbc")) { - return "org.apache.gravitino.spark.connector.jdbc.GravitinoJdbcCatalog"; + return jdbcCatalogNames.get(String.format("%d.%d", majorVersion, minorVersion)); } String key = String.format("%s-%d.%d", provider.toLowerCase(Locale.ROOT), majorVersion, minorVersion); diff --git a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java index c7517a3bf82..0bea3c21910 100644 --- a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java +++ b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java @@ -119,6 +119,14 @@ private static String getRowLevelDeleteTableSql( protected abstract boolean supportsReplaceColumns(); + protected abstract boolean supportsProperties(); + + protected abstract boolean supportsComplexType(); + + protected SparkTableInfoChecker getTableInfoChecker() { + return SparkTableInfoChecker.create(); + } + // Use a custom database not the original default database because SparkCommonIT couldn't // read&write data to tables in default database. The main reason is default database location is // determined by `hive.metastore.warehouse.dir` in hive-site.xml which is local HDFS address @@ -189,6 +197,7 @@ void testLoadCatalogs() { } @Test + @EnabledIf("supportsProperties") protected void testCreateAndLoadSchema() { String testDatabaseName = "t_create1"; dropDatabaseIfExists(testDatabaseName); @@ -218,6 +227,7 @@ protected void testCreateAndLoadSchema() { } @Test + @EnabledIf("supportsProperties") protected void testAlterSchema() { String testDatabaseName = "t_alter"; dropDatabaseIfExists(testDatabaseName); @@ -266,7 +276,7 @@ void testCreateSimpleTable() { SparkTableInfo tableInfo = getTableInfo(tableName); SparkTableInfoChecker checker = - SparkTableInfoChecker.create() + getTableInfoChecker() .withName(tableName) .withColumns(getSimpleTableColumn()) .withComment(null); @@ -287,7 +297,7 @@ void testCreateTableWithDatabase() { createSimpleTable(tableIdentifier); SparkTableInfo tableInfo = getTableInfo(tableIdentifier); SparkTableInfoChecker checker = - SparkTableInfoChecker.create().withName(tableName).withColumns(getSimpleTableColumn()); + getTableInfoChecker().withName(tableName).withColumns(getSimpleTableColumn()); checker.check(tableInfo); checkTableReadWrite(tableInfo); @@ -300,8 +310,7 @@ void testCreateTableWithDatabase() { dropTableIfExists(tableName); createSimpleTable(tableName); tableInfo = getTableInfo(tableName); - checker = - SparkTableInfoChecker.create().withName(tableName).withColumns(getSimpleTableColumn()); + checker = getTableInfoChecker().withName(tableName).withColumns(getSimpleTableColumn()); checker.check(tableInfo); checkTableReadWrite(tableInfo); } @@ -317,7 +326,7 @@ void testCreateTableWithComment() { SparkTableInfo tableInfo = getTableInfo(tableName); SparkTableInfoChecker checker = - SparkTableInfoChecker.create() + getTableInfoChecker() .withName(tableName) .withColumns(getSimpleTableColumn()) .withComment(tableComment); @@ -396,6 +405,7 @@ void testListTable() { } @Test + @EnabledIf("supportsProperties") void testAlterTableSetAndRemoveProperty() { String tableName = "test_property"; dropTableIfExists(tableName); @@ -425,8 +435,7 @@ void testAlterTableUpdateComment() { "ALTER TABLE %s SET TBLPROPERTIES('%s'='%s')", tableName, ConnectorConstants.COMMENT, comment)); SparkTableInfo tableInfo = getTableInfo(tableName); - SparkTableInfoChecker checker = - SparkTableInfoChecker.create().withName(tableName).withComment(comment); + SparkTableInfoChecker checker = getTableInfoChecker().withName(tableName).withComment(comment); checker.check(tableInfo); } @@ -593,6 +602,7 @@ protected void testAlterTableReplaceColumns() { } @Test + @EnabledIf("supportsComplexType") void testComplexType() { String tableName = "complex_type_table"; dropTableIfExists(tableName); @@ -632,7 +642,7 @@ void testCreateDatasourceFormatPartitionTable() { sql(createTableSQL); SparkTableInfo tableInfo = getTableInfo(tableName); SparkTableInfoChecker checker = - SparkTableInfoChecker.create() + getTableInfoChecker() .withName(tableName) .withColumns(getSimpleTableColumn()) .withIdentifyPartition(Arrays.asList("name", "age")); @@ -652,7 +662,7 @@ void testCreateBucketTable() { sql(createTableSQL); SparkTableInfo tableInfo = getTableInfo(tableName); SparkTableInfoChecker checker = - SparkTableInfoChecker.create() + getTableInfoChecker() .withName(tableName) .withColumns(getSimpleTableColumn()) .withBucket(4, Arrays.asList("id", "name")); @@ -672,7 +682,7 @@ void testCreateSortBucketTable() { sql(createTableSQL); SparkTableInfo tableInfo = getTableInfo(tableName); SparkTableInfoChecker checker = - SparkTableInfoChecker.create() + getTableInfoChecker() .withName(tableName) .withColumns(getSimpleTableColumn()) .withBucket(4, Arrays.asList("id", "name"), Arrays.asList("name", "id")); @@ -695,7 +705,7 @@ void testCreateTableAsSelect() { SparkTableInfo newTableInfo = getTableInfo(newTableName); SparkTableInfoChecker checker = - SparkTableInfoChecker.create().withName(newTableName).withColumns(getSimpleTableColumn()); + getTableInfoChecker().withName(newTableName).withColumns(getSimpleTableColumn()); checker.check(newTableInfo); List tableData = getTableData(newTableName); @@ -797,6 +807,7 @@ protected void deleteDirIfExists(String path) { } @Test + @EnabledIf("supportsProperties") void testTableOptions() { String tableName = "options_table"; dropTableIfExists(tableName); @@ -806,7 +817,7 @@ void testTableOptions() { SparkTableInfo tableInfo = getTableInfo(tableName); SparkTableInfoChecker checker = - SparkTableInfoChecker.create() + getTableInfoChecker() .withName(tableName) .withTableProperties(ImmutableMap.of(TableCatalog.OPTION_PREFIX + "a", "b")); checker.check(tableInfo); @@ -983,7 +994,7 @@ protected void createSimpleTable(String identifier) { protected void checkTableColumns( String tableName, List columns, SparkTableInfo tableInfo) { - SparkTableInfoChecker.create() + getTableInfoChecker() .withName(tableName) .withColumns(columns) .withComment(null) diff --git a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkEnvIT.java b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkEnvIT.java index b534a9772f7..70c2af2becc 100644 --- a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkEnvIT.java +++ b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkEnvIT.java @@ -19,10 +19,12 @@ package org.apache.gravitino.spark.connector.integration.test; +import static org.apache.gravitino.integration.test.util.TestDatabaseName.MYSQL_CATALOG_MYSQL_IT; import static org.apache.gravitino.spark.connector.PropertiesConverter.SPARK_PROPERTY_PREFIX; import static org.apache.gravitino.spark.connector.iceberg.IcebergPropertiesConstants.ICEBERG_CATALOG_CACHE_ENABLED; import java.io.IOException; +import java.sql.SQLException; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -56,6 +58,10 @@ public abstract class SparkEnvIT extends SparkUtilIT { protected String warehouse; protected FileSystem hdfs; protected String icebergRestServiceUri; + protected String mysqlUrl; + protected String mysqlUsername; + protected String mysqlPassword; + protected String mysqlDriver; private final String metalakeName = "test"; private SparkSession sparkSession; @@ -82,6 +88,7 @@ void startUp() throws Exception { if (lakeHouseIcebergProvider.equalsIgnoreCase(getProvider())) { initIcebergRestServiceEnv(); } + initMysqlEnv(); // Start Gravitino server super.startIntegrationTest(); initHdfsFileSystem(); @@ -151,6 +158,14 @@ private void initHiveEnv() { HiveContainer.HDFS_DEFAULTFS_PORT); } + private void initMysqlEnv() throws SQLException { + containerSuite.startMySQLContainer(MYSQL_CATALOG_MYSQL_IT); + mysqlUrl = containerSuite.getMySQLContainer().getJdbcUrl(); + mysqlUsername = containerSuite.getMySQLContainer().getUsername(); + mysqlPassword = containerSuite.getMySQLContainer().getPassword(); + mysqlDriver = containerSuite.getMySQLContainer().getDriverClassName(MYSQL_CATALOG_MYSQL_IT); + } + private void initIcebergRestServiceEnv() { ignoreIcebergRestService = false; Map icebergRestServiceConfigs = new HashMap<>(); diff --git a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java index b95882a0d01..d472d2a3d09 100644 --- a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java +++ b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java @@ -84,6 +84,16 @@ protected boolean supportsReplaceColumns() { return true; } + @Override + protected boolean supportsProperties() { + return true; + } + + @Override + protected boolean supportsComplexType() { + return true; + } + @Test void testCreateHiveFormatPartitionTable() { String tableName = "hive_partition_table"; diff --git a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java index f5fd337a13d..e3c9792fcf8 100644 --- a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java +++ b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java @@ -61,10 +61,12 @@ import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; +@Tag("gravitino-docker-test") public abstract class SparkIcebergCatalogIT extends SparkCommonIT { private static final String ICEBERG_FORMAT_VERSION = "format-version"; @@ -109,6 +111,16 @@ protected boolean supportsReplaceColumns() { return true; } + @Override + protected boolean supportsProperties() { + return true; + } + + @Override + protected boolean supportsComplexType() { + return true; + } + @Override protected String getTableLocation(SparkTableInfo table) { return String.join(File.separator, table.getTableLocation(), "data"); diff --git a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcCatalogIT.java b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcCatalogIT.java new file mode 100644 index 00000000000..3fbf97f291f --- /dev/null +++ b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcCatalogIT.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.spark.connector.integration.test.jdbc; + +import com.google.common.collect.Maps; +import java.util.Map; +import org.apache.gravitino.spark.connector.integration.test.SparkCommonIT; +import org.apache.gravitino.spark.connector.integration.test.util.SparkTableInfoChecker; +import org.apache.gravitino.spark.connector.jdbc.JdbcPropertiesConstants; +import org.junit.jupiter.api.Tag; + +@Tag("gravitino-docker-test") +public abstract class SparkJdbcCatalogIT extends SparkCommonIT { + @Override + protected boolean supportsSparkSQLClusteredBy() { + return false; + } + + @Override + protected boolean supportsPartition() { + return false; + } + + @Override + protected boolean supportsDelete() { + return false; + } + + @Override + protected boolean supportsSchemaEvolution() { + return false; + } + + @Override + protected boolean supportsReplaceColumns() { + return false; + } + + @Override + protected boolean supportsProperties() { + return false; + } + + @Override + protected boolean supportsComplexType() { + return false; + } + + @Override + protected String getCatalogName() { + return "jdbc_mysql"; + } + + @Override + protected String getProvider() { + return "jdbc-mysql"; + } + + @Override + protected SparkTableInfoChecker getTableInfoChecker() { + return SparkJdbcTableInfoChecker.create(); + } + + @Override + protected Map getCatalogConfigs() { + Map catalogProperties = Maps.newHashMap(); + catalogProperties.put(JdbcPropertiesConstants.GRAVITINO_JDBC_URL, mysqlUrl); + catalogProperties.put(JdbcPropertiesConstants.GRAVITINO_JDBC_USER, mysqlUsername); + catalogProperties.put(JdbcPropertiesConstants.GRAVITINO_JDBC_PASSWORD, mysqlPassword); + catalogProperties.put(JdbcPropertiesConstants.GRAVITINO_JDBC_DRIVER, mysqlDriver); + return catalogProperties; + } +} diff --git a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcTableInfoChecker.java b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcTableInfoChecker.java new file mode 100644 index 00000000000..9240e051bca --- /dev/null +++ b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcTableInfoChecker.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.spark.connector.integration.test.jdbc; + +import java.util.List; +import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.spark.connector.integration.test.util.SparkTableInfo; +import org.apache.gravitino.spark.connector.integration.test.util.SparkTableInfoChecker; + +public class SparkJdbcTableInfoChecker extends SparkTableInfoChecker { + + public static SparkJdbcTableInfoChecker create() { + return new SparkJdbcTableInfoChecker(); + } + + @Override + public SparkTableInfoChecker withColumns(List columns) { + getExpectedTableInfo() + .setColumns( + columns.stream() + .peek( + column -> + column.setComment( + StringUtils.isEmpty(column.getComment()) ? null : column.getComment())) + .collect(Collectors.toList())); + getCheckFields().add(CheckField.COLUMN); + return this; + } + + @Override + public SparkTableInfoChecker withComment(String comment) { + getExpectedTableInfo().setComment(StringUtils.isEmpty(comment) ? "" : comment); + getCheckFields().add(CheckField.COMMENT); + return this; + } +} diff --git a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogIT.java b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogIT.java index 9d036482857..fff31d13efe 100644 --- a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogIT.java +++ b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogIT.java @@ -63,6 +63,16 @@ protected boolean supportsSchemaEvolution() { return true; } + @Override + protected boolean supportsProperties() { + return true; + } + + @Override + protected boolean supportsComplexType() { + return true; + } + @Override protected boolean supportsReplaceColumns() { // Paimon doesn't support replace columns, because it doesn't support drop all fields in table. diff --git a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkTableInfo.java b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkTableInfo.java index 077936c29c5..74b3ea09685 100644 --- a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkTableInfo.java +++ b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkTableInfo.java @@ -31,6 +31,7 @@ import org.apache.gravitino.spark.connector.ConnectorConstants; import org.apache.gravitino.spark.connector.hive.SparkHiveTable; import org.apache.gravitino.spark.connector.iceberg.SparkIcebergTable; +import org.apache.gravitino.spark.connector.jdbc.SparkJdbcTable; import org.apache.gravitino.spark.connector.paimon.SparkPaimonTable; import org.apache.spark.sql.connector.catalog.SupportsMetadataColumns; import org.apache.spark.sql.connector.catalog.Table; @@ -193,6 +194,8 @@ private static StructType getSchema(Table baseTable) { return ((SparkIcebergTable) baseTable).schema(); } else if (baseTable instanceof SparkPaimonTable) { return ((SparkPaimonTable) baseTable).schema(); + } else if (baseTable instanceof SparkJdbcTable) { + return ((SparkJdbcTable) baseTable).schema(); } else { throw new IllegalArgumentException( "Doesn't support Spark table: " + baseTable.getClass().getName()); diff --git a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkTableInfoChecker.java b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkTableInfoChecker.java index 33a6a356828..bd7164af786 100644 --- a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkTableInfoChecker.java +++ b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkTableInfoChecker.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import lombok.Data; import org.apache.gravitino.spark.connector.SparkTransformConverter; import org.apache.gravitino.spark.connector.integration.test.util.SparkTableInfo.SparkColumnInfo; import org.apache.spark.sql.connector.expressions.Expressions; @@ -34,17 +35,18 @@ * To create an expected SparkTableInfo for verifying the SQL execution result, only the explicitly * set fields will be checked. */ +@Data public class SparkTableInfoChecker { private SparkTableInfo expectedTableInfo = new SparkTableInfo(); private Set checkFields = new LinkedHashSet<>(); - private SparkTableInfoChecker() {} + protected SparkTableInfoChecker() {} public static SparkTableInfoChecker create() { return new SparkTableInfoChecker(); } - private enum CheckField { + protected enum CheckField { NAME, COLUMN, PARTITION, diff --git a/spark-connector/v3.3/spark/src/main/java/org/apache/gravitino/spark/connector/jdbc/GravitinoJdbcCatalogSpark33.java b/spark-connector/v3.3/spark/src/main/java/org/apache/gravitino/spark/connector/jdbc/GravitinoJdbcCatalogSpark33.java new file mode 100644 index 00000000000..d322cd82ca0 --- /dev/null +++ b/spark-connector/v3.3/spark/src/main/java/org/apache/gravitino/spark/connector/jdbc/GravitinoJdbcCatalogSpark33.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.spark.connector.jdbc; + +public class GravitinoJdbcCatalogSpark33 extends GravitinoJdbcCatalog {} diff --git a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcCatalogIT33.java b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcCatalogIT33.java new file mode 100644 index 00000000000..acef94e8bf5 --- /dev/null +++ b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcCatalogIT33.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.spark.connector.integration.test.jdbc; + +import org.apache.gravitino.spark.connector.jdbc.GravitinoJdbcCatalogSpark33; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class SparkJdbcCatalogIT33 extends SparkJdbcCatalogIT { + @Test + void testCatalogClassName() { + String catalogClass = + getSparkSession() + .sessionState() + .conf() + .getConfString("spark.sql.catalog." + getCatalogName()); + Assertions.assertEquals(GravitinoJdbcCatalogSpark33.class.getName(), catalogClass); + } +} diff --git a/spark-connector/v3.4/spark/src/main/java/org/apache/gravitino/spark/connector/jdbc/GravitinoJdbcCatalogSpark34.java b/spark-connector/v3.4/spark/src/main/java/org/apache/gravitino/spark/connector/jdbc/GravitinoJdbcCatalogSpark34.java new file mode 100644 index 00000000000..e9c091c1882 --- /dev/null +++ b/spark-connector/v3.4/spark/src/main/java/org/apache/gravitino/spark/connector/jdbc/GravitinoJdbcCatalogSpark34.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.spark.connector.jdbc; + +import org.apache.gravitino.spark.connector.SparkTableChangeConverter; +import org.apache.gravitino.spark.connector.SparkTableChangeConverter34; +import org.apache.gravitino.spark.connector.SparkTypeConverter; + +public class GravitinoJdbcCatalogSpark34 extends GravitinoJdbcCatalog { + + @Override + protected SparkTypeConverter getSparkTypeConverter() { + return new SparkJdbcTypeConverter34(); + } + + @Override + protected SparkTableChangeConverter getSparkTableChangeConverter( + SparkTypeConverter sparkTypeConverter) { + return new SparkTableChangeConverter34(sparkTypeConverter); + } +} diff --git a/spark-connector/v3.4/spark/src/main/java/org/apache/gravitino/spark/connector/jdbc/SparkJdbcTypeConverter34.java b/spark-connector/v3.4/spark/src/main/java/org/apache/gravitino/spark/connector/jdbc/SparkJdbcTypeConverter34.java new file mode 100644 index 00000000000..a152ba73cae --- /dev/null +++ b/spark-connector/v3.4/spark/src/main/java/org/apache/gravitino/spark/connector/jdbc/SparkJdbcTypeConverter34.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.spark.connector.jdbc; + +import org.apache.gravitino.rel.types.Type; +import org.apache.gravitino.rel.types.Types; +import org.apache.gravitino.spark.connector.SparkTypeConverter34; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; + +public class SparkJdbcTypeConverter34 extends SparkTypeConverter34 { + @Override + public DataType toSparkType(Type gravitinoType) { + if (gravitinoType instanceof Types.VarCharType) { + return DataTypes.StringType; + } else { + return super.toSparkType(gravitinoType); + } + } +} diff --git a/spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcCatalogIT34.java b/spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcCatalogIT34.java new file mode 100644 index 00000000000..a105ed9aff2 --- /dev/null +++ b/spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcCatalogIT34.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.spark.connector.integration.test.jdbc; + +import org.apache.gravitino.spark.connector.jdbc.GravitinoJdbcCatalogSpark34; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class SparkJdbcCatalogIT34 extends SparkJdbcCatalogIT { + @Test + void testCatalogClassName() { + String catalogClass = + getSparkSession() + .sessionState() + .conf() + .getConfString("spark.sql.catalog." + getCatalogName()); + Assertions.assertEquals(GravitinoJdbcCatalogSpark34.class.getName(), catalogClass); + } +} diff --git a/spark-connector/v3.5/spark/src/main/java/org/apache/gravitino/spark/connector/jdbc/GravitinoJdbcCatalogSpark35.java b/spark-connector/v3.5/spark/src/main/java/org/apache/gravitino/spark/connector/jdbc/GravitinoJdbcCatalogSpark35.java new file mode 100644 index 00000000000..31b8569f94a --- /dev/null +++ b/spark-connector/v3.5/spark/src/main/java/org/apache/gravitino/spark/connector/jdbc/GravitinoJdbcCatalogSpark35.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.spark.connector.jdbc; + +public class GravitinoJdbcCatalogSpark35 extends GravitinoJdbcCatalogSpark34 {} diff --git a/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcCatalogIT35.java b/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcCatalogIT35.java new file mode 100644 index 00000000000..653dc0c297b --- /dev/null +++ b/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcCatalogIT35.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.spark.connector.integration.test.jdbc; + +import org.apache.gravitino.spark.connector.jdbc.GravitinoJdbcCatalogSpark35; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class SparkJdbcCatalogIT35 extends SparkJdbcCatalogIT { + @Test + void testCatalogClassName() { + String catalogClass = + getSparkSession() + .sessionState() + .conf() + .getConfString("spark.sql.catalog." + getCatalogName()); + Assertions.assertEquals(GravitinoJdbcCatalogSpark35.class.getName(), catalogClass); + } +}