From 9ca88e0b06a75366c680610397f136519e8890f4 Mon Sep 17 00:00:00 2001 From: yangyang zhong <35210666+hdygxsj@users.noreply.github.com> Date: Wed, 15 Jan 2025 18:20:25 +0800 Subject: [PATCH 01/31] [#5194] feat(flink): Support basic table DDL Operation for paimon-catalog (#6224) ### What changes were proposed in this pull request? Support basic table DDL Operation for paimon-catalog ### Why are the changes needed? Fix: #5194 ### Does this PR introduce _any_ user-facing change? None. ### How was this patch tested? org.apache.gravitino.flink.connector.integration.test.paimon.FlinkPaimonCatalogIT --- .../flink/connector/catalog/BaseCatalog.java | 4 +-- .../paimon/GravitinoPaimonCatalog.java | 24 ++++++++++++++++++ .../integration/test/FlinkEnvIT.java | 8 ++---- .../test/hive/FlinkHiveCatalogIT.java | 25 +++++++++++++++++++ .../test/paimon/FlinkPaimonCatalogIT.java | 10 -------- 5 files changed, 53 insertions(+), 18 deletions(-) diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java index 1496742177f..fd8e118ee49 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java @@ -656,11 +656,11 @@ static SchemaChange[] getSchemaChange(CatalogDatabase current, CatalogDatabase u return schemaChanges.toArray(new SchemaChange[0]); } - private Catalog catalog() { + protected Catalog catalog() { return GravitinoCatalogManager.get().getGravitinoCatalogInfo(getName()); } - private String catalogName() { + protected String catalogName() { return getName(); } } diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java index 017ac6e7085..c22e00fa122 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java @@ -19,10 +19,17 @@ package org.apache.gravitino.flink.connector.paimon; +import java.util.Optional; import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.factories.Factory; +import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.flink.connector.PartitionConverter; import org.apache.gravitino.flink.connector.PropertiesConverter; import org.apache.gravitino.flink.connector.catalog.BaseCatalog; +import org.apache.paimon.flink.FlinkTableFactory; /** * The GravitinoPaimonCatalog class is an implementation of the BaseCatalog class that is used to @@ -45,4 +52,21 @@ protected GravitinoPaimonCatalog( protected AbstractCatalog realCatalog() { return paimonCatalog; } + + @Override + public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + boolean dropped = + catalog() + .asTableCatalog() + .purgeTable(NameIdentifier.of(tablePath.getDatabaseName(), tablePath.getObjectName())); + if (!dropped && !ignoreIfNotExists) { + throw new TableNotExistException(catalogName(), tablePath); + } + } + + @Override + public Optional getFactory() { + return Optional.of(new FlinkTableFactory()); + } } diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java index 5ae8847c6c1..f56b5297e17 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java @@ -19,7 +19,6 @@ package org.apache.gravitino.flink.connector.integration.test; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; import com.google.errorprone.annotations.FormatMethod; import com.google.errorprone.annotations.FormatString; import java.io.IOException; @@ -159,17 +158,14 @@ protected TableResult sql(@FormatString String sql, Object... args) { return tableEnv.executeSql(String.format(sql, args)); } - protected static void doWithSchema( + protected void doWithSchema( Catalog catalog, String schemaName, Consumer action, boolean dropSchema) { Preconditions.checkNotNull(catalog); Preconditions.checkNotNull(schemaName); try { tableEnv.useCatalog(catalog.name()); if (!catalog.asSchemas().schemaExists(schemaName)) { - catalog - .asSchemas() - .createSchema( - schemaName, null, ImmutableMap.of("location", warehouse + "/" + schemaName)); + catalog.asSchemas().createSchema(schemaName, null, null); } tableEnv.useDatabase(schemaName); action.accept(catalog); diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java index 333aa83f0b6..bb7b25f6b20 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java @@ -29,6 +29,7 @@ import java.util.Arrays; import java.util.Map; import java.util.Optional; +import java.util.function.Consumer; import java.util.stream.Collectors; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.DataTypes; @@ -586,4 +587,28 @@ public void testGetHiveTable() { protected org.apache.gravitino.Catalog currentCatalog() { return hiveCatalog; } + + protected void doWithSchema( + org.apache.gravitino.Catalog catalog, + String schemaName, + Consumer action, + boolean dropSchema) { + Preconditions.checkNotNull(catalog); + Preconditions.checkNotNull(schemaName); + try { + tableEnv.useCatalog(catalog.name()); + if (!catalog.asSchemas().schemaExists(schemaName)) { + catalog + .asSchemas() + .createSchema( + schemaName, null, ImmutableMap.of("location", warehouse + "/" + schemaName)); + } + tableEnv.useDatabase(schemaName); + action.accept(catalog); + } finally { + if (dropSchema) { + catalog.asSchemas().dropSchema(schemaName, true); + } + } + } } diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java index 10fab3567a3..57a17c2a114 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java @@ -42,16 +42,6 @@ public class FlinkPaimonCatalogIT extends FlinkCommonIT { private static org.apache.gravitino.Catalog catalog; - @Override - protected boolean supportColumnOperation() { - return false; - } - - @Override - protected boolean supportTableOperation() { - return false; - } - @Override protected boolean supportSchemaOperationWithCommentAndOptions() { return false; From 24c9076acf55915bdfdae5dcb3892cd8dd83d0af Mon Sep 17 00:00:00 2001 From: Xiaojian Sun Date: Wed, 15 Jan 2025 19:08:01 +0800 Subject: [PATCH 02/31] [#6237]fix: add missing @override annotations (#6244) ### What changes were proposed in this pull request? Add missing `@override` annotations ### Why are the changes needed? Fix: https://github.com/apache/gravitino/issues/6237 ### Does this PR introduce _any_ user-facing change? N/A ### How was this patch tested? N/A --- .../gravitino/authorization/ranger/RangerClientExtension.java | 2 ++ .../gravitino/authorization/ranger/reference/VXGroup.java | 1 + .../apache/gravitino/authorization/ranger/reference/VXUser.java | 1 + .../catalog/oceanbase/operation/OceanBaseTableOperations.java | 1 + .../java/org/apache/gravitino/hook/MetalakeHookDispatcher.java | 1 + .../gravitino/listener/api/event/CreateTablePreEvent.java | 1 + .../provider/postgresql/CatalogMetaPostgreSQLProvider.java | 1 + .../provider/postgresql/MetalakeMetaPostgreSQLProvider.java | 1 + .../provider/postgresql/SecurableObjectPostgreSQLProvider.java | 1 + .../mapper/provider/postgresql/TagMetaPostgreSQLProvider.java | 1 + 10 files changed, 11 insertions(+) diff --git a/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerClientExtension.java b/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerClientExtension.java index a554559ea5c..e1e9f6955d2 100644 --- a/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerClientExtension.java +++ b/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerClientExtension.java @@ -100,12 +100,14 @@ public RangerClientExtension(String hostName, String authType, String username, } } + @Override public RangerPolicy createPolicy(RangerPolicy policy) throws RangerServiceException { Preconditions.checkArgument( policy.getResources().size() > 0, "Ranger policy resources can not be empty!"); return super.createPolicy(policy); } + @Override public RangerPolicy updatePolicy(long policyId, RangerPolicy policy) throws RangerServiceException { Preconditions.checkArgument( diff --git a/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/reference/VXGroup.java b/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/reference/VXGroup.java index 3a58f5c95a0..611127ec3f2 100644 --- a/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/reference/VXGroup.java +++ b/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/reference/VXGroup.java @@ -60,6 +60,7 @@ public VXGroup() { * * @return formatedStr */ + @Override public String toString() { String str = "VXGroup={"; str += super.toString(); diff --git a/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/reference/VXUser.java b/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/reference/VXUser.java index f605d987de0..3dbc2b0236b 100644 --- a/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/reference/VXUser.java +++ b/authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/reference/VXUser.java @@ -75,6 +75,7 @@ public String getName() { * * @return formatedStr */ + @Override public String toString() { String str = "VXUser={"; str += super.toString(); diff --git a/catalogs/catalog-jdbc-oceanbase/src/main/java/org/apache/gravitino/catalog/oceanbase/operation/OceanBaseTableOperations.java b/catalogs/catalog-jdbc-oceanbase/src/main/java/org/apache/gravitino/catalog/oceanbase/operation/OceanBaseTableOperations.java index 77c97290927..98f2d174f1a 100644 --- a/catalogs/catalog-jdbc-oceanbase/src/main/java/org/apache/gravitino/catalog/oceanbase/operation/OceanBaseTableOperations.java +++ b/catalogs/catalog-jdbc-oceanbase/src/main/java/org/apache/gravitino/catalog/oceanbase/operation/OceanBaseTableOperations.java @@ -185,6 +185,7 @@ protected Map getTableProperties(Connection connection, String t } } + @Override protected void correctJdbcTableFields( Connection connection, String databaseName, String tableName, JdbcTable.Builder tableBuilder) throws SQLException { diff --git a/core/src/main/java/org/apache/gravitino/hook/MetalakeHookDispatcher.java b/core/src/main/java/org/apache/gravitino/hook/MetalakeHookDispatcher.java index 26f31a88396..aa53b8800f8 100644 --- a/core/src/main/java/org/apache/gravitino/hook/MetalakeHookDispatcher.java +++ b/core/src/main/java/org/apache/gravitino/hook/MetalakeHookDispatcher.java @@ -116,6 +116,7 @@ public void disableMetalake(NameIdentifier ident) throws NoSuchMetalakeException dispatcher.disableMetalake(ident); } + @Override public boolean dropMetalake(NameIdentifier ident) { // For metalake, we don't clear all the privileges of catalog authorization plugin. // we just remove metalake. diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/CreateTablePreEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/CreateTablePreEvent.java index 6c01d614f3c..dd6b8cc123b 100644 --- a/core/src/main/java/org/apache/gravitino/listener/api/event/CreateTablePreEvent.java +++ b/core/src/main/java/org/apache/gravitino/listener/api/event/CreateTablePreEvent.java @@ -43,6 +43,7 @@ public TableInfo createTableRequest() { return createTableRequest; } + @Override public OperationType operationType() { return OperationType.CREATE_TABLE; } diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/CatalogMetaPostgreSQLProvider.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/CatalogMetaPostgreSQLProvider.java index abaf2c59af9..77bf3c4e285 100644 --- a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/CatalogMetaPostgreSQLProvider.java +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/CatalogMetaPostgreSQLProvider.java @@ -76,6 +76,7 @@ public String insertCatalogMetaOnDuplicateKeyUpdate(CatalogPO catalogPO) { + " deleted_at = #{catalogMeta.deletedAt}"; } + @Override public String updateCatalogMeta( @Param("newCatalogMeta") CatalogPO newCatalogPO, @Param("oldCatalogMeta") CatalogPO oldCatalogPO) { diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/MetalakeMetaPostgreSQLProvider.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/MetalakeMetaPostgreSQLProvider.java index a95d7f09fe3..06dde29751c 100644 --- a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/MetalakeMetaPostgreSQLProvider.java +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/MetalakeMetaPostgreSQLProvider.java @@ -62,6 +62,7 @@ public String insertMetalakeMetaOnDuplicateKeyUpdate(MetalakePO metalakePO) { + " deleted_at = #{metalakeMeta.deletedAt}"; } + @Override public String updateMetalakeMeta( @Param("newMetalakeMeta") MetalakePO newMetalakePO, @Param("oldMetalakeMeta") MetalakePO oldMetalakePO) { diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/SecurableObjectPostgreSQLProvider.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/SecurableObjectPostgreSQLProvider.java index 92352bcd95a..6de57dbdc48 100644 --- a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/SecurableObjectPostgreSQLProvider.java +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/SecurableObjectPostgreSQLProvider.java @@ -32,6 +32,7 @@ import org.apache.ibatis.annotations.Param; public class SecurableObjectPostgreSQLProvider extends SecurableObjectBaseSQLProvider { + @Override public String batchSoftDeleteSecurableObjects( @Param("securableObjects") List securableObjectPOs) { return "