Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streams with flexible requestN semantic #118

Merged
merged 2 commits into from
Dec 4, 2020
Merged

Conversation

whyoleg
Copy link
Member

@whyoleg whyoleg commented Nov 6, 2020

@whyoleg
Copy link
Member Author

whyoleg commented Nov 6, 2020

@yschimke
What do you think about it?
I've found, that it's really not so easy to integrate better backpressure control using just Flow and current RSocket interface. So I created new ReactiveFlow inherited from Flow (naming is TBD). And because only incoming Flow should be controlled by requestN (so on requester side it's requestStream and requestChannel output and on responder side requestChannel input), and for other cases it's not needed, and will just add complexite, I've decided to split RSocket interface for requester and responder.
So, do you think, that this approach is OK, and what do you think about RequestStrategy API?

and yes, it's still WIP, need more tests and fixes may be, but overall works

@yschimke
Copy link
Member

yschimke commented Nov 7, 2020

@whyoleg I think I'm the wrong reviewer for this. You should try to get some of the Pivotal folk with more experience across rsocket-java and spring reactor. There have been a lot of discussions in the past on reactors infinite and cancel defaults and limits, and I have simple requirements to be honest now.

In FB days it was more relevant to me since there was a polling mechanism in the background and a real cost to every additional emmitted result, but infinite and cancel is mostly fine for what I'm doing now e.g. rsocket-demo with a stream of tweets (line separated http response) or with my own backends being purely event based anyway.

lateinit var server: Job

lateinit var payload: Payload
lateinit var payloadsFlow: Flow<Payload>

fun payloadCopy(): Payload = payload.copy()
Copy link
Member

@yschimke yschimke Nov 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Outside this review...

This sort of pattern, reuse of a payload probably deserves some nicer more natural mechanism. This is likely a source of bugs under error conditions which will be missed on the happy path. How do we improve this? e.g. payload.permanent()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and how payload.permanent() will work? have just possibility to reuse data? and release after all?
@altavir has much experience working with kotlin with IO, and now researching adding rsocket-kotlin to his own projects. Maybe hi have some ideas on better payload handling?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just from my side. We do allocate a ByteBuf once, and then retain it every time we use it in benchmarks. Sounds like a copy but not really, thus not sure Kotlin IO can do the same

@yschimke
Copy link
Member

yschimke commented Nov 7, 2020

Or maybe @elizarov and work out what the extension methods to ReactiveFlow should be. I think ideally none new inside rsocket-kotlin. We would just react to the usage and contract of ReactiveFlow (edit: I thought this was part of kotlin, later answer corrects this). And use concepts the end users are already familiar with externally.

IIRC That was essentially the decision with rsocket-java and spring reactor. Not to build our own terminology, but to keep using the operators from the underlying framework.

@whyoleg
Copy link
Member Author

whyoleg commented Nov 9, 2020

The idea behind ReactiveFlow is to just store some RequestStrategy which will allow for better control of requestN frames and in current implementation, it has only one realization PrefetchStrategy(requestSize, requestOn) - which will request requestSize elements, when requestOn elements are left for collecting in Flow. So, the simplest one.
Im not sure, that something else should be added for now. From user perspective, it's even may be not needed, and default strategy (requestSize=64, requestOn=16) will be enough.

We need to add such logic, with one new type (ReactiveFlow), as Flow only works with collecting one by one, and even take(5) will work as collect by one until 5 elements collected.

@yschimke
Copy link
Member

yschimke commented Nov 9, 2020

I'm not a huge fan of defining our own ReactiveFlow

@elizarov
Copy link

If all you need is to store a request strategy, then I can suggest looking into using a context for this kind of storage. This way you can have a sensible default that will add zero overhead (and zero new concepts) and rely on the existing context-passing support in both Reactor and Kotlin Flow to change this request strategy if needed.

@whyoleg
Copy link
Member Author

whyoleg commented Nov 10, 2020

@elizarov cool idea, I will try to do that! thx!

@whyoleg whyoleg force-pushed the enhancement/streams-api branch from b53b1a9 to d5d1928 Compare November 23, 2020 19:33
@whyoleg whyoleg marked this pull request as ready for review November 24, 2020 13:16
@OlegDokuka
Copy link
Member

as discussed, lets split these changes into two PRs:

  1. For RequestN via flowOn
  2. RSocket interface API changes

@whyoleg whyoleg force-pushed the enhancement/streams-api branch from d5d1928 to b34de79 Compare November 25, 2020 19:58
@whyoleg whyoleg force-pushed the enhancement/streams-api branch from dfb7f7b to 3c82f44 Compare November 26, 2020 12:13
Copy link
Member

@OlegDokuka OlegDokuka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with some minor comments

private val requestSize: Int,
private val requestOn: Int,
) : RequestStrategy.Element {
private val requested = atomic(requestSize)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering why do we need atomic here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it needed to allow mutation on K/N if we freeze. need to check, may be it's really not needed, here.
On K/JVM and K/JS it's not needed, as functions inside RequestStrategy.Element will never be called concurrently

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, please double-check. In general, it looks like all the requests will be done within the same context, hence the context should do all the stuff to keep seriality, hence no atomic is needed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

atomic removed, not needed, thx for pointing!

override suspend fun nextRequest(): Int {
if (requested.decrementAndGet() != requestOn) return 0

requested += requestSize
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but here, this atomic is used in non-atomic way, Am I missing something?

Copy link
Member

@OlegDokuka OlegDokuka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 🔥

@whyoleg whyoleg merged commit 8784c39 into master Dec 4, 2020
@whyoleg whyoleg deleted the enhancement/streams-api branch December 4, 2020 14:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Limit rate operator for streams
4 participants