From 8d656cfa96da3fc739dab85f96322f4dee83e614 Mon Sep 17 00:00:00 2001 From: Chi Wang Date: Fri, 14 Aug 2020 02:47:07 -0700 Subject: [PATCH] Add channel pool for remote execution to overcome gRPC connections limitation. This PR add a `ReferenceCountedChannelPool` which will create `poolSize` number of channels and round-robin across them for gRPC requests. The `poolSize` is calculated as `jobs / 100`. Fixes #11801. Closes #11937. PiperOrigin-RevId: 326619592 --- .../google/devtools/build/lib/remote/BUILD | 8 +- .../lib/remote/ReferenceCountedChannel.java | 34 +++-- .../remote/ReferenceCountedChannelPool.java | 124 ++++++++++++++++++ .../lib/remote/RemoteCacheClientFactory.java | 16 +++ .../build/lib/remote/RemoteModule.java | 25 +++- 5 files changed, 190 insertions(+), 17 deletions(-) create mode 100644 src/main/java/com/google/devtools/build/lib/remote/ReferenceCountedChannelPool.java diff --git a/src/main/java/com/google/devtools/build/lib/remote/BUILD b/src/main/java/com/google/devtools/build/lib/remote/BUILD index 9cf2c6064c9469..9435e85d59d3c2 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/BUILD @@ -26,6 +26,7 @@ java_library( exclude = [ "ExecutionStatusException.java", "ReferenceCountedChannel.java", + "ReferenceCountedChannelPool.java", "RemoteRetrier.java", "RemoteRetrierUtils.java", "Retrier.java", @@ -40,6 +41,7 @@ java_library( ":ExecutionStatusException", ":ReferenceCountedChannel", ":Retrier", + "//src/main/java/com/google/devtools/build/lib:build-request-options", "//src/main/java/com/google/devtools/build/lib:runtime", "//src/main/java/com/google/devtools/build/lib/actions", "//src/main/java/com/google/devtools/build/lib/actions:artifacts", @@ -121,8 +123,12 @@ java_library( java_library( name = "ReferenceCountedChannel", - srcs = ["ReferenceCountedChannel.java"], + srcs = [ + "ReferenceCountedChannel.java", + "ReferenceCountedChannelPool.java", + ], deps = [ + "//third_party:guava", "//third_party:netty", "//third_party/grpc:grpc-jar", ], diff --git a/src/main/java/com/google/devtools/build/lib/remote/ReferenceCountedChannel.java b/src/main/java/com/google/devtools/build/lib/remote/ReferenceCountedChannel.java index 1d948cc5e28037..6a9d09b69a7c8f 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ReferenceCountedChannel.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ReferenceCountedChannel.java @@ -31,20 +31,28 @@ public class ReferenceCountedChannel extends ManagedChannel implements ReferenceCounted { private final ManagedChannel channel; - private final AbstractReferenceCounted referenceCounted = new AbstractReferenceCounted() { - @Override - protected void deallocate() { - channel.shutdown(); - } - - @Override - public ReferenceCounted touch(Object o) { - return this; - } - }; + private final AbstractReferenceCounted referenceCounted; public ReferenceCountedChannel(ManagedChannel channel) { + this( + channel, + new AbstractReferenceCounted() { + @Override + protected void deallocate() { + channel.shutdown(); + } + + @Override + public ReferenceCounted touch(Object o) { + return this; + } + }); + } + + protected ReferenceCountedChannel( + ManagedChannel channel, AbstractReferenceCounted referenceCounted) { this.channel = channel; + this.referenceCounted = referenceCounted; } @Override @@ -70,8 +78,8 @@ public ManagedChannel shutdownNow() { } @Override - public boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedException { - return channel.awaitTermination(l, timeUnit); + public boolean awaitTermination(long timeout, TimeUnit timeUnit) throws InterruptedException { + return channel.awaitTermination(timeout, timeUnit); } @Override diff --git a/src/main/java/com/google/devtools/build/lib/remote/ReferenceCountedChannelPool.java b/src/main/java/com/google/devtools/build/lib/remote/ReferenceCountedChannelPool.java new file mode 100644 index 00000000000000..909d7518b910ca --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/ReferenceCountedChannelPool.java @@ -0,0 +1,124 @@ +// Copyright 2020 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote; + +import com.google.common.collect.ImmutableList; +import io.grpc.CallOptions; +import io.grpc.ClientCall; +import io.grpc.ManagedChannel; +import io.grpc.MethodDescriptor; +import io.netty.util.AbstractReferenceCounted; +import io.netty.util.ReferenceCounted; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A wrapper around a {@link io.grpc.ManagedChannel} exposing a reference count and performing a + * round-robin load balance across a list of channels. When instantiated the reference count is 1. + * {@link ManagedChannel#shutdown()} will be called on the wrapped channel when the reference count + * reaches 0. + * + *

See {@link ReferenceCounted} for more information about reference counting. + */ +public class ReferenceCountedChannelPool extends ReferenceCountedChannel { + + private final AtomicInteger indexTicker = new AtomicInteger(); + private final ImmutableList channels; + + public ReferenceCountedChannelPool(ImmutableList channels) { + super( + channels.get(0), + new AbstractReferenceCounted() { + @Override + protected void deallocate() { + for (ManagedChannel channel : channels) { + channel.shutdown(); + } + } + + @Override + public ReferenceCounted touch(Object o) { + return null; + } + }); + this.channels = channels; + } + + @Override + public boolean isShutdown() { + for (ManagedChannel channel : channels) { + if (!channel.isShutdown()) { + return false; + } + } + return true; + } + + @Override + public boolean isTerminated() { + for (ManagedChannel channel : channels) { + if (!channel.isTerminated()) { + return false; + } + } + return true; + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit timeUnit) throws InterruptedException { + long endTimeNanos = System.nanoTime() + timeUnit.toNanos(timeout); + for (ManagedChannel channel : channels) { + long awaitTimeNanos = endTimeNanos - System.nanoTime(); + if (awaitTimeNanos <= 0) { + break; + } + channel.awaitTermination(awaitTimeNanos, TimeUnit.NANOSECONDS); + } + return isTerminated(); + } + + @Override + public ClientCall newCall( + MethodDescriptor methodDescriptor, CallOptions callOptions) { + return getNextChannel().newCall(methodDescriptor, callOptions); + } + + @Override + public String authority() { + // Assume all channels have the same authority. + return channels.get(0).authority(); + } + + /** + * Performs a simple round robin on the list of {@link ManagedChannel}s in the {@code channels} + * list. + * + * @see Suggestion from + * gRPC team. + * @return A {@link ManagedChannel} that can be used for a single RPC call. + */ + private ManagedChannel getNextChannel() { + return getChannel(indexTicker.getAndIncrement()); + } + + private ManagedChannel getChannel(int affinity) { + int index = affinity % channels.size(); + index = Math.abs(index); + // If index is the most negative int, abs(index) is still negative. + if (index < 0) { + index = 0; + } + return channels.get(index); + } +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java index 0c72227bdabc49..9989016cd3c075 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java @@ -29,9 +29,11 @@ import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.build.lib.vfs.PathFragment; import io.grpc.ClientInterceptor; +import io.grpc.ManagedChannel; import io.netty.channel.unix.DomainSocketAddress; import java.io.IOException; import java.net.URI; +import java.util.ArrayList; import java.util.List; import javax.annotation.Nullable; @@ -66,6 +68,20 @@ public static ReferenceCountedChannel createGrpcChannel( GoogleAuthUtils.newChannel(target, proxyUri, authOptions, interceptors)); } + public static ReferenceCountedChannel createGrpcChannelPool( + int poolSize, + String target, + String proxyUri, + AuthAndTLSOptions authOptions, + @Nullable List interceptors) + throws IOException { + List channels = new ArrayList<>(); + for (int i = 0; i < poolSize; i++) { + channels.add(GoogleAuthUtils.newChannel(target, proxyUri, authOptions, interceptors)); + } + return new ReferenceCountedChannelPool(ImmutableList.copyOf(channels)); + } + public static RemoteCacheClient create( RemoteOptions options, @Nullable Credentials creds, diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java index bff595bb0f8e5a..1973aa45263daf 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java @@ -42,6 +42,7 @@ import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader; import com.google.devtools.build.lib.buildeventstream.LocalFilesArtifactUploader; import com.google.devtools.build.lib.buildtool.BuildRequest; +import com.google.devtools.build.lib.buildtool.BuildRequestOptions; import com.google.devtools.build.lib.collect.nestedset.NestedSet; import com.google.devtools.build.lib.events.Event; import com.google.devtools.build.lib.events.Reporter; @@ -255,6 +256,21 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { ReferenceCountedChannel execChannel = null; ReferenceCountedChannel cacheChannel = null; ReferenceCountedChannel downloaderChannel = null; + + int poolSize = 1; + BuildRequestOptions buildRequestOptions = + env.getOptions().getOptions(BuildRequestOptions.class); + if (buildRequestOptions != null) { + // The following calculation is based on the suggestion from comment + // https://github.com/bazelbuild/bazel/issues/11801#issuecomment-672973245 + // + // The number of concurrent requests for one connection to a gRPC server is limited by + // MAX_CONCURRENT_STREAMS which is normally being 100+. We assume 50 concurrent requests for + // each connection should be fairly well. The number of connections opened by one channel is + // based on the resolved IPs of that server. We assume servers normally have 2 IPs. So the + // number of required channels is calculated as: ceil(jobs / 100). + poolSize = (int) Math.ceil((double) buildRequestOptions.jobs / 100.0); + } if (enableRemoteExecution) { ImmutableList.Builder interceptors = ImmutableList.builder(); interceptors.add(TracingMetadataUtils.newExecHeadersInterceptor(remoteOptions)); @@ -264,7 +280,8 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { interceptors.add(new NetworkTime.Interceptor()); try { execChannel = - RemoteCacheClientFactory.createGrpcChannel( + RemoteCacheClientFactory.createGrpcChannelPool( + poolSize, remoteOptions.remoteExecutor, remoteOptions.remoteProxy, authAndTlsOptions, @@ -290,7 +307,8 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { interceptors.add(new NetworkTime.Interceptor()); try { cacheChannel = - RemoteCacheClientFactory.createGrpcChannel( + RemoteCacheClientFactory.createGrpcChannelPool( + poolSize, remoteOptions.remoteCache, remoteOptions.remoteProxy, authAndTlsOptions, @@ -313,7 +331,8 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { } try { downloaderChannel = - RemoteCacheClientFactory.createGrpcChannel( + RemoteCacheClientFactory.createGrpcChannelPool( + poolSize, remoteOptions.remoteDownloader, remoteOptions.remoteProxy, authAndTlsOptions,