Skip to content

Commit

Permalink
Catalog: return Iceberg snapshot log based on Nessie commit history
Browse files Browse the repository at this point in the history
The current behavior of Nessie's Iceberg REST is to return only the most recent Iceberg snapshot. However, this seems to conflict with some Iceberg operations, which are not only maintenance operations, but rather related to "merge on read" / (equality) deletes.

This change changes Nessie's behavior by returning older snapshots from a load-table and update-table operations.

Register-table operation however do not change, because only the latest snapshot is actually imported. The behavior does change by returning an error if the table to be registered has more than 1 snapshots.

Fixes #10013
Fixes #9969
  • Loading branch information
snazy committed Dec 3, 2024
1 parent 5137dd7 commit 614f88c
Show file tree
Hide file tree
Showing 25 changed files with 965 additions and 105 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ as necessary. Empty sections will not end in the release notes.

### Highlights

- Nessie now returns the snapshot history in the `snapshots` and `snapshot-log` attributes of an Iceberg
table-metadata retrieved via Iceberg REST for table changes that have been committed via Iceberg REST
to Nessie 0.101.0 or newer. Commits made using older Nessie versions will not return older snapshots.
- Generally, not only for Nessie, It is recommended to keep the number of snapshots maintained in an
Iceberg table-metadata as low as possible. Use the maintenance operations provided by Iceberg.

### Upgrade notes

### Breaking changes
Expand All @@ -16,6 +22,11 @@ as necessary. Empty sections will not end in the release notes.

### Changes

- Nessie now reports a "bad request" for Iceberg REST register-table for table metadata with more than
one snapshot. This is a safeguard to prevent running into snapshot validation errors when using Iceberg.
While older Nessie versions accepted registrations of table metadata with more than one snapshot, it
was not particularly safe.

### Deprecations

### Fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,55 @@ public static IcebergTableMetadata.Builder tableMetadataSimple() {
IcebergSnapshotLogEntry.builder().snapshotId(11).timestampMs(12345678L).build());
}

public static IcebergTableMetadata.Builder tableMetadataThreeSnapshots() {
IcebergSchema schemaAllTypes = icebergSchemaAllTypes();

return IcebergTableMetadata.builder()
.tableUuid(UUID.randomUUID().toString())
.lastUpdatedMs(111111111L)
.location("table-location")
.currentSnapshotId(13)
.lastColumnId(schemaAllTypes.fields().get(schemaAllTypes.fields().size() - 1).id())
.lastPartitionId(INITIAL_PARTITION_ID)
.lastSequenceNumber(INITIAL_SEQUENCE_NUMBER)
.currentSchemaId(schemaAllTypes.schemaId())
.defaultSortOrderId(INITIAL_SORT_ORDER_ID)
.defaultSpecId(INITIAL_SPEC_ID)
.putProperty("prop", "value")
.addSchemas(schemaAllTypes)
.addSnapshots(
IcebergSnapshot.builder()
.snapshotId(11)
.schemaId(schemaAllTypes.schemaId())
.putSummary("operation", "testing1")
.sequenceNumber(123L)
.timestampMs(12345676L)
.build())
.addSnapshots(
IcebergSnapshot.builder()
.snapshotId(12)
.schemaId(schemaAllTypes.schemaId())
.putSummary("operation", "testing2")
.sequenceNumber(124L)
.timestampMs(12345677L)
.build())
.addSnapshots(
IcebergSnapshot.builder()
.snapshotId(13)
.schemaId(schemaAllTypes.schemaId())
.putSummary("operation", "testing3")
.sequenceNumber(125L)
.timestampMs(12345678L)
.build())
.putRef("main", IcebergSnapshotRef.builder().type("branch").snapshotId(13).build())
.addSnapshotLog(
IcebergSnapshotLogEntry.builder().snapshotId(11).timestampMs(12345676L).build())
.addSnapshotLog(
IcebergSnapshotLogEntry.builder().snapshotId(12).timestampMs(12345677L).build())
.addSnapshotLog(
IcebergSnapshotLogEntry.builder().snapshotId(13).timestampMs(12345678L).build());
}

public static IcebergViewMetadata.Builder viewMetadataSimple() {
IcebergSchema schemaAllTypes = icebergSchemaAllTypes();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public enum CatalogOps {
META_SET_SNAPSHOT_REF,
META_REMOVE_SNAPSHOT_REF,
META_UPGRADE_FORMAT_VERSION,
META_REMOVE_SNAPSHOTS,

// Catalog operations
CATALOG_CREATE_ENTITY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -654,11 +654,10 @@ public static NessieTableSnapshot icebergTableSnapshotToNessie(
currentSnapshot.manifests(); // TODO
});

for (IcebergSnapshotLogEntry logEntry : iceberg.snapshotLog()) {
// TODO ??
logEntry.snapshotId();
logEntry.timestampMs();
}
iceberg.snapshotLog().stream()
.map(IcebergSnapshotLogEntry::snapshotId)
.filter(snapId -> !snapId.equals(iceberg.currentSnapshotId()))
.forEach(snapshot::addPreviousIcebergSnapshotId);

for (IcebergStatisticsFile statisticsFile : iceberg.statistics()) {
if (statisticsFile.snapshotId() == iceberg.currentSnapshotId()) {
Expand Down Expand Up @@ -943,6 +942,7 @@ public static IcebergViewMetadata nessieViewSnapshotToIceberg(

public static IcebergTableMetadata nessieTableSnapshotToIceberg(
NessieTableSnapshot nessie,
List<NessieEntitySnapshot<?>> history,
Optional<IcebergSpec> requestedSpecVersion,
Consumer<Map<String, String>> tablePropertiesTweak) {
NessieTable entity = nessie.entity();
Expand Down Expand Up @@ -1045,6 +1045,19 @@ public static IcebergTableMetadata nessieTableSnapshotToIceberg(
metadata.putRef(
"main", IcebergSnapshotRef.builder().snapshotId(snapshotId).type("branch").build());

for (NessieEntitySnapshot<?> previous : history) {
var previousTmd =
nessieTableSnapshotToIceberg(
(NessieTableSnapshot) previous, List.of(), requestedSpecVersion, m -> {});
var previousSnap = previousTmd.currentSnapshot().orElseThrow();
metadata.addSnapshot(previousSnap);
metadata.addSnapshotLog(
IcebergSnapshotLogEntry.snapshotLogEntry(
previousSnap.timestampMs(), previousSnap.snapshotId()));
// TODO we don't include the metadata location yet - we could potentially do that later
// metadata.addMetadataLog(IcebergHistoryEntry.historyEntry(previousSnap.timestampMs(), ));
}

metadata.addSnapshotLog(
IcebergSnapshotLogEntry.builder()
.snapshotId(snapshotId)
Expand Down Expand Up @@ -1080,9 +1093,6 @@ public static IcebergTableMetadata nessieTableSnapshotToIceberg(
partitionStatisticsFile.fileSizeInBytes()));
}

// metadata.addMetadataLog();
// metadata.addSnapshotLog();

return metadata.build();
}

Expand Down Expand Up @@ -1577,13 +1587,18 @@ public static void addSnapshot(AddSnapshot u, IcebergTableMetadataUpdateState st
IcebergSnapshot icebergSnapshot = u.snapshot();
Integer schemaId = icebergSnapshot.schemaId();
NessieTableSnapshot snapshot = state.snapshot();
NessieTableSnapshot.Builder snapshotBuilder = state.builder();
if (schemaId != null) {
Optional<NessieSchema> schema = snapshot.schemaByIcebergId(schemaId);
schema.ifPresent(s -> state.builder().currentSchemaId(s.id()));
schema.ifPresent(s -> snapshotBuilder.currentSchemaId(s.id()));
}

state
.builder()
var currentIcebergSnapshotId = snapshot.icebergSnapshotId();
if (currentIcebergSnapshotId != null && currentIcebergSnapshotId != -1L) {
snapshotBuilder.addPreviousIcebergSnapshotId(currentIcebergSnapshotId);
}

snapshotBuilder
.icebergSnapshotId(icebergSnapshot.snapshotId())
.icebergSnapshotSequenceNumber(icebergSnapshot.sequenceNumber())
.icebergLastSequenceNumber(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.immutables.value.Value;
import org.projectnessie.catalog.formats.iceberg.meta.IcebergPartitionSpec;
Expand Down Expand Up @@ -156,8 +157,14 @@ interface RemoveSnapshots extends IcebergMetadataUpdate {

@Override
default void applyToTable(IcebergTableMetadataUpdateState state) {
throw new UnsupportedOperationException(
"Nessie Catalog does not allow external snapshot management");
state.addCatalogOp(CatalogOps.META_REMOVE_SNAPSHOTS);
var ids = new HashSet<>(snapshotIds());
state
.builder()
.previousIcebergSnapshotIds(
state.snapshot().previousIcebergSnapshotIds().stream()
.filter(id -> !ids.contains(id))
.collect(Collectors.toList()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.projectnessie.catalog.formats.iceberg.fixtures.IcebergFixtures.tableMetadataBare;
import static org.projectnessie.catalog.formats.iceberg.fixtures.IcebergFixtures.tableMetadataBareWithSchema;
import static org.projectnessie.catalog.formats.iceberg.fixtures.IcebergFixtures.tableMetadataSimple;
import static org.projectnessie.catalog.formats.iceberg.fixtures.IcebergFixtures.tableMetadataThreeSnapshots;
import static org.projectnessie.catalog.formats.iceberg.fixtures.IcebergFixtures.tableMetadataWithStatistics;
import static org.projectnessie.catalog.formats.iceberg.meta.IcebergNestedField.nestedField;
import static org.projectnessie.catalog.formats.iceberg.meta.IcebergPartitionField.partitionField;
Expand Down Expand Up @@ -67,6 +68,7 @@
import org.projectnessie.catalog.formats.iceberg.meta.IcebergPartitionSpec;
import org.projectnessie.catalog.formats.iceberg.meta.IcebergSchema;
import org.projectnessie.catalog.formats.iceberg.meta.IcebergSnapshot;
import org.projectnessie.catalog.formats.iceberg.meta.IcebergSnapshotLogEntry;
import org.projectnessie.catalog.formats.iceberg.meta.IcebergSortField;
import org.projectnessie.catalog.formats.iceberg.meta.IcebergSortOrder;
import org.projectnessie.catalog.formats.iceberg.meta.IcebergTableMetadata;
Expand Down Expand Up @@ -243,24 +245,50 @@ public void icebergTableMetadata(IcebergTableMetadata icebergTableMetadata) thro
NessieTableSnapshot nessie =
NessieModelIceberg.icebergTableSnapshotToNessie(
snapshotId, null, table, icebergTableMetadata, IcebergSnapshot::manifestList);

soft.assertThat(nessie.previousIcebergSnapshotIds())
.hasSize(Math.max(icebergTableMetadata.snapshotLog().size() - 1, 0))
.containsExactlyElementsOf(
icebergTableMetadata.snapshotLog().stream()
.map(IcebergSnapshotLogEntry::snapshotId)
.filter(id -> id != icebergTableMetadata.currentSnapshotId())
.collect(Collectors.toList()));

soft.assertThat(icebergJsonSerializeDeserialize(nessie, NessieTableSnapshot.class))
.isEqualTo(nessie);

IcebergTableMetadata iceberg =
NessieModelIceberg.nessieTableSnapshotToIceberg(nessie, Optional.empty(), properties -> {});
NessieModelIceberg.nessieTableSnapshotToIceberg(
nessie, List.of(), Optional.empty(), properties -> {});
IcebergTableMetadata icebergWithCatalogProps =
IcebergTableMetadata.builder()
.from(icebergTableMetadata)
.putAllProperties(
iceberg.properties().entrySet().stream()
.filter(e -> e.getKey().startsWith("nessie."))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))
.snapshots(
iceberg.snapshots().stream()
.filter(s -> s.snapshotId() == iceberg.currentSnapshotId())
.collect(Collectors.toList()))
.snapshotLog(
iceberg.snapshotLog().stream()
.filter(s -> s.snapshotId() == iceberg.currentSnapshotId())
.collect(Collectors.toList()))
.schema(
icebergTableMetadata.formatVersion() > 1
? null
: iceberg.schemas().isEmpty() ? null : iceberg.schemas().get(0))
.build();
soft.assertThat(iceberg).isEqualTo(icebergWithCatalogProps);
IcebergTableMetadata icebergCurrentSnapshotOnly =
IcebergTableMetadata.builder()
.from(iceberg)
.snapshots(
iceberg.snapshots().stream()
.filter(s -> s.snapshotId() == iceberg.currentSnapshotId())
.collect(Collectors.toList()))
.build();
soft.assertThat(icebergCurrentSnapshotOnly).isEqualTo(icebergWithCatalogProps);

NessieTableSnapshot nessieAgain =
NessieModelIceberg.icebergTableSnapshotToNessie(
Expand All @@ -278,7 +306,9 @@ static Stream<IcebergTableMetadata> icebergTableMetadata() {
// snapshot
tableMetadataSimple(),
// statistics
tableMetadataWithStatistics())
tableMetadataWithStatistics(),
// 3 snapshots
tableMetadataThreeSnapshots())
.flatMap(
builder ->
Stream.of(
Expand Down Expand Up @@ -513,7 +543,8 @@ public void icebergNested(IcebergSchema schema, IcebergSchema expected, int expe
.isEqualTo(expectedLastColumnId);

IcebergTableMetadata icebergMetadata =
NessieModelIceberg.nessieTableSnapshotToIceberg(snapshot, Optional.empty(), m -> {});
NessieModelIceberg.nessieTableSnapshotToIceberg(
snapshot, List.of(), Optional.empty(), m -> {});
soft.assertThat(icebergMetadata)
.extracting(IcebergTableMetadata::lastColumnId)
.isEqualTo(expectedLastColumnId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,13 @@ default Optional<NessieSortDefinition> sortDefinitionByIcebergId(int orderId) {
@jakarta.annotation.Nullable
Long icebergSnapshotId();

/**
* List of <em>previous</em> snapshot IDs, in the same order as Iceberg's {@code
* TableMetadata.snapshotLog}, which is oldest first, but without the current snapshot ID.
*/
@JsonInclude(JsonInclude.Include.NON_EMPTY)
List<Long> previousIcebergSnapshotIds();

@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
@jakarta.annotation.Nullable
Expand Down Expand Up @@ -269,6 +276,18 @@ interface Builder extends NessieEntitySnapshot.Builder<Builder> {
@CanIgnoreReturnValue
Builder icebergSnapshotId(@Nullable Long icebergSnapshotId);

@CanIgnoreReturnValue
Builder addPreviousIcebergSnapshotId(long element);

@CanIgnoreReturnValue
Builder addPreviousIcebergSnapshotIds(long... elements);

@CanIgnoreReturnValue
Builder previousIcebergSnapshotIds(Iterable<Long> elements);

@CanIgnoreReturnValue
Builder addAllPreviousIcebergSnapshotIds(Iterable<Long> elements);

@CanIgnoreReturnValue
Builder icebergLastSequenceNumber(@Nullable Long icebergLastSequenceNumber);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,18 @@ public enum SnapshotFormat {
* The Nessie Catalog main native format includes the entity snapshot information with schemas,
* partition definitions and sort definitions.
*/
NESSIE_SNAPSHOT,
NESSIE_SNAPSHOT(false),
/** Iceberg table metadata. */
ICEBERG_TABLE_METADATA,
ICEBERG_TABLE_METADATA(true),
;

private final boolean includeOldSnapshots;

SnapshotFormat(boolean includeOldSnapshots) {
this.includeOldSnapshots = includeOldSnapshots;
}

public boolean includeOldSnapshots() {
return includeOldSnapshots;
}
}
Loading

0 comments on commit 614f88c

Please sign in to comment.