Skip to content

Commit

Permalink
ensures RSocketRequester awaits proper termination of connection and …
Browse files Browse the repository at this point in the history
…responder side

Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <oleh.dokuka@icloud.com>
  • Loading branch information
Oleh Dokuka authored and OlegDokuka committed Apr 11, 2023
1 parent 8959385 commit 59a3e49
Show file tree
Hide file tree
Showing 29 changed files with 442 additions and 84 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package io.rsocket.resume;

import static org.openjdk.jcstress.annotations.Expect.ACCEPTABLE;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.rsocket.exceptions.ConnectionErrorException;
import io.rsocket.frame.ErrorFrameCodec;
import io.rsocket.frame.PayloadFrameCodec;
import io.rsocket.internal.UnboundedProcessor;
import org.openjdk.jcstress.annotations.Actor;
import org.openjdk.jcstress.annotations.Arbiter;
import org.openjdk.jcstress.annotations.JCStressTest;
import org.openjdk.jcstress.annotations.Outcome;
import org.openjdk.jcstress.annotations.State;
import org.openjdk.jcstress.infra.results.LL_Result;
import reactor.core.Disposable;

public class InMemoryResumableFramesStoreStressTest {
boolean storeClosed;

InMemoryResumableFramesStore store =
new InMemoryResumableFramesStore("test", Unpooled.EMPTY_BUFFER, 128);
boolean processorClosed;
UnboundedProcessor processor = new UnboundedProcessor(() -> processorClosed = true);

void subscribe() {
store.saveFrames(processor).subscribe();
store.onClose().subscribe(null, t -> storeClosed = true, () -> storeClosed = true);
}

@JCStressTest
@Outcome(
id = {"true, true"},
expect = ACCEPTABLE)
@State
public static class TwoSubscribesRaceStressTest extends InMemoryResumableFramesStoreStressTest {

Disposable d1;

final ByteBuf b1 =
PayloadFrameCodec.encode(
ByteBufAllocator.DEFAULT,
1,
false,
true,
false,
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "hello1"),
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "hello2"));
final ByteBuf b2 =
PayloadFrameCodec.encode(
ByteBufAllocator.DEFAULT,
3,
false,
true,
false,
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "hello3"),
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "hello4"));
final ByteBuf b3 =
PayloadFrameCodec.encode(
ByteBufAllocator.DEFAULT,
5,
false,
true,
false,
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "hello5"),
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "hello6"));

final ByteBuf c1 =
ErrorFrameCodec.encode(ByteBufAllocator.DEFAULT, 0, new ConnectionErrorException("closed"));

{
subscribe();
d1 = store.doOnDiscard(ByteBuf.class, ByteBuf::release).subscribe(ByteBuf::release, t -> {});
}

@Actor
public void producer1() {
processor.tryEmitNormal(b1);
processor.tryEmitNormal(b2);
processor.tryEmitNormal(b3);
}

@Actor
public void producer2() {
processor.tryEmitFinal(c1);
}

@Actor
public void producer3() {
d1.dispose();
store
.doOnDiscard(ByteBuf.class, ByteBuf::release)
.subscribe(ByteBuf::release, t -> {})
.dispose();
store
.doOnDiscard(ByteBuf.class, ByteBuf::release)
.subscribe(ByteBuf::release, t -> {})
.dispose();
store.doOnDiscard(ByteBuf.class, ByteBuf::release).subscribe(ByteBuf::release, t -> {});
}

@Actor
public void producer4() {
store.releaseFrames(0);
store.releaseFrames(0);
store.releaseFrames(0);
}

@Arbiter
public void arbiter(LL_Result r) {
r.r1 = storeClosed;
r.r2 = processorClosed;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ public ClientServerInputMultiplexer(
this.source = source;
this.isClient = isClient;

this.serverReceiver = new InternalDuplexConnection(this, source);
this.clientReceiver = new InternalDuplexConnection(this, source);
this.serverReceiver = new InternalDuplexConnection(Type.SERVER, this, source);
this.clientReceiver = new InternalDuplexConnection(Type.CLIENT, this, source);
this.serverConnection = registry.initConnection(Type.SERVER, serverReceiver);
this.clientConnection = registry.initConnection(Type.CLIENT, clientReceiver);
}
Expand Down Expand Up @@ -195,8 +195,33 @@ int incrementAndGetCheckingState() {
}
}

@Override
public String toString() {
return "ClientServerInputMultiplexer{"
+ "serverReceiver="
+ serverReceiver
+ ", clientReceiver="
+ clientReceiver
+ ", serverConnection="
+ serverConnection
+ ", clientConnection="
+ clientConnection
+ ", source="
+ source
+ ", isClient="
+ isClient
+ ", s="
+ s
+ ", t="
+ t
+ ", state="
+ state
+ '}';
}

private static class InternalDuplexConnection extends Flux<ByteBuf>
implements Subscription, DuplexConnection {
private final Type type;
private final ClientServerInputMultiplexer clientServerInputMultiplexer;
private final DuplexConnection source;

Expand All @@ -207,7 +232,10 @@ private static class InternalDuplexConnection extends Flux<ByteBuf>
CoreSubscriber<? super ByteBuf> actual;

public InternalDuplexConnection(
ClientServerInputMultiplexer clientServerInputMultiplexer, DuplexConnection source) {
Type type,
ClientServerInputMultiplexer clientServerInputMultiplexer,
DuplexConnection source) {
this.type = type;
this.clientServerInputMultiplexer = clientServerInputMultiplexer;
this.source = source;
}
Expand Down Expand Up @@ -304,5 +332,17 @@ public Mono<Void> onClose() {
public double availability() {
return source.availability();
}

@Override
public String toString() {
return "InternalDuplexConnection{"
+ "type="
+ type
+ ", source="
+ source
+ ", state="
+ state
+ '}';
}
}
}
15 changes: 13 additions & 2 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.function.Supplier;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.annotation.Nullable;
import reactor.util.function.Tuples;
import reactor.util.retry.Retry;
Expand Down Expand Up @@ -655,6 +656,11 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
requesterLeaseTracker = null;
}

final Sinks.Empty<Void> requesterOnAllClosedSink =
Sinks.unsafe().empty();
final Sinks.Empty<Void> responderOnAllClosedSink =
Sinks.unsafe().empty();

RSocket rSocketRequester =
new RSocketRequester(
multiplexer.asClientConnection(),
Expand All @@ -667,7 +673,11 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
(int) keepAliveMaxLifeTime.toMillis(),
keepAliveHandler,
interceptors::initRequesterRequestInterceptor,
requesterLeaseTracker);
requesterLeaseTracker,
requesterOnAllClosedSink,
Mono.whenDelayError(
responderOnAllClosedSink.asMono(),
requesterOnAllClosedSink.asMono()));

RSocket wrappedRSocketRequester =
interceptors.initRequester(rSocketRequester);
Expand Down Expand Up @@ -715,7 +725,8 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
(RequestInterceptor)
leases.sender)
: interceptors
::initResponderRequestInterceptor);
::initResponderRequestInterceptor,
responderOnAllClosedSink);

return wrappedRSocketRequester;
})
Expand Down
Loading

0 comments on commit 59a3e49

Please sign in to comment.