-
Notifications
You must be signed in to change notification settings - Fork 36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Streams with flexible requestN semantic #118
Conversation
whyoleg
commented
Nov 6, 2020
•
edited
Loading
edited
- Reworked streams API using coroutine context element RequestStrategy for better control over requestN frames
- fixes Limit rate operator for streams #109
@yschimke and yes, it's still WIP, need more tests and fixes may be, but overall works |
@whyoleg I think I'm the wrong reviewer for this. You should try to get some of the Pivotal folk with more experience across rsocket-java and spring reactor. There have been a lot of discussions in the past on reactors infinite and cancel defaults and limits, and I have simple requirements to be honest now. In FB days it was more relevant to me since there was a polling mechanism in the background and a real cost to every additional emmitted result, but infinite and cancel is mostly fine for what I'm doing now e.g. rsocket-demo with a stream of tweets (line separated http response) or with my own backends being purely event based anyway. |
lateinit var server: Job | ||
|
||
lateinit var payload: Payload | ||
lateinit var payloadsFlow: Flow<Payload> | ||
|
||
fun payloadCopy(): Payload = payload.copy() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Outside this review...
This sort of pattern, reuse of a payload probably deserves some nicer more natural mechanism. This is likely a source of bugs under error conditions which will be missed on the happy path. How do we improve this? e.g. payload.permanent()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and how payload.permanent()
will work? have just possibility to reuse data? and release after all?
@altavir has much experience working with kotlin with IO, and now researching adding rsocket-kotlin to his own projects. Maybe hi have some ideas on better payload handling?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just from my side. We do allocate a ByteBuf once, and then retain it every time we use it in benchmarks. Sounds like a copy but not really, thus not sure Kotlin IO can do the same
Or maybe @elizarov and work out what the extension methods to ReactiveFlow should be. I think ideally none new inside rsocket-kotlin. We would just react to the usage and contract of IIRC That was essentially the decision with rsocket-java and spring reactor. Not to build our own terminology, but to keep using the operators from the underlying framework. |
The idea behind We need to add such logic, with one new type ( |
I'm not a huge fan of defining our own ReactiveFlow |
If all you need is to store a request strategy, then I can suggest looking into using a context for this kind of storage. This way you can have a sensible default that will add zero overhead (and zero new concepts) and rely on the existing context-passing support in both Reactor and Kotlin Flow to change this request strategy if needed. |
@elizarov cool idea, I will try to do that! thx! |
b53b1a9
to
d5d1928
Compare
as discussed, lets split these changes into two PRs:
|
d5d1928
to
b34de79
Compare
…for better control over requestN frames
dfb7f7b
to
3c82f44
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with some minor comments
private val requestSize: Int, | ||
private val requestOn: Int, | ||
) : RequestStrategy.Element { | ||
private val requested = atomic(requestSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just wondering why do we need atomic here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it needed to allow mutation on K/N if we freeze. need to check, may be it's really not needed, here.
On K/JVM and K/JS it's not needed, as functions inside RequestStrategy.Element
will never be called concurrently
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, please double-check. In general, it looks like all the requests will be done within the same context, hence the context should do all the stuff to keep seriality, hence no atomic is needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
atomic removed, not needed, thx for pointing!
override suspend fun nextRequest(): Int { | ||
if (requested.decrementAndGet() != requestOn) return 0 | ||
|
||
requested += requestSize |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but here, this atomic is used in non-atomic way, Am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM 🔥