|
16 | 16 | import static org.junit.Assert.*;
|
17 | 17 | import static org.mockito.Mockito.*;
|
18 | 18 |
|
| 19 | +import java.lang.management.ManagementFactory; |
| 20 | +import java.lang.management.MemoryMXBean; |
| 21 | +import java.lang.management.MemoryUsage; |
19 | 22 | import java.util.*;
|
20 | 23 | import java.util.concurrent.*;
|
21 | 24 | import java.util.concurrent.atomic.AtomicInteger;
|
| 25 | +import java.util.concurrent.atomic.AtomicLong; |
22 | 26 | import java.util.stream.Collectors;
|
23 | 27 | import java.util.stream.IntStream;
|
24 | 28 |
|
| 29 | +import io.reactivex.rxjava3.observables.ConnectableObservable; |
25 | 30 | import io.reactivex.rxjava3.subjects.CompletableSubject;
|
| 31 | +import org.junit.Assert; |
26 | 32 | import org.junit.Test;
|
27 | 33 |
|
28 | 34 | import io.reactivex.rxjava3.core.*;
|
@@ -360,41 +366,50 @@ public void addRemoveRace() {
|
360 | 366 | }
|
361 | 367 |
|
362 | 368 | @Test
|
363 |
| - public void valuesAreReclaimable() { |
364 |
| - for (int c = 1; c <= 32; c *= 2) { |
365 |
| - for (int numValues : Arrays.asList(0, 1, c - 1, c, c + 1, c * 2 - 1, c * 2, c * 2 + 1)) { |
| 369 | + public void valuesAreReclaimable() throws Exception { |
| 370 | + ConnectableObservable<byte[]> source = |
| 371 | + Observable.range(0, 200) |
| 372 | + .map($ -> new byte[1024 * 1024]) |
| 373 | + .publish(); |
366 | 374 |
|
367 |
| - CompletableSubject termination = CompletableSubject.create(); |
368 |
| - List<Integer> integers = IntStream.range(0, numValues).boxed().collect(Collectors.toList()); |
369 |
| - int lastNodeIndex = Math.max(0, ((numValues - 1) / c) * c); |
| 375 | + System.out.println("Bounded Replay Leak check: Wait before GC"); |
| 376 | + Thread.sleep(1000); |
370 | 377 |
|
371 |
| - List<Reclaimable<Payload>> payloads = integers.stream() |
372 |
| - .map(Payload::new) |
373 |
| - .map(Reclaimable::of) |
374 |
| - .collect(Collectors.toList()); |
375 |
| - Reclaimable<Observable<Payload>> cache = Reclaimable.of( |
376 |
| - Observable.fromStream(payloads.stream().map(Reclaimable::remove)) |
377 |
| - .concatWith(termination) |
378 |
| - .cacheWithInitialCapacity(c)); |
| 378 | + System.out.println("Bounded Replay Leak check: GC"); |
| 379 | + System.gc(); |
379 | 380 |
|
380 |
| - TestObserver<Integer> o = cache.remove().map(Payload::value).test(); |
| 381 | + Thread.sleep(500); |
381 | 382 |
|
382 |
| - Reclaimable.forceGC() |
383 |
| - .assertReclaimed(cache) |
384 |
| - .assertAllReclaimed(payloads.subList(0, lastNodeIndex)) |
385 |
| - .assertAllUnreclaimed(payloads.subList(lastNodeIndex, numValues)); |
| 383 | + final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); |
| 384 | + MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage(); |
| 385 | + long initial = memHeap.getUsed(); |
386 | 386 |
|
387 |
| - o.assertValueSequence(integers) |
388 |
| - .assertNotComplete() |
389 |
| - .assertNoErrors(); |
| 387 | + System.out.printf("Bounded Replay Leak check: Starting: %.3f MB%n", initial / 1024.0 / 1024.0); |
390 | 388 |
|
391 |
| - termination.onComplete(); |
| 389 | + final AtomicLong after = new AtomicLong(); |
392 | 390 |
|
393 |
| - o.assertValueSequence(integers) |
394 |
| - .assertComplete(); |
| 391 | + source.cache().lastElement().subscribe(new Consumer<byte[]>() { |
| 392 | + @Override |
| 393 | + public void accept(byte[] v) throws Exception { |
| 394 | + System.out.println("Bounded Replay Leak check: Wait before GC 2"); |
| 395 | + Thread.sleep(1000); |
| 396 | + |
| 397 | + System.out.println("Bounded Replay Leak check: GC 2"); |
| 398 | + System.gc(); |
395 | 399 |
|
396 |
| - Reclaimable.forceGC().assertAllReclaimed(payloads); |
| 400 | + Thread.sleep(500); |
| 401 | + |
| 402 | + after.set(memoryMXBean.getHeapMemoryUsage().getUsed()); |
397 | 403 | }
|
| 404 | + }); |
| 405 | + |
| 406 | + source.connect(); |
| 407 | + |
| 408 | + System.out.printf("Bounded Replay Leak check: After: %.3f MB%n", after.get() / 1024.0 / 1024.0); |
| 409 | + |
| 410 | + if (initial + 100 * 1024 * 1024 < after.get()) { |
| 411 | + Assert.fail("Bounded Replay Leak check: Memory leak detected: " + (initial / 1024.0 / 1024.0) |
| 412 | + + " -> " + after.get() / 1024.0 / 1024.0); |
398 | 413 | }
|
399 | 414 | }
|
400 | 415 |
|
|
0 commit comments