Skip to content

Commit

Permalink
memoize "implicit" snapshots
Browse files Browse the repository at this point in the history
  • Loading branch information
snazy committed Dec 4, 2024
1 parent c3d26a1 commit 134a13c
Show file tree
Hide file tree
Showing 18 changed files with 325 additions and 333 deletions.
5 changes: 0 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,6 @@ as necessary. Empty sections will not end in the release notes.

### Changes

- Nessie now reports a "bad request" for Iceberg REST register-table for table metadata with more than
one snapshot. This is a safeguard to prevent running into snapshot validation errors when using Iceberg.
While older Nessie versions accepted registrations of table metadata with more than one snapshot, it
was not particularly safe.

### Deprecations

### Fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public void close() throws IOException {
.defaultSpecId(schemaGenerator.getIcebergPartitionSpec().specId())
.defaultSortOrderId(IcebergSortOrder.UNSORTED_ORDER.orderId())
.lastSequenceNumber(1L)
.snapshots(singletonList(snapshotWithManifestList))
.addSnapshot(snapshotWithManifestList)
.build();
metadataConsumer.accept(icebergMetadataWithManifestList);
return writer.write(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static com.google.common.base.Preconditions.checkState;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonView;
Expand All @@ -30,6 +31,8 @@
import javax.annotation.Nullable;
import org.immutables.value.Value;
import org.projectnessie.catalog.formats.iceberg.IcebergSpec;
import org.projectnessie.catalog.model.snapshot.ImmutableImplicitIcebergSnapshot;
import org.projectnessie.catalog.model.snapshot.ImplicitIcebergSnapshot;
import org.projectnessie.nessie.immutables.NessieImmutable;

@NessieImmutable
Expand Down Expand Up @@ -62,6 +65,31 @@ static IcebergSnapshot snapshot(
schemaId);
}

static IcebergSnapshot fromImplicitIcebergSnapshot(
ImplicitIcebergSnapshot implicit, Long parentSnapshotId) {
return ImmutableIcebergSnapshot.of(
implicit.sequenceNumber(),
implicit.snapshotId(),
parentSnapshotId,
implicit.timestampMs(),
implicit.summary(),
implicit.manifests(),
implicit.manifestList(),
implicit.schemaId());
}

@JsonIgnore
default ImplicitIcebergSnapshot asImplicitIcebergSnapshot() {
return ImmutableImplicitIcebergSnapshot.of(
snapshotId(),
sequenceNumber(),
timestampMs(),
summary(),
manifests(),
manifestList(),
schemaId());
}

@Nullable
@jakarta.annotation.Nullable
@JsonInclude(JsonInclude.Include.NON_NULL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import static java.util.Collections.emptyMap;

import java.time.Instant;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -56,7 +55,7 @@ public class IcebergTableMetadataUpdateState {
private int lastAddedSchemaId = -1;
private int lastAddedSpecId = -1;
private int lastAddedOrderId = -1;
private final List<IcebergSnapshot> addedSnapshots = new ArrayList<>();
private IcebergSnapshot previouslyAddedSnapshot = null;
private final Set<Integer> addedSchemaIds = new HashSet<>();
private final Set<Integer> addedSpecIds = new HashSet<>();
private final Set<Integer> addedOrderIds = new HashSet<>();
Expand Down Expand Up @@ -86,12 +85,12 @@ public NessieTableSnapshot snapshot() {
return snapshot;
}

public List<IcebergSnapshot> addedSnapshots() {
return addedSnapshots;
public IcebergSnapshot previouslyAddedSnapshot() {
return previouslyAddedSnapshot;
}

public void snapshotAdded(IcebergSnapshot snapshot) {
addedSnapshots.add(snapshot);
previouslyAddedSnapshot = snapshot;
}

public int lastAddedSchemaId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@
import org.projectnessie.catalog.model.schema.types.NessieTimestampTypeSpec;
import org.projectnessie.catalog.model.schema.types.NessieType;
import org.projectnessie.catalog.model.schema.types.NessieTypeSpec;
import org.projectnessie.catalog.model.snapshot.ImmutableImplicitIcebergSnapshot;
import org.projectnessie.catalog.model.snapshot.ImplicitIcebergSnapshot;
import org.projectnessie.catalog.model.snapshot.NessieEntitySnapshot;
import org.projectnessie.catalog.model.snapshot.NessieTableSnapshot;
import org.projectnessie.catalog.model.snapshot.NessieViewRepresentation;
Expand Down Expand Up @@ -512,7 +514,7 @@ public static NessieTableSnapshot icebergTableSnapshotToNessie(
Function<IcebergSnapshot, String> manifestListLocation) {
NessieTableSnapshot.Builder snapshot = NessieTableSnapshot.builder().id(snapshotId);
if (previous != null) {
snapshot.from(previous);
snapshot.from(previous).implicitIcebergSnapshots(Map.of());

String previousLocation = previous.icebergLocation();
if (previousLocation != null && !previousLocation.equals(iceberg.location())) {
Expand Down Expand Up @@ -623,10 +625,6 @@ public static NessieTableSnapshot icebergTableSnapshotToNessie(
.ifPresent(
currentSnapshot -> {
snapshot.icebergSnapshotId(currentSnapshot.snapshotId());
currentSnapshot
.parentSnapshotId(); // TODO Can we leave this unset, as we do not return previous
// Iceberg
// snapshots??
Integer schemaId = currentSnapshot.schemaId();
if (schemaId != null) {
// TODO this overwrites the "current schema ID" with the schema ID of the current
Expand Down Expand Up @@ -654,10 +652,23 @@ public static NessieTableSnapshot icebergTableSnapshotToNessie(
currentSnapshot.manifests(); // TODO
});

iceberg.snapshotLog().stream()
.map(IcebergSnapshotLogEntry::snapshotId)
.filter(snapId -> !snapId.equals(iceberg.currentSnapshotId()))
.forEach(snapshot::addPreviousIcebergSnapshotId);
for (var icebergSnapshotLogEntry : iceberg.snapshotLog()) {
var snapId = icebergSnapshotLogEntry.snapshotId();
if (snapId == iceberg.currentSnapshotId()) {
continue;
}
if (previous == null) {
var icebergSnap = iceberg.snapshotById(snapId);
if (icebergSnap.isEmpty()) {
throw new IllegalArgumentException(
"Invalid Iceberg table-metadata: snapshot-log references snapshot with ID "
+ snapId
+ ", which is not present as a snapshot object");
}
snapshot.putImplicitIcebergSnapshot(snapId, icebergSnap.get().asImplicitIcebergSnapshot());
}
snapshot.addPreviousIcebergSnapshotId(snapId);
}

for (IcebergStatisticsFile statisticsFile : iceberg.statistics()) {
if (statisticsFile.snapshotId() == iceberg.currentSnapshotId()) {
Expand Down Expand Up @@ -942,7 +953,7 @@ public static IcebergViewMetadata nessieViewSnapshotToIceberg(

public static IcebergTableMetadata nessieTableSnapshotToIceberg(
NessieTableSnapshot nessie,
List<NessieEntitySnapshot<?>> history,
List<ImplicitIcebergSnapshot> history,
Optional<IcebergSpec> requestedSpecVersion,
Consumer<Map<String, String>> tablePropertiesTweak) {
NessieTable entity = nessie.entity();
Expand Down Expand Up @@ -1031,6 +1042,19 @@ public static IcebergTableMetadata nessieTableSnapshotToIceberg(
? nessie.icebergManifestFileLocations()
: emptyList();

var parentSnapshotId = (Long) null;

for (ImplicitIcebergSnapshot previous : history) {
var snap = IcebergSnapshot.fromImplicitIcebergSnapshot(previous, parentSnapshotId);
metadata.addSnapshot(snap);
parentSnapshotId = snap.snapshotId();
metadata.addSnapshotLog(
IcebergSnapshotLogEntry.snapshotLogEntry(
previous.timestampMs(), previous.snapshotId()));
// TODO we don't include the metadata location yet - we could potentially do that later
// metadata.addMetadataLog(IcebergHistoryEntry.historyEntry(previousSnap.timestampMs(), ));
}

IcebergSnapshot.Builder snapshot =
IcebergSnapshot.builder()
.snapshotId(snapshotId)
Expand All @@ -1039,25 +1063,13 @@ public static IcebergTableMetadata nessieTableSnapshotToIceberg(
.manifestList(manifestListLocation)
.summary(nessie.icebergSnapshotSummary())
.timestampMs(timestampMs)
.sequenceNumber(nessie.icebergSnapshotSequenceNumber());
.sequenceNumber(nessie.icebergSnapshotSequenceNumber())
.parentSnapshotId(parentSnapshotId);
metadata.addSnapshots(snapshot.build());

metadata.putRef(
"main", IcebergSnapshotRef.builder().snapshotId(snapshotId).type("branch").build());

for (NessieEntitySnapshot<?> previous : history) {
var previousTmd =
nessieTableSnapshotToIceberg(
(NessieTableSnapshot) previous, List.of(), requestedSpecVersion, m -> {});
var previousSnap = previousTmd.currentSnapshot().orElseThrow();
metadata.addSnapshot(previousSnap);
metadata.addSnapshotLog(
IcebergSnapshotLogEntry.snapshotLogEntry(
previousSnap.timestampMs(), previousSnap.snapshotId()));
// TODO we don't include the metadata location yet - we could potentially do that later
// metadata.addMetadataLog(IcebergHistoryEntry.historyEntry(previousSnap.timestampMs(), ));
}

metadata.addSnapshotLog(
IcebergSnapshotLogEntry.builder()
.snapshotId(snapshotId)
Expand Down Expand Up @@ -1614,6 +1626,23 @@ public static void addSnapshot(AddSnapshot u, IcebergTableMetadataUpdateState st
.icebergManifestListLocation(icebergSnapshot.manifestList())
.icebergManifestFileLocations(icebergSnapshot.manifests());

// If another Iceberg snapshot was add in this same update-transaction, memoize the previous
// snapshot so we can return the full snapshot history.
var previouslyAddedSnapshot = state.previouslyAddedSnapshot();
if (previouslyAddedSnapshot != null) {
snapshotBuilder.putImplicitIcebergSnapshot(
previouslyAddedSnapshot.snapshotId(),
ImmutableImplicitIcebergSnapshot.builder()
.snapshotId(previouslyAddedSnapshot.snapshotId())
.schemaId(previouslyAddedSnapshot.schemaId())
.manifests(previouslyAddedSnapshot.manifests())
.manifestList(previouslyAddedSnapshot.manifestList())
.sequenceNumber(previouslyAddedSnapshot.sequenceNumber())
.summary(previouslyAddedSnapshot.summary())
.timestampMs(previouslyAddedSnapshot.timestampMs())
.build());
}

state.snapshotAdded(icebergSnapshot);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (C) 2024 Dremio
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.projectnessie.catalog.model.snapshot;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.projectnessie.catalog.model.schema.NessieSchema;
import org.projectnessie.nessie.immutables.NessieImmutable;

@NessieImmutable
@JsonSerialize(as = ImmutableImplicitIcebergSnapshot.class)
@JsonDeserialize(as = ImmutableImplicitIcebergSnapshot.class)
public interface ImplicitIcebergSnapshot {
static ImplicitIcebergSnapshot implicitIcebergSnapshot(NessieTableSnapshot tableSnapshot) {
return ImmutableImplicitIcebergSnapshot.of(
tableSnapshot.icebergSnapshotId(),
tableSnapshot.icebergSnapshotSequenceNumber(),
tableSnapshot.snapshotCreatedTimestamp().toEpochMilli(),
tableSnapshot.icebergSnapshotSummary(),
tableSnapshot.icebergManifestFileLocations(),
tableSnapshot.icebergManifestListLocation(),
tableSnapshot
.currentSchemaObject()
.map(NessieSchema::icebergId)
.orElse(NessieSchema.NO_SCHEMA_ID));
}

long snapshotId();

@Nullable
@jakarta.annotation.Nullable
@JsonInclude(JsonInclude.Include.NON_NULL)
Long sequenceNumber();

long timestampMs();

Map<String, String> summary();

@JsonInclude(JsonInclude.Include.NON_EMPTY)
List<String> manifests();

@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
@jakarta.annotation.Nullable
String manifestList();

@Nullable
@jakarta.annotation.Nullable
@JsonInclude(JsonInclude.Include.NON_NULL)
Integer schemaId();
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ default Optional<NessieSortDefinition> sortDefinitionByIcebergId(int orderId) {
@JsonInclude(JsonInclude.Include.NON_EMPTY)
List<Long> previousIcebergSnapshotIds();

/**
* Contains Iceberg snapshots for which no explicit Nessie commit exists. We need to memoize those
* snapshots here in case a single Iceberg transaction add multiple snapshots.
*/
Map<Long, ImplicitIcebergSnapshot> implicitIcebergSnapshots();

@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
@jakarta.annotation.Nullable
Expand Down Expand Up @@ -288,6 +294,18 @@ interface Builder extends NessieEntitySnapshot.Builder<Builder> {
@CanIgnoreReturnValue
Builder addAllPreviousIcebergSnapshotIds(Iterable<Long> elements);

@CanIgnoreReturnValue
Builder putImplicitIcebergSnapshot(long key, ImplicitIcebergSnapshot value);

@CanIgnoreReturnValue
Builder putImplicitIcebergSnapshot(Map.Entry<Long, ? extends ImplicitIcebergSnapshot> entry);

@CanIgnoreReturnValue
Builder implicitIcebergSnapshots(Map<Long, ? extends ImplicitIcebergSnapshot> entries);

@CanIgnoreReturnValue
Builder putAllImplicitIcebergSnapshots(Map<Long, ? extends ImplicitIcebergSnapshot> entries);

@CanIgnoreReturnValue
Builder icebergLastSequenceNumber(@Nullable Long icebergLastSequenceNumber);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package org.projectnessie.catalog.service.api;

import jakarta.annotation.Nullable;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -26,7 +25,6 @@
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.projectnessie.api.v2.params.ParsedReference;
import org.projectnessie.catalog.model.snapshot.NessieEntitySnapshot;
import org.projectnessie.catalog.service.config.WarehouseConfig;
import org.projectnessie.error.BaseNessieClientServerException;
import org.projectnessie.error.NessieNotFoundException;
Expand Down Expand Up @@ -78,10 +76,7 @@ CompletionStage<Stream<SnapshotResponse>> commit(
ApiContext apiContext)
throws BaseNessieClientServerException;

interface CatalogUriResolver {
URI icebergSnapshot(
Reference effectiveReference, ContentKey key, NessieEntitySnapshot<?> snapshot);
}
boolean checkIcebergSnapshotPresent(String metadataLocation, long versionId);

Optional<String> validateStorageLocation(String location);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ default ObjType type() {
TaskObj.taskDefaultCacheExpire(),
c -> ObjType.NOT_CACHED);

static ObjId snapshotObjIdForContent(String metadataLocation, long versionId) {
return snapshotIdForContent(metadataLocation, versionId);
}

static ObjId snapshotObjIdForContent(Content content) {
ObjId id = snapshotIdForContent(content);
if (id != null) {
Expand Down
Loading

0 comments on commit 134a13c

Please sign in to comment.