Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
[LW] Rely on locally written values instead of the cache (#5871)
Browse files Browse the repository at this point in the history
The lock watch cache no longer attempts to cache writes - instead, these will read from the maps tracked within the transaction (with similar performance but more consistent behaviour around concurrent writes).
  • Loading branch information
Jolyon-S authored Jan 25, 2022
1 parent af51963 commit 7906838
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
interface TransactionCacheValueStore {
boolean isWatched(TableReference tableReference);

void cacheRemoteWrite(TableReference tableReference, Cell cell, CacheValue value);
void recordRemoteWrite(TableReference tableReference, Cell cell);

void cacheRemoteReads(TableReference tableReference, Map<Cell, byte[]> remoteReadValues);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.palantir.atlasdb.keyvalue.api.cache.TransactionCacheValueStoreImpl.LocalCacheEntry.Status;
import com.palantir.common.streams.KeyedStream;
import com.palantir.lock.watch.CommitUpdate;
import com.palantir.logsafe.UnsafeArg;
import com.palantir.logsafe.exceptions.SafeIllegalStateException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
Expand All @@ -49,9 +51,9 @@ public boolean isWatched(TableReference table) {
}

@Override
public void cacheRemoteWrite(TableReference table, Cell cell, CacheValue value) {
public void recordRemoteWrite(TableReference table, Cell cell) {
CellReference cellReference = CellReference.of(table, cell);
cacheRemoteWriteInternal(cellReference, value);
recordRemoteWriteInternal(cellReference);
}

@Override
Expand All @@ -69,8 +71,7 @@ public void cacheEmptyReads(TableReference table, Set<Cell> emptyCells) {
if (snapshot.isWatched(table)) {
emptyCells.stream()
.map(cell -> CellReference.of(table, cell))
.filter(snapshot::isUnlocked)
.forEach(cell -> localUpdates.put(cell, LocalCacheEntry.read(CacheValue.empty())));
.forEach(cell -> cacheRemoteReadInternal(cell, CacheValue.empty()));
}
}

Expand All @@ -82,10 +83,10 @@ public TransactionCacheValueStore createWithFilteredSnapshot(CommitUpdate commit
localUpdates.forEach((cell, cacheEntry) -> {
switch (cacheEntry.status()) {
case READ:
newStore.cacheRemoteReadInternal(cell, cacheEntry.value());
newStore.cacheRemoteReadInternal(cell, getReadValue(cacheEntry.value()));
break;
case WRITE:
newStore.cacheRemoteWriteInternal(cell, cacheEntry.value());
newStore.recordRemoteWriteInternal(cell);
break;
case HIT:
default:
Expand All @@ -98,18 +99,20 @@ public TransactionCacheValueStore createWithFilteredSnapshot(CommitUpdate commit

@Override
public Map<Cell, CacheValue> getCachedValues(TableReference table, Set<Cell> cells) {
Map<Cell, CacheValue> locallyCachedValues = getLocallyCachedValues(table, cells);
Set<Cell> locallyWrittenCells = getLocallyWrittenCells(table, cells);
Set<Cell> cacheableCells = Sets.difference(cells, locallyWrittenCells);

Map<Cell, CacheValue> locallyCachedReads = getLocallyCachedReads(table, cacheableCells);

// Filter out which values have not been read yet
Set<Cell> remainingCells = Sets.difference(cells, locallyCachedValues.keySet());
Set<Cell> remainingCells = Sets.difference(cacheableCells, locallyCachedReads.keySet());

// Read values from the snapshot. For the hits, mark as hit in the local map.
Map<Cell, CacheValue> snapshotCachedValues = getSnapshotValues(table, remainingCells);
snapshotCachedValues.forEach(
(cell, value) -> localUpdates.put(CellReference.of(table, cell), LocalCacheEntry.hit(value)));
snapshotCachedValues.forEach((cell, value) -> cacheHitInternal(table, cell, value));

return ImmutableMap.<Cell, CacheValue>builder()
.putAll(locallyCachedValues)
.putAll(locallyCachedReads)
.putAll(snapshotCachedValues)
.build();
}
Expand All @@ -119,6 +122,7 @@ public Map<CellReference, CacheValue> getValueDigest() {
return KeyedStream.stream(localUpdates)
.filter(entry -> entry.status().equals(Status.READ))
.map(LocalCacheEntry::value)
.map(TransactionCacheValueStoreImpl::getReadValue)
.collectToMap();
}

Expand All @@ -130,23 +134,49 @@ public Set<CellReference> getHitDigest() {
.collect(Collectors.toSet());
}

private Map<Cell, CacheValue> getLocallyCachedValues(TableReference table, Set<Cell> cells) {
private void cacheHitInternal(TableReference table, Cell cell, CacheValue value) {
localUpdates.compute(CellReference.of(table, cell), (_unused, previousValue) -> {
if (previousValue != null) {
throw new SafeIllegalStateException(
"Should not be attempting to record hits for keys that have been written to or read from the"
+ " KVS",
UnsafeArg.of("table", table),
UnsafeArg.of("cell", cell),
UnsafeArg.of("previousValue", previousValue));
} else {
return LocalCacheEntry.hit(value);
}
});
}

private Map<Cell, CacheValue> getLocallyCachedReads(TableReference table, Set<Cell> cells) {
return KeyedStream.of(cells)
.map(cell -> localUpdates.get(CellReference.of(table, cell)))
.filter(Objects::nonNull)
.filter(value -> !value.status().equals(Status.WRITE))
.map(LocalCacheEntry::value)
.map(TransactionCacheValueStoreImpl::getReadValue)
.collectToMap();
}

private void cacheRemoteWriteInternal(CellReference cellReference, CacheValue value) {
if (snapshot.isWatched(cellReference.tableRef()) && snapshot.isUnlocked(cellReference)) {
localUpdates.put(cellReference, LocalCacheEntry.write(value));
private Set<Cell> getLocallyWrittenCells(TableReference table, Set<Cell> cells) {
return KeyedStream.of(cells)
.map(cell -> localUpdates.get(CellReference.of(table, cell)))
.filter(Objects::nonNull)
.filter(value -> value.status().equals(Status.WRITE))
.keys()
.collect(Collectors.toSet());
}

private void recordRemoteWriteInternal(CellReference cellReference) {
if (snapshot.isUnlocked(cellReference)) {
localUpdates.put(cellReference, LocalCacheEntry.write());
}
}

private void cacheRemoteReadInternal(CellReference cell, CacheValue value) {
if (snapshot.isUnlocked(cell)) {
localUpdates.put(cell, LocalCacheEntry.read(value));
localUpdates.putIfAbsent(cell, LocalCacheEntry.read(value));
}
}

Expand All @@ -160,11 +190,16 @@ private Map<Cell, CacheValue> getSnapshotValues(TableReference table, Set<Cell>
.collectToMap();
}

private static CacheValue getReadValue(Optional<CacheValue> cacheValue) {
return cacheValue.orElseThrow(() -> new SafeIllegalStateException("Reads must have a cache value present"));
}

@Value.Immutable
public interface LocalCacheEntry {
interface LocalCacheEntry {

Status status();

CacheValue value();
Optional<CacheValue> value();

static LocalCacheEntry read(CacheValue value) {
return ImmutableLocalCacheEntry.builder()
Expand All @@ -173,11 +208,8 @@ static LocalCacheEntry read(CacheValue value) {
.build();
}

static LocalCacheEntry write(CacheValue value) {
return ImmutableLocalCacheEntry.builder()
.status(Status.WRITE)
.value(value)
.build();
static LocalCacheEntry write() {
return ImmutableLocalCacheEntry.builder().status(Status.WRITE).build();
}

static LocalCacheEntry hit(CacheValue value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,13 @@ static TransactionScopedCache create(ValueCacheSnapshot snapshot, CacheMetrics m
@Override
public synchronized void write(TableReference tableReference, Map<Cell, byte[]> values) {
ensureNotFinalised();
KeyedStream.stream(values)
.map(CacheValue::of)
.forEach((cell, value) -> valueStore.cacheRemoteWrite(tableReference, cell, value));
values.keySet().forEach(cell -> valueStore.recordRemoteWrite(tableReference, cell));
}

@Override
public synchronized void delete(TableReference tableReference, Set<Cell> cells) {
ensureNotFinalised();
cells.forEach(cell -> valueStore.cacheRemoteWrite(tableReference, cell, CacheValue.empty()));
cells.forEach(cell -> valueStore.recordRemoteWrite(tableReference, cell));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1619,15 +1619,15 @@ protected boolean shouldDeleteAndRollback() {

@Override
public final void delete(TableReference tableRef, Set<Cell> cells) {
putInternal(tableRef, Cells.constantValueMap(cells, PtBytes.EMPTY_BYTE_ARRAY));
getCache().delete(tableRef, cells);
putInternal(tableRef, Cells.constantValueMap(cells, PtBytes.EMPTY_BYTE_ARRAY));
}

@Override
public void put(TableReference tableRef, Map<Cell, byte[]> values) {
ensureNoEmptyValues(values);
putInternal(tableRef, values);
getCache().write(tableRef, values);
putInternal(tableRef, values);
}

public void putInternal(TableReference tableRef, Map<Cell, byte[]> values) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,28 +186,35 @@ public void readOnlyTransactionCacheFiltersOutNewlyLockedValues() {
processStartTransactionsUpdate(LOCK_WATCH_SNAPSHOT, TIMESTAMP_1);

TransactionScopedCache scopedCache1 = valueCache.getTransactionScopedCache(TIMESTAMP_1);
scopedCache1.write(TABLE, ImmutableMap.of(CELL_2, VALUE_2.value().get()));
assertThatRemotelyReadCells(scopedCache1, TABLE, CELL_1, CELL_2).containsExactlyInAnyOrder(CELL_1);
verify(metrics, times(1)).registerHits(1);
verify(metrics, times(1)).registerMisses(1);
// The cache is initially empty, and thus both reads read from remote
assertThatRemotelyReadCells(scopedCache1, TABLE, CELL_1, CELL_2).containsExactlyInAnyOrder(CELL_1, CELL_2);
verify(metrics, times(1)).registerHits(0);
verify(metrics, times(1)).registerMisses(2);

// This update has a lock taken out for CELL_1: this means that all reads for it must be remote.
// This update has a lock taken out for CELL_1, and so all reads must be remote for it
eventCache.processStartTransactionsUpdate(ImmutableSet.of(TIMESTAMP_2), LOCK_WATCH_LOCK_SUCCESS);
valueCache.processStartTransactions(ImmutableSet.of(TIMESTAMP_2));
processEventCacheCommit(TIMESTAMP_1, 1L);
valueCache.updateCacheWithCommitTimestampsInformation(ImmutableSet.of(TIMESTAMP_1));

// The difference between the read only cache and the new scoped cache, despite being at the same sequence,
// is that the read-only cache contains all the locally cached values, including writes, whereas the fresh
// cache only contains those published values from the first cache - and since one was a write, and the other
// had a lock taken out during the transaction, none of the values were actually pushed centrally.
// The read only cache cannot cache CELL_1 due to it being locked, but can cache CELL_2 as it was read locally.
// The new scoped cache cannot cache CELL_1 either, but also cannot initially read CELL_2 from the cache as it
// has not yet been flushed to the central one.
TransactionScopedCache readOnlyCache = valueCache.getReadOnlyTransactionScopedCacheForCommit(TIMESTAMP_1);
assertThatRemotelyReadCells(readOnlyCache, TABLE, CELL_1, CELL_2).containsExactlyInAnyOrder(CELL_1);
verify(metrics, times(2)).registerHits(1);
verify(metrics, times(2)).registerMisses(1);
verify(metrics, times(1)).registerHits(1);
verify(metrics, times(1)).registerMisses(1);

// While this commit does update the central cache, the second cache has already been created before this point
// and thus does not receive the update
valueCache.onSuccessfulCommit(TIMESTAMP_1);

TransactionScopedCache scopedCache2 = valueCache.getTransactionScopedCache(TIMESTAMP_2);
assertThatRemotelyReadCells(scopedCache2, TABLE, CELL_1, CELL_2).containsExactlyInAnyOrder(CELL_1, CELL_2);

// This confirms that CELL_1 remains uncacheable for this cache due to there having been a lock taken out when
// this cache was created
assertThatRemotelyReadCells(scopedCache2, TABLE, CELL_1, CELL_2).containsExactlyInAnyOrder(CELL_1);
}

@Test
Expand Down Expand Up @@ -251,7 +258,7 @@ public void lockUpdatesPreventCachingAndUnlockUpdatesAllowItAgain() {
assertThat(scopedCache3.getValueDigest().loadedValues())
.containsExactlyInAnyOrderEntriesOf(ImmutableMap.of(CellReference.of(TABLE, CELL_1), VALUE_1));
assertThat(scopedCache3.getHitDigest().hitCells())
.containsExactly(CellReference.of(TABLE, CELL_2), CellReference.of(TABLE, CELL_3));
.containsExactlyInAnyOrder(CellReference.of(TABLE, CELL_2), CellReference.of(TABLE, CELL_3));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,39 @@ public void localReadsAreStoredAndRead() {
}

@Test
public void localWritesAreStoredAndReadInsteadOfSnapshotReads() {
public void localWritesOverrideSnapshotReads() {
TransactionCacheValueStore valueStore = cacheWithSingleValue();
assertCacheContainsValue(valueStore, VALUE_1);

valueStore.cacheRemoteWrite(TABLE, CELL, VALUE_2);
valueStore.recordRemoteWrite(TABLE, CELL);

assertCacheContainsValue(valueStore, VALUE_2);
assertCacheIsEmpty(valueStore);
assertDigestContainsEntries(valueStore, ImmutableMap.of());
}

@Test
public void localWritesOverrideLocalReads() {
TransactionCacheValueStore valueStore = emptyCache();
assertCacheIsEmpty(valueStore);

valueStore.cacheRemoteReads(TABLE, ImmutableMap.of(CELL, VALUE_1.value().get()));
assertCacheContainsValue(valueStore, VALUE_1);

valueStore.recordRemoteWrite(TABLE, CELL);
assertCacheIsEmpty(valueStore);
assertDigestContainsEntries(valueStore, ImmutableMap.of());
}

@Test
public void localWritesAreNotOverwrittenByReads() {
TransactionCacheValueStore valueStore = cacheWithSingleValue();
assertCacheContainsValue(valueStore, VALUE_1);

valueStore.recordRemoteWrite(TABLE, CELL);
assertCacheIsEmpty(valueStore);

valueStore.cacheRemoteReads(TABLE, ImmutableMap.of(CELL, VALUE_1.value().get()));
assertCacheIsEmpty(valueStore);
assertDigestContainsEntries(valueStore, ImmutableMap.of());
}

Expand All @@ -83,7 +109,7 @@ public void valuesNotCachedForUnwatchedTables() {
TransactionCacheValueStore valueStore = new TransactionCacheValueStoreImpl(
ValueCacheSnapshotImpl.of(HashMap.empty(), HashSet.empty(), ImmutableSet.of()));

valueStore.cacheRemoteWrite(TABLE, CELL, VALUE_1);
valueStore.recordRemoteWrite(TABLE, CELL);
assertCacheIsEmpty(valueStore);

valueStore.cacheEmptyReads(TABLE, ImmutableSet.of(CELL));
Expand All @@ -94,15 +120,15 @@ public void valuesNotCachedForUnwatchedTables() {
}

@Test
public void createWithFilteredUpdateTransfersWrites() {
public void createWithFilteredUpdateTransfersWrittenCells() {
TransactionCacheValueStore valueStore = cacheWithSingleValue();

valueStore.cacheRemoteWrite(TABLE, CELL, VALUE_1);
assertCacheContainsValue(valueStore, VALUE_1);
valueStore.recordRemoteWrite(TABLE, CELL);
assertCacheIsEmpty(valueStore);

TransactionCacheValueStore filteredValueStore =
valueStore.createWithFilteredSnapshot(CommitUpdate.invalidateSome(ImmutableSet.of()));
assertCacheContainsValue(filteredValueStore, VALUE_1);
assertCacheIsEmpty(filteredValueStore);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,17 +100,16 @@ public void getReadsCachedValuesBeforeReadingFromDb() {
cache.write(TABLE, ImmutableMap.of(CELL_3, VALUE_3.value().get()));

assertThat(getRemotelyReadCells(cache, TABLE, CELL_1, CELL_2, CELL_3, CELL_4, CELL_5))
.containsExactlyInAnyOrder(CELL_4, CELL_5);
.containsExactlyInAnyOrder(CELL_3, CELL_4, CELL_5);
}

@Test
public void deletesCacheResultLocally() {
public void deletesDoNotCacheResultLocally() {
TransactionScopedCache cache = TransactionScopedCacheImpl.create(snapshotWithSingleValue(), metrics);

cache.delete(TABLE, ImmutableSet.of(CELL_1));
assertThat(getRemotelyReadCells(cache, TABLE, CELL_1, CELL_2)).containsExactly(CELL_2);
assertThat(cache.get(TABLE, ImmutableSet.of(CELL_1), TransactionScopedCacheImplTest::remoteRead))
.isEmpty();
assertThat(getRemotelyReadCells(cache, TABLE, CELL_1, CELL_2)).containsExactly(CELL_1, CELL_2);
assertThat(getRemotelyReadCells(cache, TABLE, CELL_1)).containsExactly(CELL_1);
}

@Test
Expand Down
7 changes: 7 additions & 0 deletions changelog/@unreleased/pr-5871.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
type: fix
fix:
description: The lock watch cache no longer attempts to cache writes - instead,
these will read from the maps tracked within the transaction (with similar performance
but more consistent behaviour around concurrent writes).
links:
- https://github.com/palantir/atlasdb/pull/5871

0 comments on commit 7906838

Please sign in to comment.