Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix incorrect request-n management for Publisher.multicast() #1083

Merged
merged 1 commit into from
Jun 17, 2020

Conversation

NiteshKant
Copy link
Collaborator

Motivation

Publisher<Integer> multi = Publisher.range(1, 10)
                .beforeRequest(n -> System.out.println("Requested from main source: " + n))
                .multicastToExactly(2, 1);
        multi.forEach(t -> System.out.println("From first => " + t));
        multi.forEach(t -> System.out.println("From second => " + t));

Produces the output:

Requested from main source: 1
From first => 1
From second => 1

This shows that multicastToExactly() is not requesting more than queueSize() items unless the subscriber requests more.

Modification

  • Remove the conditional request-n call in MulticastUtils.updateRequestN() based on drainCount. Since drained item count is already included in emitted, this additional constraint is not required.
  • SubscriberUtils.calculateSourceRequested() did not handle the case when emitted > sourceRequested resulting in higher request-n numbers. For cases like groupBy/multicast when items maybe buffered before they are requested by the subscriber, we need to handle emitted > sourceRequested.

Result

multicastToExactly() correctly requests all items from the original source.

__Motivation__

```java
Publisher<Integer> multi = Publisher.range(1, 10)
                .beforeRequest(n -> System.out.println("Requested from main source: " + n))
                .multicastToExactly(2, 1);
        multi.forEach(t -> System.out.println("From first => " + t));
        multi.forEach(t -> System.out.println("From second => " + t));
```

Produces the output:
```
Requested from main source: 1
From first => 1
From second => 1
```

This shows that `multicastToExactly()` is not requesting more than `queueSize()` items unless the subscriber requests more.

__Modification__

- Remove the conditional request-n call in `MulticastUtils.updateRequestN()` based on `drainCount`. Since drained item count is already included in `emitted`, this additional constraint is not required.
- `SubscriberUtils.calculateSourceRequested()` did not handle the case when `emitted > sourceRequested` resulting in higher request-n numbers. For cases like groupBy/multicast when items maybe buffered before they are requested by the subscriber, we need to handle `emitted > sourceRequested`.

__Result__

`multicastToExactly()` correctly requests all items from the original source.
@NiteshKant NiteshKant requested a review from Scottmitch June 9, 2020 23:43
Copy link
Member

@Scottmitch Scottmitch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!

@NiteshKant NiteshKant merged commit 0e26a5f into apple:master Jun 17, 2020
@NiteshKant NiteshKant deleted the multi-req branch June 17, 2020 00:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants