Skip to content

Commit

Permalink
Move gRPC deadline details to new internal module (#1501)
Browse files Browse the repository at this point in the history
Motivation:
Some of the details of the gRPC deadline implementation need to be
shared between the api and netty modules. The internal module can also
be used for other implementation code that does not need to reside in
the api module.
Modifications:
Creates gRPC internal module and moves some existing deadline related
details to a new `DeadlineUtils` class. Includes new tests for timeout
header parsing and generation.
Result:
Improved source organization for gRPC source.
  • Loading branch information
bondolo authored Apr 21, 2021
1 parent c2c3113 commit a07bf29
Show file tree
Hide file tree
Showing 18 changed files with 445 additions and 160 deletions.
1 change: 1 addition & 0 deletions servicetalk-grpc-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dependencies {
implementation project(":servicetalk-concurrent-internal")
implementation project(":servicetalk-router-utils-internal")
implementation project(":servicetalk-utils-internal")
implementation project(":servicetalk-grpc-internal")
implementation project(":servicetalk-oio-api-internal")
implementation "org.slf4j:slf4j-api:$slf4jVersion"
implementation "com.google.code.findbugs:jsr305:$jsr305Version"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.internal.BlockingIterables.singletonBlockingIterable;
import static io.servicetalk.grpc.api.GrpcClientBuilder.GRPC_DEADLINE_KEY;
import static io.servicetalk.grpc.api.GrpcUtils.initRequest;
import static io.servicetalk.grpc.api.GrpcUtils.readGrpcMessageEncoding;
import static io.servicetalk.grpc.api.GrpcUtils.toGrpcException;
import static io.servicetalk.grpc.api.GrpcUtils.uncheckedCast;
import static io.servicetalk.grpc.api.GrpcUtils.validateResponseAndGetPayload;
import static io.servicetalk.grpc.internal.DeadlineUtils.GRPC_DEADLINE_KEY;
import static java.util.Objects.requireNonNull;

final class DefaultGrpcClientCallFactory implements GrpcClientCallFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import javax.annotation.Nullable;

import static io.servicetalk.encoding.api.Identity.identity;
import static io.servicetalk.grpc.internal.DeadlineUtils.GRPC_MAX_TIMEOUT;
import static io.servicetalk.utils.internal.DurationUtils.ensurePositive;
import static io.servicetalk.utils.internal.DurationUtils.isInfinite;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.servicetalk.client.api.ConnectionFactoryFilter;
import io.servicetalk.client.api.ServiceDiscoverer;
import io.servicetalk.client.api.ServiceDiscovererEvent;
import io.servicetalk.concurrent.api.AsyncContextMap;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.http.api.FilterableStreamingHttpConnection;
import io.servicetalk.http.api.HttpExecutionStrategy;
Expand Down Expand Up @@ -53,17 +52,6 @@
*/
public abstract class GrpcClientBuilder<U, R>
implements SingleAddressGrpcClientBuilder<U, R, ServiceDiscovererEvent<R>> {
/**
* gRPC timeout is stored in context as a deadline so that when propagated to a new request the remaining time to be
* included in the request can be calculated.
*/
static final AsyncContextMap.Key<Long> PKG_GRPC_DEADLINE_KEY = AsyncContextMap.Key.newKey("grpc-deadline");

/**
* gRPC timeout is stored in context as a deadline so that when propagated to a new request the remaining time to be
* included in the request can be calculated.
*/
protected static final AsyncContextMap.Key<Long> GRPC_DEADLINE_KEY = PKG_GRPC_DEADLINE_KEY;

private boolean appendedCatchAllFilter;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,11 @@
import java.time.Duration;
import javax.annotation.Nullable;

import static io.servicetalk.grpc.api.GrpcUtils.EIGHT_NINES;

/**
* Metadata for a <a href="https://www.grpc.io">gRPC</a> client call.
*/
public interface GrpcClientMetadata extends GrpcMetadata {

/**
* Maximum timeout which can be specified for a <a href="https://www.grpc.io">gRPC</a>
* <a href="https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests">request</a>. Note that this
* maximum is effectively infinite as the duration is more than 11,000 years.
*/
Duration GRPC_MAX_TIMEOUT = Duration.ofHours(EIGHT_NINES);

/**
* {@link GrpcExecutionStrategy} to use for the associated
* <a href="https://www.grpc.io">gRPC</a> method.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.servicetalk.grpc.api;

import io.servicetalk.buffer.api.BufferAllocator;
import io.servicetalk.concurrent.api.AsyncContextMap;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.HttpProtocolConfig;
Expand Down Expand Up @@ -53,11 +52,6 @@
* A builder for building a <a href="https://www.grpc.io">gRPC</a> server.
*/
public abstract class GrpcServerBuilder {
/**
* gRPC timeout is stored in context as a deadline so that when propagated to a new client request the remaining
* time to be included in the request can be calculated.
*/
protected static final AsyncContextMap.Key<Long> GRPC_DEADLINE_KEY = GrpcClientBuilder.PKG_GRPC_DEADLINE_KEY;

private boolean appendedCatchAllFilter;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.LongUnaryOperator;
import java.util.function.Supplier;
import javax.annotation.Nullable;

Expand All @@ -57,20 +55,20 @@
import static io.servicetalk.concurrent.api.Publisher.failed;
import static io.servicetalk.encoding.api.Identity.identity;
import static io.servicetalk.encoding.api.internal.HeaderUtils.encodingFor;
import static io.servicetalk.grpc.api.GrpcClientMetadata.GRPC_MAX_TIMEOUT;
import static io.servicetalk.grpc.api.GrpcStatusCode.CANCELLED;
import static io.servicetalk.grpc.api.GrpcStatusCode.DEADLINE_EXCEEDED;
import static io.servicetalk.grpc.api.GrpcStatusCode.INTERNAL;
import static io.servicetalk.grpc.api.GrpcStatusCode.UNIMPLEMENTED;
import static io.servicetalk.grpc.api.GrpcStatusCode.UNKNOWN;
import static io.servicetalk.grpc.internal.DeadlineUtils.GRPC_TIMEOUT_HEADER_KEY;
import static io.servicetalk.grpc.internal.DeadlineUtils.makeTimeoutHeader;
import static io.servicetalk.http.api.HeaderUtils.hasContentType;
import static io.servicetalk.http.api.HttpHeaderNames.CONTENT_TYPE;
import static io.servicetalk.http.api.HttpHeaderNames.SERVER;
import static io.servicetalk.http.api.HttpHeaderNames.TE;
import static io.servicetalk.http.api.HttpHeaderNames.USER_AGENT;
import static io.servicetalk.http.api.HttpHeaderValues.TRAILERS;
import static io.servicetalk.http.api.HttpRequestMethod.POST;
import static io.servicetalk.utils.internal.DurationUtils.isInfinite;
import static java.lang.String.valueOf;

final class GrpcUtils {
Expand All @@ -85,18 +83,11 @@ final class GrpcUtils {
private static final CharSequence GRPC_USER_AGENT = newAsciiString("grpc-service-talk/");
private static final CharSequence GRPC_MESSAGE_ENCODING_KEY = newAsciiString("grpc-encoding");
private static final CharSequence GRPC_ACCEPT_ENCODING_KEY = newAsciiString("grpc-accept-encoding");
private static final CharSequence GRPC_TIMEOUT_KEY = newAsciiString("grpc-timeout");
private static final GrpcStatus STATUS_OK = GrpcStatus.fromCodeValue(GrpcStatusCode.OK.value());
private static final ConcurrentMap<List<ContentCodec>, CharSequence> ENCODINGS_HEADER_CACHE =
new ConcurrentHashMap<>();
private static final CharSequence CONTENT_ENCODING_SEPARATOR = ", ";

/**
* <a href="https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests">gRPC spec</a> requires timeout
* value to be 8 or fewer ASCII integer digits.
*/
static final long EIGHT_NINES = 99_999_999L;

private static final TrailersTransformer<Object, Buffer> ENSURE_GRPC_STATUS_RECEIVED =
new StatelessTrailersTransformer<Buffer>() {
@Override
Expand Down Expand Up @@ -129,23 +120,6 @@ protected HttpHeaders payloadFailed(final Throwable cause, final HttpHeaders tra
}
};


/**
* Conversions between time units for gRPC timeout
*/
private static final LongUnaryOperator[] CONVERTERS = {
TimeUnit.NANOSECONDS::toMicros,
TimeUnit.MICROSECONDS::toMillis,
TimeUnit.MILLISECONDS::toSeconds,
TimeUnit.SECONDS::toMinutes,
TimeUnit.MINUTES::toHours,
};

/**
* Allowed time units for gRPC timeout
*/
private static final char[] TIMEOUT_UNIT_CHARS = "numSMH".toCharArray();

private GrpcUtils() {
// No instances.
}
Expand All @@ -157,7 +131,7 @@ static void initRequest(final HttpRequestMetaData request,
final HttpHeaders headers = request.headers();
final CharSequence timeoutValue = makeTimeoutHeader(timeout);
if (null != timeoutValue) {
headers.set(GRPC_TIMEOUT_KEY, timeoutValue);
headers.set(GRPC_TIMEOUT_HEADER_KEY, timeoutValue);
}
headers.set(USER_AGENT, GRPC_USER_AGENT);
headers.set(TE, TRAILERS);
Expand All @@ -168,28 +142,6 @@ static void initRequest(final HttpRequestMetaData request,
}
}

/**
* Make a timeout header value from the specified duration.
*
* @param timeout the timeout {@link Duration}
* @return The timeout header text value or null for infinite timeouts
*/
static @Nullable CharSequence makeTimeoutHeader(@Nullable Duration timeout) {
if (isInfinite(timeout, GRPC_MAX_TIMEOUT)) {
return null;
}

// convert assuming that negative timeout is the result of an already missed deadline.
// cannot overflow as we have already checked against safe maximum
long timeoutValue = timeout.isNegative() ? 0 : timeout.toNanos();
int units = 0;
while (timeoutValue > EIGHT_NINES) {
timeoutValue = CONVERTERS[units].applyAsLong(timeoutValue);
units++; // cannot go past end of units array as we have already range checked
}
return newAsciiString(Long.toString(timeoutValue) + TIMEOUT_UNIT_CHARS[units]);
}

static <T> StreamingHttpResponse newResponse(final StreamingHttpResponseFactory responseFactory,
@Nullable final GrpcServiceContext context,
final Publisher<T> payload,
Expand Down
20 changes: 20 additions & 0 deletions servicetalk-grpc-internal/README.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
////
* Copyright © 2021 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
////
= gRPC Internal
This module contains internal details of the ServiceTalk gRPC implementation.

See the link:https://docs.servicetalk.io/[ServiceTalk docs] for more information.
32 changes: 32 additions & 0 deletions servicetalk-grpc-internal/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright © 2019, 2021 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

apply plugin: "io.servicetalk.servicetalk-gradle-plugin-internal-library"

dependencies {
api project(":servicetalk-http-api")

implementation project(":servicetalk-annotations")
implementation project(":servicetalk-utils-internal")
implementation "com.google.code.findbugs:jsr305:$jsr305Version"

testImplementation project(":servicetalk-test-resources")
testImplementation "org.junit.jupiter:junit-jupiter-api:$junit5Version"
testImplementation "org.junit.jupiter:junit-jupiter-params:$junit5Version"
testImplementation "org.hamcrest:hamcrest-library:$hamcrestVersion"

testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:$junit5Version"
}
22 changes: 22 additions & 0 deletions servicetalk-grpc-internal/gradle/checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright © 2021 Apple Inc. and the ServiceTalk project authors
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<!DOCTYPE suppressions PUBLIC
"-//Checkstyle//DTD SuppressionFilter Configuration 1.2//EN"
"https://checkstyle.org/dtds/suppressions_1_2.dtd">

<suppressions>
</suppressions>
18 changes: 18 additions & 0 deletions servicetalk-grpc-internal/gradle/spotbugs/main-exclusions.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright © 2021 Apple Inc. and the ServiceTalk project authors
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<FindBugsFilter>
</FindBugsFilter>
Loading

0 comments on commit a07bf29

Please sign in to comment.