Skip to content

Commit

Permalink
Clean up blobsSidecar pruner workflow (#9029)
Browse files Browse the repository at this point in the history
* remove unnecessary updater being passed around

Signed-off-by: Gabriel Fukushima <[email protected]>

---------

Signed-off-by: Gabriel Fukushima <[email protected]>
  • Loading branch information
gfukushima authored Jan 29, 2025
1 parent 4d0b4fe commit 8533316
Showing 1 changed file with 42 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -928,9 +928,8 @@ public boolean pruneOldestBlobSidecars(
final int pruneLimit,
final DataArchiveWriter<List<BlobSidecar>> archiveWriter) {
try (final Stream<SlotAndBlockRootAndBlobIndex> prunableBlobKeys =
streamBlobSidecarKeys(UInt64.ZERO, lastSlotToPrune);
final FinalizedUpdater updater = finalizedUpdater()) {
return pruneBlobSidecars(pruneLimit, prunableBlobKeys, updater, archiveWriter, false);
streamBlobSidecarKeys(UInt64.ZERO, lastSlotToPrune)) {
return pruneBlobSidecars(pruneLimit, prunableBlobKeys, archiveWriter, false);
}
}

Expand All @@ -940,17 +939,14 @@ public boolean pruneOldestNonCanonicalBlobSidecars(
final int pruneLimit,
final DataArchiveWriter<List<BlobSidecar>> archiveWriter) {
try (final Stream<SlotAndBlockRootAndBlobIndex> prunableNoncanonicalBlobKeys =
streamNonCanonicalBlobSidecarKeys(UInt64.ZERO, lastSlotToPrune);
final FinalizedUpdater updater = finalizedUpdater()) {
return pruneBlobSidecars(
pruneLimit, prunableNoncanonicalBlobKeys, updater, archiveWriter, true);
streamNonCanonicalBlobSidecarKeys(UInt64.ZERO, lastSlotToPrune)) {
return pruneBlobSidecars(pruneLimit, prunableNoncanonicalBlobKeys, archiveWriter, true);
}
}

private boolean pruneBlobSidecars(
final int pruneLimit,
final Stream<SlotAndBlockRootAndBlobIndex> prunableBlobKeys,
final FinalizedUpdater updater,
final DataArchiveWriter<List<BlobSidecar>> archiveWriter,
final boolean nonCanonicalBlobSidecars) {

Expand All @@ -964,49 +960,51 @@ private boolean pruneBlobSidecars(

// pruneLimit is the number of slots to prune, not the number of BlobSidecars
final List<UInt64> slots = prunableMap.keySet().stream().sorted().limit(pruneLimit).toList();
for (final UInt64 slot : slots) {
final List<SlotAndBlockRootAndBlobIndex> keys = prunableMap.get(slot);
try (final FinalizedUpdater updater = finalizedUpdater()) {
for (final UInt64 slot : slots) {
final List<SlotAndBlockRootAndBlobIndex> keys = prunableMap.get(slot);

// Retrieve the BlobSidecars for archiving.
final List<BlobSidecar> blobSidecars =
keys.stream()
.map(
nonCanonicalBlobSidecars
? this::getNonCanonicalBlobSidecar
: this::getBlobSidecar)
.filter(Optional::isPresent)
.map(Optional::get)
.toList();

// Just warn if we failed to find all the BlobSidecars.
if (keys.size() != blobSidecars.size()) {
LOG.warn("Failed to retrieve BlobSidecars for keys: {}", keys);
}

// Retrieve the BlobSidecars for archiving.
final List<BlobSidecar> blobSidecars =
keys.stream()
.map(
nonCanonicalBlobSidecars
? this::getNonCanonicalBlobSidecar
: this::getBlobSidecar)
.filter(Optional::isPresent)
.map(Optional::get)
.toList();
// Attempt to archive the BlobSidecars.
final boolean blobSidecarArchived = archiveWriter.archive(blobSidecars);
if (!blobSidecarArchived) {
LOG.error("Failed to archive and prune BlobSidecars. Stopping pruning");
break;
}

// Just warn if we failed to find all the BlobSidecars.
if (keys.size() != blobSidecars.size()) {
LOG.warn("Failed to retrieve BlobSidecars for keys: {}", keys);
}
// Remove the BlobSidecars from the database.
for (final SlotAndBlockRootAndBlobIndex key : keys) {
if (nonCanonicalBlobSidecars) {
updater.removeNonCanonicalBlobSidecar(key);
} else {
updater.removeBlobSidecar(key);
earliestBlobSidecarSlot = Optional.of(slot.plus(1));
}
}

// Attempt to archive the BlobSidecars.
final boolean blobSidecarArchived = archiveWriter.archive(blobSidecars);
if (!blobSidecarArchived) {
LOG.error("Failed to archive and prune BlobSidecars. Stopping pruning");
break;
++pruned;
}

// Remove the BlobSidecars from the database.
for (final SlotAndBlockRootAndBlobIndex key : keys) {
if (nonCanonicalBlobSidecars) {
updater.removeNonCanonicalBlobSidecar(key);
} else {
updater.removeBlobSidecar(key);
earliestBlobSidecarSlot = Optional.of(slot.plus(1));
}
if (!nonCanonicalBlobSidecars) {
earliestBlobSidecarSlot.ifPresent(updater::setEarliestBlobSidecarSlot);
}

++pruned;
}

if (!nonCanonicalBlobSidecars) {
earliestBlobSidecarSlot.ifPresent(updater::setEarliestBlobSidecarSlot);
updater.commit();
}
updater.commit();

// `pruned` will be greater when we reach pruneLimit not on the latest BlobSidecar in a slot
return pruned >= pruneLimit;
Expand Down

0 comments on commit 8533316

Please sign in to comment.