-
Notifications
You must be signed in to change notification settings - Fork 607
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimize the **Java** Flow
interop
#3130
Conversation
baa16a0
to
ae7684a
Compare
core/jvm/src/main/scala/fs2/interop/flow/StreamSubscription.scala
Outdated
Show resolved
Hide resolved
ae7684a
to
f52d5d2
Compare
f52d5d2
to
a3b414d
Compare
core/jvm/src/main/scala/fs2/interop/flow/StreamSubscription.scala
Outdated
Show resolved
Hide resolved
1ce5636
to
57294b0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some questions:
- Was just moving the files enough for cross-compiling?
- For some reason Scala 3 didn't like my covariant
State
so I had to make it invariant which was a real PITA. - I guess the
ClassTag
change is a non-go... but then, what should we do?
|
Flow
interopFlow
interop
core/shared/src/main/scala/fs2/interop/flow/StreamSubscriber.scala
Outdated
Show resolved
Hide resolved
core/jvm/src/main/scala/fs2/interop/flow/StreamSubscription.scala
Outdated
Show resolved
Hide resolved
core/jvm/src/main/scala/fs2/interop/flow/StreamSubscription.scala
Outdated
Show resolved
Hide resolved
core/jvm/src/main/scala/fs2/interop/flow/StreamSubscription.scala
Outdated
Show resolved
Hide resolved
override def request(n: Long): Unit = | ||
if (canceled.get() ne null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this canceled
check really doing anything useful?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, yes it is, this comment would almost be more helpful here.
fs2/core/jvm/src/main/scala/fs2/interop/flow/StreamSubscription.scala
Lines 124 to 129 in 1f6bc98
// According to the spec, it's acceptable for a concurrent cancel to not | |
// be processed immediately, but if you have synchronous | |
// `cancel(); request()`, | |
// then the request must be a NOOP. | |
// See https://github.com/zainab-ali/fs2-reactive-streams/issues/29 | |
// and https://github.com/zainab-ali/fs2-reactive-streams/issues/46 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, that was the original place of the comment, I just polished it a little.
I can move it if you prefer, the thing is that it also affects cancel
, and well both methods are close together.
core/jvm/src/main/scala/fs2/interop/flow/StreamSubscriber.scala
Outdated
Show resolved
Hide resolved
Those comments justify that having an Array inside the State AtomicReference is safe; since they will never be concurrently modified.
232fab8
to
3eae324
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The diff was horrible, but when viewed standalone the new implementations are quite elegant. Thanks for bearing with me 😇
.repeatEval(dequeue1) | ||
.unNoneTerminate | ||
.unchunks | ||
.asInstanceOf[Stream[F, A]] // This cast is safe, since all elements come from onNext(a: A). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tossed and turned on this for several nights but in the end I think it's a win. Besides making it easy to use the Array
-based buffer, it also allowed us to remove the type parameter from the state machine. Basically, in this unusual case I think threading the type parameter through the implementation was less ergonomic than "pre-erasing" it.
Follow up of #3102
StreamSubscription
StreamSubscriber
Cross compile(will be done in a separate PR WIP: Cross-compileinterop.flow
#3160)