Skip to content

Commit

Permalink
Fix PublisherFlatMapSingle's demand handling
Browse files Browse the repository at this point in the history
Motivation:

PublisherFlatMapSingle was recently changed to better handle excpetions, but the changes
affected the way the drain counter was updating, which as a result created wrong demand upstream.

Modifications:

Init the counter inside the loop to reset it everytime there is contention and the lock is re-acquired.

Result:

Proper counting of delivered items and thus proper demand requests.
  • Loading branch information
tkountis committed Jun 22, 2021
1 parent 09d0e08 commit c354772
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2018-2020 Apple Inc. and the ServiceTalk project authors
* Copyright © 2018-2021 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -271,9 +271,9 @@ private void enqueueAndDrain(Object item) {
}

private void drainPending() {
long drainCount = 0;
boolean tryAcquire = true;
while (tryAcquire && tryAcquireLock(emittingUpdater, this)) {
long drainCount = 0;
try {
Object t;
while ((t = pending.poll()) != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,24 @@
import java.util.function.Function;

import static io.servicetalk.concurrent.Cancellable.IGNORE_CANCEL;
import static io.servicetalk.concurrent.api.Executors.newFixedSizeExecutor;
import static io.servicetalk.concurrent.api.Publisher.fromIterable;
import static io.servicetalk.concurrent.api.Single.failed;
import static io.servicetalk.concurrent.api.Single.succeeded;
import static io.servicetalk.concurrent.api.SourceAdapters.fromSource;
import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION;
import static io.servicetalk.utils.internal.PlatformDependent.throwException;
import static java.time.Duration.ofMillis;
import static java.util.Arrays.asList;
import static java.util.concurrent.ThreadLocalRandom.current;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.stream.Collectors.toList;
import static java.util.stream.IntStream.range;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -638,4 +642,20 @@ void testRequestAndEmitConcurrency() throws Exception {
assertThat("Unexpected items emitted.", received, hasSize(totalToRequest));
assertThat(received, containsInAnyOrder(range(0, totalToRequest).boxed().toArray()));
}

@Test
void testConcurrentEmissions() throws Exception {
final Executor executor = newFixedSizeExecutor(10);
final int maxSingles = 1_000;
final int expected = range(1, maxSingles).reduce(0, Integer::sum);

final int actual = Publisher.range(1, maxSingles)
.flatMapMergeSingle(key -> executor.timer(ofMillis(current().nextInt(10)))
.toSingle().map(ignored -> key), 10)
.collect(() -> 0, Integer::sum)
.toFuture().get();

assertThat(actual, is(equalTo(expected)));
executor.closeAsync().toFuture().get();
}
}

0 comments on commit c354772

Please sign in to comment.