Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MultiFromOutputStream cs swap fix #1991

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;

/**
* Output stream that {@link java.util.concurrent.Flow.Publisher} publishes any data written to it as {@link ByteBuffer}
* events.
*
* @deprecated please use {@link io.helidon.common.reactive.OutputStreamMulti} instead
*/
@Deprecated(since = "2.0.0", forRemoval = true)
Expand All @@ -46,23 +46,21 @@ public class MultiFromOutputStream extends OutputStream implements Multi<ByteBuf
private final EmittingPublisher<ByteBuffer> emittingPublisher = EmittingPublisher.create();
private final CompletableFuture<?> completionResult = new CompletableFuture<>();
private final AtomicBoolean written = new AtomicBoolean();
private final AtomicReference<CompletableFuture<Void>> demandUpdated = new AtomicReference<>();
private volatile CompletableFuture<Void> demandUpdated = new CompletableFuture<>();

/**
* Create new output stream that {@link java.util.concurrent.Flow.Publisher}
* publishes any data written to it as {@link ByteBuffer} events.
*/
protected MultiFromOutputStream() {
this.demandUpdated.set(new CompletableFuture<>());
emittingPublisher.onCancel(() -> {
// when write is called, an exception is thrown as it is a cancelled subscriber
// when close is called, we do not throw an exception, as that should be silent
completionResult.complete(null);
demandUpdated.cancel(true);
});
emittingPublisher.onRequest((n, demand) -> {
// complete previous and create new future for demand update
this.demandUpdated.getAndSet(new CompletableFuture<>())
.complete(null);
this.demandUpdated.complete(null);
});
}

Expand All @@ -82,7 +80,7 @@ void timeout(long timeout) {
* @param requestCallback to be executed
* @return this OutputStreamMulti
*/
public MultiFromOutputStream onRequest(BiConsumer<Long, Long> requestCallback){
public MultiFromOutputStream onRequest(BiConsumer<Long, Long> requestCallback) {
this.emittingPublisher.onRequest(requestCallback);
return this;
}
Expand Down Expand Up @@ -182,8 +180,6 @@ private void publish(byte[] buffer, int offset, int length) throws IOException {

ByteBuffer byteBuffer = createBuffer(buffer, offset, length);

// defend against racing demand updates
CompletableFuture<Void> demandUpdated = this.demandUpdated.get();
while (!emittingPublisher.emit(byteBuffer)) {
if (emittingPublisher.isCancelled()) {
throw new IOException("Output stream already closed.");
Expand All @@ -194,8 +190,8 @@ private void publish(byte[] buffer, int offset, int length) throws IOException {
}

// wait until some data can be sent or the stream has been closed
await(start, 250, demandUpdated);
demandUpdated = this.demandUpdated.get();
await(start, timeout, demandUpdated);
demandUpdated = new CompletableFuture<>();
}

} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,9 @@
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.reactivestreams.tck.TestEnvironment;
import org.reactivestreams.tck.flow.FlowPublisherVerification;
import org.reactivestreams.tck.flow.support.Function;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
Expand All @@ -54,15 +48,12 @@ public Flow.Publisher<ByteBuffer> createFlowPublisher(long l) {
executor.submit(() -> {
for (long n = 0; n < l; n++) {
final long fn = n;
//stochastic test of write methods being thread-safe
executor.submit(() -> {
try {
osp.write(("token" + fn).getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
// expected by some tests
}
countDownLatch.countDown();
});
try {
osp.write(("token" + fn).getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
// expected by some tests
}
countDownLatch.countDown();
}
try {
countDownLatch.await();
Expand All @@ -89,112 +80,9 @@ public long maxElementsFromPublisher() {

@Test
public void stochastic_spec103_mustSignalOnMethodsSequentially() throws Throwable {
final int iterations = 1000;
final int elements = 100;

stochasticTest(iterations, new Function<Integer, Void>() {
@Override
public Void apply(final Integer runNumber) throws Throwable {
activePublisherTest(elements, true, new PublisherTestRun<ByteBuffer>() {
@Override
public void run(Publisher<ByteBuffer> pub) throws Throwable {
final TestEnvironment.Latch completionLatch = new TestEnvironment.Latch(env);

final AtomicInteger gotElements = new AtomicInteger(0);
pub.subscribe(new Subscriber<ByteBuffer>() {
private Subscription subs;

private ConcurrentAccessBarrier concurrentAccessBarrier = new ConcurrentAccessBarrier();

/**
* Concept wise very similar to a {@link org.reactivestreams.tck.TestEnvironment.Latch}, serves to protect
* a critical section from concurrent access, with the added benefit of Thread tracking and same-thread-access awareness.
*
* Since a <i>Synchronous</i> Publisher may choose to synchronously (using the same {@link Thread}) call
* {@code onNext} directly from either {@code subscribe} or {@code request} a plain Latch is not enough
* to verify concurrent access safety - one needs to track if the caller is not still using the calling thread
* to enter subsequent critical sections ("nesting" them effectively).
*/
final class ConcurrentAccessBarrier {
private AtomicReference<Thread> currentlySignallingThread = new AtomicReference<Thread>(null);
private volatile String previousSignal = null;

public void enterSignal(String signalName) {
if ((!currentlySignallingThread.compareAndSet(null, Thread.currentThread())) && !isSynchronousSignal()) {
env.flop(String.format(
"Illegal concurrent access detected (entering critical section)! " +
"%s emited %s signal, before %s finished its %s signal.",
Thread.currentThread(), signalName, currentlySignallingThread.get(), previousSignal));
}
this.previousSignal = signalName;
}

public void leaveSignal(String signalName) {
currentlySignallingThread.set(null);
this.previousSignal = signalName;
}

private boolean isSynchronousSignal() {
return (previousSignal != null) && Thread.currentThread().equals(currentlySignallingThread.get());
}

}

@Override
public void onSubscribe(Subscription s) {
final String signal = "onSubscribe()";
concurrentAccessBarrier.enterSignal(signal);

subs = s;
subs.request(1);

concurrentAccessBarrier.leaveSignal(signal);
}

@Override
public void onNext(ByteBuffer ignore) {
final String signal = String.format("onNext(%s)", ignore);
concurrentAccessBarrier.enterSignal(signal);

if (gotElements.incrementAndGet() <= elements) // requesting one more than we know are in the stream (some Publishers need this)
{
subs.request(1);
}

concurrentAccessBarrier.leaveSignal(signal);
}

@Override
public void onError(Throwable t) {
final String signal = String.format("onError(%s)", t.getMessage());
concurrentAccessBarrier.enterSignal(signal);

// ignore value

concurrentAccessBarrier.leaveSignal(signal);
}

@Override
public void onComplete() {
final String signal = "onComplete()";
concurrentAccessBarrier.enterSignal(signal);

// entering for completeness

concurrentAccessBarrier.leaveSignal(signal);
completionLatch.close();
}
});

completionLatch.expectClose(
elements * env.defaultTimeoutMillis(),
String.format("Failed in iteration %d of %d. Expected completion signal after signalling %d elements (signalled %d), yet did not receive it",
runNumber, iterations, elements, gotElements.get()));
}
});
return null;
}
});
for (int i = 0; i < 100; i++) {
super.stochastic_spec103_mustSignalOnMethodsSequentially();
}
}

@BeforeClass
Expand Down