From 9dbd436311b46124595208fbcfcb2adb32715692 Mon Sep 17 00:00:00 2001 From: David Nault Date: Mon, 28 Oct 2024 15:25:30 -0700 Subject: [PATCH] JCBC-2170 (1/n) Don't switch schedulers when converting reactive query result Motivation ---------- Groundwork for asserting reactive results are published on the user's custom Scheduler. Modifications ------------- In reactive returnQueryResult(), build a fully non-blocking reactive chain instead of switching to bounded elastic scheduler and blocking. Rework the relevant utility methods in ContentAsUtil to support the reactive chain. Change-Id: Ifab9e60ef71f8c6e8011bf9ac303eea7fdddcb92 Reviewed-on: https://review.couchbase.org/c/couchbase-jvm-clients/+/218707 Tested-by: Build Bot Reviewed-by: Michael Reiche Reviewed-by: Graham Pople --- .../com/couchbase/JavaSdkCommandExecutor.java | 19 ++-- .../ReactiveJavaSdkCommandExecutor.java | 55 +++++------- .../com/couchbase/utils/ContentAsUtil.java | 88 +++++++------------ 3 files changed, 58 insertions(+), 104 deletions(-) diff --git a/java-fit-performer/src/main/java/com/couchbase/JavaSdkCommandExecutor.java b/java-fit-performer/src/main/java/com/couchbase/JavaSdkCommandExecutor.java index 5ba4b1558..b3f9674c4 100644 --- a/java-fit-performer/src/main/java/com/couchbase/JavaSdkCommandExecutor.java +++ b/java-fit-performer/src/main/java/com/couchbase/JavaSdkCommandExecutor.java @@ -1020,20 +1020,13 @@ public static void populateResult(com.couchbase.client.protocol.sdk.query.Comman var builder = com.couchbase.client.protocol.sdk.query.QueryResult.newBuilder(); - var content = ContentAsUtil.contentTypeList(request.getContentAs(), - () -> values.rowsAs(byte[].class), - () -> values.rowsAs(String.class), - () -> values.rowsAs(JsonObject.class), - () -> values.rowsAs(JsonArray.class), - () -> values.rowsAs(Boolean.class), - () -> values.rowsAs(Integer.class), - () -> values.rowsAs(Double.class)); - - if (content.isFailure()) { - throw content.exception(); - } + var contentAs = request.getContentAs(); + var rowType = ContentAsUtil.toJavaClass(contentAs); + var content = values.rowsAs(rowType).stream() + .map(row -> ContentAsUtil.toFitContent(row, contentAs)) + .toList(); - builder.addAllContent(content.value()); + builder.addAllContent(content); // Metadata var convertedMetaData = convertMetaData(values.metaData()); diff --git a/java-fit-performer/src/main/java/com/couchbase/ReactiveJavaSdkCommandExecutor.java b/java-fit-performer/src/main/java/com/couchbase/ReactiveJavaSdkCommandExecutor.java index aede0a967..a7f59fed9 100644 --- a/java-fit-performer/src/main/java/com/couchbase/ReactiveJavaSdkCommandExecutor.java +++ b/java-fit-performer/src/main/java/com/couchbase/ReactiveJavaSdkCommandExecutor.java @@ -17,8 +17,6 @@ import com.couchbase.client.core.cnc.RequestSpan; import com.couchbase.client.java.ReactiveCollection; -import com.couchbase.client.java.json.JsonArray; -import com.couchbase.client.java.json.JsonObject; import com.couchbase.client.java.kv.*; import com.couchbase.client.java.query.ReactiveQueryResult; import com.couchbase.client.java.kv.GetResult; @@ -58,7 +56,6 @@ import com.couchbase.utils.ClusterConnection; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; import java.time.Duration; import java.time.Instant; @@ -687,37 +684,25 @@ protected Exception convertException(Throwable raw) { return convertExceptionShared(raw); } - private Mono returnQueryResult(com.couchbase.client.protocol.sdk.query.Command request, Mono queryResult, Result.Builder result, Long start) { - return queryResult.publishOn(Schedulers.boundedElastic()).map(r -> { - result.setElapsedNanos(System.nanoTime() - start); - - var builder = com.couchbase.client.protocol.sdk.query.QueryResult.newBuilder(); - - // FIT only supports testing blocking (not streaming) queries currently, so the .block() here to gather - // the rows is fine. - var content = ContentAsUtil.contentTypeList(request.getContentAs(), - () -> r.rowsAs(byte[].class).collectList().block(), - () -> r.rowsAs(String.class).collectList().block(), - () -> r.rowsAs(JsonObject.class).collectList().block(), - () -> r.rowsAs(JsonArray.class).collectList().block(), - () -> r.rowsAs(Boolean.class).collectList().block(), - () -> r.rowsAs(Integer.class).collectList().block(), - () -> r.rowsAs(Double.class).collectList().block()); - - if (content.isFailure()) { - throw content.exception(); - } - - builder.addAllContent(content.value()); - - // Metadata - var convertedMetaData = convertMetaData(r.metaData().block()); - builder.setMetaData(convertedMetaData); - - result.setSdk(com.couchbase.client.protocol.sdk.Result.newBuilder() - .setQueryResult(builder)); + private Mono returnQueryResult(com.couchbase.client.protocol.sdk.query.Command request, Mono queryResult, Result.Builder result, Long start) { + return queryResult.flatMap(r -> { + result.setElapsedNanos(System.nanoTime() - start); - return result.build(); - }); - } + var contentAs = request.getContentAs(); + var rowType = ContentAsUtil.toJavaClass(contentAs); + return r.rowsAs(rowType) + .map(it -> ContentAsUtil.toFitContent(it, contentAs)) + .collectList() + .zipWith(r.metaData().map(JavaSdkCommandExecutor::convertMetaData)) + .map(fitRowsAndMetadata -> { + var fitQueryResult = com.couchbase.client.protocol.sdk.query.QueryResult.newBuilder() + .addAllContent(fitRowsAndMetadata.getT1()) + .setMetaData(fitRowsAndMetadata.getT2()); + + result.setSdk(com.couchbase.client.protocol.sdk.Result.newBuilder() + .setQueryResult(fitQueryResult)); + return result.build(); + }); + }); + } } diff --git a/java-fit-performer/src/main/java/com/couchbase/utils/ContentAsUtil.java b/java-fit-performer/src/main/java/com/couchbase/utils/ContentAsUtil.java index b38a6bf8f..ca52754c7 100644 --- a/java-fit-performer/src/main/java/com/couchbase/utils/ContentAsUtil.java +++ b/java-fit-performer/src/main/java/com/couchbase/utils/ContentAsUtil.java @@ -20,8 +20,8 @@ import com.couchbase.client.protocol.shared.ContentAs; import com.couchbase.client.protocol.shared.ContentTypes; import com.google.protobuf.ByteString; +import reactor.util.annotation.Nullable; -import java.util.List; import java.util.function.Supplier; public class ContentAsUtil { @@ -91,62 +91,38 @@ public static Try contentType(ContentAs contentAs, } } - public static Try> contentTypeList(ContentAs contentAs, - Supplier> asByteArray, - Supplier> asString, - Supplier> asJsonObject, - Supplier> asJsonArray, - Supplier> asBoolean, - Supplier> asInteger, - Supplier> asDouble) { - try { - if (contentAs.hasAsByteArray()) { - return new Try<>(asByteArray.get().stream() - .map(v -> v != null - ? ContentTypes.newBuilder().setContentAsBytes(ByteString.copyFrom(v)).build() - : getNullContentType().value()) - .toList()); - } else if (contentAs.hasAsString()) { - return new Try<>(asString.get().stream() - .map(v -> v != null - ? ContentTypes.newBuilder().setContentAsString(v).build() - : getNullContentType().value()).toList()); - } else if (contentAs.hasAsJsonObject()) { - return new Try<>(asJsonObject.get().stream() - .map(v -> v != null - ? ContentTypes.newBuilder().setContentAsBytes(ByteString.copyFrom(v.toBytes())).build() - : getNullContentType().value()) - .toList()); - } else if (contentAs.hasAsJsonArray()) { - return new Try<>(asJsonArray.get().stream() - .map(v -> v != null - ? ContentTypes.newBuilder().setContentAsBytes(ByteString.copyFrom(v.toBytes())).build() - : getNullContentType().value()) - .toList()); - } else if (contentAs.getAsBoolean()) { - return new Try<>(asBoolean.get().stream() - .map(v -> v != null - ? ContentTypes.newBuilder().setContentAsBool(v).build() - : getNullContentType().value()) - .toList()); - } else if (contentAs.hasAsInteger()) { - return new Try<>(asInteger.get().stream() - .map(v -> v != null - ? ContentTypes.newBuilder().setContentAsInt64(v).build() - : getNullContentType().value()) - .toList()); - } else if (contentAs.hasAsFloatingPoint()) { - return new Try<>(asDouble.get().stream() - .map(v -> v != null - ?ContentTypes.newBuilder().setContentAsDouble(v).build() - : getNullContentType().value()) - .toList()); - } else { - throw new UnsupportedOperationException("Java performer cannot handle contentAs " + contentAs.toString()); - } - } catch (RuntimeException err) { - return new Try<>(err); + public static Class toJavaClass(ContentAs contentAs) { + return switch (contentAs.getAsCase()) { + case AS_STRING -> String.class; + case AS_BYTE_ARRAY -> byte[].class; + case AS_JSON_OBJECT -> JsonObject.class; + case AS_JSON_ARRAY -> JsonArray.class; + case AS_BOOLEAN -> Boolean.class; + case AS_INTEGER -> Integer.class; + case AS_FLOATING_POINT -> Double.class; + + default -> throw new UnsupportedOperationException("Java performer cannot handle contentAs " + contentAs); + }; + } + + public static ContentTypes toFitContent(@Nullable Object value, ContentAs contentAs) { + ContentTypes.Builder builder = ContentTypes.newBuilder(); + + if (value == null) return builder.setContentAsNull(ContentTypes.NullValue.getDefaultInstance()).build(); + + switch (contentAs.getAsCase()) { + case AS_STRING -> builder.setContentAsString((String) value); + case AS_BYTE_ARRAY -> builder.setContentAsBytes(ByteString.copyFrom((byte[]) value)); + case AS_JSON_OBJECT -> builder.setContentAsBytes(ByteString.copyFrom(((JsonObject) value).toBytes())); + case AS_JSON_ARRAY -> builder.setContentAsBytes(ByteString.copyFrom(((JsonArray) value).toBytes())); + case AS_BOOLEAN -> builder.setContentAsBool((Boolean) value); + case AS_INTEGER -> builder.setContentAsInt64((Integer) value); + case AS_FLOATING_POINT -> builder.setContentAsDouble((Double) value); + + default -> throw new UnsupportedOperationException("Java performer cannot handle contentAs " + contentAs); } + + return builder.build(); } public static byte[] convert(ContentTypes content) {