Skip to content

Commit

Permalink
Merge branch 'main' of github.com:apache/gravitino into minor
Browse files Browse the repository at this point in the history
  • Loading branch information
yuqi1129 committed Jan 15, 2025
2 parents 7e9d01a + a13d435 commit ea949f2
Show file tree
Hide file tree
Showing 17 changed files with 76 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,14 @@ public RangerClientExtension(String hostName, String authType, String username,
}
}

@Override
public RangerPolicy createPolicy(RangerPolicy policy) throws RangerServiceException {
Preconditions.checkArgument(
policy.getResources().size() > 0, "Ranger policy resources can not be empty!");
return super.createPolicy(policy);
}

@Override
public RangerPolicy updatePolicy(long policyId, RangerPolicy policy)
throws RangerServiceException {
Preconditions.checkArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public VXGroup() {
*
* @return formatedStr
*/
@Override
public String toString() {
String str = "VXGroup={";
str += super.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public String getName() {
*
* @return formatedStr
*/
@Override
public String toString() {
String str = "VXUser={";
str += super.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ protected Map<String, String> getTableProperties(Connection connection, String t
}
}

@Override
protected void correctJdbcTableFields(
Connection connection, String databaseName, String tableName, JdbcTable.Builder tableBuilder)
throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,15 @@ public void initAuthorizationPluginInstance(IsolatedClassLoader classLoader) {
try {
BaseAuthorization<?> authorization =
BaseAuthorization.createAuthorization(classLoader, authorizationProvider);

// Load the authorization plugin with the class loader of the catalog.
// Because the JDBC authorization plugin may load JDBC driver using the class loader.
authorizationPlugin =
authorization.newPlugin(entity.namespace().level(0), provider(), this.conf);
classLoader.withClassLoader(
cl ->
authorization.newPlugin(
entity.namespace().level(0), provider(), this.conf));

} catch (Exception e) {
LOG.error("Failed to load authorization with class loader", e);
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ public void disableMetalake(NameIdentifier ident) throws NoSuchMetalakeException
dispatcher.disableMetalake(ident);
}

@Override
public boolean dropMetalake(NameIdentifier ident) {
// For metalake, we don't clear all the privileges of catalog authorization plugin.
// we just remove metalake.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public TableInfo createTableRequest() {
return createTableRequest;
}

@Override
public OperationType operationType() {
return OperationType.CREATE_TABLE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public String insertCatalogMetaOnDuplicateKeyUpdate(CatalogPO catalogPO) {
+ " deleted_at = #{catalogMeta.deletedAt}";
}

@Override
public String updateCatalogMeta(
@Param("newCatalogMeta") CatalogPO newCatalogPO,
@Param("oldCatalogMeta") CatalogPO oldCatalogPO) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public String insertMetalakeMetaOnDuplicateKeyUpdate(MetalakePO metalakePO) {
+ " deleted_at = #{metalakeMeta.deletedAt}";
}

@Override
public String updateMetalakeMeta(
@Param("newMetalakeMeta") MetalakePO newMetalakePO,
@Param("oldMetalakeMeta") MetalakePO oldMetalakePO) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.ibatis.annotations.Param;

public class SecurableObjectPostgreSQLProvider extends SecurableObjectBaseSQLProvider {
@Override
public String batchSoftDeleteSecurableObjects(
@Param("securableObjects") List<SecurableObjectPO> securableObjectPOs) {
return "<script>"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public String insertTagMetaOnDuplicateKeyUpdate(TagPO tagPO) {
+ " deleted_at = #{tagMeta.deletedAt}";
}

@Override
public String updateTagMeta(
@Param("newTagMeta") TagPO newTagPO, @Param("oldTagMeta") TagPO oldTagPO) {
return "UPDATE "
Expand Down
8 changes: 4 additions & 4 deletions docs/flink-connector/flink-connector.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,16 @@ This capability allows users to perform federation queries, accessing data from

Set the flink configuration in flink-conf.yaml.
```yaml
table.catalog-store.kind=gravitino
table.catalog-store.gravitino.gravitino.metalake=test
table.catalog-store.gravitino.gravitino.uri=http://localhost:8080
table.catalog-store.kind: gravitino
table.catalog-store.gravitino.gravitino.metalake: test
table.catalog-store.gravitino.gravitino.uri: http://localhost:8090
```
Or you can set the flink configuration in the `TableEnvironment`.
```java
final Configuration configuration = new Configuration();
configuration.setString("table.catalog-store.kind", "gravitino");
configuration.setString("table.catalog-store.gravitino.gravitino.metalake", "test");
configuration.setString("table.catalog-store.gravitino.gravitino.uri", "http://localhost:8080");
configuration.setString("table.catalog-store.gravitino.gravitino.uri", "http://localhost:8090");
EnvironmentSettings.Builder builder = EnvironmentSettings.newInstance().withConfiguration(configuration);
TableEnvironment tableEnv = TableEnvironment.create(builder.inBatchMode().build());
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -656,11 +656,11 @@ static SchemaChange[] getSchemaChange(CatalogDatabase current, CatalogDatabase u
return schemaChanges.toArray(new SchemaChange[0]);
}

private Catalog catalog() {
protected Catalog catalog() {
return GravitinoCatalogManager.get().getGravitinoCatalogInfo(getName());
}

private String catalogName() {
protected String catalogName() {
return getName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,17 @@

package org.apache.gravitino.flink.connector.paimon;

import java.util.Optional;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.factories.Factory;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.flink.connector.PartitionConverter;
import org.apache.gravitino.flink.connector.PropertiesConverter;
import org.apache.gravitino.flink.connector.catalog.BaseCatalog;
import org.apache.paimon.flink.FlinkTableFactory;

/**
* The GravitinoPaimonCatalog class is an implementation of the BaseCatalog class that is used to
Expand All @@ -45,4 +52,21 @@ protected GravitinoPaimonCatalog(
protected AbstractCatalog realCatalog() {
return paimonCatalog;
}

@Override
public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
boolean dropped =
catalog()
.asTableCatalog()
.purgeTable(NameIdentifier.of(tablePath.getDatabaseName(), tablePath.getObjectName()));
if (!dropped && !ignoreIfNotExists) {
throw new TableNotExistException(catalogName(), tablePath);
}
}

@Override
public Optional<Factory> getFactory() {
return Optional.of(new FlinkTableFactory());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.gravitino.flink.connector.integration.test;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.errorprone.annotations.FormatMethod;
import com.google.errorprone.annotations.FormatString;
import java.io.IOException;
Expand Down Expand Up @@ -159,17 +158,14 @@ protected TableResult sql(@FormatString String sql, Object... args) {
return tableEnv.executeSql(String.format(sql, args));
}

protected static void doWithSchema(
protected void doWithSchema(
Catalog catalog, String schemaName, Consumer<Catalog> action, boolean dropSchema) {
Preconditions.checkNotNull(catalog);
Preconditions.checkNotNull(schemaName);
try {
tableEnv.useCatalog(catalog.name());
if (!catalog.asSchemas().schemaExists(schemaName)) {
catalog
.asSchemas()
.createSchema(
schemaName, null, ImmutableMap.of("location", warehouse + "/" + schemaName));
catalog.asSchemas().createSchema(schemaName, null, null);
}
tableEnv.useDatabase(schemaName);
action.accept(catalog);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
Expand Down Expand Up @@ -586,4 +587,28 @@ public void testGetHiveTable() {
protected org.apache.gravitino.Catalog currentCatalog() {
return hiveCatalog;
}

protected void doWithSchema(
org.apache.gravitino.Catalog catalog,
String schemaName,
Consumer<org.apache.gravitino.Catalog> action,
boolean dropSchema) {
Preconditions.checkNotNull(catalog);
Preconditions.checkNotNull(schemaName);
try {
tableEnv.useCatalog(catalog.name());
if (!catalog.asSchemas().schemaExists(schemaName)) {
catalog
.asSchemas()
.createSchema(
schemaName, null, ImmutableMap.of("location", warehouse + "/" + schemaName));
}
tableEnv.useDatabase(schemaName);
action.accept(catalog);
} finally {
if (dropSchema) {
catalog.asSchemas().dropSchema(schemaName, true);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,6 @@ public class FlinkPaimonCatalogIT extends FlinkCommonIT {

private static org.apache.gravitino.Catalog catalog;

@Override
protected boolean supportColumnOperation() {
return false;
}

@Override
protected boolean supportTableOperation() {
return false;
}

@Override
protected boolean supportSchemaOperationWithCommentAndOptions() {
return false;
Expand Down

0 comments on commit ea949f2

Please sign in to comment.