You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Replay works by requesting as many items as the largest requester subscriber does. So if there is one with request(2) and another with request(5), replay will request(5) from the upstream. To be precise, subscriber requests are accumulated and the operator requests the difference between the last known max and the latest known max. This way, if there is an additional request(10) from the first subscriber above, replay will request only 5 additional elements.
This works for the unbounded replay() because all subscribers start from the very beginning. However, if the buffer is bounded, this differencing doesn't work anymore. The following unit test fails with both 1.x and 2.x implementations.
What happens here is that even though ts2 subscribes after 2 elements were requested from source, it only receives the very last and replay() doesn't request 1 more.
The idea about fixing this is that the total requested amount of late subscribers would start from a "current index", i.e., the number of items received by replay() so far.
This approach would work in this synchronous test but may not work with asynchronous subscribers. The problem is that the start node and the index has to be atomically updated together so a subscriber "pins" both at the same time: the continuous delivery is guaranteed as well as the proper total requested amount. I'll investigate to make this happen.
Let me emphasize again that the unbounded replay() works properly and the v2 ReplaySubject isn't affected because it is unbounded-in.
The text was updated successfully, but these errors were encountered:
Replay works by requesting as many items as the largest requester subscriber does. So if there is one with
request(2)
and another withrequest(5)
, replay willrequest(5)
from the upstream. To be precise, subscriber requests are accumulated and the operator requests the difference between the last known max and the latest known max. This way, if there is an additionalrequest(10)
from the first subscriber above,replay
will request only 5 additional elements.This works for the unbounded
replay()
because all subscribers start from the very beginning. However, if the buffer is bounded, this differencing doesn't work anymore. The following unit test fails with both 1.x and 2.x implementations.What happens here is that even though
ts2
subscribes after 2 elements were requested from source, it only receives the very last andreplay()
doesn't request 1 more.The idea about fixing this is that the total requested amount of late subscribers would start from a "current index", i.e., the number of items received by
replay()
so far.This approach would work in this synchronous test but may not work with asynchronous subscribers. The problem is that the start node and the index has to be atomically updated together so a subscriber "pins" both at the same time: the continuous delivery is guaranteed as well as the proper total requested amount. I'll investigate to make this happen.
Let me emphasize again that the unbounded
replay()
works properly and the v2ReplaySubject
isn't affected because it is unbounded-in.The text was updated successfully, but these errors were encountered: