Skip to content

Commit

Permalink
Add an EventBus subscriber for remote analysis caching events.
Browse files Browse the repository at this point in the history
Currently, this listens to serialization events only. The test assertions are now made more exact with the new information.

A future motivation is to hook this into more principled logging solutions (e.g. BEP) for capturing remote analysis caching events (# bytes/counts read/write, cache hit rate for cache reading builds).

PiperOrigin-RevId: 673234500
Change-Id: Ie6395616f41019190aeda890a041ce8eb3878403
  • Loading branch information
jin authored and copybara-github committed Sep 11, 2024
1 parent 7a9702b commit e8e1bd3
Show file tree
Hide file tree
Showing 9 changed files with 124 additions and 17 deletions.
1 change: 1 addition & 0 deletions src/main/java/com/google/devtools/build/lib/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,7 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/skyframe/config",
"//src/main/java/com/google/devtools/build/lib/skyframe/rewinding:action_rewound_event",
"//src/main/java/com/google/devtools/build/lib/skyframe/serialization",
"//src/main/java/com/google/devtools/build/lib/skyframe/serialization/analysis:event_listener",
"//src/main/java/com/google/devtools/build/lib/skyframe/serialization/analysis:frontier_serializer",
"//src/main/java/com/google/devtools/build/lib/skyframe/serialization/analysis:options",
"//src/main/java/com/google/devtools/build/lib/util",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,7 @@ private void serializeFrontier(
activeDirectoriesMatcher.get(),
requireNonNull(env.getFingerprintValueService()),
env.getReporter(),
env.getEventBus(),
options.serializedFrontierProfile);
if (maybeFailureDetail.isPresent()) {
throw new AbruptExitException(DetailedExitCode.of(maybeFailureDetail.get()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import com.google.devtools.build.lib.skyframe.serialization.FingerprintValueService;
import com.google.devtools.build.lib.skyframe.serialization.ObjectCodecRegistry;
import com.google.devtools.build.lib.skyframe.serialization.ObjectCodecs;
import com.google.devtools.build.lib.skyframe.serialization.analysis.RemoteAnalysisCachingEventListener;
import com.google.devtools.build.lib.skyframe.serialization.analysis.RemoteAnalysisCachingOptions;
import com.google.devtools.build.lib.util.AbruptExitException;
import com.google.devtools.build.lib.util.DetailedExitCode;
Expand Down Expand Up @@ -146,6 +147,7 @@ public class CommandEnvironment {
private final DelegatingDownloader delegatingDownloader;
@Nullable private final Supplier<ObjectCodecs> analysisObjectCodecsSupplier;
@Nullable private final FingerprintValueService fingerprintValueService;
private final RemoteAnalysisCachingEventListener remoteAnalysisCachingEventListener;

private boolean mergedAnalysisAndExecution;

Expand Down Expand Up @@ -179,6 +181,16 @@ public class CommandEnvironment {
// occurrences of the same flag.
private ImmutableList<OptionAndRawValue> invocationPolicyFlags = ImmutableList.of();

/**
* Gets the {@link RemoteAnalysisCachingEventListener} for this invocation.
*
* <p>A new copy of the listener is instantiated for every new {@link CommandEnvironment}, so
* statistics are not retained between invocations.
*/
public RemoteAnalysisCachingEventListener getRemoteAnalysisCachingEventListener() {
return remoteAnalysisCachingEventListener;
}

private class BlazeModuleEnvironment implements BlazeModule.ModuleEnvironment {
@Nullable
@Override
Expand Down Expand Up @@ -369,6 +381,8 @@ public void exit(AbruptExitException exception) {
this.analysisObjectCodecsSupplier = null;
this.fingerprintValueService = null;
}
this.remoteAnalysisCachingEventListener = new RemoteAnalysisCachingEventListener();
this.eventBus.register(remoteAnalysisCachingEventListener);
}

private static ObjectCodecs initAnalysisObjectCodecs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,21 @@ filegroup(
visibility = ["//src:__subpackages__"],
)

java_library(
name = "event_listener",
srcs = ["RemoteAnalysisCachingEventListener.java"],
deps = [
"//src/main/java/com/google/devtools/build/lib/concurrent",
"//src/main/java/com/google/devtools/build/skyframe:skyframe-objects",
"//third_party:guava",
],
)

java_library(
name = "frontier_serializer",
srcs = ["FrontierSerializer.java"],
deps = [
":event_listener",
"//src/main/java/com/google/devtools/build/lib/actions:action_lookup_key",
"//src/main/java/com/google/devtools/build/lib/cmdline",
"//src/main/java/com/google/devtools/build/lib/collect",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.eventbus.EventBus;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.actions.ActionLookupKey;
Expand All @@ -39,6 +40,7 @@
import com.google.devtools.build.lib.skyframe.serialization.ProfileCollector;
import com.google.devtools.build.lib.skyframe.serialization.SerializationException;
import com.google.devtools.build.lib.skyframe.serialization.SerializationResult;
import com.google.devtools.build.lib.skyframe.serialization.analysis.RemoteAnalysisCachingEventListener.SerializedNodeEvent;
import com.google.devtools.build.skyframe.InMemoryGraph;
import com.google.devtools.build.skyframe.InMemoryNodeEntry;
import com.google.devtools.build.skyframe.SkyKey;
Expand Down Expand Up @@ -75,6 +77,7 @@ public static Optional<FailureDetail> serializeAndUploadFrontier(
PathFragmentPrefixTrie matcher,
FingerprintValueService fingerprintValueService,
Reporter reporter,
EventBus eventBus,
String profilePath)
throws InterruptedException {
// Starts initializing ObjectCodecs in a background thread as it can take some time.
Expand Down Expand Up @@ -110,37 +113,37 @@ public static Optional<FailureDetail> serializeAndUploadFrontier(
AtomicInteger frontierValueCount = new AtomicInteger();
selection.forEach(
/* parallelismThreshold= */ 0,
(actionLookupKey, marking) -> {
(key, marking) -> {
if (!marking.equals(FRONTIER_CANDIDATE)) {
return;
}
try {
SerializationResult<ByteString> keyBytes =
codecs.serializeMemoizedAndBlocking(
fingerprintValueService, actionLookupKey, profileCollector);
codecs.serializeMemoizedAndBlocking(fingerprintValueService, key, profileCollector);
var keyWriteStatus = keyBytes.getFutureToBlockWritesOn();
if (keyWriteStatus != null) {
writeStatuses.add(keyWriteStatus);
}

InMemoryNodeEntry node = checkNotNull(graph.getIfPresent(actionLookupKey));
InMemoryNodeEntry node = checkNotNull(graph.getIfPresent(key));
SerializationResult<ByteString> valueBytes =
codecs.serializeMemoizedAndBlocking(
fingerprintValueService, node.getValue(), profileCollector);
var writeStatusFuture = valueBytes.getFutureToBlockWritesOn();
if (writeStatusFuture != null) {
writeStatuses.add(writeStatusFuture);
}

frontierValueCount.getAndIncrement();
eventBus.post(new SerializedNodeEvent(key));
} catch (SerializationException e) {
writeStatuses.add(immediateFailedFuture(e));
}
});

reporter.handle(
Event.info(
String.format(
"Serialized %s frontier entries in %s\n", frontierValueCount, stopwatch)));
String.format("Serialized %s frontier entries in %s", frontierValueCount, stopwatch)));

try {
var unusedNull =
Expand All @@ -155,7 +158,7 @@ public static Optional<FailureDetail> serializeAndUploadFrontier(
return Optional.of(createFailureDetail(message, Code.SERIALIZED_FRONTIER_PROFILE_FAILED));
}
reporter.handle(
Event.info(String.format("Waiting for write futures took an additional %s\n", stopwatch)));
Event.info(String.format("Waiting for write futures took an additional %s", stopwatch)));

if (profilePath.isEmpty()) {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2024 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.skyframe.serialization.analysis;

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.collect.HashMultiset;
import com.google.common.collect.Multiset;
import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
import com.google.devtools.build.lib.concurrent.ThreadSafety;
import com.google.devtools.build.skyframe.SkyFunctionName;
import com.google.devtools.build.skyframe.SkyKey;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/** An {@link com.google.common.eventbus.EventBus} listener for remote analysis caching events. */
@ThreadSafety.ThreadSafe
public class RemoteAnalysisCachingEventListener {

/**
* An event for when a Skyframe node has been serialized, but its associated write futures (i.e.
* RPC latency) may not be done yet.
*/
public record SerializedNodeEvent(SkyKey key) {
public SerializedNodeEvent {
checkNotNull(key);
}
}

private final Set<SkyKey> serializedKeys = ConcurrentHashMap.newKeySet();

@Subscribe
@AllowConcurrentEvents
@SuppressWarnings("unused")
public void onSerializationComplete(SerializedNodeEvent event) {
serializedKeys.add(event.key());
}

/** Returns the counts of {@link SkyFunctionName} from serialized nodes of this invocation. */
public Multiset<SkyFunctionName> getSkyfunctionCounts() {
Multiset<SkyFunctionName> counts = HashMultiset.create();
serializedKeys.forEach(key -> counts.add(key.functionName()));
return counts;
}

/** Returns the count of serialized nodes of this invocation. */
public int getSerializedKeysCount() {
return serializedKeys.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/pkgcache",
"//src/main/java/com/google/devtools/build/lib/skyframe:aspect_key_creator",
"//src/main/java/com/google/devtools/build/lib/skyframe:configured_target_key",
"//src/main/java/com/google/devtools/build/lib/skyframe:sky_functions",
"//src/main/java/com/google/devtools/build/lib/skyframe/serialization/analysis:frontier_serializer",
"//src/main/java/com/google/devtools/build/skyframe",
"//src/main/java/com/google/devtools/build/skyframe:skyframe-objects",
Expand All @@ -34,7 +35,9 @@ java_test(
deps = [
":FrontierSerializerTestBase",
"//src/main/java/com/google/devtools/build/lib:runtime",
"//src/main/java/com/google/devtools/build/lib/skyframe:sky_functions",
"//src/main/java/com/google/devtools/build/lib/skyframe/serialization:serialization_module",
"//third_party:junit4",
"//third_party:truth",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@
// limitations under the License.
package com.google.devtools.build.lib.skyframe.serialization.analysis;

import static com.google.common.truth.Truth.assertThat;

import com.google.devtools.build.lib.runtime.BlazeRuntime;
import com.google.devtools.build.lib.skyframe.SkyFunctions;
import com.google.devtools.build.lib.skyframe.serialization.SerializationModule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

Expand All @@ -25,4 +29,16 @@ public final class FrontierSerializerTest extends FrontierSerializerTestBase {
protected BlazeRuntime.Builder getRuntimeBuilder() throws Exception {
return super.getRuntimeBuilder().addBlazeModule(new SerializationModule());
}

@Test
public void buildCommand_uploadsFrontierBytesWithUploadMode() throws Exception {
setupScenarioWithAspects("--experimental_remote_analysis_cache_mode=upload");

var listener = getCommandEnvironment().getRemoteAnalysisCachingEventListener();
assertThat(listener.getSerializedKeysCount()).isAtLeast(9); // for Bazel
assertThat(listener.getSkyfunctionCounts().count(SkyFunctions.CONFIGURED_TARGET))
.isAtLeast(9); // for Bazel

assertContainsEvent("Waiting for write futures took an additional");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.devtools.build.lib.pkgcache.LoadingFailedException;
import com.google.devtools.build.lib.skyframe.AspectKeyCreator.AspectBaseKey;
import com.google.devtools.build.lib.skyframe.ConfiguredTargetKey;
import com.google.devtools.build.lib.skyframe.SkyFunctions;
import com.google.devtools.build.lib.skyframe.serialization.analysis.FrontierSerializer.SelectionMarking;
import com.google.devtools.build.skyframe.InMemoryGraph;
import com.google.devtools.build.skyframe.SkyKey;
Expand Down Expand Up @@ -209,14 +210,6 @@ private void collectAnalysisUtc(
}
}

@Test
public void buildCommand_uploadsFrontierBytesWithUploadMode() throws Exception {
setupScenarioWithAspects("--experimental_remote_analysis_cache_mode=upload");

assertContainsEvent("active or frontier keys in");
assertContainsEvent("Waiting for write futures took an additional");
}

@Test
public void buildCommandWithSkymeld_uploadsFrontierBytesWithUploadMode() throws Exception {
write(
Expand Down Expand Up @@ -245,7 +238,10 @@ public void buildCommandWithSkymeld_uploadsFrontierBytesWithUploadMode() throws
// Validate that Skymeld did run.
assertThat(getCommandEnvironment().withMergedAnalysisAndExecutionSourceOfTruth()).isTrue();

assertContainsEvent("active or frontier keys in");
var listener = getCommandEnvironment().getRemoteAnalysisCachingEventListener();
assertThat(listener.getSerializedKeysCount()).isAtLeast(1);
assertThat(listener.getSkyfunctionCounts().count(SkyFunctions.CONFIGURED_TARGET)).isAtLeast(1);

assertContainsEvent("Waiting for write futures took an additional");
}

Expand Down Expand Up @@ -310,7 +306,7 @@ public void buildCommand_serializedFrontierProfileContainsExpectedClasses() thro
.doesNotContain("com.google.devtools.build.lib.skyframe.NonRuleConfiguredTargetValue");
}

private void setupScenarioWithAspects(String... options) throws Exception {
protected final void setupScenarioWithAspects(String... options) throws Exception {
write(
"foo/provider.bzl",
"""
Expand Down

0 comments on commit e8e1bd3

Please sign in to comment.