-
Notifications
You must be signed in to change notification settings - Fork 426
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Simplified and refactored Netty-ReactiveStreams integration #3636
Conversation
private[netty] class SimpleSubscriber(contentLength: Option[Int]) extends PromisingSubscriber[Array[Byte], HttpContent] { | ||
private[netty] class SimpleSubscriber(contentLength: Option[Long]) extends PromisingSubscriber[Array[Byte], HttpContent] { | ||
// These don't need to be volatile as Reactive Streams guarantees that onSubscribe/onNext/onError/onComplete are | ||
// called serially (https://github.com/reactive-streams/reactive-streams-jvm?tab=readme-ov-file#1-publisher-code - rule 3) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: Isn't the serial guarantee a different thing than memory barrier concerns? We have a guarantee that there will be no parallel access to these values, but they can still happen on different threads, thus a change performed on thread may not be always immediately visible to another thread?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIU, it includes memory visibility safety (the "happens-before relationship" which is a term also used by the JMM in the context of synchronized
blocks and volatile
semantics):
The definition of "serially" says:
In the context of a Signal, non-overlapping. In the context of the JVM, calls to methods on an object are serial if and only if there is a happens-before relationship between those calls (implying also that the calls do not overlap). When the calls are performed asynchronously, coordination to establish the happens-before relationship is to be implemented using techniques such as, but not limited to, atomics, monitors, or locks.
The way I see it, every "serial" guarantee must include proper happens-before relationship or it doesn't really guarantee anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understood this as a guarantee that calls to onNext, onComplete, etc. never overlap, so we are safe to perform check internal state and perform updates without worrying that these values will be changed in a parallel onNext call. I think you may be right regarding these guarantees extending to visibility, at least internally. For example, checking some other open source subscribers: https://github.com/playframework/netty-reactive-streams/blob/89aff754154a118ea134d5ca64cce3dc44c78fc6/netty-reactive-streams/src/main/java/org/playframework/netty/HandlerSubscriber.java#L84
Most mutable members are normal variables. The Subscription
and ChannelHandlerContext
are volatile, possibly because they are provided from the outside?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understood this as a guarantee that calls to onNext, onComplete, etc. never overlap, so we are safe to perform check internal state and perform updates without worrying that these values will be changed in a parallel onNext call.
Without proper happens-before relationship this "non-overlapping" alone doesn't really give you any safety at all because even though subsequent OK, I take it back - there actually is some value in it, e.g. like you said, even though you have to use volatiles, you can assume that noone else will modify these variables in the middle of your invocation.onNext
calls happen one after another in real time, they can still see some not-fully-thread-published broken state of a previous invocation, requiring the same synchronization as if they were concurrent.
Most mutable members are normal variables. The Subscription and ChannelHandlerContext are volatile, possibly because they are provided from the outside?
I think subscription
must be volatile
because it is accessed from ChannelDuplexHandler
handler methods. There is no happens-before relationship between onSubscribe
(where subscription
is set) and these methods.
There's a similar situation with ctx
, which is set by a Netty handler method but accessed in Subscriber
methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, the HandlerSubscriber
is a Netty channel handler, so its onNext
and other reactive methods are supposed to be called only from Netty handler context, which guarantees the same thread, thus visibility. In case of our SimpleSubscriber
it's similar - it is the Netty's underlying publisher that calls these methods, and totalLength
and buffers
are accessed only there, so I guess it's OK to remove volatile
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I guess this is the way Netty implements reactivestreams spec's requirements.
@@ -46,7 +53,7 @@ class InputStreamPublisher[F[_]](range: InputStreamRange, chunkSize: Int)(implic | |||
case _ => chunkSize | |||
} | |||
|
|||
val _ = monad | |||
runAsync(monad |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: I think we still should add a comment that in case of Id
this isn't really async and we are aware that this case violates reactive streams.
object RunAsync { | ||
type Id[A] = A | ||
|
||
final val Id: RunAsync[Id] = new RunAsync[Id] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
out of curiosity: anything better in final val Id
over object Id extends RunAsync[Id]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, no strong opinions.
It arguably generates somewhat simpler bytecode, i.e.
- no lazy initialization associated with
object
- no separate class associated with
object
(there's an anonymous class but it will probably be compiled to a lambda so it won't have a classfile)
It also works more naturally with type inference:
- when it's a
val
, type ofId
will be inferred asRunAsync[Id]
(unless explicitly requested to be typed asId.type
) - when it's an
object
, it will be inferred asId.type
, with API potentially extended overRunAsync[Id]
Neither the laziness nor introducing a subtype was my intention, so a val
is much closer to expressing exactly what I want, which is just having a plain implementation of RunAsync
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good, thanks :)
No description provided.