-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Fix ObservableCache GC issue #7911
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 3.x
Are you sure you want to change the base?
Conversation
`ObservableCache` retains strong references to every event issued by its upstream source via its `head` field. These references remain reachable for the entire duration of the subscription, even if application code does not retain its own reference to the `ObservableCache` instance. This occurs because `ObservableCache` is currently the object that `implements Observer<T>` and is subscribed to the upstream `source`, which means that the source _must_ retain a reference to the `ObservableCache` until the subscription is either disposed of or reaches a terminal state. As a result, every cached `T` value remains reachable for the duration of the subscription to `source`, even if the application no longer retains the ability to issue new subscriptions to the `ObservableCache` instance. This change fixes this issue by: 1. Splitting the `Observable` and `Observer` surfaces of `ObservableCache` into two distinct objects (`ObservableCache` which extends `Observable`, and `ObservableCache.Multicaster` which implements `Observer`) 2. Having only `ObservableCache` retain a reference to the `head` 3. Having only `ObservableCache.Multicaster` retain a reference to the current `tail` 4. Setting `Multicaster.tail` to `null` upon receipt of the upstream's terminal event With this change, when `ObservableCache` goes out of scope, the only remaining references to the nodes of the linked list are `Multicaster.tail` and `CacheDisposable.node`. As subscribed `CacheDisposable` instances advance through the linked list, its nodes progressively become reclaimable, and eventually become fully reclaimable once the terminal event has been issued to all subscribers.
Why do you need such progressive reclamation? |
We have several reactive systems that handle request-reply workloads. Each individual workload can span a relatively wide range of durations (ranging from a few seconds to ~1 hour). Our systems commonly allocate graphs of reactive operators at the start of a request, and these request-scoped object graphs generally do not go out of scope until the response is produced, which generally corresponds to the terminal event(s) of distinguished part(s) of this data flow. In other words, these objects will eventually become garbage, but they can take a relatively long time to actually do so. This is a challenging type of workload for generational collectors like G1, because these objects must be copied repeatedly (as they are promoted through successive young regions). Long durations and/or heavy load exacerbates this problem, and can result in destined-to-become-garbage objects landing in the old region, where things are more expensive to collect. We make extensive and ad-hoc use of the |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## 3.x #7911 +/- ##
============================================
- Coverage 99.62% 99.61% -0.01%
+ Complexity 6801 6774 -27
============================================
Files 752 752
Lines 47707 47720 +13
Branches 6401 6401
============================================
+ Hits 47527 47537 +10
+ Misses 84 80 -4
- Partials 96 103 +7 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
I'll have to think about this more. Other operators will likely require similar changes. Also the test looks unnecessarily convoluted. Can you trigger the issue in the original code and check the lack of reclaiming via |
I think the approach I took in the unit test is a bit nicer than the approach you highlighted. That approach relies on triggering collections via The approach I propose in this PR is a deterministic method for asserting on the reclamability of specific object instances: it relies on a fully-documented property of the JVM specification, which guarantees that objects that are softly-reachable will have been collected in the event that
For example, I ran the test you highlighted and found that it takes around 3200ms to complete. I then rewrote it as follows: @Test
public void noBoundedRetentionViaThreadLocal_reclaimable() throws Exception {
long start = System.nanoTime();
final ReplayProcessor<byte[]> rp = ReplayProcessor.createWithSize(1);
Flowable<byte[]> source = rp.take(1)
.concatMap(new Function<byte[], Publisher<byte[]>>() {
@Override
public Publisher<byte[]> apply(byte[] v) throws Exception {
return rp;
}
})
.takeLast(1)
;
source.subscribe(new Consumer<byte[]>() {
@Override
public void accept(byte[] v) throws Exception {
}
});
List<Reclaimable<byte[]>> payloads = IntStream.range(0, 200)
.mapToObj(i -> Reclaimable.of("bytes-" + i, new byte[1024 * 1024]))
.collect(Collectors.toList());
for (Reclaimable<byte[]> payload : payloads) {
rp.onNext(payload.remove());
}
rp.onComplete();
Reclaimable.forceGC().assertAllReclaimed(payloads.subList(0, 199)).assertUnreclaimed(payloads.get(199));
long end = System.nanoTime();
System.out.println("runtime: " + String.format("%.3f ms", (end - start) / 1_000_000.0d ));
} The rewritten version of the test reliably completes within about 170ms, is (in my opinion) easier to read, and its assertions offer stronger guarantees about the reachability of specific objects of interest. |
That's great until the OOM Killer on Github actions decides to break the builds. The few seconds the test takes is not a concern. We have many such leak tests and no failures due to failed GCs. If it happens, we'll adapt the tests, but not before. Please adjust your test. You don't need to check various capacities. |
Re: other operators, I'd be willing to take a look - the One other problematic "operator" that we use that I'm aware of is |
Where can I learn more about this? I don't have a lot of experience with Github, and a cursory search didn't turn anything up. |
Ah - I think I understand. This is the linux OOM killer. If the process(es) running as part of a build attempt to allocate more memory than is available in the container (?), the build is killed. But these tests constrain the amount of heap to 1200 Mb, and the allocation I'm making is for 8 Gb, which obviously fails before any portion of the heap is actually claimed. And even if heap were claimed, the upper bound on the amount of memory in use by the JVM process is still 1200 Mb. Is there an actual risk here? |
We had issues before. |
Ok. Well, the unit test I authored exercises the path(s) of interest under 48 separate parameterizations, so a naive rewrite would produce a test that takes ~3 minutes to complete. How would you like me to rewrite this - should I simply pass a bunch of |
I suppose the precise boundary checks in the current unit test aren't useful if we aren't able to assert on the reclaimability of specific object instances. I'll rewrite to just inflate the heap and make sure it's reclaimed afterwards. |
(force-pushed because I forgot to sign my commits) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Much better.
ObservableCache
retains strong references to every event issued by its upstream source (via itshead
field). These references remain reachable for the entire duration of the subscription, even if application code does not retain its own reference to theObservableCache
instance. This occurs becauseObservableCache
is currently the object thatimplements Observer<T>
and is subscribed to the upstreamsource
, which means that the source must retain a reference to theObservableCache
until the subscription is either disposed of or reaches a terminal state. As a result, every cachedT
value remains reachable for the duration of the subscription tosource
, even if the application no longer retains the ability to issue new subscriptions to theObservableCache
instance.This change fixes this issue by:
Observable
andObserver
surfaces ofObservableCache
into two distinct objects (ObservableCache
which extendsObservable
, andObservableCache.Multicaster
which implementsObserver
)ObservableCache
retain a reference to thehead
ObservableCache.Multicaster
retain a reference to the currenttail
Multicaster.tail
tonull
upon receipt of the upstream's terminal eventWith this change, when
ObservableCache
goes out of scope, the only remaining references to the nodes of the linked list areMulticaster.tail
andCacheDisposable.node
. As subscribedCacheDisposable
instances advance through the linked list, its nodes progressively become reclaimable, and eventually become fully reclaimable once the terminal event has been issued to all subscribers.Thank you for contributing to RxJava. Before pressing the "Create Pull Request" button, please consider the following points:
Please give a description about what and why you are contributing, even if it's trivial.
Please include the issue list number(s) or other PR numbers in the description if you are contributing in response to those.
Please include a reasonable set of unit tests if you contribute new code or change an existing one. If you contribute an operator, (if applicable) please make sure you have tests for working with an
empty
,just
,range
of values as well as anerror
source, with and/or without backpressure and see if unsubscription/cancellation propagates correctly.