From 6f32d5a7d75ff46de6efc557c1ea95777ea813ca Mon Sep 17 00:00:00 2001 From: olaola Date: Wed, 20 Sep 2017 17:12:19 +0200 Subject: [PATCH] Passing Bazel metadata in gRPC headers. TESTED=unit tests RELNOTES: none PiperOrigin-RevId: 169395919 --- .../BuildEventServiceModule.java | 8 +- .../google/devtools/build/lib/remote/BUILD | 1 - .../build/lib/remote/ByteStreamUploader.java | 11 +- .../build/lib/remote/GrpcRemoteCache.java | 3 + .../build/lib/remote/GrpcRemoteExecutor.java | 2 + .../remote/RemoteActionContextProvider.java | 32 +++-- .../build/lib/remote/RemoteModule.java | 7 +- .../build/lib/remote/RemoteSpawnCache.java | 118 +++++++++------- .../build/lib/remote/RemoteSpawnRunner.java | 131 ++++++++++-------- .../lib/remote/TracingMetadataUtils.java | 109 +++++++++++++++ .../build/lib/runtime/CommandEnvironment.java | 49 ++++--- .../lib/remote/ByteStreamUploaderTest.java | 116 +++++++++++++++- .../build/lib/remote/GrpcRemoteCacheTest.java | 5 + .../remote/GrpcRemoteExecutionClientTest.java | 62 +++++++-- .../lib/remote/RemoteSpawnCacheTest.java | 4 +- .../lib/remote/RemoteSpawnRunnerTest.java | 121 +++++++++++++--- .../com/google/devtools/build/remote/BUILD | 5 - .../build/remote/ExecutionServer.java | 29 ++-- .../devtools/build/remote/RemoteWorker.java | 14 +- 19 files changed, 620 insertions(+), 207 deletions(-) create mode 100644 src/main/java/com/google/devtools/build/lib/remote/TracingMetadataUtils.java diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java index 4c05cd548ab276..088c7315c459c3 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java +++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java @@ -44,7 +44,6 @@ import com.google.devtools.common.options.OptionsProvider; import java.io.IOException; import java.util.Set; -import java.util.UUID; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -86,7 +85,7 @@ public void beforeCommand(CommandEnvironment commandEnvironment) commandEnvironment.getRuntime().getClock(), commandEnvironment.getRuntime().getPathToUriConverter(), commandEnvironment.getReporter(), - commandEnvironment.getClientEnv().get("BAZEL_INTERNAL_BUILD_REQUEST_ID"), + commandEnvironment.getBuildRequestId().toString(), commandEnvironment.getCommandId().toString(), commandEnvironment.getCommandName()); if (streamer != null) { @@ -126,8 +125,6 @@ public void afterCommand() { /** * Returns {@code null} if no stream could be created. - * - * @param buildRequestId if {@code null} or {@code ""} a random UUID is used instead. */ @Nullable @VisibleForTesting @@ -213,9 +210,6 @@ private BuildEventTransport tryCreateBesTransport( logger.fine(format("Will create BuildEventServiceTransport streaming to '%s'", besOptions.besBackend)); - buildRequestId = isNullOrEmpty(buildRequestId) - ? UUID.randomUUID().toString() - : buildRequestId; commandLineReporter.handle( Event.info( format( diff --git a/src/main/java/com/google/devtools/build/lib/remote/BUILD b/src/main/java/com/google/devtools/build/lib/remote/BUILD index 3d8eeb367a6a98..62ff0c9ebdbc37 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/BUILD @@ -27,7 +27,6 @@ java_library( "//src/main/java/com/google/devtools/build/lib/concurrent", "//src/main/java/com/google/devtools/build/lib/exec/apple", "//src/main/java/com/google/devtools/build/lib/exec/local", - "//src/main/java/com/google/devtools/build/lib/standalone", "//src/main/java/com/google/devtools/build/lib/vfs", "//src/main/java/com/google/devtools/common/options", "//third_party:apache_httpclient", diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java index 7f60e1407ffd31..13670c43842dc7 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java @@ -37,6 +37,7 @@ import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; +import io.grpc.Context; import io.grpc.Metadata; import io.grpc.Status; import io.grpc.StatusException; @@ -257,9 +258,11 @@ private void retryAsyncUpload( try { ListenableScheduledFuture schedulingResult = retryService.schedule( - () -> - startAsyncUploadWithRetry( - chunker, backoffTimes, overallUploadResult), + Context.current() + .wrap( + () -> + startAsyncUploadWithRetry( + chunker, backoffTimes, overallUploadResult)), nextDelayMillis, MILLISECONDS); // In case the scheduled execution errors, we need to notify the overallUploadResult. @@ -418,7 +421,7 @@ private String newResourceName(Digest digest) { return resourceName; } }; - call.start(callListener, new Metadata()); + call.start(callListener, TracingMetadataUtils.headersFromCurrentContext()); call.request(1); } diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java index b49a103b5c6b60..21ecf631b263da 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java @@ -89,18 +89,21 @@ public GrpcRemoteCache(Channel channel, CallCredentials credentials, RemoteOptio private ContentAddressableStorageBlockingStub casBlockingStub() { return ContentAddressableStorageGrpc.newBlockingStub(channel) + .withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor()) .withCallCredentials(credentials) .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS); } private ByteStreamBlockingStub bsBlockingStub() { return ByteStreamGrpc.newBlockingStub(channel) + .withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor()) .withCallCredentials(credentials) .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS); } private ActionCacheBlockingStub acBlockingStub() { return ActionCacheGrpc.newBlockingStub(channel) + .withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor()) .withCallCredentials(credentials) .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS); } diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java index 3dabf0a677ea25..a1a5427766112d 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java @@ -58,12 +58,14 @@ public GrpcRemoteExecutor(Channel channel, @Nullable CallCredentials callCredent private ExecutionBlockingStub execBlockingStub() { return ExecutionGrpc.newBlockingStub(channel) + .withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor()) .withCallCredentials(callCredentials) .withDeadlineAfter(callTimeoutSecs, TimeUnit.SECONDS); } private WatcherBlockingStub watcherBlockingStub() { return WatcherGrpc.newBlockingStub(channel) + .withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor()) .withCallCredentials(callCredentials); } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java index 8bb5a27e4340b1..b1173fb77cbb39 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java @@ -49,20 +49,32 @@ public Iterable getActionContexts() { ExecutionOptions executionOptions = checkNotNull(env.getOptions().getOptions(ExecutionOptions.class)); RemoteOptions remoteOptions = checkNotNull(env.getOptions().getOptions(RemoteOptions.class)); + String buildRequestId = env.getBuildRequestId().toString(); + String commandId = env.getCommandId().toString(); if (remoteOptions.experimentalRemoteSpawnCache) { - RemoteSpawnCache spawnCache = new RemoteSpawnCache(env.getExecRoot(), remoteOptions, cache, - executionOptions.verboseFailures, env.getReporter()); + RemoteSpawnCache spawnCache = + new RemoteSpawnCache( + env.getExecRoot(), + remoteOptions, + cache, + buildRequestId, + commandId, + executionOptions.verboseFailures, + env.getReporter()); return ImmutableList.of(spawnCache); } else { - RemoteSpawnRunner spawnRunner = new RemoteSpawnRunner( - env.getExecRoot(), - remoteOptions, - createFallbackRunner(env), - executionOptions.verboseFailures, - env.getReporter(), - cache, - executor); + RemoteSpawnRunner spawnRunner = + new RemoteSpawnRunner( + env.getExecRoot(), + remoteOptions, + createFallbackRunner(env), + executionOptions.verboseFailures, + env.getReporter(), + buildRequestId, + commandId, + cache, + executor); return ImmutableList.of(new RemoteSpawnStrategy(spawnRunner)); } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java index 6b8eb87b981224..f9949c34b8a199 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java @@ -35,9 +35,12 @@ import io.grpc.CallCredentials; import io.grpc.Channel; import java.io.IOException; +import java.util.logging.Logger; /** RemoteModule provides distributed cache and remote execution for Bazel. */ public final class RemoteModule extends BlazeModule { + private static final Logger logger = Logger.getLogger(RemoteModule.class.getName()); + @VisibleForTesting static final class CasPathConverter implements PathConverter { // Not final; unfortunately, the Bazel startup process requires us to create this object before @@ -89,7 +92,9 @@ public void serverInit(OptionsProvider startupOptions, ServerBuilder builder) @Override public void beforeCommand(CommandEnvironment env) { env.getEventBus().register(this); - + String buildRequestId = env.getBuildRequestId().toString(); + String commandId = env.getCommandId().toString(); + logger.info("Command: buildRequestId = " + buildRequestId + ", commandId = " + commandId); RemoteOptions remoteOptions = env.getOptions().getOptions(RemoteOptions.class); AuthAndTLSOptions authAndTlsOptions = env.getOptions().getOptions(AuthAndTLSOptions.class); converter.options = remoteOptions; diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java index 774a5802a7368c..74a8afc8afb132 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java @@ -32,6 +32,7 @@ import com.google.devtools.remoteexecution.v1test.ActionResult; import com.google.devtools.remoteexecution.v1test.Command; import com.google.devtools.remoteexecution.v1test.Platform; +import io.grpc.Context; import java.io.IOException; import java.util.Collection; import java.util.NoSuchElementException; @@ -54,6 +55,8 @@ final class RemoteSpawnCache implements SpawnCache { private final Platform platform; private final RemoteActionCache remoteCache; + private final String buildRequestId; + private final String commandId; private final boolean verboseFailures; @Nullable private final Reporter cmdlineReporter; @@ -61,14 +64,22 @@ final class RemoteSpawnCache implements SpawnCache { // Used to ensure that a warning is reported only once. private final AtomicBoolean warningReported = new AtomicBoolean(); - RemoteSpawnCache(Path execRoot, RemoteOptions options, RemoteActionCache remoteCache, - boolean verboseFailures, @Nullable Reporter cmdlineReporter) { + RemoteSpawnCache( + Path execRoot, + RemoteOptions options, + RemoteActionCache remoteCache, + String buildRequestId, + String commandId, + boolean verboseFailures, + @Nullable Reporter cmdlineReporter) { this.execRoot = execRoot; this.options = options; this.platform = options.parseRemotePlatformOverride(); this.remoteCache = remoteCache; this.verboseFailures = verboseFailures; this.cmdlineReporter = cmdlineReporter; + this.buildRequestId = buildRequestId; + this.commandId = commandId; } @Override @@ -91,61 +102,70 @@ public CacheHandle lookup(Spawn spawn, SpawnExecutionPolicy policy) // Look up action cache, and reuse the action output if it is found. final ActionKey actionKey = Digests.computeActionKey(action); - ActionResult result = - this.options.remoteAcceptCached ? remoteCache.getCachedActionResult(actionKey) : null; - if (result != null) { - // We don't cache failed actions, so we know the outputs exist. - // For now, download all outputs locally; in the future, we can reuse the digests to - // just update the TreeNodeRepository and continue the build. - try { - remoteCache.download(result, execRoot, policy.getFileOutErr()); - SpawnResult spawnResult = new SpawnResult.Builder() - .setStatus(Status.SUCCESS) - .setExitCode(result.getExitCode()) - .build(); - return SpawnCache.success(spawnResult); - } catch (CacheNotFoundException e) { - // There's a cache miss. Fall back to local execution. - } - } - if (options.remoteUploadLocalResults) { - return new CacheHandle() { - @Override - public boolean hasResult() { - return false; + Context withMetadata = + TracingMetadataUtils.contextWithMetadata(buildRequestId, commandId, actionKey); + // Metadata will be available in context.current() until we detach. + // This is done via a thread-local variable. + Context previous = withMetadata.attach(); + try { + ActionResult result = + this.options.remoteAcceptCached ? remoteCache.getCachedActionResult(actionKey) : null; + if (result != null) { + // We don't cache failed actions, so we know the outputs exist. + // For now, download all outputs locally; in the future, we can reuse the digests to + // just update the TreeNodeRepository and continue the build. + try { + remoteCache.download(result, execRoot, policy.getFileOutErr()); + SpawnResult spawnResult = + new SpawnResult.Builder() + .setStatus(Status.SUCCESS) + .setExitCode(result.getExitCode()) + .build(); + return SpawnCache.success(spawnResult); + } catch (CacheNotFoundException e) { + // There's a cache miss. Fall back to local execution. } + } + if (options.remoteUploadLocalResults) { + return new CacheHandle() { + @Override + public boolean hasResult() { + return false; + } - @Override - public SpawnResult getResult() { - throw new NoSuchElementException(); - } + @Override + public SpawnResult getResult() { + throw new NoSuchElementException(); + } - @Override - public boolean willStore() { - return true; - } + @Override + public boolean willStore() { + return true; + } - @Override - public void store(SpawnResult result, Collection files) - throws InterruptedException, IOException { - try { + @Override + public void store(SpawnResult result, Collection files) + throws InterruptedException, IOException { boolean uploadAction = Status.SUCCESS.equals(result.status()) && result.exitCode() == 0; - remoteCache.upload(actionKey, execRoot, files, policy.getFileOutErr(), uploadAction); - } catch (IOException e) { - if (verboseFailures) { - report(Event.debug("Upload to remote cache failed: " + e.getMessage())); - } else { - reportOnce(Event.warn("Some artifacts failed be uploaded to the remote cache.")); + try { + remoteCache.upload(actionKey, execRoot, files, policy.getFileOutErr(), uploadAction); + } catch (IOException e) { + if (verboseFailures) { + report(Event.debug("Upload to remote cache failed: " + e.getMessage())); + } else { + reportOnce(Event.warn("Some artifacts failed be uploaded to the remote cache.")); + } } } - } - @Override - public void close() { - } - }; - } else { - return SpawnCache.NO_RESULT_NO_STORE; + @Override + public void close() {} + }; + } else { + return SpawnCache.NO_RESULT_NO_STORE; + } + } finally { + withMetadata.detach(previous); } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java index b4d28cd9950b52..f4babd76a13d44 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java @@ -45,6 +45,7 @@ import com.google.devtools.remoteexecution.v1test.ExecuteRequest; import com.google.devtools.remoteexecution.v1test.ExecuteResponse; import com.google.devtools.remoteexecution.v1test.Platform; +import io.grpc.Context; import io.grpc.Status.Code; import java.io.IOException; import java.io.OutputStream; @@ -74,6 +75,8 @@ class RemoteSpawnRunner implements SpawnRunner { @Nullable private final Reporter cmdlineReporter; @Nullable private final RemoteActionCache remoteCache; @Nullable private final GrpcRemoteExecutor remoteExecutor; + private final String buildRequestId; + private final String commandId; // Used to ensure that a warning is reported only once. private final AtomicBoolean warningReported = new AtomicBoolean(); @@ -84,6 +87,8 @@ class RemoteSpawnRunner implements SpawnRunner { SpawnRunner fallbackRunner, boolean verboseFailures, @Nullable Reporter cmdlineReporter, + String buildRequestId, + String commandId, @Nullable RemoteActionCache remoteCache, @Nullable GrpcRemoteExecutor remoteExecutor) { this.execRoot = execRoot; @@ -94,6 +99,8 @@ class RemoteSpawnRunner implements SpawnRunner { this.remoteExecutor = remoteExecutor; this.verboseFailures = verboseFailures; this.cmdlineReporter = cmdlineReporter; + this.buildRequestId = buildRequestId; + this.commandId = commandId; } @Override @@ -121,64 +128,68 @@ public SpawnResult exec(Spawn spawn, SpawnExecutionPolicy policy) // Look up action cache, and reuse the action output if it is found. ActionKey actionKey = Digests.computeActionKey(action); - boolean acceptCachedResult = options.remoteAcceptCached && Spawns.mayBeCached(spawn); - boolean uploadLocalResults = options.remoteUploadLocalResults; - + Context withMetadata = + TracingMetadataUtils.contextWithMetadata(buildRequestId, commandId, actionKey); + Context previous = withMetadata.attach(); try { - // Try to lookup the action in the action cache. - ActionResult cachedResult = - acceptCachedResult - ? remoteCache.getCachedActionResult(actionKey) - : null; - if (cachedResult != null) { - if (cachedResult.getExitCode() != 0) { - // The remote cache must never serve a failed action. - throw new EnvironmentalExecException("The remote cache is in an invalid state as it" - + " served a failed action. Hash of the action: " + actionKey.getDigest()); - } - try { - return downloadRemoteResults(cachedResult, policy.getFileOutErr()); - } catch (CacheNotFoundException e) { - // No cache hit, so we fall through to local or remote execution. - // We set acceptCachedResult to false in order to force the action re-execution. - acceptCachedResult = false; + boolean acceptCachedResult = options.remoteAcceptCached && Spawns.mayBeCached(spawn); + boolean uploadLocalResults = options.remoteUploadLocalResults; + + try { + // Try to lookup the action in the action cache. + ActionResult cachedResult = + acceptCachedResult ? remoteCache.getCachedActionResult(actionKey) : null; + if (cachedResult != null) { + if (cachedResult.getExitCode() != 0) { + // The remote cache must never serve a failed action. + throw new EnvironmentalExecException( + "The remote cache is in an invalid state as it" + + " served a failed action. Hash of the action: " + + actionKey.getDigest()); + } + try { + return downloadRemoteResults(cachedResult, policy.getFileOutErr()); + } catch (CacheNotFoundException e) { + // No cache hit, so we fall through to local or remote execution. + // We set acceptCachedResult to false in order to force the action re-execution. + acceptCachedResult = false; + } } + } catch (IOException e) { + return execLocallyOrFail(spawn, policy, inputMap, actionKey, uploadLocalResults, e); } - } catch (IOException e) { - return execLocallyOrFail( - spawn, policy, inputMap, actionKey, uploadLocalResults, e); - } - if (remoteExecutor == null) { - // Remote execution is disabled and so execute the spawn on the local machine. - return execLocally(spawn, policy, inputMap, uploadLocalResults, remoteCache, actionKey); - } + if (remoteExecutor == null) { + // Remote execution is disabled and so execute the spawn on the local machine. + return execLocally(spawn, policy, inputMap, uploadLocalResults, remoteCache, actionKey); + } - try { - // Upload the command and all the inputs into the remote cache. - remoteCache.ensureInputsPresent(repository, execRoot, inputRoot, command); - } catch (IOException e) { - return execLocallyOrFail( - spawn, policy, inputMap, actionKey, uploadLocalResults, e); - } + try { + // Upload the command and all the inputs into the remote cache. + remoteCache.ensureInputsPresent(repository, execRoot, inputRoot, command); + } catch (IOException e) { + return execLocallyOrFail(spawn, policy, inputMap, actionKey, uploadLocalResults, e); + } - final ActionResult result; - try { - result = executeRemotely(action, inputMap.size(), acceptCachedResult); - } catch (IOException e) { - return execLocallyOrFail(spawn, policy, inputMap, actionKey, uploadLocalResults, e); - } + final ActionResult result; + try { + result = executeRemotely(action, inputMap.size(), acceptCachedResult); + } catch (IOException e) { + return execLocallyOrFail(spawn, policy, inputMap, actionKey, uploadLocalResults, e); + } - boolean executionFailed = result.getExitCode() != 0; - if (options.remoteLocalFallback && executionFailed) { - return execLocally(spawn, policy, inputMap, uploadLocalResults, remoteCache, actionKey); - } + boolean executionFailed = result.getExitCode() != 0; + if (options.remoteLocalFallback && executionFailed) { + return execLocally(spawn, policy, inputMap, uploadLocalResults, remoteCache, actionKey); + } - try { - return downloadRemoteResults(result, policy.getFileOutErr()); - } catch (IOException e) { - return execLocallyOrFail( - spawn, policy, inputMap, actionKey, uploadLocalResults, e); + try { + return downloadRemoteResults(result, policy.getFileOutErr()); + } catch (IOException e) { + return execLocallyOrFail(spawn, policy, inputMap, actionKey, uploadLocalResults, e); + } + } finally { + withMetadata.detach(previous); } } @@ -191,8 +202,8 @@ private SpawnResult downloadRemoteResults(ActionResult result, FileOutErr outErr .build(); } - private ActionResult executeRemotely(Action action, int numInputFiles, - boolean acceptCachedResult) throws IOException, InterruptedException { + private ActionResult executeRemotely(Action action, int numInputFiles, boolean acceptCachedResult) + throws IOException, InterruptedException { // TODO(olaola): set BuildInfo and input total bytes as well. ExecuteRequest.Builder request = ExecuteRequest.newBuilder() @@ -204,9 +215,13 @@ private ActionResult executeRemotely(Action action, int numInputFiles, return reply.getResult(); } - private SpawnResult execLocallyOrFail(Spawn spawn, SpawnExecutionPolicy policy, - SortedMap inputMap, ActionKey actionKey, - boolean uploadLocalResults, IOException cause) + private SpawnResult execLocallyOrFail( + Spawn spawn, + SpawnExecutionPolicy policy, + SortedMap inputMap, + ActionKey actionKey, + boolean uploadLocalResults, + IOException cause) throws ExecException, InterruptedException, IOException { if (options.remoteLocalFallback) { return execLocally(spawn, policy, inputMap, uploadLocalResults, remoteCache, actionKey); @@ -313,7 +328,8 @@ private SpawnResult execLocally( SortedMap inputMap, boolean uploadToCache, @Nullable RemoteActionCache remoteCache, - @Nullable ActionKey actionKey) throws ExecException, IOException, InterruptedException { + @Nullable ActionKey actionKey) + throws ExecException, IOException, InterruptedException { if (uploadToCache && Spawns.mayBeCached(spawn) && remoteCache != null && actionKey != null) { return execLocallyAndUpload(spawn, policy, inputMap, remoteCache, actionKey); } @@ -326,7 +342,8 @@ SpawnResult execLocallyAndUpload( SpawnExecutionPolicy policy, SortedMap inputMap, RemoteActionCache remoteCache, - ActionKey actionKey) throws ExecException, IOException, InterruptedException { + ActionKey actionKey) + throws ExecException, IOException, InterruptedException { Map ctimesBefore = getInputCtimes(inputMap); SpawnResult result = fallbackRunner.exec(spawn, policy); Map ctimesAfter = getInputCtimes(inputMap); diff --git a/src/main/java/com/google/devtools/build/lib/remote/TracingMetadataUtils.java b/src/main/java/com/google/devtools/build/lib/remote/TracingMetadataUtils.java new file mode 100644 index 00000000000000..18da62205c380f --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/TracingMetadataUtils.java @@ -0,0 +1,109 @@ +// Copyright 2017 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote; + +import com.google.common.annotations.VisibleForTesting; +import com.google.devtools.build.lib.analysis.BlazeVersionInfo; +import com.google.devtools.build.lib.remote.Digests.ActionKey; +import com.google.devtools.remoteexecution.v1test.RequestMetadata; +import com.google.devtools.remoteexecution.v1test.ToolDetails; +import io.grpc.ClientInterceptor; +import io.grpc.Context; +import io.grpc.Contexts; +import io.grpc.Metadata; +import io.grpc.ServerCall; +import io.grpc.ServerCall.Listener; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.protobuf.ProtoUtils; +import io.grpc.stub.MetadataUtils; + +/** Utility functions to handle Metadata for remote Grpc calls. */ +public class TracingMetadataUtils { + + private TracingMetadataUtils() {} + + private static final Context.Key CONTEXT_KEY = + Context.key("remote-grpc-metadata"); + + @VisibleForTesting + public static final Metadata.Key METADATA_KEY = + ProtoUtils.keyForProto(RequestMetadata.getDefaultInstance()); + + /** + * Returns a new gRPC context derived from the current context, with + * {@link RequestMetadata} accessible by the {@link fromCurrentContext()} method. + * + *

The {@link RequestMetadata} is constructed using the provided arguments + * and the current tool version. + */ + public static Context contextWithMetadata( + String buildRequestId, String commandId, ActionKey actionKey) { + RequestMetadata metadata = + RequestMetadata.newBuilder() + .setCorrelatedInvocationsId(buildRequestId) + .setToolInvocationId(commandId) + .setActionId(actionKey.getDigest().getHash()) + .setToolDetails( + ToolDetails.newBuilder() + .setToolName("bazel") + .setToolVersion(BlazeVersionInfo.instance().getVersion())) + .build(); + return Context.current().withValue(CONTEXT_KEY, metadata); + } + + /** + * Fetches a {@link RequestMetadata} defined on the current context. + * + * @throws {@link IllegalStateException} when the metadata is not defined in the current context. + */ + public static RequestMetadata fromCurrentContext() { + RequestMetadata metadata = CONTEXT_KEY.get(); + if (metadata == null) { + throw new IllegalStateException("RequestMetadata not set in current context."); + } + return metadata; + } + + /** + * Creates a {@link Metadata} containing the {@link RequestMetadata} defined on the current + * context. + * + * @throws {@link IllegalStateException} when the metadata is not defined in the current context. + */ + public static Metadata headersFromCurrentContext() { + Metadata headers = new Metadata(); + headers.put(METADATA_KEY, fromCurrentContext()); + return headers; + } + + public static ClientInterceptor attachMetadataFromContextInterceptor() { + return MetadataUtils.newAttachHeadersInterceptor(headersFromCurrentContext()); + } + + /** GRPC interceptor to add logging metadata to the GRPC context. */ + public static class ServerHeadersInterceptor implements ServerInterceptor { + @Override + public Listener interceptCall( + ServerCall call, Metadata headers, ServerCallHandler next) { + RequestMetadata meta = headers.get(METADATA_KEY); + if (meta == null) { + throw new IllegalStateException("RequestMetadata not received from the client."); + } + Context ctx = Context.current().withValue(CONTEXT_KEY, meta); + return Contexts.interceptCall(ctx, call, headers, next); + } + } + +} diff --git a/src/main/java/com/google/devtools/build/lib/runtime/CommandEnvironment.java b/src/main/java/com/google/devtools/build/lib/runtime/CommandEnvironment.java index 86fa7be744ff1a..655b7d6156eb4a 100644 --- a/src/main/java/com/google/devtools/build/lib/runtime/CommandEnvironment.java +++ b/src/main/java/com/google/devtools/build/lib/runtime/CommandEnvironment.java @@ -69,6 +69,7 @@ public final class CommandEnvironment { private final BlazeDirectories directories; private UUID commandId; // Unique identifier for the command being run + private UUID buildRequestId; // Unique identifier for the build being run private final Reporter reporter; private final EventBus eventBus; private final BlazeModule.ModuleEnvironment blazeModuleEnvironment; @@ -124,6 +125,7 @@ public void exit(AbruptExitException exception) { this.workspace = workspace; this.directories = workspace.getDirectories(); this.commandId = null; // Will be set once we get the client environment + this.buildRequestId = null; // Will be set once we get the client environment this.reporter = new Reporter(eventBus); this.eventBus = eventBus; this.commandThread = commandThread; @@ -244,6 +246,21 @@ private Map filterClientEnv(Set vars) { return Collections.unmodifiableMap(result); } + private UUID getFromEnvOrGenerate(String varName) { + // Try to set the clientId from the client environment. + String uuidString = clientEnv.getOrDefault(varName, ""); + if (!uuidString.isEmpty()) { + try { + return UUID.fromString(uuidString); + } catch (IllegalArgumentException e) { + // String was malformed, so we will resort to generating a random UUID + } + } + // We have been provided with the client environment, but it didn't contain + // the variable; hence generate our own id. + return UUID.randomUUID(); + } + private void updateClientEnv(List> clientEnvList) { Preconditions.checkState(clientEnv.isEmpty()); @@ -251,21 +268,11 @@ private void updateClientEnv(List> clientEnvList) { for (Map.Entry entry : env) { clientEnv.put(entry.getKey(), entry.getValue()); } - // Try to set the clientId from the client environment. if (commandId == null) { - String uuidString = clientEnv.get("BAZEL_INTERNAL_INVOCATION_ID"); - if (uuidString != null) { - try { - commandId = UUID.fromString(uuidString); - } catch (IllegalArgumentException e) { - // String was malformed, so we will resort to generating a random UUID - } - } + commandId = getFromEnvOrGenerate("BAZEL_INTERNAL_INVOCATION_ID"); } - if (commandId == null) { - // We have been provided with the client environment, but it didn't contain - // the invocation id; hence generate our own. - commandId = UUID.randomUUID(); + if (buildRequestId == null) { + buildRequestId = getFromEnvOrGenerate("BAZEL_INTERNAL_BUILD_REQUEST_ID"); } setCommandIdInCrashData(); } @@ -301,14 +308,14 @@ public PackageRootResolver getPackageRootResolver() { * the build info. */ public UUID getCommandId() { - if (commandId == null) { - // The commandId should not be requested before the beforeCommand is executed, as the - // commandId might be set through the client environment. However, to simplify testing, - // we set the id value before we throw the exception. - commandId = UUID.randomUUID(); - throw new IllegalArgumentException("Build Id requested before client environment provided"); - } - return commandId; + return Preconditions.checkNotNull(commandId); + } + + /** + * Returns the UUID that Blaze uses to identify everything logged from the current build request. + */ + public UUID getBuildRequestId() { + return Preconditions.checkNotNull(buildRequestId); } public SkyframeExecutor getSkyframeExecutor() { diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java index 732afac1fa7a4f..e809b74b7e1314 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java @@ -14,22 +14,30 @@ package com.google.devtools.build.lib.remote; import static com.google.common.truth.Truth.assertThat; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.Assert.fail; import com.google.bytestream.ByteStreamGrpc; import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase; import com.google.bytestream.ByteStreamProto.WriteRequest; import com.google.bytestream.ByteStreamProto.WriteResponse; +import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import com.google.devtools.build.lib.analysis.BlazeVersionInfo; +import com.google.devtools.remoteexecution.v1test.Digest; +import com.google.devtools.remoteexecution.v1test.RequestMetadata; import com.google.protobuf.ByteString; +import io.grpc.BindableService; import io.grpc.Channel; +import io.grpc.Context; import io.grpc.Metadata; import io.grpc.Server; import io.grpc.ServerCall; import io.grpc.ServerCall.Listener; import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptors; import io.grpc.ServerServiceDefinition; import io.grpc.Status; import io.grpc.Status.Code; @@ -76,28 +84,36 @@ public class ByteStreamUploaderTest { private Server server; private Channel channel; + private Context withEmptyMetadata; @Mock private Retrier.Backoff mockBackoff; @Before - public void init() throws Exception { + public final void setUp() throws Exception { MockitoAnnotations.initMocks(this); String serverName = "Server for " + this.getClass(); server = InProcessServerBuilder.forName(serverName).fallbackHandlerRegistry(serviceRegistry) .build().start(); channel = InProcessChannelBuilder.forName(serverName).build(); + withEmptyMetadata = + TracingMetadataUtils.contextWithMetadata( + "none", "none", Digests.unsafeActionKeyFromDigest(Digest.getDefaultInstance())); + // Needs to be repeated in every test that uses the timeout setting, since the tests run + // on different threads than the setUp. + withEmptyMetadata.attach(); } @After - public void shutdown() { + public void tearDown() throws Exception { server.shutdownNow(); retryService.shutdownNow(); } @Test(timeout = 10000) public void singleBlobUploadShouldWork() throws Exception { + withEmptyMetadata.attach(); Retrier retrier = new Retrier(() -> mockBackoff, (Status s) -> true); ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService); @@ -166,6 +182,7 @@ public void onCompleted() { @Test(timeout = 20000) public void multipleBlobsUploadShouldWork() throws Exception { + withEmptyMetadata.attach(); Retrier retrier = new Retrier(() -> new FixedBackoff(1, 0), (Status s) -> true); ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService); @@ -251,10 +268,100 @@ public void onCompleted() { blockUntilInternalStateConsistent(uploader); } + @Test(timeout = 20000) + public void contextShouldBePreservedUponRetries() throws Exception { + withEmptyMetadata.attach(); + // We upload blobs with different context, and retry 3 times for each upload. + // We verify that the correct metadata is passed to the server with every blob. + Retrier retrier = new Retrier(() -> new FixedBackoff(3, 0), (Status s) -> true); + ByteStreamUploader uploader = + new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService); + + List toUpload = ImmutableList.of("aaaaaaaaaa", "bbbbbbbbbb", "cccccccccc"); + List builders = new ArrayList<>(toUpload.size()); + Map uploadsFailed = new HashMap<>(); + for (String s : toUpload) { + Chunker chunker = new Chunker(s.getBytes(UTF_8), /* chunkSize=*/ 3); + builders.add(chunker); + uploadsFailed.put(chunker.digest().getHash(), 0); + } + + BindableService bsService = + new ByteStreamImplBase() { + @Override + public StreamObserver write(StreamObserver response) { + return new StreamObserver() { + + private String digestHash; + + @Override + public void onNext(WriteRequest writeRequest) { + String resourceName = writeRequest.getResourceName(); + if (!resourceName.isEmpty()) { + String[] components = resourceName.split("/"); + assertThat(components).hasLength(6); + digestHash = components[4]; + } + assertThat(digestHash).isNotNull(); + RequestMetadata meta = TracingMetadataUtils.fromCurrentContext(); + assertThat(meta.getCorrelatedInvocationsId()).isEqualTo("build-req-id"); + assertThat(meta.getToolInvocationId()).isEqualTo("command-id"); + assertThat(meta.getActionId()).isEqualTo(digestHash); + assertThat(meta.getToolDetails().getToolName()).isEqualTo("bazel"); + assertThat(meta.getToolDetails().getToolVersion()) + .isEqualTo(BlazeVersionInfo.instance().getVersion()); + synchronized (this) { + Integer numFailures = uploadsFailed.get(digestHash); + if (numFailures < 3) { + uploadsFailed.put(digestHash, numFailures + 1); + response.onError(Status.INTERNAL.asException()); + return; + } + } + } + + @Override + public void onError(Throwable throwable) { + fail("onError should never be called."); + } + + @Override + public void onCompleted() { + response.onNext(WriteResponse.newBuilder().setCommittedSize(10).build()); + response.onCompleted(); + } + }; + } + }; + serviceRegistry.addService( + ServerInterceptors.intercept( + bsService, new TracingMetadataUtils.ServerHeadersInterceptor())); + + List> uploads = new ArrayList<>(); + + for (Chunker chunker : builders) { + Context ctx = + TracingMetadataUtils.contextWithMetadata( + "build-req-id", "command-id", Digests.unsafeActionKeyFromDigest(chunker.digest())); + ctx.call( + () -> { + uploads.add(uploader.uploadBlobAsync(chunker)); + return null; + }); + } + + for (ListenableFuture upload : uploads) { + upload.get(); + } + + blockUntilInternalStateConsistent(uploader); + } + @Test(timeout = 10000) public void sameBlobShouldNotBeUploadedTwice() throws Exception { // Test that uploading the same file concurrently triggers only one file upload. + withEmptyMetadata.attach(); Retrier retrier = new Retrier(() -> mockBackoff, (Status s) -> true); ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService); @@ -313,6 +420,7 @@ public void onCompleted() { @Test(timeout = 10000) public void errorsShouldBeReported() throws IOException, InterruptedException { + withEmptyMetadata.attach(); Retrier retrier = new Retrier(() -> new FixedBackoff(1, 10), (Status s) -> true); ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService); @@ -339,6 +447,7 @@ public StreamObserver write(StreamObserver response @Test(timeout = 10000) public void shutdownShouldCancelOngoingUploads() throws Exception { + withEmptyMetadata.attach(); Retrier retrier = new Retrier(() -> new FixedBackoff(1, 10), (Status s) -> true); ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService); @@ -390,6 +499,7 @@ public void onCancel() { @Test(timeout = 10000) public void failureInRetryExecutorShouldBeHandled() throws Exception { + withEmptyMetadata.attach(); Retrier retrier = new Retrier(() -> new FixedBackoff(1, 10), (Status s) -> true); ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService); @@ -420,6 +530,7 @@ public StreamObserver write(StreamObserver response @Test(timeout = 10000) public void resourceNameWithoutInstanceName() throws Exception { + withEmptyMetadata.attach(); Retrier retrier = new Retrier(() -> mockBackoff, (Status s) -> true); ByteStreamUploader uploader = new ByteStreamUploader(/* instanceName */ null, channel, null, 3, retrier, retryService); @@ -456,6 +567,7 @@ public void onCompleted() { @Test(timeout = 10000) public void nonRetryableStatusShouldNotBeRetried() throws Exception { + withEmptyMetadata.attach(); Retrier retrier = new Retrier(() -> new FixedBackoff(1, 0), /* No Status is retriable. */ (Status s) -> false); ByteStreamUploader uploader = diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java index fe31aa729db316..5cc03be8240ea3 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java @@ -54,6 +54,7 @@ import io.grpc.ClientCall; import io.grpc.ClientInterceptor; import io.grpc.ClientInterceptors; +import io.grpc.Context; import io.grpc.MethodDescriptor; import io.grpc.Server; import io.grpc.Status; @@ -103,6 +104,10 @@ public final void setUp() throws Exception { FileSystemUtils.createDirectoryAndParents(stdout.getParentDirectory()); FileSystemUtils.createDirectoryAndParents(stderr.getParentDirectory()); outErr = new FileOutErr(stdout, stderr); + Context withEmptyMetadata = + TracingMetadataUtils.contextWithMetadata( + "none", "none", Digests.unsafeActionKeyFromDigest(Digest.getDefaultInstance())); + withEmptyMetadata.attach(); } @After diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java index c5d5b66f62261a..de6d50f908907f 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java @@ -34,6 +34,7 @@ import com.google.devtools.build.lib.actions.ExecException; import com.google.devtools.build.lib.actions.ResourceSet; import com.google.devtools.build.lib.actions.SimpleSpawn; +import com.google.devtools.build.lib.analysis.BlazeVersionInfo; import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions; import com.google.devtools.build.lib.authandtls.GrpcUtils; import com.google.devtools.build.lib.exec.SpawnExecException; @@ -61,6 +62,7 @@ import com.google.devtools.remoteexecution.v1test.FindMissingBlobsRequest; import com.google.devtools.remoteexecution.v1test.FindMissingBlobsResponse; import com.google.devtools.remoteexecution.v1test.GetActionResultRequest; +import com.google.devtools.remoteexecution.v1test.RequestMetadata; import com.google.longrunning.Operation; import com.google.protobuf.Any; import com.google.protobuf.ByteString; @@ -69,9 +71,15 @@ import com.google.watcher.v1.ChangeBatch; import com.google.watcher.v1.Request; import com.google.watcher.v1.WatcherGrpc.WatcherImplBase; +import io.grpc.BindableService; import io.grpc.CallCredentials; import io.grpc.Channel; +import io.grpc.Metadata; import io.grpc.Server; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.ServerInterceptors; import io.grpc.Status; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; @@ -229,8 +237,17 @@ public PathFragment getExecPath() { GrpcUtils.newCallCredentials(Options.getDefaults(AuthAndTLSOptions.class)); GrpcRemoteCache remoteCache = new GrpcRemoteCache(channel, creds, options, retrier); - client = new RemoteSpawnRunner(execRoot, options, null, true, /*cmdlineReporter=*/null, - remoteCache, executor); + client = + new RemoteSpawnRunner( + execRoot, + options, + null, + true, + /*cmdlineReporter=*/ null, + "build-req-id", + "command-id", + remoteCache, + executor); inputDigest = fakeFileCache.createScratchInput(simpleSpawn.getInputFiles().get(0), "xyz"); } @@ -364,22 +381,42 @@ public void onError(Throwable t) { }; } + /** Capture the request headers from a client. Useful for testing metadata propagation. */ + private static class RequestHeadersValidator implements ServerInterceptor { + @Override + public ServerCall.Listener interceptCall( + ServerCall call, + Metadata headers, + ServerCallHandler next) { + RequestMetadata meta = headers.get(TracingMetadataUtils.METADATA_KEY); + assertThat(meta.getCorrelatedInvocationsId()).isEqualTo("build-req-id"); + assertThat(meta.getToolInvocationId()).isEqualTo("command-id"); + assertThat(meta.getActionId()).isNotEmpty(); + assertThat(meta.getToolDetails().getToolName()).isEqualTo("bazel"); + assertThat(meta.getToolDetails().getToolVersion()) + .isEqualTo(BlazeVersionInfo.instance().getVersion()); + return next.startCall(call, headers); + } + } + @Test public void remotelyExecute() throws Exception { - serviceRegistry.addService( + BindableService actionCache = new ActionCacheImplBase() { @Override public void getActionResult( GetActionResultRequest request, StreamObserver responseObserver) { responseObserver.onError(Status.NOT_FOUND.asRuntimeException()); } - }); + }; + serviceRegistry.addService( + ServerInterceptors.intercept(actionCache, new RequestHeadersValidator())); final ActionResult actionResult = ActionResult.newBuilder() .setStdoutRaw(ByteString.copyFromUtf8("stdout")) .setStderrRaw(ByteString.copyFromUtf8("stderr")) .build(); - serviceRegistry.addService( + BindableService execService = new ExecutionImplBase() { @Override public void execute(ExecuteRequest request, StreamObserver responseObserver) { @@ -395,7 +432,9 @@ public void execute(ExecuteRequest request, StreamObserver responseOb .build()); responseObserver.onCompleted(); } - }); + }; + serviceRegistry.addService( + ServerInterceptors.intercept(execService, new RequestHeadersValidator())); final Command command = Command.newBuilder() .addAllArguments(ImmutableList.of("/bin/echo", "Hi!")) @@ -406,7 +445,7 @@ public void execute(ExecuteRequest request, StreamObserver responseOb .build()) .build(); final Digest cmdDigest = Digests.computeDigest(command); - serviceRegistry.addService( + BindableService cas = new ContentAddressableStorageImplBase() { @Override public void findMissingBlobs( @@ -424,13 +463,15 @@ public void findMissingBlobs( responseObserver.onNext(b.build()); responseObserver.onCompleted(); } - }); + }; + serviceRegistry.addService(ServerInterceptors.intercept(cas, new RequestHeadersValidator())); ByteStreamImplBase mockByteStreamImpl = Mockito.mock(ByteStreamImplBase.class); when(mockByteStreamImpl.write(Mockito.>anyObject())) .thenAnswer(blobWriteAnswer(command.toByteArray())) .thenAnswer(blobWriteAnswer("xyz".getBytes(UTF_8))); - serviceRegistry.addService(mockByteStreamImpl); + serviceRegistry.addService( + ServerInterceptors.intercept(mockByteStreamImpl, new RequestHeadersValidator())); SpawnResult result = client.exec(simpleSpawn, simplePolicy); assertThat(result.setupSuccess()).isTrue(); @@ -563,7 +604,8 @@ public void getActionResult( }) .when(mockWatcherImpl) .watch(Mockito.anyObject(), Mockito.>anyObject()); - serviceRegistry.addService(mockWatcherImpl); + serviceRegistry.addService( + ServerInterceptors.intercept(mockWatcherImpl, new RequestHeadersValidator())); final Command command = Command.newBuilder() .addAllArguments(ImmutableList.of("/bin/echo", "Hi!")) diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnCacheTest.java index 160c20fa6d00cb..b1e34f51feccf2 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnCacheTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnCacheTest.java @@ -166,7 +166,9 @@ public final void setUp() throws Exception { Reporter reporter = new Reporter(new EventBus()); eventHandler = new StoredEventHandler(); reporter.addHandler(eventHandler); - cache = new RemoteSpawnCache(execRoot, options, remoteCache, false, reporter); + cache = + new RemoteSpawnCache( + execRoot, options, remoteCache, "build-req-id", "command-id", false, reporter); fakeFileCache.createScratchInput(simpleSpawn.getInputFiles().get(0), "xyz"); } diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java index 940c9d3afdeac8..bf9dee024c312c 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java @@ -121,8 +121,16 @@ public void nonCachableSpawnsShouldNotBeCached_remote() throws Exception { options.remoteUploadLocalResults = true; RemoteSpawnRunner runner = - new RemoteSpawnRunner(execRoot, options, localRunner, true, /*cmdlineReporter=*/null, - cache, executor); + new RemoteSpawnRunner( + execRoot, + options, + localRunner, + true, + /*cmdlineReporter=*/ null, + "build-req-id", + "command-id", + cache, + executor); ExecuteResponse succeeded = ExecuteResponse.newBuilder().setResult( ActionResult.newBuilder().setExitCode(0).build()).build(); @@ -169,8 +177,16 @@ public void nonCachableSpawnsShouldNotBeCached_local() throws Exception { options.remoteUploadLocalResults = true; RemoteSpawnRunner runner = - new RemoteSpawnRunner(execRoot, options, localRunner, true, /*cmdlineReporter=*/null, - cache, null); + new RemoteSpawnRunner( + execRoot, + options, + localRunner, + true, + /*cmdlineReporter=*/ null, + "build-req-id", + "command-id", + cache, + null); // Throw an IOException to trigger the local fallback. when(executor.executeRemotely(any(ExecuteRequest.class))).thenThrow(IOException.class); @@ -211,8 +227,17 @@ public void failedActionShouldOnlyUploadOutputs() throws Exception { options.remoteUploadLocalResults = true; RemoteSpawnRunner runner = - spy(new RemoteSpawnRunner(execRoot, options, localRunner, true, /*cmdlineReporter=*/null, - cache, null)); + spy( + new RemoteSpawnRunner( + execRoot, + options, + localRunner, + true, + /*cmdlineReporter=*/ null, + "build-req-id", + "command-id", + cache, + null)); Spawn spawn = newSimpleSpawn(); SpawnExecutionPolicy policy = new FakeSpawnExecutionPolicy(spawn); @@ -249,8 +274,17 @@ public void dontAcceptFailedCachedAction() throws Exception { SpawnExecutionPolicy policy = new FakeSpawnExecutionPolicy(spawn); RemoteSpawnRunner runner = - spy(new RemoteSpawnRunner(execRoot, options, localRunner, true, /*cmdlineReporter=*/null, - cache, null)); + spy( + new RemoteSpawnRunner( + execRoot, + options, + localRunner, + true, + /*cmdlineReporter=*/ null, + "build-req-id", + "command-id", + cache, + null)); try { runner.exec(spawn, policy); @@ -274,7 +308,16 @@ public void printWarningIfCacheIsDown() throws Exception { reporter.addHandler(eventHandler); RemoteSpawnRunner runner = - new RemoteSpawnRunner(execRoot, options, localRunner, false, reporter, cache, null); + new RemoteSpawnRunner( + execRoot, + options, + localRunner, + false, + reporter, + "build-req-id", + "command-id", + cache, + null); Spawn spawn = newSimpleSpawn(); SpawnExecutionPolicy policy = new FakeSpawnExecutionPolicy(spawn); @@ -315,8 +358,16 @@ public void fallbackFails() throws Exception { options.remoteLocalFallback = true; RemoteSpawnRunner runner = - new RemoteSpawnRunner(execRoot, options, localRunner, true, /*cmdlineReporter=*/null, - cache, null); + new RemoteSpawnRunner( + execRoot, + options, + localRunner, + true, + /*cmdlineReporter=*/ null, + "build-req-id", + "command-id", + cache, + null); Spawn spawn = newSimpleSpawn(); SpawnExecutionPolicy policy = new FakeSpawnExecutionPolicy(spawn); @@ -343,8 +394,16 @@ public void cacheDownloadFailureTriggersRemoteExecution() throws Exception { RemoteOptions options = Options.getDefaults(RemoteOptions.class); RemoteSpawnRunner runner = - new RemoteSpawnRunner(execRoot, options, localRunner, true, /*cmdlineReporter=*/null, - cache, executor); + new RemoteSpawnRunner( + execRoot, + options, + localRunner, + true, + /*cmdlineReporter=*/ null, + "build-req-id", + "command-id", + cache, + executor); ActionResult cachedResult = ActionResult.newBuilder().setExitCode(0).build(); when(cache.getCachedActionResult(any(ActionKey.class))).thenReturn(cachedResult); @@ -375,8 +434,16 @@ public void testRemoteExecutionTimeout() throws Exception { options.remoteLocalFallback = false; RemoteSpawnRunner runner = - new RemoteSpawnRunner(execRoot, options, localRunner, true, /*cmdlineReporter=*/null, - cache, executor); + new RemoteSpawnRunner( + execRoot, + options, + localRunner, + true, + /*cmdlineReporter=*/ null, + "build-req-id", + "command-id", + cache, + executor); ActionResult cachedResult = ActionResult.newBuilder().setExitCode(0).build(); when(cache.getCachedActionResult(any(ActionKey.class))).thenReturn(null); @@ -402,8 +469,16 @@ public void testExitCode_executorfailure() throws Exception { options.remoteLocalFallback = false; RemoteSpawnRunner runner = - new RemoteSpawnRunner(execRoot, options, localRunner, true, /*cmdlineReporter=*/null, - cache, executor); + new RemoteSpawnRunner( + execRoot, + options, + localRunner, + true, + /*cmdlineReporter=*/ null, + "build-req-id", + "command-id", + cache, + executor); when(cache.getCachedActionResult(any(ActionKey.class))).thenReturn(null); when(executor.executeRemotely(any(ExecuteRequest.class))).thenThrow(new IOException()); @@ -429,8 +504,16 @@ public void testExitCode_executionfailure() throws Exception { options.remoteLocalFallback = false; RemoteSpawnRunner runner = - new RemoteSpawnRunner(execRoot, options, localRunner, true, /*cmdlineReporter=*/null, - cache, executor); + new RemoteSpawnRunner( + execRoot, + options, + localRunner, + true, + /*cmdlineReporter=*/ null, + "build-req-id", + "command-id", + cache, + executor); when(cache.getCachedActionResult(any(ActionKey.class))).thenThrow(new IOException()); diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/BUILD b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/BUILD index af85891ca9cbb2..8f36c4d0a8ffe8 100644 --- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/BUILD +++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/BUILD @@ -12,18 +12,13 @@ java_library( visibility = ["//src/tools/remote_worker:__subpackages__"], deps = [ "//src/main/java/com/google/devtools/build/lib:build-base", - "//src/main/java/com/google/devtools/build/lib:events", - "//src/main/java/com/google/devtools/build/lib:io", "//src/main/java/com/google/devtools/build/lib:os_util", "//src/main/java/com/google/devtools/build/lib:packages-internal", "//src/main/java/com/google/devtools/build/lib:process_util", - "//src/main/java/com/google/devtools/build/lib:runtime", "//src/main/java/com/google/devtools/build/lib:single-line-formatter", "//src/main/java/com/google/devtools/build/lib:unix", "//src/main/java/com/google/devtools/build/lib:util", "//src/main/java/com/google/devtools/build/lib/actions", - "//src/main/java/com/google/devtools/build/lib/authandtls", - "//src/main/java/com/google/devtools/build/lib/concurrent", "//src/main/java/com/google/devtools/build/lib/remote", "//src/main/java/com/google/devtools/build/lib/shell", "//src/main/java/com/google/devtools/build/lib/vfs", diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ExecutionServer.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ExecutionServer.java index 9ed769ceb53e04..83b003a74c622a 100644 --- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ExecutionServer.java +++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ExecutionServer.java @@ -28,6 +28,7 @@ import com.google.devtools.build.lib.remote.Digests; import com.google.devtools.build.lib.remote.Digests.ActionKey; import com.google.devtools.build.lib.remote.SimpleBlobStoreActionCache; +import com.google.devtools.build.lib.remote.TracingMetadataUtils; import com.google.devtools.build.lib.shell.AbnormalTerminationException; import com.google.devtools.build.lib.shell.Command; import com.google.devtools.build.lib.shell.CommandException; @@ -41,10 +42,12 @@ import com.google.devtools.remoteexecution.v1test.ExecuteRequest; import com.google.devtools.remoteexecution.v1test.ExecutionGrpc.ExecutionImplBase; import com.google.devtools.remoteexecution.v1test.Platform; +import com.google.devtools.remoteexecution.v1test.RequestMetadata; import com.google.longrunning.Operation; import com.google.protobuf.util.Durations; import com.google.rpc.Code; import com.google.rpc.Status; +import io.grpc.Context; import io.grpc.StatusException; import io.grpc.protobuf.StatusProto; import io.grpc.stub.StreamObserver; @@ -59,7 +62,6 @@ import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -123,12 +125,8 @@ public ExecutionServer( @Override public void execute(ExecuteRequest request, StreamObserver responseObserver) { final String opName = UUID.randomUUID().toString(); - ListenableFuture future = executorService.submit(new Callable() { - @Override - public ActionResult call() throws Exception { - return execute(request, opName); - } - }); + ListenableFuture future = + executorService.submit(Context.current().wrap(() -> execute(request, opName))); operationsCache.put(opName, future); responseObserver.onNext(Operation.newBuilder().setName(opName).build()); responseObserver.onCompleted(); @@ -137,19 +135,20 @@ public ActionResult call() throws Exception { private ActionResult execute(ExecuteRequest request, String id) throws IOException, InterruptedException, StatusException { Path tempRoot = workPath.getRelative("build-" + id); + String workDetails = ""; try { tempRoot.createDirectory(); - logger.log( - FINE, - "Work received has {0} input files and {1} output files.", - new Object[]{ - request.getTotalInputFileCount(), request.getAction().getOutputFilesCount() - }); + RequestMetadata meta = TracingMetadataUtils.fromCurrentContext(); + workDetails = + String.format( + "build-request-id: %s command-id: %s action-id: %s", + meta.getCorrelatedInvocationsId(), meta.getToolInvocationId(), meta.getActionId()); + logger.log(FINE, "Received work for: {0}", workDetails); ActionResult result = execute(request.getAction(), tempRoot); - logger.log(FINE, "Completed {0}.", id); + logger.log(FINE, "Completed {0}.", workDetails); return result; } catch (Exception e) { - logger.log(Level.SEVERE, "Work failed.", e); + logger.log(Level.SEVERE, "Work failed: {0} {1}.", new Object[] {workDetails, e}); throw e; } finally { if (workerOptions.debug) { diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java index 8d47d5ccd2e8e1..a459cc14bf798f 100644 --- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java +++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java @@ -27,6 +27,7 @@ import com.google.devtools.build.lib.remote.RemoteOptions; import com.google.devtools.build.lib.remote.SimpleBlobStoreActionCache; import com.google.devtools.build.lib.remote.SimpleBlobStoreFactory; +import com.google.devtools.build.lib.remote.TracingMetadataUtils; import com.google.devtools.build.lib.remote.blobstore.ConcurrentMapBlobStore; import com.google.devtools.build.lib.remote.blobstore.OnDiskBlobStore; import com.google.devtools.build.lib.remote.blobstore.SimpleBlobStore; @@ -49,6 +50,8 @@ import com.google.devtools.remoteexecution.v1test.ExecutionGrpc.ExecutionImplBase; import com.google.watcher.v1.WatcherGrpc.WatcherImplBase; import io.grpc.Server; +import io.grpc.ServerInterceptor; +import io.grpc.ServerInterceptors; import io.grpc.netty.NettyServerBuilder; import java.io.FileOutputStream; import java.io.IOException; @@ -121,15 +124,16 @@ public RemoteWorker( } public Server startServer() throws IOException { + ServerInterceptor headersInterceptor = new TracingMetadataUtils.ServerHeadersInterceptor(); NettyServerBuilder b = NettyServerBuilder.forPort(workerOptions.listenPort) - .addService(actionCacheServer) - .addService(bsServer) - .addService(casServer); + .addService(ServerInterceptors.intercept(actionCacheServer, headersInterceptor)) + .addService(ServerInterceptors.intercept(bsServer, headersInterceptor)) + .addService(ServerInterceptors.intercept(casServer, headersInterceptor)); if (execServer != null) { - b.addService(execServer); - b.addService(watchServer); + b.addService(ServerInterceptors.intercept(execServer, headersInterceptor)); + b.addService(ServerInterceptors.intercept(watchServer, headersInterceptor)); } else { logger.info("Execution disabled, only serving cache requests."); }