Skip to content

Commit

Permalink
JCBC-2170 FIT: ClusterEnvironment.publishOnScheduler()
Browse files Browse the repository at this point in the history
Motivation
----------
Verify the SDK publishes results on the configured
scheduler.

Modifications
-------------
Configure the FIT ClusterEnvironment to publish on a
custom scheduler.

Add UserSchedulerUtil.withSchedulerCheck(Mono/Flux).
It returns a new Mono/Flux with doOn* callbacks
that verify the signals are published on the custom
scheduler.

Everywhere the performer calls a reactive SDK method,
wrap the result in `withSchedulerCheck()`.

Change-Id: I40a5c053603d34ed8e86743f286c91bf77273010
Reviewed-on: https://review.couchbase.org/c/couchbase-jvm-clients/+/219006
Tested-by: Build Bot <build@couchbase.com>
Reviewed-by: Michael Reiche <michael.reiche@couchbase.com>
  • Loading branch information
dnault committed Nov 5, 2024
1 parent dcc5f18 commit 7f36cf2
Show file tree
Hide file tree
Showing 12 changed files with 168 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import com.couchbase.utils.Capabilities;
import com.couchbase.utils.ClusterConnection;
import com.couchbase.utils.OptionsUtil;
import com.couchbase.utils.UserSchedulerUtil;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.Status;
Expand Down Expand Up @@ -232,6 +233,12 @@ public void clusterConnectionCreate(ClusterConnectionCreateRequest request,

var clusterEnvironment = OptionsUtil.convertClusterConfig(request, getCluster, onClusterConnectionClose);

// [if:3.7.5] first version that allows specifying custom publishOn scheduler
var userExecutorAndScheduler = UserSchedulerUtil.userExecutorAndScheduler();
onClusterConnectionClose.add(userExecutorAndScheduler::dispose);
clusterEnvironment.publishOnScheduler(userExecutorAndScheduler::scheduler);
// [end]

var connection = new ClusterConnection(request.getClusterHostname(),
request.getClusterUsername(),
request.getClusterPassword(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import static com.couchbase.JavaSdkCommandExecutor.convertExceptionShared;
import static com.couchbase.JavaSdkCommandExecutor.setSuccess;
import static com.couchbase.client.performer.core.util.TimeUtil.getTimeNow;
import static com.couchbase.utils.UserSchedulerUtil.withSchedulerCheck;

public class LookupInHelper {
public static Result.Builder handleLookupIn(PerRun perRun,
Expand Down Expand Up @@ -247,7 +248,7 @@ private static Mono<?> handleLookupInReactive(
result = collection.lookupIn(docId, specs);
}

return result.doOnNext(r -> {
return withSchedulerCheck(result).doOnNext(r -> {
out.setElapsedNanos(System.nanoTime() - start);

if (command.getReturnResult()) {
Expand Down Expand Up @@ -284,7 +285,7 @@ private static Mono<?> handleLookupInAnyReplicaReactive(
result = collection.lookupInAnyReplica(docId, specs);
}

return result
return withSchedulerCheck(result)
.doOnError(err -> err.printStackTrace())
.doFinally(v -> System.out.println("Finished"))
.doOnNext(v -> {
Expand Down Expand Up @@ -331,7 +332,7 @@ private static Mono<?> handleLookupInAllReplicasReactive(
out.setElapsedNanos(System.nanoTime() - start);

var streamer = new FluxStreamer<LookupInReplicaResult>(
results,
withSchedulerCheck(results),
perRun,
req.getStreamConfig().getStreamId(),
req.getStreamConfig(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
// [if:3.6.0]
import static com.couchbase.search.SearchHelper.handleSearchReactive;
// [end]
import static com.couchbase.utils.UserSchedulerUtil.withSchedulerCheck;


/**
Expand Down Expand Up @@ -105,7 +106,7 @@ private Mono<Result> performOperationReactive(com.couchbase.client.protocol.sdk.
Mono<MutationResult> mr;
if (options == null) mr = collection.insert(docId, content);
else mr = collection.insert(docId, content, options);
return mr.map(r -> {
return withSchedulerCheck(mr).map(r -> {
result.setElapsedNanos(System.nanoTime() - start);
if (op.getReturnResult()) populateResult(result, r);
else setSuccess(result);
Expand All @@ -121,7 +122,7 @@ private Mono<Result> performOperationReactive(com.couchbase.client.protocol.sdk.
Mono<GetResult> gr;
if (options == null) gr = collection.get(docId);
else gr = collection.get(docId, options);
return gr.map(r -> {
return withSchedulerCheck(gr).map(r -> {
result.setElapsedNanos(System.nanoTime() - start);
if (op.getReturnResult()) populateResult(request.getContentAs(), result, r);
else setSuccess(result);
Expand All @@ -138,7 +139,7 @@ private Mono<Result> performOperationReactive(com.couchbase.client.protocol.sdk.
if (options == null) mr = collection.remove(docId);
else mr = collection.remove(docId, options);
result.setElapsedNanos(System.nanoTime() - start);
return mr.map(r -> {
return withSchedulerCheck(mr).map(r -> {
if (op.getReturnResult()) populateResult(result, r);
else setSuccess(result);
return result.build();
Expand All @@ -154,7 +155,7 @@ private Mono<Result> performOperationReactive(com.couchbase.client.protocol.sdk.
Mono<MutationResult> mr;
if (options == null) mr = collection.replace(docId, content);
else mr = collection.replace(docId, content, options);
return mr.map(r -> {
return withSchedulerCheck(mr).map(r -> {
result.setElapsedNanos(System.nanoTime() - start);
if (op.getReturnResult()) populateResult(result, r);
else setSuccess(result);
Expand All @@ -172,7 +173,7 @@ private Mono<Result> performOperationReactive(com.couchbase.client.protocol.sdk.
if (options == null) mr = collection.upsert(docId, content);
else mr = collection.upsert(docId, content, options);
result.setElapsedNanos(System.nanoTime() - start);
return mr.map(r -> {
return withSchedulerCheck(mr).map(r -> {
if (op.getReturnResult()) populateResult(result, r);
else setSuccess(result);
return result.build();
Expand All @@ -189,7 +190,7 @@ private Mono<Result> performOperationReactive(com.couchbase.client.protocol.sdk.
if (options != null) results = collection.scan(scanType, options);
else results = collection.scan(scanType);
result.setElapsedNanos(System.nanoTime() - start);
var streamer = new FluxStreamer<ScanResult>(results, perRun, request.getStreamConfig().getStreamId(), request.getStreamConfig(),
var streamer = new FluxStreamer<ScanResult>(withSchedulerCheck(results), perRun, request.getStreamConfig().getStreamId(), request.getStreamConfig(),
(ScanResult r) -> processScanResult(request, r),
(Throwable err) -> convertException(err));
perRun.streamerOwner().addAndStart(streamer);
Expand Down Expand Up @@ -244,7 +245,7 @@ private Mono<Result> handleCollectionLevelCommand(Command op, PerRun perRun, Res
} else {
gr = collection.getAndLock(docId, Duration.ofSeconds(duration.getSeconds()), options);
}
return gr.map(r -> {
return withSchedulerCheck(gr).map(r -> {
result.setElapsedNanos(System.nanoTime() - start);
if (op.getReturnResult()) {
populateResult(request.getContentAs(), result, r);
Expand All @@ -268,7 +269,7 @@ private Mono<Result> handleCollectionLevelCommand(Command op, PerRun perRun, Res
} else {
gr = collection.unlock(docId, cas, options);
}
return gr.then(Mono.fromCallable(() -> {
return withSchedulerCheck(gr).then(Mono.fromCallable(() -> {
result.setElapsedNanos(System.nanoTime() - start);
setSuccess(result);
return result.build();
Expand All @@ -293,7 +294,7 @@ private Mono<Result> handleCollectionLevelCommand(Command op, PerRun perRun, Res
} else {
gr = collection.getAndTouch(docId, expiry, options);
}
return gr.map(r -> {
return withSchedulerCheck(gr).map(r -> {
result.setElapsedNanos(System.nanoTime() - start);
if (op.getReturnResult()) {
populateResult(request.getContentAs(), result, r);
Expand Down Expand Up @@ -322,7 +323,7 @@ private Mono<Result> handleCollectionLevelCommand(Command op, PerRun perRun, Res
} else {
mr = collection.touch(docId, expiry, options);
}
return mr.map(r -> {
return withSchedulerCheck(mr).map(r -> {
result.setElapsedNanos(System.nanoTime() - start);
if (op.getReturnResult()) {
populateResult(result, r);
Expand All @@ -337,7 +338,7 @@ private Mono<Result> handleCollectionLevelCommand(Command op, PerRun perRun, Res
var request = clc.getExists();
var docId = getDocId(request.getLocation());
var exists = collection.exists(docId);
return exists.map(r -> {
return withSchedulerCheck(exists).map(r -> {
if (op.getReturnResult()) {
populateResult(result, r);
} else {
Expand All @@ -357,7 +358,7 @@ private Mono<Result> handleCollectionLevelCommand(Command op, PerRun perRun, Res
} else {
mr = collection.mutateIn(docId, request.getSpecList().stream().map(v -> convertMutateInSpec(v)).toList(), options);
}
return mr.map(r -> {
return withSchedulerCheck(mr).map(r -> {
if (op.getReturnResult()) {
populateResult(result, r, request);
} else {
Expand All @@ -380,7 +381,7 @@ private Mono<Result> handleCollectionLevelCommand(Command op, PerRun perRun, Res
results = collection.getAllReplicas(docId, options);
}
result.setElapsedNanos(System.nanoTime() - start);
var streamer = new FluxStreamer<>(results, perRun, request.getStreamConfig().getStreamId(), request.getStreamConfig(),
var streamer = new FluxStreamer<>(withSchedulerCheck(results), perRun, request.getStreamConfig().getStreamId(), request.getStreamConfig(),
(GetReplicaResult r) -> processGetAllReplicasResult(request, r),
this::convertException);
perRun.streamerOwner().addAndStart(streamer);
Expand All @@ -403,7 +404,7 @@ private Mono<Result> handleCollectionLevelCommand(Command op, PerRun perRun, Res
} else {
gr = collection.getAnyReplica(docId, options);
}
return gr.map(r -> {
return withSchedulerCheck(gr).map(r -> {
result.setElapsedNanos(System.nanoTime() - start);
if (op.getReturnResult()) {
populateResult(result, r, request.getContentAs());
Expand Down Expand Up @@ -431,7 +432,7 @@ private Mono<Result> handleCollectionLevelCommand(Command op, PerRun perRun, Res
cr = collection.binary().increment(docId, options);
}
result.setElapsedNanos(System.nanoTime() - start);
return cr.map(r -> {
return withSchedulerCheck(cr).map(r -> {
if (op.getReturnResult()) {
populateResult(result, r);
} else {
Expand All @@ -454,7 +455,7 @@ private Mono<Result> handleCollectionLevelCommand(Command op, PerRun perRun, Res
cr = collection.binary().decrement(docId, options);
}
result.setElapsedNanos(System.nanoTime() - start);
return cr.map(r -> {
return withSchedulerCheck(cr).map(r -> {
if (op.getReturnResult()) {
populateResult(result, r);
} else {
Expand All @@ -477,7 +478,7 @@ private Mono<Result> handleCollectionLevelCommand(Command op, PerRun perRun, Res
mr = collection.binary().append(docId, request.getContent().toByteArray(), options);
}
result.setElapsedNanos(System.nanoTime() - start);
return mr.map(r -> {
return withSchedulerCheck(mr).map(r -> {
if (op.getReturnResult()) {
populateResult(result, r);
} else {
Expand All @@ -500,7 +501,7 @@ private Mono<Result> handleCollectionLevelCommand(Command op, PerRun perRun, Res
mr = collection.binary().prepend(docId, request.getContent().toByteArray(), options);
}
result.setElapsedNanos(System.nanoTime() - start);
return mr.map(r -> {
return withSchedulerCheck(mr).map(r -> {
if (op.getReturnResult()) {
populateResult(result, r);
} else {
Expand Down Expand Up @@ -537,7 +538,7 @@ private Mono<Result> handleScopeLevelCommand(Command op, PerRun perRun, Result.B
queryResult = scope.reactive().query(query);
}

return returnQueryResult(request, queryResult, result, start);
return returnQueryResult(request, withSchedulerCheck(queryResult), result, start);
}
// [end]

Expand Down Expand Up @@ -581,7 +582,7 @@ private Mono<Result> handleBucketLevelCommand(Command op, PerRun perRun, Result.
response = bucket.waitUntilReady(timeout);
}

return response.then(Mono.fromCallable(() -> {
return withSchedulerCheck(response).then(Mono.fromCallable(() -> {
setSuccess(result);
return result.build();
}));
Expand Down Expand Up @@ -614,7 +615,7 @@ private Mono<Result> handleClusterLevelCommand(Command op, PerRun perRun, Result
response = cluster.waitUntilReady(timeout);
}

return response.then(Mono.fromCallable(() -> {
return withSchedulerCheck(response).then(Mono.fromCallable(() -> {
setSuccess(result);
return result.build();
}));
Expand Down Expand Up @@ -673,7 +674,7 @@ else if (clc.hasEventingFunctionManager()) {
queryResult = connection.cluster().reactive().query(query);
}

return returnQueryResult(request, queryResult, result, start);
return returnQueryResult(request, withSchedulerCheck(queryResult), result, start);
}

return Mono.error(new UnsupportedOperationException("Unknown command " + op));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.ConcurrentHashMap;

import static com.couchbase.utils.OptionsUtil.convertDuration;
import static com.couchbase.utils.UserSchedulerUtil.withSchedulerCheck;

public class EventingHelper {

Expand Down Expand Up @@ -82,7 +83,7 @@ public static Mono<Result> handleEventingFunctionManagerReactive(ReactiveCluster
}


return response.map(r -> {
return withSchedulerCheck(response).map(r -> {
minimalEventingFunctionFromResult(result, r);
return result.build();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.concurrent.ConcurrentHashMap;

import static com.couchbase.JavaSdkCommandExecutor.setSuccess;
import static com.couchbase.utils.UserSchedulerUtil.withSchedulerCheck;


public class BucketManagerHelper {
Expand Down Expand Up @@ -157,7 +158,7 @@ public static Mono<Result> handleBucketManagerReactive(ReactiveCluster cluster,
var options = createBucketOptions(request.getOptions(), spans);
response = cluster.buckets().createBucket(createBucketSettings(request.getSettings(), spans), options);
}
return response.then(Mono.fromCallable(() -> {
return withSchedulerCheck(response).then(Mono.fromCallable(() -> {
populateResult(start, result, null);
result.setElapsedNanos(System.nanoTime() - start);
setSuccess(result);
Expand All @@ -173,7 +174,7 @@ public static Mono<Result> handleBucketManagerReactive(ReactiveCluster cluster,
var options = updateBucketOptions(request.getOptions(), spans);
response = cluster.buckets().updateBucket(updateBucketSettings(request.getSettings(), spans), options);
}
return response.then(Mono.fromCallable(() -> {
return withSchedulerCheck(response).then(Mono.fromCallable(() -> {
populateResult(start, result, null);
result.setElapsedNanos(System.nanoTime() - start);
setSuccess(result);
Expand All @@ -189,7 +190,7 @@ public static Mono<Result> handleBucketManagerReactive(ReactiveCluster cluster,
var options = dropBucketOptions(request.getOptions(), spans);
response = cluster.buckets().dropBucket(request.getBucketName(), options);
}
return response.then(Mono.fromCallable(() -> {
return withSchedulerCheck(response).then(Mono.fromCallable(() -> {
populateResult(start, result, null);
result.setElapsedNanos(System.nanoTime() - start);
setSuccess(result);
Expand All @@ -205,7 +206,7 @@ public static Mono<Result> handleBucketManagerReactive(ReactiveCluster cluster,
var options = flushBucketOptions(request.getOptions(), spans);
response = cluster.buckets().flushBucket(request.getBucketName(), options);
}
return response.then(Mono.fromCallable(() -> {
return withSchedulerCheck(response).then(Mono.fromCallable(() -> {
populateResult(start, result, null);
result.setElapsedNanos(System.nanoTime() - start);
setSuccess(result);
Expand All @@ -220,7 +221,7 @@ public static Mono<Result> handleBucketManagerReactive(ReactiveCluster cluster,
var options = createGetAllBucketsOptions(request.getOptions(), spans);
response = cluster.buckets().getAllBuckets(options);
}
return response.map(rr -> {
return withSchedulerCheck(response).map(rr -> {
rr.forEach((name, settings) ->
populateResult(start, result, settings));
return result.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.stream.Collectors;

import static com.couchbase.JavaSdkCommandExecutor.setSuccess;
import static com.couchbase.utils.UserSchedulerUtil.withSchedulerCheck;


public class CollectionManagerHelper {
Expand Down Expand Up @@ -137,7 +138,7 @@ public static Mono<Result> handleCollectionManagerReactive(ReactiveCluster clust
var options = createGetAllScopesOptions(request.getOptions(), spans);
response = cluster.bucket(bucketName).collections().getAllScopes(options);
}
Flux<Result> f = response.map(r -> {
Flux<Result> f = withSchedulerCheck(response).map(r -> {
populateResult(result, r);
return result.build();
});
Expand All @@ -151,7 +152,7 @@ public static Mono<Result> handleCollectionManagerReactive(ReactiveCluster clust
} else {
r = collections.createScope(request.getName());
}
return r.then(Mono.fromSupplier(() -> {
return withSchedulerCheck(r).then(Mono.fromSupplier(() -> {
setSuccess(result);
return result.build();
}));
Expand All @@ -164,7 +165,7 @@ public static Mono<Result> handleCollectionManagerReactive(ReactiveCluster clust
} else {
r = collections.dropScope(request.getName());
}
return r.then(Mono.fromSupplier(() -> {
return withSchedulerCheck(r).then(Mono.fromSupplier(() -> {
setSuccess(result);
return result.build();
}));
Expand All @@ -178,7 +179,7 @@ public static Mono<Result> handleCollectionManagerReactive(ReactiveCluster clust
} else {
r = collections.createCollection(request.getScopeName(), request.getName(), settings);
}
return r.then(Mono.fromSupplier(() -> {
return withSchedulerCheck(r).then(Mono.fromSupplier(() -> {
setSuccess(result);
return result.build();
}));
Expand All @@ -192,7 +193,7 @@ public static Mono<Result> handleCollectionManagerReactive(ReactiveCluster clust
} else {
r = collections.updateCollection(request.getScopeName(), request.getName(), settings);
}
return r.then(Mono.fromSupplier(() -> {
return withSchedulerCheck(r).then(Mono.fromSupplier(() -> {
setSuccess(result);
return result.build();
}));
Expand All @@ -205,7 +206,7 @@ public static Mono<Result> handleCollectionManagerReactive(ReactiveCluster clust
} else {
r = collections.dropCollection(request.getScopeName(), request.getName());
}
return r.then(Mono.fromSupplier(() -> {
return withSchedulerCheck(r).then(Mono.fromSupplier(() -> {
setSuccess(result);
return result.build();
}));
Expand Down
Loading

0 comments on commit 7f36cf2

Please sign in to comment.