Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Fix excess item retention in the other replay components #5898

Merged
merged 1 commit into from
Mar 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,15 @@ public final void complete() {
truncateFinal();
}

final void trimHead() {
Node head = get();
if (head.value != null) {
Node n = new Node(null, 0L);
n.lazySet(head.get());
set(n);
}
}

@Override
public final void replay(InnerSubscription<T> output) {
synchronized (output) {
Expand Down Expand Up @@ -909,7 +918,7 @@ void truncate() {
* based on its properties (i.e., truncate but the very last node).
*/
void truncateFinal() {

trimHead();
}
/* test */ final void collect(Collection<? super T> output) {
Node n = getHead();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,16 @@ final void removeFirst() {
// can't null out the head's value because of late replayers would see null
setFirst(next);
}

final void trimHead() {
Node head = get();
if (head.value != null) {
Node n = new Node(null);
n.lazySet(head.get());
set(n);
}
}

/* test */ final void removeSome(int n) {
Node head = get();
while (n > 0) {
Expand Down Expand Up @@ -733,7 +743,7 @@ Object leaveTransform(Object value) {
* based on its properties (i.e., truncate but the very last node).
*/
void truncateFinal() {

trimHead();
}
/* test */ final void collect(Collection<? super T> output) {
Node n = getHead();
Expand Down
66 changes: 63 additions & 3 deletions src/main/java/io/reactivex/processors/ReplayProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.reactivestreams.*;

import io.reactivex.Scheduler;
import io.reactivex.annotations.CheckReturnValue;
import io.reactivex.annotations.*;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.internal.util.*;
Expand Down Expand Up @@ -362,6 +362,24 @@ public Throwable getThrowable() {
return null;
}

/**
* Makes sure the item cached by the head node in a bounded
* ReplayProcessor is released (as it is never part of a replay).
* <p>
* By default, live bounded buffers will remember one item before
* the currently receivable one to ensure subscribers can always
* receive a continuous sequence of items. A terminated ReplayProcessor
* automatically releases this inaccessible item.
* <p>
* The method must be called sequentially, similar to the standard
* {@code onXXX} methods.
* @since 2.1.11 - experimental
*/
@Experimental
public void cleanupBuffer() {
buffer.trimHead();
}

/**
* Returns a single value the Subject currently has or null if no such value exists.
* <p>The method is thread-safe.
Expand Down Expand Up @@ -499,6 +517,12 @@ interface ReplayBuffer<T> {
boolean isDone();

Throwable getError();

/**
* Make sure an old inaccessible head value is released
* in a bounded buffer.
*/
void trimHead();
}

static final class ReplaySubscription<T> extends AtomicInteger implements Subscription {
Expand Down Expand Up @@ -568,6 +592,11 @@ public void complete() {
done = true;
}

@Override
public void trimHead() {
// not applicable for an unbounded buffer
}

@Override
public T getValue() {
int s = size;
Expand Down Expand Up @@ -771,14 +800,25 @@ public void next(T value) {
@Override
public void error(Throwable ex) {
error = ex;
trimHead();
done = true;
}

@Override
public void complete() {
trimHead();
done = true;
}

@Override
public void trimHead() {
if (head.value != null) {
Node<T> n = new Node<T>(null);
n.lazySet(head.get());
head = n;
}
}

@Override
public boolean isDone() {
return done;
Expand Down Expand Up @@ -992,19 +1032,39 @@ void trimFinal() {
for (;;) {
TimedNode<T> next = h.get();
if (next == null) {
head = h;
if (h.value != null) {
head = new TimedNode<T>(null, 0L);
} else {
head = h;
}
break;
}

if (next.time > limit) {
head = h;
if (h.value != null) {
TimedNode<T> n = new TimedNode<T>(null, 0L);
n.lazySet(h.get());
head = n;
} else {
head = h;
}
break;
}

h = next;
}
}


@Override
public void trimHead() {
if (head.value != null) {
TimedNode<T> n = new TimedNode<T>(null, 0L);
n.lazySet(head.get());
head = n;
}
}

@Override
public void next(T value) {
TimedNode<T> n = new TimedNode<T>(value, scheduler.now(unit));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

import io.reactivex.annotations.NonNull;
import org.junit.*;
import org.mockito.InOrder;
import org.reactivestreams.*;

import io.reactivex.*;
import io.reactivex.Scheduler.Worker;
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.TestException;
import io.reactivex.flowables.ConnectableFlowable;
Expand Down Expand Up @@ -1775,4 +1775,172 @@ public void badRequest() {
.replay()
);
}

@Test
public void noHeadRetentionCompleteSize() {
PublishProcessor<Integer> source = PublishProcessor.create();

FlowableReplay<Integer> co = (FlowableReplay<Integer>)source
.replay(1);

// the backpressure coordination would not accept items from source otherwise
co.test();

co.connect();

BoundedReplayBuffer<Integer> buf = (BoundedReplayBuffer<Integer>)(co.current.get().buffer);

source.onNext(1);
source.onNext(2);
source.onComplete();

assertNull(buf.get().value);

Object o = buf.get();

buf.trimHead();

assertSame(o, buf.get());
}

@Test
public void noHeadRetentionErrorSize() {
PublishProcessor<Integer> source = PublishProcessor.create();

FlowableReplay<Integer> co = (FlowableReplay<Integer>)source
.replay(1);

co.test();

co.connect();

BoundedReplayBuffer<Integer> buf = (BoundedReplayBuffer<Integer>)(co.current.get().buffer);

source.onNext(1);
source.onNext(2);
source.onError(new TestException());

assertNull(buf.get().value);

Object o = buf.get();

buf.trimHead();

assertSame(o, buf.get());
}

@Test
public void noHeadRetentionSize() {
PublishProcessor<Integer> source = PublishProcessor.create();

FlowableReplay<Integer> co = (FlowableReplay<Integer>)source
.replay(1);

co.test();

co.connect();

BoundedReplayBuffer<Integer> buf = (BoundedReplayBuffer<Integer>)(co.current.get().buffer);

source.onNext(1);
source.onNext(2);

assertNotNull(buf.get().value);

buf.trimHead();

assertNull(buf.get().value);

Object o = buf.get();

buf.trimHead();

assertSame(o, buf.get());
}

@Test
public void noHeadRetentionCompleteTime() {
PublishProcessor<Integer> source = PublishProcessor.create();

FlowableReplay<Integer> co = (FlowableReplay<Integer>)source
.replay(1, TimeUnit.MINUTES, Schedulers.computation());

co.test();

co.connect();

BoundedReplayBuffer<Integer> buf = (BoundedReplayBuffer<Integer>)(co.current.get().buffer);

source.onNext(1);
source.onNext(2);
source.onComplete();

assertNull(buf.get().value);

Object o = buf.get();

buf.trimHead();

assertSame(o, buf.get());
}

@Test
public void noHeadRetentionErrorTime() {
PublishProcessor<Integer> source = PublishProcessor.create();

FlowableReplay<Integer> co = (FlowableReplay<Integer>)source
.replay(1, TimeUnit.MINUTES, Schedulers.computation());

co.test();

co.connect();

BoundedReplayBuffer<Integer> buf = (BoundedReplayBuffer<Integer>)(co.current.get().buffer);

source.onNext(1);
source.onNext(2);
source.onError(new TestException());

assertNull(buf.get().value);

Object o = buf.get();

buf.trimHead();

assertSame(o, buf.get());
}

@Test
public void noHeadRetentionTime() {
TestScheduler sch = new TestScheduler();

PublishProcessor<Integer> source = PublishProcessor.create();

FlowableReplay<Integer> co = (FlowableReplay<Integer>)source
.replay(1, TimeUnit.MILLISECONDS, sch);

co.test();

co.connect();

BoundedReplayBuffer<Integer> buf = (BoundedReplayBuffer<Integer>)(co.current.get().buffer);

source.onNext(1);

sch.advanceTimeBy(2, TimeUnit.MILLISECONDS);

source.onNext(2);

assertNotNull(buf.get().value);

buf.trimHead();

assertNull(buf.get().value);

Object o = buf.get();

buf.trimHead();

assertSame(o, buf.get());
}
}
Loading