diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/metadata/MetadataUpdater.java b/fluss-client/src/main/java/com/alibaba/fluss/client/metadata/MetadataUpdater.java index 82d7bd407..9b6bd7074 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/metadata/MetadataUpdater.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/metadata/MetadataUpdater.java @@ -314,15 +314,14 @@ private static Cluster initializeCluster(Configuration conf, RpcClient rpcClient } /** Invalid the bucket metadata for the given physical table paths. */ - public void invalidPhysicalTableBucketMeta( - Collection physicalTablesToInvalid) { + public void invalidPhysicalTableBucketMeta(Set physicalTablesToInvalid) { if (!physicalTablesToInvalid.isEmpty()) { cluster = cluster.invalidPhysicalTableBucketMeta(physicalTablesToInvalid); } } /** Get the table physical paths by table ids and partition ids. */ - public Collection getPhysicalTablePathByIds( + public Set getPhysicalTablePathByIds( @Nullable Collection tableId, @Nullable Collection tablePartitions) { Set physicalTablePaths = new HashSet<>(); diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/LogFetcher.java b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/LogFetcher.java index 665ff091c..0088dabd8 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/LogFetcher.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/LogFetcher.java @@ -59,7 +59,6 @@ import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -270,7 +269,7 @@ private synchronized void handleFetchLogResponse( // if is invalid metadata exception, we need to clear table bucket meta // to enable another round of log fetch to request new medata if (e instanceof InvalidMetadataException) { - Collection physicalTablePaths = + Set physicalTablePaths = metadataUpdater.getPhysicalTablePathByIds( tableOrPartitionsInFetchRequest.tableIds, tableOrPartitionsInFetchRequest.tablePartitions); diff --git a/fluss-common/src/main/java/com/alibaba/fluss/cluster/BucketLocation.java b/fluss-common/src/main/java/com/alibaba/fluss/cluster/BucketLocation.java index 08d63c099..369cda594 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/cluster/BucketLocation.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/cluster/BucketLocation.java @@ -22,6 +22,9 @@ import javax.annotation.Nullable; +import java.util.Arrays; +import java.util.Objects; + /** This is used to describe per-bucket location information. */ @Internal public final class BucketLocation { @@ -73,6 +76,26 @@ public ServerNode[] getReplicas() { return replicas; } + @Override + public boolean equals(Object object) { + if (this == object) { + return true; + } + if (!(object instanceof BucketLocation)) { + return false; + } + BucketLocation that = (BucketLocation) object; + return Objects.equals(physicalTablePath, that.physicalTablePath) + && Objects.equals(tableBucket, that.tableBucket) + && Objects.equals(leader, that.leader) + && Objects.deepEquals(replicas, that.replicas); + } + + @Override + public int hashCode() { + return Objects.hash(physicalTablePath, tableBucket, leader, Arrays.hashCode(replicas)); + } + @Override public String toString() { return String.format( diff --git a/fluss-common/src/main/java/com/alibaba/fluss/cluster/Cluster.java b/fluss-common/src/main/java/com/alibaba/fluss/cluster/Cluster.java index f6a630cce..93d6f86c1 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/cluster/Cluster.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/cluster/Cluster.java @@ -27,12 +27,12 @@ import javax.annotation.Nullable; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; /** * An immutable representation of a subset of the server nodes, tables, and buckets and schemas in @@ -122,12 +122,19 @@ public Cluster( this.pathByTableId = Collections.unmodifiableMap(tempPathByTableId); } - public Cluster invalidPhysicalTableBucketMeta( - Collection physicalTablesToInvalid) { + public Cluster invalidPhysicalTableBucketMeta(Set physicalTablesToInvalid) { + // should remove invalid tables from current availableLocationsByPath Map> newBucketLocationsByPath = - new HashMap<>(availableLocationsByPath); - for (PhysicalTablePath path : physicalTablesToInvalid) { - newBucketLocationsByPath.remove(path); + new HashMap<>(availableLocationsByPath.size() - physicalTablesToInvalid.size()); + // copy the metadata from current availableLocationsByPath to newBucketLocationsByPath + // except for the tables in physicalTablesToInvalid + for (Map.Entry> tablePathAndBucketLocations : + availableLocationsByPath.entrySet()) { + if (!physicalTablesToInvalid.contains(tablePathAndBucketLocations.getKey())) { + newBucketLocationsByPath.put( + tablePathAndBucketLocations.getKey(), + new ArrayList<>(tablePathAndBucketLocations.getValue())); + } } return new Cluster( new HashMap<>(aliveTabletServersById), diff --git a/fluss-common/src/test/java/com/alibaba/fluss/cluster/ClusterTest.java b/fluss-common/src/test/java/com/alibaba/fluss/cluster/ClusterTest.java index 20cfb2c44..56c8fb132 100644 --- a/fluss-common/src/test/java/com/alibaba/fluss/cluster/ClusterTest.java +++ b/fluss-common/src/test/java/com/alibaba/fluss/cluster/ClusterTest.java @@ -68,49 +68,7 @@ void setup() { @Test void testReturnModifiableCollections() { - Map> tablePathToBucketLocations = new HashMap<>(); - tablePathToBucketLocations.put( - DATA1_PHYSICAL_TABLE_PATH, - Arrays.asList( - new BucketLocation( - DATA1_PHYSICAL_TABLE_PATH, DATA1_TABLE_ID, 0, NODES[0], NODES), - new BucketLocation( - DATA1_PHYSICAL_TABLE_PATH, DATA1_TABLE_ID, 1, null, NODES), - new BucketLocation( - DATA1_PHYSICAL_TABLE_PATH, DATA1_TABLE_ID, 2, NODES[2], NODES))); - tablePathToBucketLocations.put( - PhysicalTablePath.of(DATA2_TABLE_PATH), - Arrays.asList( - new BucketLocation( - PhysicalTablePath.of(DATA2_TABLE_PATH), - DATA2_TABLE_ID, - 0, - null, - NODES), - new BucketLocation( - PhysicalTablePath.of(DATA2_TABLE_PATH), - DATA2_TABLE_ID, - 1, - NODES[0], - NODES))); - - Map tablePathToTableId = new HashMap<>(); - tablePathToTableId.put(DATA1_TABLE_PATH, DATA1_TABLE_ID); - tablePathToTableId.put(DATA2_TABLE_PATH, DATA2_TABLE_ID); - - Map tablePathToTableInfo = new HashMap<>(); - tablePathToTableInfo.put(DATA1_TABLE_PATH, DATA1_TABLE_INFO); - tablePathToTableInfo.put(DATA2_TABLE_PATH, DATA2_TABLE_INFO); - - Cluster cluster = - new Cluster( - aliveTabletServersById, - COORDINATOR_SERVER, - tablePathToBucketLocations, - tablePathToTableId, - Collections.emptyMap(), - tablePathToTableInfo); - + Cluster cluster = createCluster(); assertThatThrownBy(() -> cluster.getAliveTabletServers().put(1, NODES[3])) .isInstanceOf(UnsupportedOperationException.class); assertThatThrownBy( @@ -129,6 +87,50 @@ void testReturnModifiableCollections() { @Test void testGetTable() { + Cluster cluster = createCluster(); + assertThat(cluster.getTable(DATA1_TABLE_PATH).get()).isEqualTo(DATA1_TABLE_INFO); + assertThat(cluster.getTable(DATA2_TABLE_PATH).get()).isEqualTo(DATA2_TABLE_INFO); + assertThat(cluster.getSchema(DATA1_TABLE_PATH).get()) + .isEqualTo(new SchemaInfo(DATA1_SCHEMA, 1)); + assertThat(cluster.getSchema(DATA2_TABLE_PATH).get()) + .isEqualTo(new SchemaInfo(DATA2_SCHEMA, 1)); + } + + @Test + void testInvalidMetaAndUpdate() { + Cluster cluster = createCluster(); + for (int i = 0; i < 10000; i++) { + // mock invalid meta + cluster = + cluster.invalidPhysicalTableBucketMeta( + Collections.singleton(DATA1_PHYSICAL_TABLE_PATH)); + // mock update meta + cluster = + new Cluster( + aliveTabletServersById, + COORDINATOR_SERVER, + new HashMap<>(cluster.getBucketLocationsByPath()), + new HashMap<>(cluster.getTableIdByPath()), + Collections.emptyMap(), + new HashMap<>(cluster.getTableInfoByPath())); + } + + // verify available buckets + List availableBuckets = + cluster.getAvailableBucketsForPhysicalTablePath( + PhysicalTablePath.of(DATA2_TABLE_PATH)); + assertThat(availableBuckets) + .isEqualTo( + Collections.singletonList( + new BucketLocation( + PhysicalTablePath.of(DATA2_TABLE_PATH), + DATA2_TABLE_ID, + 1, + NODES[0], + NODES))); + } + + private Cluster createCluster() { Map> tablePathToBucketLocations = new HashMap<>(); tablePathToBucketLocations.put( DATA1_PHYSICAL_TABLE_PATH, @@ -163,20 +165,12 @@ void testGetTable() { tablePathToTableInfo.put(DATA1_TABLE_PATH, DATA1_TABLE_INFO); tablePathToTableInfo.put(DATA2_TABLE_PATH, DATA2_TABLE_INFO); - Cluster cluster = - new Cluster( - aliveTabletServersById, - COORDINATOR_SERVER, - tablePathToBucketLocations, - tablePathToTableId, - Collections.emptyMap(), - tablePathToTableInfo); - - assertThat(cluster.getTable(DATA1_TABLE_PATH).get()).isEqualTo(DATA1_TABLE_INFO); - assertThat(cluster.getTable(DATA2_TABLE_PATH).get()).isEqualTo(DATA2_TABLE_INFO); - assertThat(cluster.getSchema(DATA1_TABLE_PATH).get()) - .isEqualTo(new SchemaInfo(DATA1_SCHEMA, 1)); - assertThat(cluster.getSchema(DATA2_TABLE_PATH).get()) - .isEqualTo(new SchemaInfo(DATA2_SCHEMA, 1)); + return new Cluster( + aliveTabletServersById, + COORDINATOR_SERVER, + tablePathToBucketLocations, + tablePathToTableId, + Collections.emptyMap(), + tablePathToTableInfo); } }