Skip to content

Commit

Permalink
concurrent-api: make context capture more generic (#3183)
Browse files Browse the repository at this point in the history
Motivation:

Our context capture and restore process is centered upon the ContextMap type. The AsyncContext type is only one specific type of context that applications may want to capture and restore across async boundaries. Other examples include the OTEL context, grpc context, and really any random ThreadLocal you want to preserve across async boundaries. They also don't necessarily fit the same pattern as a AsycContext.

Modifications:

Further abstract the context type to a CapturedContext. The key method is `Scope attachContext()` which has the job of attaching whatever context it knows about. The key benefit of this abstraction is that it is very composable. We can add extensions that are simple proxy wrappers that capture the context and restore their own, while delegating to an underlying version that is ultimately rooted in what we know as the AsyncContext.

Result:

More composable context capture and restore.
  • Loading branch information
bryce-anderson authored Feb 7, 2025
1 parent b625ab7 commit f0851de
Show file tree
Hide file tree
Showing 95 changed files with 1,010 additions and 699 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package io.servicetalk.concurrent.api;

import io.servicetalk.context.api.ContextMap;

import static java.util.Objects.requireNonNull;

/**
Expand All @@ -38,13 +36,13 @@ abstract class AbstractAsynchronousCompletableOperator extends AbstractNoHandleS

@Override
final void handleSubscribe(Subscriber subscriber,
ContextMap contextMap, AsyncContextProvider contextProvider) {
CapturedContext capturedContext, AsyncContextProvider contextProvider) {
// The AsyncContext needs to be preserved when ever we interact with the original Subscriber, so we wrap it here
// with the original contextMap. Otherwise some other context may leak into this subscriber chain from the other
// side of the asynchronous boundary.
final Subscriber operatorSubscriber =
contextProvider.wrapCompletableSubscriberAndCancellable(subscriber, contextMap);
contextProvider.wrapCompletableSubscriberAndCancellable(subscriber, capturedContext);
final Subscriber upstreamSubscriber = apply(operatorSubscriber);
original.delegateSubscribe(upstreamSubscriber, contextMap, contextProvider);
original.delegateSubscribe(upstreamSubscriber, capturedContext, contextProvider);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package io.servicetalk.concurrent.api;

import io.servicetalk.context.api.ContextMap;

import static java.util.Objects.requireNonNull;

/**
Expand All @@ -41,13 +39,13 @@ abstract class AbstractAsynchronousPublisherOperator<T, R> extends AbstractNoHan

@Override
final void handleSubscribe(Subscriber<? super R> subscriber,
ContextMap contextMap, AsyncContextProvider contextProvider) {
CapturedContext capturedContext, AsyncContextProvider contextProvider) {
// The AsyncContext needs to be preserved when ever we interact with the original Subscriber, so we wrap it here
// with the original contextMap. Otherwise some other context may leak into this subscriber chain from the other
// side of the asynchronous boundary.
final Subscriber<? super R> operatorSubscriber =
contextProvider.wrapPublisherSubscriberAndSubscription(subscriber, contextMap);
contextProvider.wrapPublisherSubscriberAndSubscription(subscriber, capturedContext);
final Subscriber<? super T> upstreamSubscriber = apply(operatorSubscriber);
original.delegateSubscribe(upstreamSubscriber, contextMap, contextProvider);
original.delegateSubscribe(upstreamSubscriber, capturedContext, contextProvider);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package io.servicetalk.concurrent.api;

import io.servicetalk.context.api.ContextMap;

import static java.util.Objects.requireNonNull;

/**
Expand All @@ -41,13 +39,13 @@ abstract class AbstractAsynchronousSingleOperator<T, R> extends AbstractNoHandle

@Override
final void handleSubscribe(Subscriber<? super R> subscriber,
ContextMap contextMap, AsyncContextProvider contextProvider) {
CapturedContext capturedContext, AsyncContextProvider contextProvider) {
// The AsyncContext needs to be preserved when ever we interact with the original Subscriber, so we wrap it here
// with the original contextMap. Otherwise some other context may leak into this subscriber chain from the other
// side of the asynchronous boundary.
final Subscriber<? super R> operatorSubscriber =
contextProvider.wrapSingleSubscriberAndCancellable(subscriber, contextMap);
contextProvider.wrapSingleSubscriberAndCancellable(subscriber, capturedContext);
final Subscriber<? super T> upstreamSubscriber = apply(operatorSubscriber);
original.delegateSubscribe(upstreamSubscriber, contextMap, contextProvider);
original.delegateSubscribe(upstreamSubscriber, capturedContext, contextProvider);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,21 @@
import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.CompletableSource;
import io.servicetalk.concurrent.internal.SequentialCancellable;
import io.servicetalk.context.api.ContextMap;

import javax.annotation.Nullable;

abstract class AbstractCompletableAndSingleConcatenated<T> extends AbstractNoHandleSubscribeSingle<T> {

@Override
protected void handleSubscribe(final Subscriber<? super T> subscriber,
final ContextMap contextMap, final AsyncContextProvider contextProvider) {
final Subscriber<? super T> wrappedSubscriber = contextProvider.wrapSingleSubscriber(subscriber, contextMap);
delegateSubscribeToOriginal(wrappedSubscriber, contextMap, contextProvider);
final CapturedContext capturedContext, final AsyncContextProvider contextProvider) {
final Subscriber<? super T> wrappedSubscriber =
contextProvider.wrapSingleSubscriber(subscriber, capturedContext);
delegateSubscribeToOriginal(wrappedSubscriber, capturedContext, contextProvider);
}

abstract void delegateSubscribeToOriginal(Subscriber<? super T> offloadSubscriber,
ContextMap contextMap, AsyncContextProvider contextProvider);
CapturedContext capturedContext, AsyncContextProvider contextProvider);

abstract static class AbstractConcatWithSubscriber<T> implements Subscriber<T>, CompletableSource.Subscriber {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package io.servicetalk.concurrent.api;

import io.servicetalk.context.api.ContextMap;

import static java.util.Objects.requireNonNull;

abstract class AbstractMergeCompletableOperator<T extends CompletableMergeSubscriber>
Expand All @@ -30,15 +28,15 @@ abstract class AbstractMergeCompletableOperator<T extends CompletableMergeSubscr
}

@Override
final void handleSubscribe(Subscriber subscriber, ContextMap contextMap,
final void handleSubscribe(Subscriber subscriber, CapturedContext capturedContext,
AsyncContextProvider contextProvider) {
// The AsyncContext needs to be preserved when ever we interact with the original Subscriber, so we wrap it here
// with the original contextMap. Otherwise some other context may leak into this subscriber chain from the other
// side of the asynchronous boundary.
final Subscriber operatorSubscriber =
contextProvider.wrapCompletableSubscriberAndCancellable(subscriber, contextMap);
contextProvider.wrapCompletableSubscriberAndCancellable(subscriber, capturedContext);
T mergeSubscriber = apply(operatorSubscriber);
original.delegateSubscribe(mergeSubscriber, contextMap, contextProvider);
original.delegateSubscribe(mergeSubscriber, capturedContext, contextProvider);
doMerge(mergeSubscriber);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.internal.DelayedCancellable;
import io.servicetalk.context.api.ContextMap;

/**
* A {@link Completable} created from a {@link Publisher}.
Expand All @@ -40,12 +39,12 @@ abstract class AbstractPubToCompletable<T> extends AbstractNoHandleSubscribeComp

@Override
final void handleSubscribe(final Subscriber subscriber,
final ContextMap contextMap, final AsyncContextProvider contextProvider) {
final CapturedContext capturedContext, final AsyncContextProvider contextProvider) {
// We are now subscribing to the original Publisher chain for the first time, wrap Subscription to preserve the
// context.
PublisherSource.Subscriber<? super T> wrappedSubscriber =
contextProvider.wrapSubscription(newSubscriber(subscriber), contextMap);
source.delegateSubscribe(wrappedSubscriber, contextMap, contextProvider);
contextProvider.wrapSubscription(newSubscriber(subscriber), capturedContext);
source.delegateSubscribe(wrappedSubscriber, capturedContext, contextProvider);
}

abstract static class AbstractPubToCompletableSubscriber<T> extends DelayedCancellable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.PublisherSource.Subscription;
import io.servicetalk.context.api.ContextMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -38,12 +37,12 @@ abstract class AbstractPubToSingle<T> extends AbstractNoHandleSubscribeSingle<T>

@Override
final void handleSubscribe(final Subscriber<? super T> subscriber,
final ContextMap contextMap, final AsyncContextProvider contextProvider) {
final CapturedContext capturedContext, final AsyncContextProvider contextProvider) {
// We are now subscribing to the original Publisher chain for the first time, wrap Subscription to preserve the
// context.
PublisherSource.Subscriber<? super T> wrappedSubscription =
contextProvider.wrapSubscription(newSubscriber(subscriber), contextMap);
source.delegateSubscribe(wrappedSubscription, contextMap, contextProvider);
contextProvider.wrapSubscription(newSubscriber(subscriber), capturedContext);
source.delegateSubscribe(wrappedSubscription, capturedContext, contextProvider);
}

abstract PublisherSource.Subscriber<T> newSubscriber(Subscriber<? super T> original);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.internal.ConcurrentSubscription;
import io.servicetalk.concurrent.internal.DuplicateSubscribeException;
import io.servicetalk.context.api.ContextMap;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -49,21 +48,21 @@ abstract class AbstractPublisherGroupBy<Key, T> extends AbstractNoHandleSubscrib
abstract static class AbstractGroupBySubscriber<Key, T> implements Subscriber<T> {
private boolean rootCancelled;
private final int queueLimit;
private final ContextMap contextMap;
private final CapturedContext capturedContext;
private final AsyncContextProvider contextProvider;
private final Map<Key, GroupMulticastSubscriber<Key, T>> groups;
private final GroupMulticastSubscriber<String, GroupedPublisher<Key, T>> target;
@Nullable
private Subscription subscription;

AbstractGroupBySubscriber(final Subscriber<? super GroupedPublisher<Key, T>> target, final int queueLimit,
final int initialCapacityForGroups, final ContextMap contextMap,
final int initialCapacityForGroups, final CapturedContext capturedContext,
final AsyncContextProvider contextProvider) {
this.queueLimit = queueLimit;
this.contextMap = contextMap;
this.capturedContext = capturedContext;
this.contextProvider = contextProvider;
this.target = new GroupMulticastSubscriber<>(this, "root");
this.target.subscriber(target, false, contextMap, contextProvider);
this.target.subscriber(target, false, capturedContext, contextProvider);
groups = new ConcurrentHashMap<>(initialCapacityForGroups);
}

Expand Down Expand Up @@ -98,7 +97,7 @@ final void onNext(Key key, @Nullable T t) {
} else {
groupSub = new GroupMulticastSubscriber<>(this, key);
GroupedPublisher<Key, T> groupedPublisher = new DefaultGroupedPublisher<>(key, groupSub,
contextMap, contextProvider);
capturedContext, contextProvider);
final GroupMulticastSubscriber<Key, T> oldVal = groups.put(key, groupSub);
assert oldVal == null; // concurrent onNext not allowed, collision not expected.
groupSub.onNext(t); // deliver to group first to avoid re-entry creating ordering issues.
Expand Down Expand Up @@ -165,16 +164,16 @@ public String toString() {
}

void subscriber(final Subscriber<? super T> subscriber, final boolean triggerOnSubscribe,
final ContextMap contextMap, final AsyncContextProvider contextProvider) {
final CapturedContext capturedContext, final AsyncContextProvider contextProvider) {
// The root Subscriber's downstream subscriber is set internally, so no need for atomic operation to filter
// duplicates.
if (!triggerOnSubscribe) {
assert this.subscriber == null && ctxSubscriber == null;
this.subscriber = subscriber;
ctxSubscriber = contextProvider.wrapPublisherSubscriber(subscriber, contextMap);
ctxSubscriber = contextProvider.wrapPublisherSubscriber(subscriber, capturedContext);
} else if (subscriberStateUpdater.compareAndSet(this, 0, 1)) {
this.subscriber = subscriber;
ctxSubscriber = contextProvider.wrapPublisherSubscriber(subscriber, contextMap);
ctxSubscriber = contextProvider.wrapPublisherSubscriber(subscriber, capturedContext);
triggerOnSubscribe();
} else {
// this.subscriber may be null (we set the subscriber variable after subscriberStateUpdater),
Expand Down Expand Up @@ -215,14 +214,14 @@ int outstandingDemandLimit() {
private static final class DefaultGroupedPublisher<Key, T> extends GroupedPublisher<Key, T>
implements PublisherSource<T> {
private final GroupMulticastSubscriber<Key, T> groupSink;
private final ContextMap contextMap;
private final CapturedContext capturedContext;
private final AsyncContextProvider contextProvider;

DefaultGroupedPublisher(final Key key, final GroupMulticastSubscriber<Key, T> groupSink,
final ContextMap contextMap, final AsyncContextProvider contextProvider) {
final CapturedContext capturedContext, final AsyncContextProvider contextProvider) {
super(key);
this.groupSink = groupSink;
this.contextMap = contextMap;
this.capturedContext = capturedContext;
this.contextProvider = contextProvider;
}

Expand All @@ -233,7 +232,7 @@ public void subscribe(final Subscriber<? super T> subscriber) {

@Override
protected void handleSubscribe(Subscriber<? super T> sub) {
groupSink.subscriber(sub, true, contextMap, contextProvider);
groupSink.subscriber(sub, true, capturedContext, contextProvider);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package io.servicetalk.concurrent.api;

import io.servicetalk.context.api.ContextMap;

/**
* Base class for all {@link Completable}s that are created with already realized result and does not generate result
* asynchronously.
Expand All @@ -25,10 +23,10 @@ abstract class AbstractSynchronousCompletable extends AbstractNoHandleSubscribeC

@Override
final void handleSubscribe(Subscriber subscriber,
ContextMap contextMap, AsyncContextProvider contextProvider) {
CapturedContext capturedContext, AsyncContextProvider contextProvider) {
// We need to wrap the Subscriber to save/restore the AsyncContext on each operation or else the AsyncContext
// may leak from another thread.
doSubscribe(contextProvider.wrapCompletableSubscriber(subscriber, contextMap));
doSubscribe(contextProvider.wrapCompletableSubscriber(subscriber, capturedContext));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package io.servicetalk.concurrent.api;

import io.servicetalk.context.api.ContextMap;

import static java.util.Objects.requireNonNull;

/**
Expand All @@ -41,7 +39,7 @@ abstract class AbstractSynchronousCompletableOperator extends AbstractNoHandleSu

@Override
final void handleSubscribe(Subscriber subscriber,
ContextMap contextMap, AsyncContextProvider contextProvider) {
original.delegateSubscribe(apply(subscriber), contextMap, contextProvider);
CapturedContext capturedContext, AsyncContextProvider contextProvider) {
original.delegateSubscribe(apply(subscriber), capturedContext, contextProvider);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package io.servicetalk.concurrent.api;

import io.servicetalk.context.api.ContextMap;

/**
* Base class for all {@link Publisher}s that are created with already realized values and do not generate values
* asynchronously.
Expand All @@ -27,10 +25,10 @@ abstract class AbstractSynchronousPublisher<T> extends AbstractNoHandleSubscribe

@Override
final void handleSubscribe(Subscriber<? super T> subscriber,
ContextMap contextMap, AsyncContextProvider contextProvider) {
CapturedContext capturedContext, AsyncContextProvider contextProvider) {
// We need to wrap the Subscriber to save/restore the AsyncContext on each operation or else the AsyncContext
// may leak from another thread.
doSubscribe(contextProvider.wrapPublisherSubscriber(subscriber, contextMap));
doSubscribe(contextProvider.wrapPublisherSubscriber(subscriber, capturedContext));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package io.servicetalk.concurrent.api;

import io.servicetalk.context.api.ContextMap;

import static java.util.Objects.requireNonNull;

/**
Expand All @@ -43,7 +41,7 @@ abstract class AbstractSynchronousPublisherOperator<T, R> extends AbstractNoHand

@Override
final void handleSubscribe(Subscriber<? super R> subscriber,
ContextMap contextMap, AsyncContextProvider contextProvider) {
original.delegateSubscribe(apply(subscriber), contextMap, contextProvider);
CapturedContext capturedContext, AsyncContextProvider contextProvider) {
original.delegateSubscribe(apply(subscriber), capturedContext, contextProvider);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package io.servicetalk.concurrent.api;

import io.servicetalk.context.api.ContextMap;

/**
* Base class for all {@link Single}s that are created with already realized result and does not generate result
* asynchronously.
Expand All @@ -27,10 +25,10 @@ abstract class AbstractSynchronousSingle<T> extends AbstractNoHandleSubscribeSin

@Override
final void handleSubscribe(Subscriber<? super T> subscriber,
ContextMap contextMap, AsyncContextProvider contextProvider) {
CapturedContext capturedContext, AsyncContextProvider contextProvider) {
// We need to wrap the Subscriber to save/restore the AsyncContext on each operation or else the AsyncContext
// may leak from another thread.
doSubscribe(contextProvider.wrapSingleSubscriber(subscriber, contextMap));
doSubscribe(contextProvider.wrapSingleSubscriber(subscriber, capturedContext));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package io.servicetalk.concurrent.api;

import io.servicetalk.context.api.ContextMap;

import static java.util.Objects.requireNonNull;

/**
Expand All @@ -43,7 +41,7 @@ abstract class AbstractSynchronousSingleOperator<T, R> extends AbstractNoHandleS

@Override
final void handleSubscribe(Subscriber<? super R> subscriber,
ContextMap contextMap, AsyncContextProvider contextProvider) {
original.delegateSubscribe(apply(subscriber), contextMap, contextProvider);
CapturedContext capturedContext, AsyncContextProvider contextProvider) {
original.delegateSubscribe(apply(subscriber), capturedContext, contextProvider);
}
}
Loading

0 comments on commit f0851de

Please sign in to comment.