Skip to content

Commit

Permalink
review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
bondolo committed Feb 11, 2022
1 parent 1f9d0ce commit ab54e30
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 207 deletions.
14 changes: 8 additions & 6 deletions docs/modules/ROOT/pages/blocking-safe-by-default.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,23 @@ often either naive hence sub-optimal or fairly complex. ServiceTalk internally d
it does not offload more than what is required. In other words, it reduces offloading when it can determine that no user
code can interact with the event loop on a certain path.

[#execution-strategy]
=== Execution Strategy

The primary purpose of an link:{source-root}/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ExecutionStrategy.java[Execution Strategy] is to define which interaction paths require offloading.

For example, at the xref:{page-version}@servicetalk-http-api::blocking-safe-by-default.adoc[HTTP] transport layer
(no application level protocol), four offload paths are available:
The primary purpose of an link:{source-root}/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ExecutionStrategy.java[Execution Strategy]
is to define which interaction paths of a particular transport or protocol layer require offloading. For
example, at the xref:{page-version}@servicetalk-http-api::blocking-safe-by-default.adoc[HTTP] transport layer, four
offload paths are available:

. Sending data to the transport.
. Receiving data from the transport.
. Handling transport events.
. Closing the transport.

A given application, filter or component may indicate that it requires offloading for none, some or all of these
interactions. Other transports or interfaces may define a specific execution strategy which is specific to the
interaction paths they provide.
interactions. Protocols other than HTTP will have their own APIs that would otherwise execute user code on an
`EventLoop` thread and define their own `ExecutionStrategy` to control which of those interaction paths APIs are
subject to offloading.

[#influencing-offloading-decisions]
=== Influencing offloading decisions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
* xref:{page-version}@servicetalk-concurrent-api::blocking-safe-by-default.adoc[Blocking safe by default]
** xref:{page-version}@servicetalk-concurrent-api::blocking-implementation.adoc[Implementation details]
* xref:{page-version}@servicetalk-concurrent-api::async-context.adoc[Async Context]
* xref:{page-version}@servicetalk-concurrent-api::pitfalls.adoc[Concurrency Pitfalls]
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ endif::[]

= Blocking safe by default (Implementation Details)

As described xref:{page-version}@servicetalk-concurrent-api::blocking-safe-by-default.adoc[here], ServiceTalk by default
allows users to write blocking code when interacting with ServiceTalk. This document describes the details of the
implementation and is addressed to audiences who intend to know the internals of how this is achieved.
As described xref:{page-version}@servicetalk-concurrent-api::blocking-safe-by-default.adoc[here], ServiceTalk, by
default, allows users to write blocking code when interacting with ServiceTalk. This document describes the details of
the implementation and is addressed to audiences who intend to know the internals of how this is achieved.

NOTE: It is not required to read this document if you just want to use ServiceTalk.

Expand All @@ -26,15 +26,16 @@ threads less error-prone, and also allows for certain optimizations around threa

An asynchronous source has two important decisions to make about thread usage:

1. Which thread or executor will be used to do the actual work related to a source. eg: for an HTTP client, the work is to send an HTTP
request and read the HTTP response.
1. Which thread or executor will be used to do the actual task related to a source. eg: for an HTTP client, the task
is to send an HTTP request and read the HTTP response.
2. Which thread or executor will be used to interact with the `Subscriber` corresponding to its `Subscription`s.

In sometimes a single specific thread will be used but in most cases a thread from an `Executor` pool will be used.

Part 1. above is not governed by the
link:https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#specification[ReactiveStreams specification]
and hence sources are free to use any thread. ServiceTalk typically will use Netty's `EventLoop` to do the actual work.
and hence sources are free to use any thread. ServiceTalk typically will use Netty's `EventLoop` to perform the actual
task.
Part 2. defines all the interactions using the ReactiveStreams specifications, i.e. all methods in `Publisher`,
`Subscriber` and `Subscription`.

Expand All @@ -45,20 +46,19 @@ ServiceTalk concurrency APIs defines which thread will be used by any asynchrono
ServiceTalk uses Netty for network I/O and the Netty implementation uses a fixed number of I/O threads for executing
network operations. To maximize throughput and minimize latency it is important to ensure that at least one I/O thread
is always ready to respond immediately to network activity. One approach to ensure I/O thread availability is to
carefully limit the scope of work done by I/O threads and, when practical, perform any other necessary work on some
other non-precious thread. Moving work from I/O threads to other threads is called “offloading” and is a core technique
used by ServiceTalk.
carefully limit the scope of work done by I/O threads and, whenever practical, delegate any other necessary tasks that
are not related to I/O to some other thread. Moving tasks from I/O threads to other threads is called “offloading” and
is a core technique used by ServiceTalk.

ServiceTalk will, by default, execute most application code on an offloaded “safe” thread. For efficiency, some
interfaces are invoked synchronously and must not block. Usually these interfaces are functional interfaces that are
meant to be low–overhead pure functions. These interfaces are explictly about the non-blocking requirement. Examples
of application code which must not block and may execute on event loop thread include the `Predicate` provided to the `appendServiceFilter(Predicate<StreamingHttpRequest>,
StreamingHttpServiceFilterFactory)` method of link:{source-root}/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpServerBuilder.java[HttpServerBuilder].
ServiceTalk will, by default, execute most application code on threads other than the Netty I/O threads. There are some
situations where offloading may not be applied; where APIs are synchronous, assumed unlikely to block, and, therefore,
not specifying offloading characteristics simplifies the ServiceTalk APIs. For example, the `Predicate` provided to the `appendServiceFilter(Predicate<StreamingHttpRequest>,
StreamingHttpServiceFilterFactory)` method of link:{source-root}/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpServerBuilder.java[HttpServerBuilder] by an application must not block and may be executed on an I/O thread.

For most invocations of application code, if the application developer knows that their code cannot block and always
executes quickly in near constant time they can request that ServiceTalk not offload their code. This will improve
application performance by reducing latency and overhead. Requests to not offload will be honored by ServiceTalk if all
the other components in the same execution path have also opted out of offloading. As a last resort, work tasks may also
the other components in the same execution path have also opted out of offloading. As a last resort, tasks may also
be queued to be performed as threads are available.

ServiceTalk is designed to be fully asynchronous except where the API provides explicit blocking behaviour as a
Expand All @@ -77,47 +77,84 @@ threads.

== Offloading and asynchronous sources

ServiceTalk allows control of the thread that will be used for the signals of an asynchronous source. A safe
used based upon the decision logic described earlier but for time–consuming tasks an `Executor` may be specified.
The `subscribeOn(Executor)` and `publishOn(Executor)` operators will offload execution from the default thread to a
thread from the provided `Executor`. The below diagram illustrates the interaction between an asynchronous source, its
`Subscriber`, its operators, and the `Executor`.
ServiceTalk uses the `link:{source-root}/servicetalk-http-api/src/main/java/io/servicetalk/concurrent/api/Executor.java[Executor]`
abstraction to specify the source of threads to be used for the delivery of signals from an asynchronous source. The
default signal offloading, if any, used by an asynchronous source is determined by the source. For exampe, the HTTP
sources, in addition to allowing for specification of an offloading executor, provide both direct control of the
offloading via
`xref:{page-version}@servicetalk-concurrent-api::blocking-safe-by-default.adoc##execution-strategy[ExecutionStrategy]`
and may also influenced by the
xref:{page-version}@servicetalk-concurrent-api::blocking-safe-by-default.adoc#influencing-offloading-decisions[computed execution strategy].

Applications with asynchronous, blocking or computationally expensive tasks can also offload those tasks to specific `Executor`.
The `subscribeOn(Executor)` and `publishOn(Executor)` operators will cause offloading execution from the default signal
delivery thread thread to a thread from the provided `Executor`. The below diagram illustrates the interaction between
an asynchronous source, its `Subscriber`, its operators, and the `Executor`.

image::offloading.svg[Offloading]

During `subscribe()` the execution will offload at the `subscribeOn()` operator and transition execution from the
application thread to an `Exeuctor` thread. The application thread will be able to continue while the subscribe
operation asynchronously continues on an `Executor` thread.

When a result is available at the source it will begin
publication using the receiving event loop thread but will offload at the `publishOn()` operator and transition
execution from the event loop thread to an `Executor` thread. Once the publish signal is offloaded the event loop thread
will be available again for executing other I/O tasks while the response is asynchronously processed on the `Executor`
thread.
When a result is available at the source it will begin publication using the receiving event loop thread but will
offload at the `publishOn()` operator and transition execution from the event loop thread to an `Executor` thread. Once
the publish signal is offloaded the event loop thread will be available again for executing other I/O tasks while the
response is asynchronously processed on the `Executor` thread.

[source, java]
----
Collection<Integer> Publisher.range(1, 10) <2> <4>
.map(element -> element) // non-offloaded NO-OP
.publishOn(publishExecutor)
.map(element -> element) // offloaded NO-OP
.subscribeOn(subscribeExecutor)
.toFuture() <3>
.get(); <1>
----
<1> `toFuture().get()` will do a `subscribe(Subscriber)`. This flows up the operator chain, `subscribeOn` (offload onto `subscribeExecutor` thread) -> `map` -> `publishOn` -> `map` -> `Range`.
<2> `Range` will call `Subscriber.onSubscribe(Subscription)` on the `Subscriber`. This flows back down the operator chain, `Range` -> `map` -> `publishOn` (offloads to `publishExecutor` thread) -> `map` -> `subscribeOn` -> `toFuture`.
<3> `toFuture()` will call `Subscription.request(Long.MAX_VALUE)`. This flows up the operator chain, `subscribeOn` (offloads onto `subscribeOn` thread) -> `map` -> `publishOn` -> `map` -> `Range`.
<4> `Range` will do `onNext(element)` for 1-10 items synchronously (on a thread from `subscribeExecutor`). Each `onNext` flows back down the operator chain, `Range` -> `map` -> `publishOn` (offloads to thread from `publishExecutor`) -> `map` -> `subscribeOn` -> `toFuture`.

Taking the same example from xref:{page-version}@servicetalk-concurrent-api::blocking-safe-by-default.adoc[here]
This example can be expanded to demonstrate the offloading behavior directly. The expanded example extends the NO-OP
`map` implementations to reveal the active thread during their execution. To show the active thread at the other
points described in the callouts the expanded example also adds `whenOnSubscribe`, `whenRequest` and `liftSync`
operations in the operator chain.

[source, java]
----
client.request() # <1>
.map(resp -> {
return resp.toString(); # <2>
})
.publishOn(executor) # <3>
.flatMap(stringResp -> { # <4>
return client2.request(stringResp);
})
.filter(stringResp -> {
stringResp.equals("Hello World"); # <5>
});
Publisher.range(1, 10)
.whenOnSubscribe(subscription -> {
System.out.println("\nonSubscribe starts on " + Thread.currentThread());
})
.map(element -> {
System.out.println("\nPublish starts on " + Thread.currentThread() + " Received : " + element);
return element;
})
.publishOn(publishExecutor)
.map(element -> {
System.out.println("\nPublish offloaded to " + Thread.currentThread() + " Received : " + element);
return element;
})
.whenOnSubscribe(subscription -> {
System.out.println("\nonSubscribe offloaded to " + Thread.currentThread());
})
.whenRequest(request -> {
System.out.println("\nrequest(" + request + ") offloaded to " + Thread.currentThread());
})
.liftSync(subscriber -> {
System.out.println("\nSubscribe offloaded to " + Thread.currentThread());
return subscriber;
})
.subscribeOn(subscribeExecutor)
.liftSync(subscriber -> {
System.out.println("\nSubscribe begins on " + Thread.currentThread());
return subscriber;
})
.whenRequest(request -> {
System.out.println("\nrequest(" + request + ") starts on " + Thread.currentThread());
})
.toFuture()
.get();
----
<1> A hypothetical client which provides a `request()` method that returns a `Single<Response>`.
<2> Converting the response to a `String`.
<3> Offload execution to the provided `Executor`
<4> Call another `client2` that provides a new `Single` which is returned from `flatMap`.
<5> Only allow "Hello World" messages to be emitted.

In the above example the operators `map` and `filter` do not need to offload since they do not do
any asynchronous work. However, `flatmap` should be offloaded since it asynchronously executes another request. Adding
a `publishOn(Executor)` operator before `flatMap` ensures that the event loop thread which delivered the original
response is not blocked.
Loading

0 comments on commit ab54e30

Please sign in to comment.