Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit rate operator for streams #109

Closed
whyoleg opened this issue Oct 18, 2020 · 1 comment · Fixed by #118
Closed

Limit rate operator for streams #109

whyoleg opened this issue Oct 18, 2020 · 1 comment · Fixed by #118

Comments

@whyoleg
Copy link
Member

whyoleg commented Oct 18, 2020

Need to implement some limit rate operator for streams and channels, to improve flexibility of streams API.
F.e to support similar operator from reactor like Flux.limitRate or reactor-like prefetch logic: request new elements, when 75% of elements collected

@yschimke
Copy link
Member

Yep - just noticed that it's defaulting to the X + cancel, even for a take(3).

$ ./rsocket-cli --debug -r 3 wss://rsocket-demo.herokuapp.com/rsocket
11:34:30.215	NativePRNGNonBlocking is not found, fallback to SHA1PRNG
11:34:31.144	Send: 
Setup frame -> Stream Id: 0 Length: 52
Flags: 0b000000000 (M0R0L0)
Version: 1.0 Honor lease: false
Keep alive: interval=20.0s, max lifetime=90.0s
Data mime type: application/json
Metadata mime type: application/json
Data: Empty
11:34:31.171	Send: 
RequestStream frame -> Stream Id: 1 Length: 13
Flags: 0b100000000 (M1F0C0N0)
Initial request: 64
Metadata: Empty
Data: Empty
11:34:31.263	Receive: 
Payload frame -> Stream Id: 1 Length: 19
Flags: 0b000100000 (M0F0C0N1)
Data(length=13):
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 31 36 30 33 30 31 37 32 37 31 35 38 39          |1603017271589                      |
+--------+-------------------------------------------------+----------------+
1603017271589
11:34:31.312	Receive: 
Payload frame -> Stream Id: 1 Length: 19
Flags: 0b000100000 (M0F0C0N1)
Data(length=13):
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 31 36 30 33 30 31 37 32 37 31 36 33 39          |1603017271639                      |
+--------+-------------------------------------------------+----------------+
1603017271639
11:34:31.362	Receive: 
Payload frame -> Stream Id: 1 Length: 19
Flags: 0b000100000 (M0F0C0N1)
Data(length=13):
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 31 36 30 33 30 31 37 32 37 31 36 38 39          |1603017271689                      |
+--------+-------------------------------------------------+----------------+
11:34:31.364	Send: 
Cancel frame -> Stream Id: 1 Length: 6
Flags: 0b000000000 ()
1603017271689

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants