Skip to content

Commit

Permalink
2.x: Fix bounded replay() memory leak due to bad node retention (#6371)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Jan 17, 2019
1 parent a85ddd1 commit 5106a20
Show file tree
Hide file tree
Showing 6 changed files with 260 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,8 @@ public void dispose() {
// the others had non-zero. By removing this 'blocking' child, the others
// are now free to receive events
parent.manageRequests();
// make sure the last known node is not retained
index = null;
}
}
/**
Expand Down Expand Up @@ -824,6 +826,7 @@ public final void replay(InnerSubscription<T> output) {
}
for (;;) {
if (output.isDisposed()) {
output.index = null;
return;
}

Expand Down Expand Up @@ -864,6 +867,7 @@ public final void replay(InnerSubscription<T> output) {
break;
}
if (output.isDisposed()) {
output.index = null;
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,8 @@ public void dispose() {
cancelled = true;
// remove this from the parent
parent.remove(this);
// make sure the last known node is not retained
index = null;
}
}
/**
Expand Down Expand Up @@ -686,6 +688,7 @@ public final void replay(InnerDisposable<T> output) {

for (;;) {
if (output.isDisposed()) {
output.index = null;
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;

import java.lang.management.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
Expand Down Expand Up @@ -1976,4 +1977,67 @@ public void currentDisposedWhenConnecting() {

assertFalse(fr.current.get().isDisposed());
}

@Test
public void noBoundedRetentionViaThreadLocal() throws Exception {
Flowable<byte[]> source = Flowable.range(1, 200)
.map(new Function<Integer, byte[]>() {
@Override
public byte[] apply(Integer v) throws Exception {
return new byte[1024 * 1024];
}
})
.replay(new Function<Flowable<byte[]>, Publisher<byte[]>>() {
@Override
public Publisher<byte[]> apply(final Flowable<byte[]> f) throws Exception {
return f.take(1)
.concatMap(new Function<byte[], Publisher<byte[]>>() {
@Override
public Publisher<byte[]> apply(byte[] v) throws Exception {
return f;
}
});
}
}, 1)
.takeLast(1)
;

System.out.println("Bounded Replay Leak check: Wait before GC");
Thread.sleep(1000);

System.out.println("Bounded Replay Leak check: GC");
System.gc();

Thread.sleep(500);

final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage();
long initial = memHeap.getUsed();

System.out.printf("Bounded Replay Leak check: Starting: %.3f MB%n", initial / 1024.0 / 1024.0);

final AtomicLong after = new AtomicLong();

source.subscribe(new Consumer<byte[]>() {
@Override
public void accept(byte[] v) throws Exception {
System.out.println("Bounded Replay Leak check: Wait before GC 2");
Thread.sleep(1000);

System.out.println("Bounded Replay Leak check: GC 2");
System.gc();

Thread.sleep(500);

after.set(memoryMXBean.getHeapMemoryUsage().getUsed());
}
});

System.out.printf("Bounded Replay Leak check: After: %.3f MB%n", after.get() / 1024.0 / 1024.0);

if (initial + 100 * 1024 * 1024 < after.get()) {
Assert.fail("Bounded Replay Leak check: Memory leak detected: " + (initial / 1024.0 / 1024.0)
+ " -> " + after.get() / 1024.0 / 1024.0);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;

import java.lang.management.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.*;

import org.junit.*;
import org.mockito.InOrder;
Expand Down Expand Up @@ -1713,4 +1714,66 @@ public void noHeadRetentionTime() {

assertSame(o, buf.get());
}
}

@Test
public void noBoundedRetentionViaThreadLocal() throws Exception {
Observable<byte[]> source = Observable.range(1, 200)
.map(new Function<Integer, byte[]>() {
@Override
public byte[] apply(Integer v) throws Exception {
return new byte[1024 * 1024];
}
})
.replay(new Function<Observable<byte[]>, Observable<byte[]>>() {
@Override
public Observable<byte[]> apply(final Observable<byte[]> o) throws Exception {
return o.take(1)
.concatMap(new Function<byte[], Observable<byte[]>>() {
@Override
public Observable<byte[]> apply(byte[] v) throws Exception {
return o;
}
});
}
}, 1)
.takeLast(1)
;

System.out.println("Bounded Replay Leak check: Wait before GC");
Thread.sleep(1000);

System.out.println("Bounded Replay Leak check: GC");
System.gc();

Thread.sleep(500);

final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage();
long initial = memHeap.getUsed();

System.out.printf("Bounded Replay Leak check: Starting: %.3f MB%n", initial / 1024.0 / 1024.0);

final AtomicLong after = new AtomicLong();

source.subscribe(new Consumer<byte[]>() {
@Override
public void accept(byte[] v) throws Exception {
System.out.println("Bounded Replay Leak check: Wait before GC 2");
Thread.sleep(1000);

System.out.println("Bounded Replay Leak check: GC 2");
System.gc();

Thread.sleep(500);

after.set(memoryMXBean.getHeapMemoryUsage().getUsed());
}
});

System.out.printf("Bounded Replay Leak check: After: %.3f MB%n", after.get() / 1024.0 / 1024.0);

if (initial + 100 * 1024 * 1024 < after.get()) {
Assert.fail("Bounded Replay Leak check: Memory leak detected: " + (initial / 1024.0 / 1024.0)
+ " -> " + after.get() / 1024.0 / 1024.0);
}
}}
65 changes: 62 additions & 3 deletions src/test/java/io/reactivex/processors/ReplayProcessorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,19 @@
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;

import java.lang.management.*;
import java.util.Arrays;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.*;

import org.junit.Test;
import org.junit.*;
import org.mockito.*;
import org.reactivestreams.*;

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.TestException;
import io.reactivex.functions.Function;
import io.reactivex.functions.*;
import io.reactivex.internal.subscriptions.BooleanSubscription;
import io.reactivex.processors.ReplayProcessor.*;
import io.reactivex.schedulers.*;
Expand Down Expand Up @@ -1692,4 +1693,62 @@ public void noHeadRetentionTime() {
public void invalidRequest() {
TestHelper.assertBadRequestReported(ReplayProcessor.create());
}

@Test
public void noBoundedRetentionViaThreadLocal() throws Exception {
final ReplayProcessor<byte[]> rp = ReplayProcessor.createWithSize(1);

Flowable<byte[]> source = rp.take(1)
.concatMap(new Function<byte[], Publisher<byte[]>>() {
@Override
public Publisher<byte[]> apply(byte[] v) throws Exception {
return rp;
}
})
.takeLast(1)
;

System.out.println("Bounded Replay Leak check: Wait before GC");
Thread.sleep(1000);

System.out.println("Bounded Replay Leak check: GC");
System.gc();

Thread.sleep(500);

final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage();
long initial = memHeap.getUsed();

System.out.printf("Bounded Replay Leak check: Starting: %.3f MB%n", initial / 1024.0 / 1024.0);

final AtomicLong after = new AtomicLong();

source.subscribe(new Consumer<byte[]>() {
@Override
public void accept(byte[] v) throws Exception {
System.out.println("Bounded Replay Leak check: Wait before GC 2");
Thread.sleep(1000);

System.out.println("Bounded Replay Leak check: GC 2");
System.gc();

Thread.sleep(500);

after.set(memoryMXBean.getHeapMemoryUsage().getUsed());
}
});

for (int i = 0; i < 200; i++) {
rp.onNext(new byte[1024 * 1024]);
}
rp.onComplete();

System.out.printf("Bounded Replay Leak check: After: %.3f MB%n", after.get() / 1024.0 / 1024.0);

if (initial + 100 * 1024 * 1024 < after.get()) {
Assert.fail("Bounded Replay Leak check: Memory leak detected: " + (initial / 1024.0 / 1024.0)
+ " -> " + after.get() / 1024.0 / 1024.0);
}
}
}
65 changes: 62 additions & 3 deletions src/test/java/io/reactivex/subjects/ReplaySubjectTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;

import java.lang.management.*;
import java.util.Arrays;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.*;

import org.junit.Test;
import org.junit.*;
import org.mockito.*;

import io.reactivex.*;
import io.reactivex.disposables.*;
import io.reactivex.exceptions.TestException;
import io.reactivex.functions.Function;
import io.reactivex.functions.*;
import io.reactivex.observers.*;
import io.reactivex.schedulers.*;
import io.reactivex.subjects.ReplaySubject.*;
Expand Down Expand Up @@ -1284,4 +1285,62 @@ public void noHeadRetentionTime() {

assertSame(o, buf.head);
}

@Test
public void noBoundedRetentionViaThreadLocal() throws Exception {
final ReplaySubject<byte[]> rs = ReplaySubject.createWithSize(1);

Observable<byte[]> source = rs.take(1)
.concatMap(new Function<byte[], Observable<byte[]>>() {
@Override
public Observable<byte[]> apply(byte[] v) throws Exception {
return rs;
}
})
.takeLast(1)
;

System.out.println("Bounded Replay Leak check: Wait before GC");
Thread.sleep(1000);

System.out.println("Bounded Replay Leak check: GC");
System.gc();

Thread.sleep(500);

final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage();
long initial = memHeap.getUsed();

System.out.printf("Bounded Replay Leak check: Starting: %.3f MB%n", initial / 1024.0 / 1024.0);

final AtomicLong after = new AtomicLong();

source.subscribe(new Consumer<byte[]>() {
@Override
public void accept(byte[] v) throws Exception {
System.out.println("Bounded Replay Leak check: Wait before GC 2");
Thread.sleep(1000);

System.out.println("Bounded Replay Leak check: GC 2");
System.gc();

Thread.sleep(500);

after.set(memoryMXBean.getHeapMemoryUsage().getUsed());
}
});

for (int i = 0; i < 200; i++) {
rs.onNext(new byte[1024 * 1024]);
}
rs.onComplete();

System.out.printf("Bounded Replay Leak check: After: %.3f MB%n", after.get() / 1024.0 / 1024.0);

if (initial + 100 * 1024 * 1024 < after.get()) {
Assert.fail("Bounded Replay Leak check: Memory leak detected: " + (initial / 1024.0 / 1024.0)
+ " -> " + after.get() / 1024.0 / 1024.0);
}
}
}

0 comments on commit 5106a20

Please sign in to comment.