Skip to content

Commit

Permalink
Passing Bazel metadata in gRPC headers.
Browse files Browse the repository at this point in the history
TESTED=unit tests
RELNOTES: none
PiperOrigin-RevId: 169395919
  • Loading branch information
olaola authored and laszlocsomor committed Sep 21, 2017
1 parent e6b6d7c commit 6f32d5a
Show file tree
Hide file tree
Showing 19 changed files with 620 additions and 207 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import com.google.devtools.common.options.OptionsProvider;
import java.io.IOException;
import java.util.Set;
import java.util.UUID;
import java.util.logging.Logger;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -86,7 +85,7 @@ public void beforeCommand(CommandEnvironment commandEnvironment)
commandEnvironment.getRuntime().getClock(),
commandEnvironment.getRuntime().getPathToUriConverter(),
commandEnvironment.getReporter(),
commandEnvironment.getClientEnv().get("BAZEL_INTERNAL_BUILD_REQUEST_ID"),
commandEnvironment.getBuildRequestId().toString(),
commandEnvironment.getCommandId().toString(),
commandEnvironment.getCommandName());
if (streamer != null) {
Expand Down Expand Up @@ -126,8 +125,6 @@ public void afterCommand() {

/**
* Returns {@code null} if no stream could be created.
*
* @param buildRequestId if {@code null} or {@code ""} a random UUID is used instead.
*/
@Nullable
@VisibleForTesting
Expand Down Expand Up @@ -213,9 +210,6 @@ private BuildEventTransport tryCreateBesTransport(
logger.fine(format("Will create BuildEventServiceTransport streaming to '%s'",
besOptions.besBackend));

buildRequestId = isNullOrEmpty(buildRequestId)
? UUID.randomUUID().toString()
: buildRequestId;
commandLineReporter.handle(
Event.info(
format(
Expand Down
1 change: 0 additions & 1 deletion src/main/java/com/google/devtools/build/lib/remote/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/concurrent",
"//src/main/java/com/google/devtools/build/lib/exec/apple",
"//src/main/java/com/google/devtools/build/lib/exec/local",
"//src/main/java/com/google/devtools/build/lib/standalone",
"//src/main/java/com/google/devtools/build/lib/vfs",
"//src/main/java/com/google/devtools/common/options",
"//third_party:apache_httpclient",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.Context;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusException;
Expand Down Expand Up @@ -257,9 +258,11 @@ private void retryAsyncUpload(
try {
ListenableScheduledFuture<?> schedulingResult =
retryService.schedule(
() ->
startAsyncUploadWithRetry(
chunker, backoffTimes, overallUploadResult),
Context.current()
.wrap(
() ->
startAsyncUploadWithRetry(
chunker, backoffTimes, overallUploadResult)),
nextDelayMillis,
MILLISECONDS);
// In case the scheduled execution errors, we need to notify the overallUploadResult.
Expand Down Expand Up @@ -418,7 +421,7 @@ private String newResourceName(Digest digest) {
return resourceName;
}
};
call.start(callListener, new Metadata());
call.start(callListener, TracingMetadataUtils.headersFromCurrentContext());
call.request(1);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,21 @@ public GrpcRemoteCache(Channel channel, CallCredentials credentials, RemoteOptio

private ContentAddressableStorageBlockingStub casBlockingStub() {
return ContentAddressableStorageGrpc.newBlockingStub(channel)
.withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor())
.withCallCredentials(credentials)
.withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
}

private ByteStreamBlockingStub bsBlockingStub() {
return ByteStreamGrpc.newBlockingStub(channel)
.withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor())
.withCallCredentials(credentials)
.withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
}

private ActionCacheBlockingStub acBlockingStub() {
return ActionCacheGrpc.newBlockingStub(channel)
.withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor())
.withCallCredentials(credentials)
.withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,14 @@ public GrpcRemoteExecutor(Channel channel, @Nullable CallCredentials callCredent

private ExecutionBlockingStub execBlockingStub() {
return ExecutionGrpc.newBlockingStub(channel)
.withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor())
.withCallCredentials(callCredentials)
.withDeadlineAfter(callTimeoutSecs, TimeUnit.SECONDS);
}

private WatcherBlockingStub watcherBlockingStub() {
return WatcherGrpc.newBlockingStub(channel)
.withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor())
.withCallCredentials(callCredentials);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,32 @@ public Iterable<? extends ActionContext> getActionContexts() {
ExecutionOptions executionOptions =
checkNotNull(env.getOptions().getOptions(ExecutionOptions.class));
RemoteOptions remoteOptions = checkNotNull(env.getOptions().getOptions(RemoteOptions.class));
String buildRequestId = env.getBuildRequestId().toString();
String commandId = env.getCommandId().toString();

if (remoteOptions.experimentalRemoteSpawnCache) {
RemoteSpawnCache spawnCache = new RemoteSpawnCache(env.getExecRoot(), remoteOptions, cache,
executionOptions.verboseFailures, env.getReporter());
RemoteSpawnCache spawnCache =
new RemoteSpawnCache(
env.getExecRoot(),
remoteOptions,
cache,
buildRequestId,
commandId,
executionOptions.verboseFailures,
env.getReporter());
return ImmutableList.of(spawnCache);
} else {
RemoteSpawnRunner spawnRunner = new RemoteSpawnRunner(
env.getExecRoot(),
remoteOptions,
createFallbackRunner(env),
executionOptions.verboseFailures,
env.getReporter(),
cache,
executor);
RemoteSpawnRunner spawnRunner =
new RemoteSpawnRunner(
env.getExecRoot(),
remoteOptions,
createFallbackRunner(env),
executionOptions.verboseFailures,
env.getReporter(),
buildRequestId,
commandId,
cache,
executor);
return ImmutableList.of(new RemoteSpawnStrategy(spawnRunner));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@
import io.grpc.CallCredentials;
import io.grpc.Channel;
import java.io.IOException;
import java.util.logging.Logger;

/** RemoteModule provides distributed cache and remote execution for Bazel. */
public final class RemoteModule extends BlazeModule {
private static final Logger logger = Logger.getLogger(RemoteModule.class.getName());

@VisibleForTesting
static final class CasPathConverter implements PathConverter {
// Not final; unfortunately, the Bazel startup process requires us to create this object before
Expand Down Expand Up @@ -89,7 +92,9 @@ public void serverInit(OptionsProvider startupOptions, ServerBuilder builder)
@Override
public void beforeCommand(CommandEnvironment env) {
env.getEventBus().register(this);

String buildRequestId = env.getBuildRequestId().toString();
String commandId = env.getCommandId().toString();
logger.info("Command: buildRequestId = " + buildRequestId + ", commandId = " + commandId);
RemoteOptions remoteOptions = env.getOptions().getOptions(RemoteOptions.class);
AuthAndTLSOptions authAndTlsOptions = env.getOptions().getOptions(AuthAndTLSOptions.class);
converter.options = remoteOptions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.devtools.remoteexecution.v1test.ActionResult;
import com.google.devtools.remoteexecution.v1test.Command;
import com.google.devtools.remoteexecution.v1test.Platform;
import io.grpc.Context;
import java.io.IOException;
import java.util.Collection;
import java.util.NoSuchElementException;
Expand All @@ -54,21 +55,31 @@ final class RemoteSpawnCache implements SpawnCache {
private final Platform platform;

private final RemoteActionCache remoteCache;
private final String buildRequestId;
private final String commandId;
private final boolean verboseFailures;

@Nullable private final Reporter cmdlineReporter;

// Used to ensure that a warning is reported only once.
private final AtomicBoolean warningReported = new AtomicBoolean();

RemoteSpawnCache(Path execRoot, RemoteOptions options, RemoteActionCache remoteCache,
boolean verboseFailures, @Nullable Reporter cmdlineReporter) {
RemoteSpawnCache(
Path execRoot,
RemoteOptions options,
RemoteActionCache remoteCache,
String buildRequestId,
String commandId,
boolean verboseFailures,
@Nullable Reporter cmdlineReporter) {
this.execRoot = execRoot;
this.options = options;
this.platform = options.parseRemotePlatformOverride();
this.remoteCache = remoteCache;
this.verboseFailures = verboseFailures;
this.cmdlineReporter = cmdlineReporter;
this.buildRequestId = buildRequestId;
this.commandId = commandId;
}

@Override
Expand All @@ -91,61 +102,70 @@ public CacheHandle lookup(Spawn spawn, SpawnExecutionPolicy policy)

// Look up action cache, and reuse the action output if it is found.
final ActionKey actionKey = Digests.computeActionKey(action);
ActionResult result =
this.options.remoteAcceptCached ? remoteCache.getCachedActionResult(actionKey) : null;
if (result != null) {
// We don't cache failed actions, so we know the outputs exist.
// For now, download all outputs locally; in the future, we can reuse the digests to
// just update the TreeNodeRepository and continue the build.
try {
remoteCache.download(result, execRoot, policy.getFileOutErr());
SpawnResult spawnResult = new SpawnResult.Builder()
.setStatus(Status.SUCCESS)
.setExitCode(result.getExitCode())
.build();
return SpawnCache.success(spawnResult);
} catch (CacheNotFoundException e) {
// There's a cache miss. Fall back to local execution.
}
}
if (options.remoteUploadLocalResults) {
return new CacheHandle() {
@Override
public boolean hasResult() {
return false;
Context withMetadata =
TracingMetadataUtils.contextWithMetadata(buildRequestId, commandId, actionKey);
// Metadata will be available in context.current() until we detach.
// This is done via a thread-local variable.
Context previous = withMetadata.attach();
try {
ActionResult result =
this.options.remoteAcceptCached ? remoteCache.getCachedActionResult(actionKey) : null;
if (result != null) {
// We don't cache failed actions, so we know the outputs exist.
// For now, download all outputs locally; in the future, we can reuse the digests to
// just update the TreeNodeRepository and continue the build.
try {
remoteCache.download(result, execRoot, policy.getFileOutErr());
SpawnResult spawnResult =
new SpawnResult.Builder()
.setStatus(Status.SUCCESS)
.setExitCode(result.getExitCode())
.build();
return SpawnCache.success(spawnResult);
} catch (CacheNotFoundException e) {
// There's a cache miss. Fall back to local execution.
}
}
if (options.remoteUploadLocalResults) {
return new CacheHandle() {
@Override
public boolean hasResult() {
return false;
}

@Override
public SpawnResult getResult() {
throw new NoSuchElementException();
}
@Override
public SpawnResult getResult() {
throw new NoSuchElementException();
}

@Override
public boolean willStore() {
return true;
}
@Override
public boolean willStore() {
return true;
}

@Override
public void store(SpawnResult result, Collection<Path> files)
throws InterruptedException, IOException {
try {
@Override
public void store(SpawnResult result, Collection<Path> files)
throws InterruptedException, IOException {
boolean uploadAction = Status.SUCCESS.equals(result.status()) && result.exitCode() == 0;
remoteCache.upload(actionKey, execRoot, files, policy.getFileOutErr(), uploadAction);
} catch (IOException e) {
if (verboseFailures) {
report(Event.debug("Upload to remote cache failed: " + e.getMessage()));
} else {
reportOnce(Event.warn("Some artifacts failed be uploaded to the remote cache."));
try {
remoteCache.upload(actionKey, execRoot, files, policy.getFileOutErr(), uploadAction);
} catch (IOException e) {
if (verboseFailures) {
report(Event.debug("Upload to remote cache failed: " + e.getMessage()));
} else {
reportOnce(Event.warn("Some artifacts failed be uploaded to the remote cache."));
}
}
}
}

@Override
public void close() {
}
};
} else {
return SpawnCache.NO_RESULT_NO_STORE;
@Override
public void close() {}
};
} else {
return SpawnCache.NO_RESULT_NO_STORE;
}
} finally {
withMetadata.detach(previous);
}
}

Expand Down
Loading

0 comments on commit 6f32d5a

Please sign in to comment.