From dfff9aa3a6edddccb4e7979b440bcc5519a20a6c Mon Sep 17 00:00:00 2001 From: youze Liang <525672876@qq.com> Date: Wed, 22 Jan 2025 11:37:34 +0800 Subject: [PATCH] resolve conversation --- docs/spark-connector/spark-catalog-jdbc.md | 3 +++ docs/spark-connector/spark-connector.md | 2 +- spark-connector/spark-common/build.gradle.kts | 3 --- .../jdbc/SparkJdbcTypeConverter.java | 2 ++ .../integration/test/SparkCommonIT.java | 10 +++++----- .../test/hive/SparkHiveCatalogIT.java | 2 +- .../test/iceberg/SparkIcebergCatalogIT.java | 4 +--- ...ogIT.java => SparkJdbcMysqlCatalogIT.java} | 4 ++-- .../test/jdbc/SparkJdbcTableInfoChecker.java | 1 + .../test/paimon/SparkPaimonCatalogIT.java | 2 +- ...33.java => SparkJdbcMysqlCatalogIT33.java} | 2 +- .../jdbc/SparkJdbcTypeConverter34.java | 2 ++ ...34.java => SparkJdbcMysqlCatalogIT34.java} | 2 +- .../jdbc/GravitinoJdbcCatalogSpark35.java | 19 ++++++++++++++++++- ...35.java => SparkJdbcMysqlCatalogIT35.java} | 2 +- 15 files changed, 40 insertions(+), 20 deletions(-) rename spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/{SparkJdbcCatalogIT.java => SparkJdbcMysqlCatalogIT.java} (95%) rename spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/{SparkJdbcCatalogIT33.java => SparkJdbcMysqlCatalogIT33.java} (94%) rename spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/{SparkJdbcCatalogIT34.java => SparkJdbcMysqlCatalogIT34.java} (94%) rename spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/{SparkJdbcCatalogIT35.java => SparkJdbcMysqlCatalogIT35.java} (94%) diff --git a/docs/spark-connector/spark-catalog-jdbc.md b/docs/spark-connector/spark-catalog-jdbc.md index 3b1a3a7616c..7805d80266f 100644 --- a/docs/spark-connector/spark-catalog-jdbc.md +++ b/docs/spark-connector/spark-catalog-jdbc.md @@ -9,6 +9,8 @@ The Apache Gravitino Spark connector offers the capability to read JDBC tables, ## Capabilities +Supports MySQL and PostgreSQL. For OceanBase which is compatible with Mysql Dialects could use Mysql driver and Mysql Dialects as a trackoff way. But for Doris which do not support MySQL Dialects, are not currently supported. + #### Support DML and DDL operations: - `CREATE TABLE` @@ -22,6 +24,7 @@ The Apache Gravitino Spark connector offers the capability to read JDBC tables, ::: #### Not supported operations: + - `UPDATE` - `DELETE` - `TRUNCATE` diff --git a/docs/spark-connector/spark-connector.md b/docs/spark-connector/spark-connector.md index a7c47d51f22..a982313433a 100644 --- a/docs/spark-connector/spark-connector.md +++ b/docs/spark-connector/spark-connector.md @@ -11,7 +11,7 @@ The Apache Gravitino Spark connector leverages the Spark DataSourceV2 interface ## Capabilities -1. Supports [Hive catalog](spark-catalog-hive.md), [Iceberg catalog](spark-catalog-iceberg.md) and [Paimon catalog](spark-catalog-paimon.md). +1. Supports [Hive catalog](spark-catalog-hive.md), [Iceberg catalog](spark-catalog-iceberg.md), [Paimon catalog](spark-catalog-paimon.md) and [Jdbc catalog](spark-catalog-jdbc.md). 2. Supports federation query. 3. Supports most DDL and DML SQLs. diff --git a/spark-connector/spark-common/build.gradle.kts b/spark-connector/spark-common/build.gradle.kts index 2d1cccfd19e..06e0077d21e 100644 --- a/spark-connector/spark-common/build.gradle.kts +++ b/spark-connector/spark-common/build.gradle.kts @@ -39,9 +39,6 @@ val scalaCollectionCompatVersion: String = libs.versions.scala.collection.compat dependencies { implementation(project(":catalogs:catalog-common")) - implementation(project(":catalogs:catalog-jdbc-common")) { - exclude("org.apache.logging.log4j") - } implementation(libs.guava) compileOnly(project(":clients:client-java-runtime", configuration = "shadow")) diff --git a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/jdbc/SparkJdbcTypeConverter.java b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/jdbc/SparkJdbcTypeConverter.java index a73d592c7d6..56e2734a7f4 100644 --- a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/jdbc/SparkJdbcTypeConverter.java +++ b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/jdbc/SparkJdbcTypeConverter.java @@ -29,6 +29,8 @@ public class SparkJdbcTypeConverter extends SparkTypeConverter { @Override public DataType toSparkType(Type gravitinoType) { + // if spark version lower than 3.4.4, using VarCharType will throw an exception: Unsupported + // type varchar. if (gravitinoType instanceof Types.VarCharType) { return DataTypes.StringType; } else { diff --git a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java index 0bea3c21910..2eb9e7b9b5a 100644 --- a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java +++ b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java @@ -119,7 +119,7 @@ private static String getRowLevelDeleteTableSql( protected abstract boolean supportsReplaceColumns(); - protected abstract boolean supportsProperties(); + protected abstract boolean supportsSchemaAndTableProperties(); protected abstract boolean supportsComplexType(); @@ -197,7 +197,7 @@ void testLoadCatalogs() { } @Test - @EnabledIf("supportsProperties") + @EnabledIf("supportsSchemaAndTableProperties") protected void testCreateAndLoadSchema() { String testDatabaseName = "t_create1"; dropDatabaseIfExists(testDatabaseName); @@ -227,7 +227,7 @@ protected void testCreateAndLoadSchema() { } @Test - @EnabledIf("supportsProperties") + @EnabledIf("supportsSchemaAndTableProperties") protected void testAlterSchema() { String testDatabaseName = "t_alter"; dropDatabaseIfExists(testDatabaseName); @@ -405,7 +405,7 @@ void testListTable() { } @Test - @EnabledIf("supportsProperties") + @EnabledIf("supportsSchemaAndTableProperties") void testAlterTableSetAndRemoveProperty() { String tableName = "test_property"; dropTableIfExists(tableName); @@ -807,7 +807,7 @@ protected void deleteDirIfExists(String path) { } @Test - @EnabledIf("supportsProperties") + @EnabledIf("supportsSchemaAndTableProperties") void testTableOptions() { String tableName = "options_table"; dropTableIfExists(tableName); diff --git a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java index d472d2a3d09..6ed8e12d647 100644 --- a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java +++ b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java @@ -85,7 +85,7 @@ protected boolean supportsReplaceColumns() { } @Override - protected boolean supportsProperties() { + protected boolean supportsSchemaAndTableProperties() { return true; } diff --git a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java index e3c9792fcf8..291f8f25dbf 100644 --- a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java +++ b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java @@ -61,12 +61,10 @@ import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; -@Tag("gravitino-docker-test") public abstract class SparkIcebergCatalogIT extends SparkCommonIT { private static final String ICEBERG_FORMAT_VERSION = "format-version"; @@ -112,7 +110,7 @@ protected boolean supportsReplaceColumns() { } @Override - protected boolean supportsProperties() { + protected boolean supportsSchemaAndTableProperties() { return true; } diff --git a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcCatalogIT.java b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcMysqlCatalogIT.java similarity index 95% rename from spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcCatalogIT.java rename to spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcMysqlCatalogIT.java index 3fbf97f291f..65ed937be3a 100644 --- a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcCatalogIT.java +++ b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcMysqlCatalogIT.java @@ -26,7 +26,7 @@ import org.junit.jupiter.api.Tag; @Tag("gravitino-docker-test") -public abstract class SparkJdbcCatalogIT extends SparkCommonIT { +public abstract class SparkJdbcMysqlCatalogIT extends SparkCommonIT { @Override protected boolean supportsSparkSQLClusteredBy() { return false; @@ -53,7 +53,7 @@ protected boolean supportsReplaceColumns() { } @Override - protected boolean supportsProperties() { + protected boolean supportsSchemaAndTableProperties() { return false; } diff --git a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcTableInfoChecker.java b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcTableInfoChecker.java index 9240e051bca..32a66923cbe 100644 --- a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcTableInfoChecker.java +++ b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcTableInfoChecker.java @@ -31,6 +31,7 @@ public static SparkJdbcTableInfoChecker create() { return new SparkJdbcTableInfoChecker(); } + // Spark jdbc table cannot distinguish between comment=null and comment="" @Override public SparkTableInfoChecker withColumns(List columns) { getExpectedTableInfo() diff --git a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogIT.java b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogIT.java index fff31d13efe..40afa060859 100644 --- a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogIT.java +++ b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/paimon/SparkPaimonCatalogIT.java @@ -64,7 +64,7 @@ protected boolean supportsSchemaEvolution() { } @Override - protected boolean supportsProperties() { + protected boolean supportsSchemaAndTableProperties() { return true; } diff --git a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcCatalogIT33.java b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcMysqlCatalogIT33.java similarity index 94% rename from spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcCatalogIT33.java rename to spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcMysqlCatalogIT33.java index acef94e8bf5..cf190cfd4fb 100644 --- a/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcCatalogIT33.java +++ b/spark-connector/v3.3/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcMysqlCatalogIT33.java @@ -23,7 +23,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -public class SparkJdbcCatalogIT33 extends SparkJdbcCatalogIT { +public class SparkJdbcMysqlCatalogIT33 extends SparkJdbcMysqlCatalogIT { @Test void testCatalogClassName() { String catalogClass = diff --git a/spark-connector/v3.4/spark/src/main/java/org/apache/gravitino/spark/connector/jdbc/SparkJdbcTypeConverter34.java b/spark-connector/v3.4/spark/src/main/java/org/apache/gravitino/spark/connector/jdbc/SparkJdbcTypeConverter34.java index a152ba73cae..bbd32e0225d 100644 --- a/spark-connector/v3.4/spark/src/main/java/org/apache/gravitino/spark/connector/jdbc/SparkJdbcTypeConverter34.java +++ b/spark-connector/v3.4/spark/src/main/java/org/apache/gravitino/spark/connector/jdbc/SparkJdbcTypeConverter34.java @@ -28,6 +28,8 @@ public class SparkJdbcTypeConverter34 extends SparkTypeConverter34 { @Override public DataType toSparkType(Type gravitinoType) { + // if spark version lower than 3.4.4, using VarCharType will throw an exception: Unsupported + // type varchar. if (gravitinoType instanceof Types.VarCharType) { return DataTypes.StringType; } else { diff --git a/spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcCatalogIT34.java b/spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcMysqlCatalogIT34.java similarity index 94% rename from spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcCatalogIT34.java rename to spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcMysqlCatalogIT34.java index a105ed9aff2..9a4038404d8 100644 --- a/spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcCatalogIT34.java +++ b/spark-connector/v3.4/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcMysqlCatalogIT34.java @@ -22,7 +22,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -public class SparkJdbcCatalogIT34 extends SparkJdbcCatalogIT { +public class SparkJdbcMysqlCatalogIT34 extends SparkJdbcMysqlCatalogIT { @Test void testCatalogClassName() { String catalogClass = diff --git a/spark-connector/v3.5/spark/src/main/java/org/apache/gravitino/spark/connector/jdbc/GravitinoJdbcCatalogSpark35.java b/spark-connector/v3.5/spark/src/main/java/org/apache/gravitino/spark/connector/jdbc/GravitinoJdbcCatalogSpark35.java index 31b8569f94a..1b10d63fa09 100644 --- a/spark-connector/v3.5/spark/src/main/java/org/apache/gravitino/spark/connector/jdbc/GravitinoJdbcCatalogSpark35.java +++ b/spark-connector/v3.5/spark/src/main/java/org/apache/gravitino/spark/connector/jdbc/GravitinoJdbcCatalogSpark35.java @@ -19,4 +19,21 @@ package org.apache.gravitino.spark.connector.jdbc; -public class GravitinoJdbcCatalogSpark35 extends GravitinoJdbcCatalogSpark34 {} +import org.apache.gravitino.spark.connector.SparkTableChangeConverter; +import org.apache.gravitino.spark.connector.SparkTableChangeConverter34; +import org.apache.gravitino.spark.connector.SparkTypeConverter; +import org.apache.gravitino.spark.connector.SparkTypeConverter34; + +public class GravitinoJdbcCatalogSpark35 extends GravitinoJdbcCatalog { + + @Override + protected SparkTypeConverter getSparkTypeConverter() { + return new SparkTypeConverter34(); + } + + @Override + protected SparkTableChangeConverter getSparkTableChangeConverter( + SparkTypeConverter sparkTypeConverter) { + return new SparkTableChangeConverter34(sparkTypeConverter); + } +} diff --git a/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcCatalogIT35.java b/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcMysqlCatalogIT35.java similarity index 94% rename from spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcCatalogIT35.java rename to spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcMysqlCatalogIT35.java index 653dc0c297b..00c14e40d2c 100644 --- a/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcCatalogIT35.java +++ b/spark-connector/v3.5/spark/src/test/java/org/apache/gravitino/spark/connector/integration/test/jdbc/SparkJdbcMysqlCatalogIT35.java @@ -23,7 +23,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -public class SparkJdbcCatalogIT35 extends SparkJdbcCatalogIT { +public class SparkJdbcMysqlCatalogIT35 extends SparkJdbcMysqlCatalogIT { @Test void testCatalogClassName() { String catalogClass =