-
Notifications
You must be signed in to change notification settings - Fork 566
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
New implementation of PublisherInputStream that improves performance and fixes race conditions #1690
Conversation
…InputStream as it really implements a subscriber not a publisher. This new implementation fixes a couple of important problems: (1) It implements the ability to read into byte arrays, not just one byte a time for better performance and (2) It changes the implementation to avoid a race condition when accessing the data chunks held in CompletableFuture's. The race condition resulted in certain tests to hang if a thread raced and updated the value of the old 'processed' variable. Signed-off-by: Santiago Pericasgeertsen <santiago.pericasgeertsen@oracle.com>
How about to rename it to something completely different as it is not a publisher nor subscriber. public class FlowInputStream extends InputStream{
...
private class DataChunkSubscriber implements Flow.Subscriber<DataChunk>{
public void on...
public void on...
public void on...
public void on...
...
}
} To avoid users of the api to accidentally violate rule §1.10 like this: SubscriberInputStream subscriber = new SubscriberInputStream(publisher);
publisher.subscribe(subscriber); |
I like the idea of the private class. I'll explore that. |
…ent confusion with other public APIs. Also renamed and re-formatted internal document describing implementation. Signed-off-by: Santiago Pericasgeertsen <santiago.pericasgeertsen@oracle.com>
Signed-off-by: Santiago Pericasgeertsen <santiago.pericasgeertsen@oracle.com>
- `Subscription.request` is invoked only after one chunk has been consumed | ||
- the number of chunks requested is always 1 | ||
- Publisher fully conforms to the Flow.Publisher in the reactive-streams specification with respect to: | ||
- total order of `onNext`/`onComplete`/`onError` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
onSubscribe
/onNext
/onComplete
/onError
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is focusing on correct synchronization (hand-over-hand) and reading in quantities larger than 1 byte. Looks good.
New implementation of
PublisherInputStream
, now renamed toDataChunkInputStream
. This new implementation fixes a couple of important problems:CompletableFuture
's. The race condition resulted in certain tests to hang if a thread raced and updated the value of the oldprocessed
variable.There is also a new internal document
docs-internal/datachunkinputstream.md
that describes the new implementation. Credit to Oleksandr Otenko.Signed-off-by: Santiago Pericasgeertsen santiago.pericasgeertsen@oracle.com