Skip to content

Commit

Permalink
[#6309] fix(flink): remove paimon package from GravitinoPaimonCatalog…
Browse files Browse the repository at this point in the history
…Factory (#6313)

### What changes were proposed in this pull request?

remove paimon package from `GravitinoPaimonCatalogFactory`, because
Paimon package may not in the Flink classpath when loading catalog
factories.

### Why are the changes needed?

Fix: #6309 

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

tested in local flink cluster
  • Loading branch information
FANNG1 authored Jan 17, 2025
1 parent 6fb45d6 commit 85b919c
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.Factory;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.flink.connector.PartitionConverter;
import org.apache.gravitino.flink.connector.PropertiesConverter;
import org.apache.gravitino.flink.connector.catalog.BaseCatalog;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.FlinkTableFactory;

/**
Expand All @@ -40,12 +42,13 @@ public class GravitinoPaimonCatalog extends BaseCatalog {
private final AbstractCatalog paimonCatalog;

protected GravitinoPaimonCatalog(
String catalogName,
AbstractCatalog paimonCatalog,
CatalogFactory.Context context,
String defaultDatabase,
PropertiesConverter propertiesConverter,
PartitionConverter partitionConverter) {
super(catalogName, paimonCatalog.getDefaultDatabase(), propertiesConverter, partitionConverter);
this.paimonCatalog = paimonCatalog;
super(context.getName(), defaultDatabase, propertiesConverter, partitionConverter);
FlinkCatalogFactory flinkCatalogFactory = new FlinkCatalogFactory();
this.paimonCatalog = flinkCatalogFactory.createCatalog(context);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
import java.util.Set;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.gravitino.flink.connector.DefaultPartitionConverter;
import org.apache.gravitino.flink.connector.PartitionConverter;
import org.apache.gravitino.flink.connector.PropertiesConverter;
import org.apache.gravitino.flink.connector.catalog.BaseCatalogFactory;
import org.apache.paimon.flink.FlinkCatalog;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.gravitino.flink.connector.utils.FactoryUtils;

/**
* Factory for creating instances of {@link GravitinoPaimonCatalog}. It will be created by SPI
Expand All @@ -38,9 +38,12 @@ public class GravitinoPaimonCatalogFactory implements BaseCatalogFactory {

@Override
public Catalog createCatalog(Context context) {
FlinkCatalog catalog = new FlinkCatalogFactory().createCatalog(context);
final FactoryUtil.CatalogFactoryHelper helper =
FactoryUtils.createCatalogFactoryHelper(this, context);
String defaultDatabase =
helper.getOptions().get(GravitinoPaimonCatalogFactoryOptions.DEFAULT_DATABASE);
return new GravitinoPaimonCatalog(
context.getName(), catalog, propertiesConverter(), partitionConverter());
context, defaultDatabase, propertiesConverter(), partitionConverter());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,17 @@

package org.apache.gravitino.flink.connector.paimon;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.paimon.flink.FlinkCatalogOptions;

public class GravitinoPaimonCatalogFactoryOptions {

/** Identifier for the {@link GravitinoPaimonCatalog}. */
public static final String IDENTIFIER = "gravitino-paimon";

public static final ConfigOption<String> DEFAULT_DATABASE =
ConfigOptions.key(FlinkCatalogOptions.DEFAULT_DATABASE.key())
.stringType()
.defaultValue(FlinkCatalogOptions.DEFAULT_DATABASE.defaultValue());
}

0 comments on commit 85b919c

Please sign in to comment.