Skip to content

Commit

Permalink
Remote: Fix performance regression in "upload missing inputs".
Browse files Browse the repository at this point in the history
  • Loading branch information
coeuvre committed Jul 15, 2022
1 parent 8e03d82 commit cee1e0e
Showing 1 changed file with 103 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
// limitations under the License.
package com.google.devtools.build.lib.remote;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.devtools.build.lib.remote.util.RxFutures.toCompletable;
import static com.google.devtools.build.lib.remote.util.RxFutures.toSingle;
Expand All @@ -26,8 +24,11 @@
import build.bazel.remote.execution.v2.Directory;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.profiler.Profiler;
import com.google.devtools.build.lib.profiler.SilentCloseable;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
Expand All @@ -38,14 +39,15 @@
import com.google.protobuf.Message;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.subjects.AsyncSubject;
import java.io.IOException;
import java.util.HashSet;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.concurrent.GuardedBy;
import java.util.stream.Collectors;

/** A {@link RemoteCache} with additional functionality needed for remote execution. */
public class RemoteExecutionCache extends RemoteCache {
Expand Down Expand Up @@ -85,13 +87,14 @@ public void ensureInputsPresent(
return;
}

MissingDigestFinder missingDigestFinder = new MissingDigestFinder(context, allDigests.size());
Single<List<NewDigest>> newDigests = findNewDigests(allDigests, force);

Single<List<NewDigest>> missingNewDigests =
newDigests.flatMap(digests -> findMissingNewDigests(context, digests));

Flowable<TransferResult> uploads =
Flowable.fromIterable(allDigests)
.flatMapSingle(
digest ->
uploadBlobIfMissing(
context, merkleTree, additionalInputs, force, missingDigestFinder, digest));
missingNewDigests.flatMapPublisher(
digests -> uploadNewDigests(context, merkleTree, additionalInputs, digests));

try {
mergeBulkTransfer(uploads).blockingAwait();
Expand All @@ -105,36 +108,6 @@ public void ensureInputsPresent(
}
}

private Single<TransferResult> uploadBlobIfMissing(
RemoteActionExecutionContext context,
MerkleTree merkleTree,
Map<Digest, Message> additionalInputs,
boolean force,
MissingDigestFinder missingDigestFinder,
Digest digest) {
Completable upload =
casUploadCache.execute(
digest,
Completable.defer(
() ->
// Only reach here if the digest is missing and is not being uploaded.
missingDigestFinder
.registerAndCount(digest)
.flatMapCompletable(
missingDigests -> {
if (missingDigests.contains(digest)) {
return toCompletable(
() -> uploadBlob(context, digest, merkleTree, additionalInputs),
directExecutor());
} else {
return Completable.complete();
}
})),
/* onIgnored= */ missingDigestFinder::count,
force);
return toTransferResult(upload);
}

private ListenableFuture<Void> uploadBlob(
RemoteActionExecutionContext context,
Digest digest,
Expand Down Expand Up @@ -165,92 +138,98 @@ private ListenableFuture<Void> uploadBlob(
digest)));
}

/**
* A missing digest finder that initiates the request when the internal counter reaches an
* expected count.
*/
class MissingDigestFinder {
private final int expectedCount;

private final AsyncSubject<ImmutableSet<Digest>> digestsSubject;
private final Single<ImmutableSet<Digest>> resultSingle;

@GuardedBy("this")
private final Set<Digest> digests;

@GuardedBy("this")
private int currentCount = 0;

MissingDigestFinder(RemoteActionExecutionContext context, int expectedCount) {
checkArgument(expectedCount > 0, "expectedCount should be greater than 0");
this.expectedCount = expectedCount;
this.digestsSubject = AsyncSubject.create();
this.digests = new HashSet<>();
static class NewDigest {
Digest digest;
AsyncSubject<Void> continueSubject;
Completable completion;
}

AtomicBoolean findMissingDigestsCalled = new AtomicBoolean(false);
this.resultSingle =
Single.fromObservable(
digestsSubject
.flatMapSingle(
digests -> {
boolean wasCalled = findMissingDigestsCalled.getAndSet(true);
// Make sure we don't have re-subscription caused by refCount() below.
checkState(!wasCalled, "FindMissingDigests is called more than once");
return toSingle(
() -> findMissingDigests(context, digests), directExecutor());
})
// Use replay here because we could have a race condition that downstream hasn't
// been added to the subscription list (to receive the upstream result) while
// upstream is completed.
.replay(1)
.refCount());
}
private Single<List<NewDigest>> findNewDigests(Iterable<Digest> allDigests, boolean force) {
return Single.using(
() -> Profiler.instance().profile("collect digest"),
ignored ->
Flowable.fromIterable(allDigests)
.flatMapMaybe(digest -> findNewDigest(digest, force))
.collect(Collectors.toList()),
SilentCloseable::close);
}

/**
* Register the {@code digest} and increase the counter.
*
* <p>Returned Single cannot be subscribed more than once.
*
* @return Single that emits the result of the {@code FindMissingDigest} request.
*/
Single<ImmutableSet<Digest>> registerAndCount(Digest digest) {
AtomicBoolean subscribed = new AtomicBoolean(false);
// count() will potentially trigger the findMissingDigests call. Adding and counting before
// returning the Single could introduce a race that the result of findMissingDigests is
// available but the consumer doesn't get it because it hasn't subscribed the returned
// Single. In this case, it subscribes after upstream is completed resulting a re-run of
// findMissingDigests (due to refCount()).
//
// Calling count() inside doOnSubscribe to ensure the consumer already subscribed to the
// returned Single to avoid a re-execution of findMissingDigests.
return resultSingle.doOnSubscribe(
d -> {
boolean wasSubscribed = subscribed.getAndSet(true);
checkState(!wasSubscribed, "Single is subscribed more than once");
synchronized (this) {
digests.add(digest);
}
count();
});
}
@SuppressWarnings("CheckReturnValue")
private Maybe<NewDigest> findNewDigest(Digest digest, boolean force) {
return Maybe.create(
emitter -> {
AsyncSubject<Void> afterSubject = AsyncSubject.create();

Completable upload =
casUploadCache.execute(
digest,
Completable.defer(
() -> {
NewDigest newDigest = new NewDigest();
newDigest.digest = digest;
newDigest.continueSubject = AsyncSubject.create();
newDigest.completion = Completable.fromObservable(afterSubject);
emitter.onSuccess(newDigest);
return Completable.fromObservable(newDigest.continueSubject);
}),
emitter::onComplete,
force);

Observable.<Void>fromCompletable(upload).subscribeWith(afterSubject);
});
}

/** Increase the counter. */
void count() {
ImmutableSet<Digest> digestsResult = null;
private Single<List<NewDigest>> findMissingNewDigests(
RemoteActionExecutionContext context, Iterable<NewDigest> newDigests) {
return Single.using(
() -> Profiler.instance().profile("findMissingDigests"),
ignored ->
toSingle(
() ->
findMissingDigests(
context,
Iterables.transform(newDigests, newDigest -> newDigest.digest)),
directExecutor())
.map(
missingDigests -> {
List<NewDigest> result = new ArrayList<>();
for (NewDigest newDigest : newDigests) {
if (missingDigests.contains(newDigest.digest)) {
result.add(newDigest);
} else {
newDigest.continueSubject.onComplete();
}
}
return result;
}),
SilentCloseable::close);
}

synchronized (this) {
if (currentCount < expectedCount) {
currentCount++;
if (currentCount == expectedCount) {
digestsResult = ImmutableSet.copyOf(digests);
}
}
}
private Flowable<TransferResult> uploadNewDigests(
RemoteActionExecutionContext context,
MerkleTree merkleTree,
Map<Digest, Message> additionalInputs,
Iterable<NewDigest> newDigests) {
return Flowable.using(
() -> Profiler.instance().profile("upload"),
ignored ->
Flowable.fromIterable(newDigests)
.flatMapSingle(
digest -> uploadNewDigest(context, merkleTree, additionalInputs, digest)),
SilentCloseable::close);
}

if (digestsResult != null) {
digestsSubject.onNext(digestsResult);
digestsSubject.onComplete();
}
}
@SuppressWarnings("CheckReturnValue")
private Single<TransferResult> uploadNewDigest(
RemoteActionExecutionContext context,
MerkleTree merkleTree,
Map<Digest, Message> additionalInputs,
NewDigest digest) {
Observable.<Void>fromCompletable(
toCompletable(
() -> uploadBlob(context, digest.digest, merkleTree, additionalInputs),
directExecutor()))
.subscribeWith(digest.continueSubject);
return toTransferResult(digest.completion);
}
}

0 comments on commit cee1e0e

Please sign in to comment.