Skip to content

Commit

Permalink
Deprecate HTTP/gRPC offloadWithThreadAffinity (#1564)
Browse files Browse the repository at this point in the history
Motivation:
The method used for ensuring thread affinity used by the HTTP/gRPC
builders will be removed in a near future version of ServiceTalk. The
offered alternative is to use a single threaded executor which provides
the same affinity guarantee.
Modifications:
Methods are marked deprecated. Some existing tests which reference the
thread based signal offloader are converted to the default signal
offleader.
Result:
Deprecation warning provided.
  • Loading branch information
bondolo authored May 17, 2021
1 parent 8d92bb5 commit a31605d
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION;
import static io.servicetalk.concurrent.internal.EmptySubscriptions.EMPTY_SUBSCRIPTION;
import static io.servicetalk.concurrent.internal.SignalOffloaders.threadBasedOffloaderFactory;
import static io.servicetalk.concurrent.internal.SignalOffloaders.defaultOffloaderFactory;

@RunWith(Parameterized.class)
public class ExecutorThrowsTest {
Expand Down Expand Up @@ -165,7 +165,7 @@ private Executor newAlwaysFailingExecutor() {
Executor original = from(task -> {
throw DELIBERATE_EXCEPTION;
});
return new OffloaderAwareExecutor(original, threadBasedOffloaderFactory());
return new OffloaderAwareExecutor(original, defaultOffloaderFactory());
}

private void verifyError() throws Throwable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import static io.servicetalk.concurrent.api.Publisher.from;
import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
import static io.servicetalk.concurrent.api.completable.AbstractPublishAndSubscribeOnTest.verifyCapturedThreads;
import static io.servicetalk.concurrent.internal.SignalOffloaders.threadBasedOffloaderFactory;
import static io.servicetalk.concurrent.internal.SignalOffloaders.defaultOffloaderFactory;
import static java.lang.Long.MAX_VALUE;
import static java.lang.Thread.currentThread;

Expand All @@ -49,7 +49,7 @@ public abstract class AbstractPublishAndSubscribeOnTest {
public final Timeout timeout = new ServiceTalkTestTimeout();
@Rule
public final ExecutorRule originalSourceExecutorRule = ExecutorRule.withExecutor(
() -> new OffloaderAwareExecutor(newCachedThreadExecutor(), threadBasedOffloaderFactory()));
() -> new OffloaderAwareExecutor(newCachedThreadExecutor(), defaultOffloaderFactory()));

protected AtomicReferenceArray<Thread> setupAndSubscribe(
BiFunction<Publisher<String>, Executor, Publisher<String>> offloadingFunction, Executor executor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,39 +15,43 @@
*/
package io.servicetalk.concurrent.api.publisher;

import io.servicetalk.concurrent.api.Executor;
import io.servicetalk.concurrent.api.ExecutorRule;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.internal.OffloaderAwareExecutor;

import org.hamcrest.Description;
import org.hamcrest.TypeSafeMatcher;
import org.junit.Rule;
import org.junit.Test;

import java.util.concurrent.atomic.AtomicReferenceArray;

import static io.servicetalk.concurrent.api.Executors.newCachedThreadExecutor;
import static io.servicetalk.concurrent.internal.SignalOffloaders.threadBasedOffloaderFactory;
import static io.servicetalk.concurrent.internal.SignalOffloaders.defaultOffloaderFactory;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;

public class PublishAndSubscribeOnTest extends AbstractPublishAndSubscribeOnTest {

@Rule
public final ExecutorRule executorRule = ExecutorRule.withExecutor(() ->
new OffloaderAwareExecutor(newCachedThreadExecutor(), threadBasedOffloaderFactory()));
public final ExecutorRule<Executor> executorRule = ExecutorRule.withExecutor(() ->
new OffloaderAwareExecutor(newCachedThreadExecutor(), defaultOffloaderFactory()));

@Test
public void testPublishOnNoOverride() throws InterruptedException {
AtomicReferenceArray<Thread> capturedThreads =
setupAndSubscribe(Publisher::publishOn, executorRule.executor());

assertThat("Unexpected threads for subscription and subscriber for original source.",
capturedThreads.get(ORIGINAL_SUBSCRIBER_THREAD), is(capturedThreads.get(ORIGINAL_SUBSCRIPTION_THREAD)));
capturedThreads.get(ORIGINAL_SUBSCRIBER_THREAD),
sameThreadFactory(capturedThreads.get(ORIGINAL_SUBSCRIPTION_THREAD)));
assertThat("Unexpected threads for subscription and subscriber for offloaded source.",
capturedThreads.get(OFFLOADED_SUBSCRIBER_THREAD),
not(capturedThreads.get(OFFLOADED_SUBSCRIPTION_THREAD)));
not(sameThreadFactory(capturedThreads.get(OFFLOADED_SUBSCRIPTION_THREAD))));
assertThat("Unexpected threads for original and offloaded source.",
capturedThreads.get(ORIGINAL_SUBSCRIBER_THREAD), not(capturedThreads.get(OFFLOADED_SUBSCRIBER_THREAD)));
capturedThreads.get(ORIGINAL_SUBSCRIBER_THREAD),
not(sameThreadFactory(capturedThreads.get(OFFLOADED_SUBSCRIBER_THREAD))));
}

@Test
Expand All @@ -57,12 +61,13 @@ public void testPublishOnOverride() throws InterruptedException {

assertThat("Unexpected threads for subscription and subscriber for original source.",
capturedThreads.get(ORIGINAL_SUBSCRIBER_THREAD),
not(capturedThreads.get(ORIGINAL_SUBSCRIPTION_THREAD)));
not(sameThreadFactory(capturedThreads.get(ORIGINAL_SUBSCRIPTION_THREAD))));
assertThat("Unexpected threads for subscription and subscriber for offloaded source.",
capturedThreads.get(OFFLOADED_SUBSCRIBER_THREAD),
not(capturedThreads.get(OFFLOADED_SUBSCRIPTION_THREAD)));
not(sameThreadFactory(capturedThreads.get(OFFLOADED_SUBSCRIPTION_THREAD))));
assertThat("Unexpected threads for original and offloaded source.",
capturedThreads.get(ORIGINAL_SUBSCRIBER_THREAD), is(capturedThreads.get(OFFLOADED_SUBSCRIBER_THREAD)));
capturedThreads.get(ORIGINAL_SUBSCRIBER_THREAD),
sameThreadFactory(capturedThreads.get(OFFLOADED_SUBSCRIBER_THREAD)));
}

@Test
Expand All @@ -71,13 +76,14 @@ public void testSubscribeOnNoOverride() throws InterruptedException {
setupAndSubscribe(Publisher::subscribeOn, executorRule.executor());

assertThat("Unexpected threads for subscription and subscriber for original source.",
capturedThreads.get(ORIGINAL_SUBSCRIBER_THREAD), is(capturedThreads.get(ORIGINAL_SUBSCRIPTION_THREAD)));
capturedThreads.get(ORIGINAL_SUBSCRIBER_THREAD),
sameThreadFactory(capturedThreads.get(ORIGINAL_SUBSCRIPTION_THREAD)));
assertThat("Unexpected threads for subscription and subscriber for offloaded source.",
capturedThreads.get(OFFLOADED_SUBSCRIBER_THREAD),
not(capturedThreads.get(OFFLOADED_SUBSCRIPTION_THREAD)));
not(sameThreadFactory(capturedThreads.get(OFFLOADED_SUBSCRIPTION_THREAD))));
assertThat("Unexpected threads for original and offloaded source.",
capturedThreads.get(ORIGINAL_SUBSCRIPTION_THREAD),
not(capturedThreads.get(OFFLOADED_SUBSCRIPTION_THREAD)));
not(sameThreadFactory(capturedThreads.get(OFFLOADED_SUBSCRIPTION_THREAD))));
}

@Test
Expand All @@ -87,13 +93,13 @@ public void testSubscribeOnOverride() throws InterruptedException {

assertThat("Unexpected threads for subscription and subscriber for original source.",
capturedThreads.get(ORIGINAL_SUBSCRIBER_THREAD),
not(capturedThreads.get(ORIGINAL_SUBSCRIPTION_THREAD)));
not(sameThreadFactory(capturedThreads.get(ORIGINAL_SUBSCRIPTION_THREAD))));
assertThat("Unexpected threads for subscription and subscriber for offloaded source.",
capturedThreads.get(OFFLOADED_SUBSCRIBER_THREAD),
not(capturedThreads.get(OFFLOADED_SUBSCRIPTION_THREAD)));
not(sameThreadFactory(capturedThreads.get(OFFLOADED_SUBSCRIPTION_THREAD))));
assertThat("Unexpected threads for original and offloaded source.",
capturedThreads.get(ORIGINAL_SUBSCRIPTION_THREAD),
is(capturedThreads.get(OFFLOADED_SUBSCRIPTION_THREAD)));
sameThreadFactory(capturedThreads.get(OFFLOADED_SUBSCRIPTION_THREAD)));
}

@Test
Expand All @@ -103,13 +109,13 @@ public void testNoOverride() throws InterruptedException {

assertThat("Unexpected threads for subscription and subscriber for original source.",
capturedThreads.get(ORIGINAL_SUBSCRIBER_THREAD),
is(capturedThreads.get(ORIGINAL_SUBSCRIPTION_THREAD)));
sameThreadFactory(capturedThreads.get(ORIGINAL_SUBSCRIPTION_THREAD)));
assertThat("Unexpected threads for subscription and subscriber for offloaded source.",
capturedThreads.get(OFFLOADED_SUBSCRIBER_THREAD),
is(capturedThreads.get(OFFLOADED_SUBSCRIPTION_THREAD)));
sameThreadFactory(capturedThreads.get(OFFLOADED_SUBSCRIPTION_THREAD)));
assertThat("Unexpected threads for original and offloaded source.",
capturedThreads.get(ORIGINAL_SUBSCRIPTION_THREAD),
not(capturedThreads.get(OFFLOADED_SUBSCRIPTION_THREAD)));
not(sameThreadFactory(capturedThreads.get(OFFLOADED_SUBSCRIPTION_THREAD))));
}

@Test
Expand All @@ -119,12 +125,37 @@ public void testOverride() throws InterruptedException {

assertThat("Unexpected threads for subscription and subscriber for original source.",
capturedThreads.get(ORIGINAL_SUBSCRIBER_THREAD),
is(capturedThreads.get(ORIGINAL_SUBSCRIPTION_THREAD)));
sameThreadFactory(capturedThreads.get(ORIGINAL_SUBSCRIPTION_THREAD)));
assertThat("Unexpected threads for subscription and subscriber for offloaded source.",
capturedThreads.get(OFFLOADED_SUBSCRIBER_THREAD),
is(capturedThreads.get(OFFLOADED_SUBSCRIPTION_THREAD)));
sameThreadFactory(capturedThreads.get(OFFLOADED_SUBSCRIPTION_THREAD)));
assertThat("Unexpected threads for original and offloaded source.",
capturedThreads.get(ORIGINAL_SUBSCRIPTION_THREAD),
is(capturedThreads.get(OFFLOADED_SUBSCRIPTION_THREAD)));
sameThreadFactory(capturedThreads.get(OFFLOADED_SUBSCRIPTION_THREAD)));
}

TypeSafeMatcher<Thread> sameThreadFactory(Thread matchThread) {
return new TypeSafeMatcher<Thread>() {
final String matchPrefix = getNamePrefix(matchThread.getName());

@Override
public void describeTo(final Description description) {
description.appendText("non-matching name prefix");
}

@Override
protected boolean matchesSafely(final Thread item) {
String prefix = getNamePrefix(item.getName());

return matchPrefix.equals(prefix);
}
};
}

private static String getNamePrefix(String name) {
int lastDash = name.lastIndexOf('-');
return -1 == lastDash ?
name :
name.substring(0, lastDash);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,11 @@ public Builder executor(Executor executor) {
* Enable thread affinity while offloading. When enabled, offloading implementation will favor using a
* single thread per subscribe of a source.
*
* @deprecated Use a single threaded executor with {@link #executor(Executor)} to ensure affinity.
*
* @return {@code this}.
*/
@Deprecated
public Builder offloadWithThreadAffinity() {
httpBuilder.offloadWithThreadAffinity();
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,11 @@ public Builder executor(Executor executor) {
* Enable thread affinity while offloading. When enabled, offloading implementation will favor using a
* single thread per subscribe of a source.
*
* @deprecated Use a single threaded executor with {@link #executor(Executor)} to ensure affinity.
*
* @return {@code this}.
*/
@Deprecated
public Builder offloadWithThreadAffinity() {
threadAffinity = true;
return this;
Expand Down

0 comments on commit a31605d

Please sign in to comment.