Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x -- groupBy with evicting map -- seeing inconsistent behavior when eviction occurs #5933

Closed
sfitts opened this issue Mar 24, 2018 · 3 comments

Comments

@sfitts
Copy link

sfitts commented Mar 24, 2018

I am using version 2.1.11. I'm seeing what I believe to be a bug in the groupBy operator when configured to use an evicting map. Here is a test that demonstrates what I'm seeing:

    private static class TestTicker extends Ticker {
        long tick = 0;

        @Override
        public long read() {
            return tick;
        }
    }
    
    @Test
    public void testGroupByEviction() {
        FlowableProcessor<Integer> source = PublishProcessor.create();
        TestSubscriber<Integer> subscriber1 = new TestSubscriber<>();
        TestTicker testTicker = new TestTicker();

        Function<Consumer<Object>, Map<Object, Object>> mapFactory = action -> {
            return CacheBuilder.newBuilder()
                    .expireAfterAccess(5, TimeUnit.SECONDS)
                    .removalListener(notification -> {
                        try {
                            action.accept(notification.getValue());
                        } catch (Exception ex) {
                            throw new RuntimeException(ex);
                        }
                    }).ticker(testTicker).build().asMap();
        };

        final List<String> list = Collections.synchronizedList(new ArrayList<String>());
        Flowable<Integer> stream = source.doOnCancel(() -> list.add("Source canceled"))
                .groupBy(v -> v, Functions.identity(), false, Flowable.bufferSize(), mapFactory)
                .flatMap(group -> {
            return group
                    .doOnComplete(() -> list.add("Group completed"))
                    .doOnCancel(() -> list.add("Group canceled"));
        });
        stream.doOnCancel(() -> list.add("Outer group by canceled.")).subscribe(subscriber1);
        
        // Send 3 in the same group and wait for them to be seen
        source.onNext(1);
        source.onNext(1);
        source.onNext(1);
        subscriber1.awaitCount(3);
        
        // Advance time far enough to evict the group.
        // NOTE -- Comment this line out to make the test "pass".
        testTicker.tick = TimeUnit.SECONDS.toNanos(6);
        
        // Send more data in the group (triggering eviction and recreation)
        source.onNext(1);
        source.onNext(1);

        // Wait for the last 2 and then cancel the subscription
        subscriber1.awaitCount(5);
        subscriber1.cancel();

        // Observe the result.  Note that right now the result differs depending on whether eviction occurred or
        // not.  The observed sequence in that case is:  Group completed, Outer group by canceled., Group canceled.
        // The addition of the "Group completed" is actually fine, but the fact that the cancel doesn't reach the
        // source seems like a bug.  Commenting out the setting of "tick" above will produce the "expected" sequence.
        assertEquals(Arrays.asList(
                // "Group completed", -- this is here when eviction occurs
                "Outer group by canceled.", 
                "Group canceled",
                "Source canceled"  // This is *not* here when eviction occurs
        ), list);
    }

As you can see from this when a group eviction occurs and the root subscription is canceled, the cancel dose not propagate beyond the groupBy (it does when there is no eviction). I think the reason for this is that after the eviction processing (line 188 of FlowableGroupBy) the groupCount is 3 when I believe it should be 2). This leads the groupBy to conclude that there are 2 active groups when the cancel occurs, when in fact there is only one. This doesn't happen when there is no eviction (you can see this by commenting out the line that updates "tick").

@akarnokd akarnokd added the 2.x label Mar 24, 2018
@akarnokd
Copy link
Member

Yes, looks like eviction only happens when an upstream item is coming down. I think the same eviction logic could be run in cancel() as well:

        @Override
        public void cancel() {
            // cancelling the main source means we don't want any more groups
            // but running groups still require new values
            if (cancelled.compareAndSet(false, true)) {

                if (evictedGroups != null) {
                    GroupedUnicast<K, V> evictedGroup;
                    while ((evictedGroup = evictedGroups.poll()) != null) {
                        evictedGroup.onComplete();
                    }
                }

                if (groupCount.decrementAndGet() == 0) {
                    s.cancel();
                }
            }
        }

/cc @davidmoten

@davidmoten
Copy link
Collaborator

Thanks for the report, I'll fix tomorrow

@akarnokd
Copy link
Member

Sorry for the delay. I'll try to tackle this in the coming weeks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants