Skip to content

Conversation

joepembe
Copy link

@joepembe joepembe commented Oct 1, 2025

ObservableCache retains strong references to every event issued by its upstream source (via its head field). These references remain reachable for the entire duration of the subscription, even if application code does not retain its own reference to the ObservableCache instance. This occurs because ObservableCache is currently the object that implements Observer<T> and is subscribed to the upstream source, which means that the source must retain a reference to the ObservableCache until the subscription is either disposed of or reaches a terminal state. As a result, every cached T value remains reachable for the duration of the subscription to source, even if the application no longer retains the ability to issue new subscriptions to the ObservableCache instance.

This change fixes this issue by:

  1. Splitting the Observable and Observer surfaces of ObservableCache into two distinct objects (ObservableCache which extends Observable, and ObservableCache.Multicaster which implements Observer)
  2. Having only ObservableCache retain a reference to the head
  3. Having only ObservableCache.Multicaster retain a reference to the current tail
  4. Setting Multicaster.tail to null upon receipt of the upstream's terminal event

With this change, when ObservableCache goes out of scope, the only remaining references to the nodes of the linked list are Multicaster.tail and CacheDisposable.node. As subscribed CacheDisposable instances advance through the linked list, its nodes progressively become reclaimable, and eventually become fully reclaimable once the terminal event has been issued to all subscribers.

Thank you for contributing to RxJava. Before pressing the "Create Pull Request" button, please consider the following points:

  • Please give a description about what and why you are contributing, even if it's trivial.

  • Please include the issue list number(s) or other PR numbers in the description if you are contributing in response to those.

  • Please include a reasonable set of unit tests if you contribute new code or change an existing one. If you contribute an operator, (if applicable) please make sure you have tests for working with an empty, just, range of values as well as an error source, with and/or without backpressure and see if unsubscription/cancellation propagates correctly.

`ObservableCache` retains strong references to every event issued by its upstream source via its `head` field. These references remain reachable for the entire duration of the subscription, even if application code does not retain its own reference to the `ObservableCache` instance. This occurs because `ObservableCache` is currently the object that `implements Observer<T>` and is subscribed to the upstream `source`, which means that the source _must_ retain a reference to the `ObservableCache` until the subscription is either disposed of or reaches a terminal state. As a result, every cached `T` value remains reachable for the duration of the subscription to `source`, even if the application no longer retains the ability to issue new subscriptions to the `ObservableCache` instance.

This change fixes this issue by:

1. Splitting the `Observable` and `Observer` surfaces of `ObservableCache` into two distinct objects (`ObservableCache` which extends `Observable`, and `ObservableCache.Multicaster` which implements `Observer`)
2. Having only `ObservableCache` retain a reference to the `head`
3. Having only `ObservableCache.Multicaster` retain a reference to the current `tail`
4. Setting `Multicaster.tail` to `null` upon receipt of the upstream's terminal event

With this change, when `ObservableCache` goes out of scope, the only remaining references to the nodes of the linked list are `Multicaster.tail` and `CacheDisposable.node`. As subscribed `CacheDisposable` instances advance through the linked list, its nodes progressively become reclaimable, and eventually become fully reclaimable once the terminal event has been issued to all subscribers.
@akarnokd
Copy link
Member

akarnokd commented Oct 1, 2025

Why do you need such progressive reclamation?

@joepembe
Copy link
Author

joepembe commented Oct 1, 2025

We have several reactive systems that handle request-reply workloads. Each individual workload can span a relatively wide range of durations (ranging from a few seconds to ~1 hour). Our systems commonly allocate graphs of reactive operators at the start of a request, and these request-scoped object graphs generally do not go out of scope until the response is produced, which generally corresponds to the terminal event(s) of distinguished part(s) of this data flow. In other words, these objects will eventually become garbage, but they can take a relatively long time to actually do so. This is a challenging type of workload for generational collectors like G1, because these objects must be copied repeatedly (as they are promoted through successive young regions). Long durations and/or heavy load exacerbates this problem, and can result in destined-to-become-garbage objects landing in the old region, where things are more expensive to collect.

We make extensive and ad-hoc use of the cache() operator to prevent multiple subscriptions from accidentally re-executing non-idempotent or costly business logic. This is elevating heap usage, and making the GC's job more difficult. So it seemed worth fixing.

Copy link

codecov bot commented Oct 1, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 99.61%. Comparing base (9b55d01) to head (b9b2316).
⚠️ Report is 191 commits behind head on 3.x.

Additional details and impacted files
@@             Coverage Diff              @@
##                3.x    #7911      +/-   ##
============================================
- Coverage     99.62%   99.61%   -0.01%     
+ Complexity     6801     6774      -27     
============================================
  Files           752      752              
  Lines         47707    47720      +13     
  Branches       6401     6401              
============================================
+ Hits          47527    47537      +10     
+ Misses           84       80       -4     
- Partials         96      103       +7     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@akarnokd akarnokd added this to the 3.1-support milestone Oct 2, 2025
@akarnokd
Copy link
Member

akarnokd commented Oct 2, 2025

I'll have to think about this more. Other operators will likely require similar changes.

Also the test looks unnecessarily convoluted. Can you trigger the issue in the original code and check the lack of reclaiming via ManagementFactory.getMemoryMXBean(), System.gc? Example

@joepembe
Copy link
Author

joepembe commented Oct 2, 2025

I think the approach I took in the unit test is a bit nicer than the approach you highlighted. That approach relies on triggering collections via System.gc() (which is not guaranteed to actually trigger a collection), sleeping for an arbitrary duration (presumably to give the garbage collector time to complete its work), and asserting on the amount of heap in use. This approach is susceptible to non-deterministic failures, since manually-triggered garbage collections may or may not actually yield a collection, and if a collection does occur, it may or may not complete by the time the sleep() completes, and if a collection did complete, it may or may not have actually collected the object(s) of interest. The test behavior would also seem to depend on the specifics of the GC algorithm being used in the JVM, and the actual runtime of the test must be artificially extended to reduce the likelihood of a false positive, which harms build times.

The approach I propose in this PR is a deterministic method for asserting on the reclamability of specific object instances: it relies on a fully-documented property of the JVM specification, which guarantees that objects that are softly-reachable will have been collected in the event that OutOfMemoryError is thrown. From the oracle javadocs:

All soft references to softly-reachable objects are guaranteed to have been cleared before the virtual machine throws an OutOfMemoryError.

For example, I ran the test you highlighted and found that it takes around 3200ms to complete. I then rewrote it as follows:

@Test
public void noBoundedRetentionViaThreadLocal_reclaimable() throws Exception {
    long start = System.nanoTime();
    final ReplayProcessor<byte[]> rp = ReplayProcessor.createWithSize(1);

    Flowable<byte[]> source = rp.take(1)
            .concatMap(new Function<byte[], Publisher<byte[]>>() {
                @Override
                public Publisher<byte[]> apply(byte[] v) throws Exception {
                    return rp;
                }
            })
            .takeLast(1)
            ;

    source.subscribe(new Consumer<byte[]>() {
        @Override
        public void accept(byte[] v) throws Exception {
        }
    });

    List<Reclaimable<byte[]>> payloads = IntStream.range(0, 200)
            .mapToObj(i -> Reclaimable.of("bytes-" + i, new byte[1024 * 1024]))
            .collect(Collectors.toList());

    for (Reclaimable<byte[]> payload : payloads) {
        rp.onNext(payload.remove());
    }
    rp.onComplete();

    Reclaimable.forceGC().assertAllReclaimed(payloads.subList(0, 199)).assertUnreclaimed(payloads.get(199));
    long end = System.nanoTime();
    System.out.println("runtime: " + String.format("%.3f ms", (end - start) / 1_000_000.0d ));
}

The rewritten version of the test reliably completes within about 170ms, is (in my opinion) easier to read, and its assertions offer stronger guarantees about the reachability of specific objects of interest.

@akarnokd
Copy link
Member

akarnokd commented Oct 2, 2025

That's great until the OOM Killer on Github actions decides to break the builds. The few seconds the test takes is not a concern. We have many such leak tests and no failures due to failed GCs. If it happens, we'll adapt the tests, but not before.

Please adjust your test. You don't need to check various capacities.

@joepembe
Copy link
Author

joepembe commented Oct 2, 2025

Re: other operators, I'd be willing to take a look - the Observable#cache() operator is the most urgent one for our specific situation, but I haven't audited every operator we use, so if there are specific operators you're thinking of, please share.

One other problematic "operator" that we use that I'm aware of is ReplaySubject; this one suffers from a very similar reclaimability concern as cache(). Unfortunately, the approach I'm proposing in this PR doesn't work out-of-the-box for Subject types, because Subject is the union of both Observable and Observer. I've actually written my own CacheSubject implementation (it doesn't actually extend Subject for the aforementioned reasons). You can see it here if you're interested: joepembe@6a9f284

@joepembe
Copy link
Author

joepembe commented Oct 2, 2025

That's great until the OOM Killer on Github actions decides to break the builds.

Where can I learn more about this? I don't have a lot of experience with Github, and a cursory search didn't turn anything up.

@joepembe
Copy link
Author

joepembe commented Oct 2, 2025

Ah - I think I understand. This is the linux OOM killer. If the process(es) running as part of a build attempt to allocate more memory than is available in the container (?), the build is killed. But these tests constrain the amount of heap to 1200 Mb, and the allocation I'm making is for 8 Gb, which obviously fails before any portion of the heap is actually claimed. And even if heap were claimed, the upper bound on the amount of memory in use by the JVM process is still 1200 Mb. Is there an actual risk here?

@akarnokd
Copy link
Member

akarnokd commented Oct 2, 2025

We had issues before.

@joepembe
Copy link
Author

joepembe commented Oct 2, 2025

Ok. Well, the unit test I authored exercises the path(s) of interest under 48 separate parameterizations, so a naive rewrite would produce a test that takes ~3 minutes to complete. How would you like me to rewrite this - should I simply pass a bunch of byte[] values though it once, and check the post-GC utilization?

@joepembe
Copy link
Author

joepembe commented Oct 2, 2025

I suppose the precise boundary checks in the current unit test aren't useful if we aren't able to assert on the reclaimability of specific object instances. I'll rewrite to just inflate the heap and make sure it's reclaimed afterwards.

@joepembe
Copy link
Author

joepembe commented Oct 2, 2025

(force-pushed because I forgot to sign my commits)

Copy link
Member

@akarnokd akarnokd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much better.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants