From 2ac422c008ed5ec2ada66bf43651c0162247abcd Mon Sep 17 00:00:00 2001 From: Mike Bender Date: Fri, 2 Dec 2022 13:20:53 -0500 Subject: [PATCH 1/4] Update Web version to v0.22.2 (#3136) Release notes: https://github.com/deephaven/web-client-ui/releases/tag/v0.22.2 --- web/client-ui/Dockerfile | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/web/client-ui/Dockerfile b/web/client-ui/Dockerfile index fbbb436e115..840d90313aa 100644 --- a/web/client-ui/Dockerfile +++ b/web/client-ui/Dockerfile @@ -2,9 +2,9 @@ FROM deephaven/node:local-build WORKDIR /usr/src/app # Most of the time, these versions are the same, except in cases where a patch only affects one of the packages -ARG WEB_VERSION=0.21.1 -ARG GRID_VERSION=0.21.1 -ARG CHART_VERSION=0.21.1 +ARG WEB_VERSION=0.22.2 +ARG GRID_VERSION=0.22.2 +ARG CHART_VERSION=0.22.2 # Pull in the published code-studio package from npmjs and extract is RUN set -eux; \ From 7032e1906bf9a892b8340b38746b5f232bc4c7ce Mon Sep 17 00:00:00 2001 From: Devin Smith Date: Fri, 2 Dec 2022 11:01:44 -0800 Subject: [PATCH 2/4] Improve DebugAwareFunctionalLock (#3098) --- .../engine/updategraph/UpdateGraphLock.java | 65 ++++++------------- .../updategraph/UpdateGraphProcessor.java | 8 +-- 2 files changed, 25 insertions(+), 48 deletions(-) diff --git a/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateGraphLock.java b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateGraphLock.java index d3b3abdd06e..30e570e6e5f 100644 --- a/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateGraphLock.java +++ b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateGraphLock.java @@ -8,15 +8,14 @@ import io.deephaven.internal.log.LoggerFactory; import io.deephaven.io.logger.Logger; import io.deephaven.util.FunctionalInterfaces; -import io.deephaven.util.SafeCloseable; import io.deephaven.util.locks.AwareFunctionalLock; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.lang3.mutable.MutableBoolean; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import java.util.ArrayDeque; import java.util.Deque; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @@ -105,13 +104,8 @@ public static void installInstrumentation(@Nullable final Instrumentation instru rwLock = new ReentrantReadWriteLock(true); readLock = rwLock.readLock(); writeLock = rwLock.writeLock(); - if (allowUnitTestMode) { - sharedLock = new DebugAwareFunctionalLock(new SharedLock()); - exclusiveLock = new DebugAwareFunctionalLock(new ExclusiveLock()); - } else { - sharedLock = new SharedLock(); - exclusiveLock = new ExclusiveLock(); - } + sharedLock = new SharedLock(); + exclusiveLock = allowUnitTestMode ? new DebugAwareFunctionalLock(new ExclusiveLock()) : new ExclusiveLock(); this.allowUnitTestMode = allowUnitTestMode; } @@ -306,10 +300,10 @@ public final Condition newCondition() { // region DebugLock class DebugAwareFunctionalLock implements AwareFunctionalLock { - private final AwareFunctionalLock delegate; - private final Deque lockingContext = new ArrayDeque<>(); + private final ExclusiveLock delegate; + private final Deque lockingContext = new ConcurrentLinkedDeque<>(); - DebugAwareFunctionalLock(AwareFunctionalLock delegate) { + DebugAwareFunctionalLock(ExclusiveLock delegate) { this.delegate = delegate; } @@ -321,19 +315,19 @@ public boolean isHeldByCurrentThread() { @Override public void lock() { delegate.lock(); - lockingContext.push(new Throwable()); + pushContext(); } @Override public void lockInterruptibly() throws InterruptedException { delegate.lockInterruptibly(); - lockingContext.push(new Throwable()); + pushContext(); } @Override public boolean tryLock() { if (delegate.tryLock()) { - lockingContext.push(new Throwable()); + pushContext(); return true; } return false; @@ -342,7 +336,7 @@ public boolean tryLock() { @Override public boolean tryLock(long time, @NotNull TimeUnit unit) throws InterruptedException { if (delegate.tryLock(time, unit)) { - lockingContext.push(new Throwable()); + pushContext(); return true; } return false; @@ -350,8 +344,8 @@ public boolean tryLock(long time, @NotNull TimeUnit unit) throws InterruptedExce @Override public void unlock() { - delegate.unlock(); lockingContext.pop(); + delegate.unlock(); } @NotNull @@ -360,37 +354,20 @@ public Condition newCondition() { return delegate.newCondition(); } - @Override - public void doLocked( - @NotNull FunctionalInterfaces.ThrowingRunnable runnable) throws EXCEPTION_TYPE { - delegate.doLocked(runnable); - } - - @Override - public void doLockedInterruptibly( - @NotNull FunctionalInterfaces.ThrowingRunnable runnable) - throws InterruptedException, EXCEPTION_TYPE { - delegate.doLockedInterruptibly(runnable); - } - - @Override - public RESULT_TYPE computeLocked( - @NotNull FunctionalInterfaces.ThrowingSupplier supplier) - throws EXCEPTION_TYPE { - return delegate.computeLocked(supplier); - } - - @Override - public RESULT_TYPE computeLockedInterruptibly( - @NotNull FunctionalInterfaces.ThrowingSupplier supplier) - throws InterruptedException, EXCEPTION_TYPE { - return delegate.computeLockedInterruptibly(supplier); - } - String getDebugMessage() { final Throwable item = lockingContext.peek(); return item == null ? "locking context is empty" : ExceptionUtils.getStackTrace(item); } + + private void pushContext() { + // Implementation must have already acquired lock + try { + lockingContext.push(new Throwable()); + } catch (Throwable t) { + delegate.unlock(); + throw t; + } + } } // endregion DebugLock diff --git a/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateGraphProcessor.java b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateGraphProcessor.java index 347c7f014bd..fb5c2a9152e 100644 --- a/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateGraphProcessor.java +++ b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateGraphProcessor.java @@ -575,16 +575,16 @@ public void enableUnitTestMode() { } private void assertLockAvailable(@NotNull final String action) { - if (!UpdateGraphProcessor.DEFAULT.exclusiveLock().tryLock()) { + final AwareFunctionalLock exclusiveLock = UpdateGraphProcessor.DEFAULT.exclusiveLock(); + if (!exclusiveLock.tryLock()) { log.error().append("Lock is held when ").append(action).append(", with previous holder: ") .append(unitTestModeHolder).endl(); ThreadDump.threadDump(System.err); - UpdateGraphLock.DebugAwareFunctionalLock lock = - (UpdateGraphLock.DebugAwareFunctionalLock) UpdateGraphProcessor.DEFAULT.exclusiveLock(); + UpdateGraphLock.DebugAwareFunctionalLock lock = (UpdateGraphLock.DebugAwareFunctionalLock) exclusiveLock; throw new IllegalStateException( "Lock is held when " + action + ", with previous holder: " + lock.getDebugMessage()); } - UpdateGraphProcessor.DEFAULT.exclusiveLock().unlock(); + exclusiveLock.unlock(); } /** From 050c6c5cc910a22e199531fa6caaefce52c9ab3f Mon Sep 17 00:00:00 2001 From: Andrew <3199649+abaranec@users.noreply.github.com> Date: Fri, 2 Dec 2022 15:05:43 -0500 Subject: [PATCH 3/4] Improve BigInteger handling in Parquet files (Closes #3125) (#3126) * Allow unrecognized values in metadata JSON to be ignored. Allow BigIntegers to be encoded as Deciomal types with a precision/scale of 1, instead of requiring a serializable codec * Improve BigInteger handling * Applied spotless * Allow overriding default codecs of BigInteger and BigDecimal by ones set by users in ParquetInstructions. Fix unit test * add comment * Missed a file * Comments * Spotless * DO a table walk to determine precision, use a default valid value if everything is null * spotless * Renames * moar spotless --- .../engine/table/impl/CodecLookup.java | 7 +- .../engine/util/BigDecimalUtils.java | 59 +++++++----- .../engine/table/impl/TestCodecColumns.java | 9 +- .../table/BigIntegerParquetBytesCodec.java | 90 +++++++++++++++++++ .../parquet/table/ParquetSchemaReader.java | 6 ++ .../parquet/table/ParquetTableWriter.java | 26 ++++-- .../io/deephaven/parquet/table/TypeInfos.java | 32 ++++++- .../table/location/ParquetColumnLocation.java | 36 ++++---- .../parquet/table/metadata/TableInfo.java | 2 + .../table/ParquetTableReadWriteTest.java | 30 ++++++- 10 files changed, 242 insertions(+), 55 deletions(-) create mode 100644 extensions/parquet/table/src/main/java/io/deephaven/parquet/table/BigIntegerParquetBytesCodec.java diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/CodecLookup.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/CodecLookup.java index f798ea883b3..b4c6c717d9c 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/CodecLookup.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/CodecLookup.java @@ -17,6 +17,7 @@ import java.io.Externalizable; import java.math.BigDecimal; +import java.math.BigInteger; /** * Utility class to concentrate {@link ObjectCodec} lookups. @@ -76,7 +77,11 @@ private static boolean noCodecRequired(@NotNull final Class dataType) { // appropriate precision and scale calculated from column data, // unless the user explicitly requested something else // via instructions. - dataType == BigDecimal.class; + dataType == BigDecimal.class || + + // BigIntegers can be encoded as a DecimalLogicalType using a precision of 1 and scale of 0, which lets + // them be read by other parquet tools. + dataType == BigInteger.class; } /** diff --git a/engine/table/src/main/java/io/deephaven/engine/util/BigDecimalUtils.java b/engine/table/src/main/java/io/deephaven/engine/util/BigDecimalUtils.java index c86986b8464..e84381c3805 100644 --- a/engine/table/src/main/java/io/deephaven/engine/util/BigDecimalUtils.java +++ b/engine/table/src/main/java/io/deephaven/engine/util/BigDecimalUtils.java @@ -24,6 +24,8 @@ * refreshing tables, we need the user to tell us. */ public class BigDecimalUtils { + private static final PrecisionAndScale EMPTY_TABLE_PRECISION_AND_SCALE = new PrecisionAndScale(1, 1); + private static final int TARGET_CHUNK_SIZE = 4096; public static final int INVALID_PRECISION_OR_SCALE = -1; /** @@ -53,7 +55,8 @@ public static PrecisionAndScale computePrecisionAndScale( } /** - * Compute an overall precision and scale that would fit all existing values in a column source. + * Compute an overall precision and scale that would fit all existing values in a column source. Note that this + * requires a full table scan to ensure the correct values are determined. * * @param rowSet The rowset for the provided column * @param source a {@code ColumnSource} of {@code BigDecimal} type @@ -62,29 +65,45 @@ public static PrecisionAndScale computePrecisionAndScale( public static PrecisionAndScale computePrecisionAndScale( final RowSet rowSet, final ColumnSource source) { - final int sz = 4096; - // we first compute max(precision - scale) and max(scale), which corresponds to - // max(digits left of the decimal point), max(digits right of the decimal point). - // Then we convert to (precision, scale) before returning. - int maxPrecisionMinusScale = 0; - int maxScale = 0; - try (final ChunkSource.GetContext context = source.makeGetContext(sz); + if (rowSet.isEmpty()) { + return EMPTY_TABLE_PRECISION_AND_SCALE; + } + + // We will walk the entire table to determine the max(precision - scale) and + // max(scale), which corresponds to max(digits left of the decimal point), max(digits right of the decimal + // point). Then we convert to (precision, scale) before returning. + int maxPrecisionMinusScale = -1; + int maxScale = -1; + try (final ChunkSource.GetContext context = source.makeGetContext(TARGET_CHUNK_SIZE); final RowSequence.Iterator it = rowSet.getRowSequenceIterator()) { - final RowSequence rowSeq = it.getNextRowSequenceWithLength(sz); - final ObjectChunk chunk = source.getChunk(context, rowSeq).asObjectChunk(); - for (int i = 0; i < chunk.size(); ++i) { - final BigDecimal x = chunk.get(i); - final int precision = x.precision(); - final int scale = x.scale(); - final int precisionMinusScale = precision - scale; - if (precisionMinusScale > maxPrecisionMinusScale) { - maxPrecisionMinusScale = precisionMinusScale; - } - if (scale > maxScale) { - maxScale = scale; + while (it.hasMore()) { + final RowSequence rowSeq = it.getNextRowSequenceWithLength(TARGET_CHUNK_SIZE); + final ObjectChunk chunk = + source.getChunk(context, rowSeq).asObjectChunk(); + for (int i = 0; i < chunk.size(); ++i) { + final BigDecimal x = chunk.get(i); + if (x == null) { + continue; + } + + final int precision = x.precision(); + final int scale = x.scale(); + final int precisionMinusScale = precision - scale; + if (precisionMinusScale > maxPrecisionMinusScale) { + maxPrecisionMinusScale = precisionMinusScale; + } + if (scale > maxScale) { + maxScale = scale; + } } } } + + // If these are < 0, then every value we visited was null + if (maxPrecisionMinusScale < 0 && maxScale < 0) { + return EMPTY_TABLE_PRECISION_AND_SCALE; + } + return new PrecisionAndScale(maxPrecisionMinusScale + maxScale, maxScale); } diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/TestCodecColumns.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/TestCodecColumns.java index d1f3e849b92..ad86f9b3f4d 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/TestCodecColumns.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/TestCodecColumns.java @@ -8,6 +8,7 @@ import io.deephaven.engine.table.Table; import io.deephaven.engine.table.TableDefinition; import io.deephaven.engine.testutil.TstUtils; +import io.deephaven.parquet.table.BigIntegerParquetBytesCodec; import io.deephaven.parquet.table.ParquetTools; import io.deephaven.engine.util.TableTools; import io.deephaven.parquet.table.ParquetInstructions; @@ -51,16 +52,18 @@ public class TestCodecColumns { ColumnDefinition.fromGenericType("VWBA", byte[].class, byte.class); writeBuilder.addColumnCodec("VWBA", SimpleByteArrayCodec.class.getName()); readBuilder.addColumnCodec("VWBA", SimpleByteArrayCodec.class.getName()); + VARIABLE_WIDTH_COLUMN_DEFINITION_2 = ColumnDefinition.fromGenericType("VWCD", ArrayTuple.class); readBuilder.addColumnCodec("VWCD", ExternalizableCodec.class.getName(), ArrayTuple.class.getName()); FIXED_WIDTH_BYTE_ARRAY_COLUMN_DEFINITION = ColumnDefinition.fromGenericType("FWBA", byte[].class, byte.class); writeBuilder.addColumnCodec("FWBA", SimpleByteArrayCodec.class.getName(), "9"); readBuilder.addColumnCodec("FWBA", SimpleByteArrayCodec.class.getName(), "9"); + VARIABLE_WIDTH_BIG_INTEGER_COLUMN_DEFINITION = ColumnDefinition.fromGenericType("VWBI", BigInteger.class); - writeBuilder.addColumnCodec("VWBI", BigIntegerCodec.class.getName()); - readBuilder.addColumnCodec("VWBI", BigIntegerCodec.class.getName()); + writeBuilder.addColumnCodec("VWBI", SerializableCodec.class.getName()); + readBuilder.addColumnCodec("VWBI", SerializableCodec.class.getName()); + VARIABLE_WIDTH_BIG_INTEGER_COLUMN_DEFINITION_S = ColumnDefinition.fromGenericType("VWBIS", BigInteger.class); - readBuilder.addColumnCodec("VWBIS", SerializableCodec.class.getName()); expectedReadInstructions = readBuilder.build(); writeInstructions = writeBuilder.build(); } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/BigIntegerParquetBytesCodec.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/BigIntegerParquetBytesCodec.java new file mode 100644 index 00000000000..149d9397d26 --- /dev/null +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/BigIntegerParquetBytesCodec.java @@ -0,0 +1,90 @@ +package io.deephaven.parquet.table; + +import io.deephaven.datastructures.util.CollectionUtil; +import io.deephaven.util.codec.ObjectCodec; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.math.BigInteger; +import java.nio.ByteBuffer; + +public class BigIntegerParquetBytesCodec implements ObjectCodec { + private final int encodedSizeInBytes; + private final byte[] nullBytes; + + /** + * + * @param encodedSizeInBytes encoded size in bytes, if fixed size, or -1 if variable size. note that according to + * the parquet spec, the minimum number of bytes required to represent the unscaled value should be used for + * a variable sized (binary) encoding; in any case, the maximum encoded bytes is implicitly limited by + * precision. + */ + public BigIntegerParquetBytesCodec(final int encodedSizeInBytes) { + this.encodedSizeInBytes = encodedSizeInBytes; + if (encodedSizeInBytes > 0) { + nullBytes = new byte[encodedSizeInBytes]; + for (int i = 0; i < encodedSizeInBytes; ++i) { + nullBytes[i] = (byte) 0xff; + } + } else { + nullBytes = CollectionUtil.ZERO_LENGTH_BYTE_ARRAY; + } + } + + // Given how parquet encoding works for nulls, the actual value provided for a null is irrelevant. + @Override + public boolean isNullable() { + return true; + } + + @Override + public int getPrecision() { + return 0; + } + + @Override + public int getScale() { + return 1; + } + + @Override + public int expectedObjectWidth() { + return encodedSizeInBytes <= 0 ? VARIABLE_WIDTH_SENTINEL : encodedSizeInBytes; + } + + @NotNull + @Override + public byte[] encode(@Nullable final BigInteger input) { + if (input == null) { + return nullBytes; + } + + return input.toByteArray(); + } + + @Nullable + @Override + public BigInteger decode(@NotNull final byte[] input, final int offset, final int length) { + if (length <= 0) { + return null; + } + + if (length == encodedSizeInBytes) { + boolean allPreviousBitsSet = true; + for (int i = 0; i < encodedSizeInBytes; ++i) { + if (input[offset + i] != (byte) 0xff) { + allPreviousBitsSet = false; + break; + } + } + if (allPreviousBitsSet) { + return null; + } + } + + final ByteBuffer buffer = ByteBuffer.wrap(input, offset, length); + final byte[] unscaledValueBytes = new byte[length]; + buffer.get(unscaledValueBytes); + return new BigInteger(unscaledValueBytes); + } +} diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetSchemaReader.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetSchemaReader.java index 0eda7a0f73d..d88b716ab57 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetSchemaReader.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetSchemaReader.java @@ -25,6 +25,7 @@ import java.io.File; import java.io.IOException; +import java.math.BigInteger; import java.util.*; import java.util.function.BiFunction; import java.util.function.Supplier; @@ -327,6 +328,11 @@ public Optional> visit(final LogicalTypeAnnotation.EnumLogicalTypeAnnot @Override public Optional> visit( final LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) { + // This pair of values (precision=1, scale=0) is set at write tiem as a marker so that we can recover + // the fact that the type is a BigInteger, not a BigDecimal when the fies are read. + if (decimalLogicalType.getPrecision() == 1 && decimalLogicalType.getScale() == 0) { + return Optional.of(BigInteger.class); + } return Optional.of(java.math.BigDecimal.class); } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java index 5bcf98fe882..5f9bd4c6d85 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java @@ -53,6 +53,7 @@ import java.io.IOException; import java.lang.reflect.Field; import java.math.BigDecimal; +import java.math.BigInteger; import java.nio.*; import java.nio.file.Path; import java.nio.file.Paths; @@ -775,14 +776,23 @@ private static TransferObject getDestinationBuffer( return new ByteTransfer(columnSource, maxValuesPerPage); } else if (String.class.equals(columnType)) { return new StringTransfer(columnSource, maxValuesPerPage); - } else if (BigDecimal.class.equals(columnType)) { - // noinspection unchecked - final ColumnSource bigDecimalColumnSource = (ColumnSource) columnSource; - final BigDecimalUtils.PrecisionAndScale precisionAndScale = TypeInfos.getPrecisionAndScale( - computedCache, columnDefinition.getName(), tableRowSet, () -> bigDecimalColumnSource); - final ObjectCodec codec = new BigDecimalParquetBytesCodec( - precisionAndScale.precision, precisionAndScale.scale, -1); - return new CodecTransfer<>(bigDecimalColumnSource, codec, maxValuesPerPage); + } + + // If there's an explicit codec, we should disregard the defaults for these CodecLookup#lookup() will properly + // select the codec assigned by the instructions so we only need to check and redirect once. + if (!CodecLookup.explicitCodecPresent(instructions.getCodecName(columnDefinition.getName()))) { + if (BigDecimal.class.equals(columnType)) { + // noinspection unchecked + final ColumnSource bigDecimalColumnSource = (ColumnSource) columnSource; + final BigDecimalUtils.PrecisionAndScale precisionAndScale = TypeInfos.getPrecisionAndScale( + computedCache, columnDefinition.getName(), tableRowSet, () -> bigDecimalColumnSource); + final ObjectCodec codec = new BigDecimalParquetBytesCodec( + precisionAndScale.precision, precisionAndScale.scale, -1); + return new CodecTransfer<>(bigDecimalColumnSource, codec, maxValuesPerPage); + } else if (BigInteger.class.equals(columnType)) { + return new CodecTransfer<>((ColumnSource) columnSource, new BigIntegerParquetBytesCodec(-1), + maxValuesPerPage); + } } final ObjectCodec codec = CodecLookup.lookup(columnDefinition, instructions); diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/TypeInfos.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/TypeInfos.java index efd10704dfb..648e2f96615 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/TypeInfos.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/TypeInfos.java @@ -23,6 +23,7 @@ import java.io.Externalizable; import java.math.BigDecimal; +import java.math.BigInteger; import java.util.*; import java.util.function.Supplier; @@ -34,7 +35,6 @@ * and the data translation. */ class TypeInfos { - private static final TypeInfo[] TYPE_INFOS = new TypeInfo[] { IntType.INSTANCE, LongType.INSTANCE, @@ -45,15 +45,16 @@ class TypeInfos { CharType.INSTANCE, ByteType.INSTANCE, StringType.INSTANCE, - DateTimeType.INSTANCE + DateTimeType.INSTANCE, + BigIntegerType.INSTANCE }; private static final Map, TypeInfo> BY_CLASS; static { final Map, TypeInfo> fa = new HashMap<>(); - for (TypeInfo typeInfo : TYPE_INFOS) { - for (Class type : typeInfo.getTypes()) { + for (final TypeInfo typeInfo : TYPE_INFOS) { + for (final Class type : typeInfo.getTypes()) { fa.put(type, typeInfo); } } @@ -95,6 +96,7 @@ static Pair getCodecAndArgs( if (!CodecLookup.codecRequired(columnDefinition)) { return null; } + // Impute an appropriate codec for the data type final Class dataType = columnDefinition.getDataType(); if (Externalizable.class.isAssignableFrom(dataType)) { @@ -365,6 +367,28 @@ public PrimitiveBuilder getBuilder(boolean required, boolean repe } } + /** + * We will encode BigIntegers as Decimal types. Parquet has no special type for BigIntegers, but we can maintain + * external compatibility by encoding them as fixed length decimals of scale 1. Internally, we'll record that we + * wrote this as a decimal, so we can properly decode it back to BigInteger. + */ + private enum BigIntegerType implements TypeInfo { + INSTANCE; + + private static final Set> clazzes = Collections.singleton(BigInteger.class); + + @Override + public Set> getTypes() { + return clazzes; + } + + @Override + public PrimitiveBuilder getBuilder(boolean required, boolean repeating, Class dataType) { + return type(PrimitiveTypeName.BINARY, required, repeating) + .as(LogicalTypeAnnotation.decimalType(0, 1)); + } + } + interface TypeInfo { Set> getTypes(); diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java index 7764f6bb324..07d171bef40 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java @@ -27,10 +27,7 @@ import io.deephaven.parquet.base.ParquetFileReader; import io.deephaven.parquet.base.RowGroupReader; import io.deephaven.parquet.base.tempfix.ParquetMetadataConverter; -import io.deephaven.parquet.table.BigDecimalParquetBytesCodec; -import io.deephaven.parquet.table.ParquetInstructions; -import io.deephaven.parquet.table.ParquetSchemaReader; -import io.deephaven.parquet.table.ParquetTableWriter; +import io.deephaven.parquet.table.*; import io.deephaven.parquet.table.metadata.CodecInfo; import io.deephaven.parquet.table.metadata.ColumnTypeInfo; import io.deephaven.parquet.table.metadata.GroupingColumnInfo; @@ -51,6 +48,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.math.BigDecimal; +import java.math.BigInteger; import java.util.*; import java.util.function.Function; import java.util.function.LongFunction; @@ -597,7 +595,7 @@ private static ToPage makeToPage( ToPage toPage = null; - if (logicalTypeAnnotation != null) { + if (!isCodec && logicalTypeAnnotation != null) { toPage = logicalTypeAnnotation.accept( new LogicalTypeVisitor(parquetColumnName, columnChunkReader, pageType)) .orElse(null); @@ -764,19 +762,25 @@ private static class LogicalTypeVisitor componentType, decimalLogicalType.getPrecision(), decimalLogicalType.getScale())); case FIXED_LEN_BYTE_ARRAY: case BINARY: - if (!BigDecimal.class.equals(componentType)) { - throw new IllegalArgumentException( - "The native type for a BigDecimal column is " + componentType.getCanonicalName()); - } final int encodedSizeInBytes = (typeName == PrimitiveType.PrimitiveTypeName.BINARY) ? -1 : type.getTypeLength(); - return Optional.of( - ToObjectPage.create( - BigDecimal.class, - new BigDecimalParquetBytesCodec( - decimalLogicalType.getPrecision(), decimalLogicalType.getScale(), - encodedSizeInBytes), - columnChunkReader.getDictionarySupplier())); + if (BigDecimal.class.equals(componentType)) { + return Optional.of( + ToObjectPage.create( + BigDecimal.class, + new BigDecimalParquetBytesCodec( + decimalLogicalType.getPrecision(), decimalLogicalType.getScale(), + encodedSizeInBytes), + columnChunkReader.getDictionarySupplier())); + } else if (BigInteger.class.equals(componentType)) { + return Optional.of( + ToObjectPage.create( + BigInteger.class, + new BigIntegerParquetBytesCodec(encodedSizeInBytes), + columnChunkReader.getDictionarySupplier())); + } + + // We won't blow up here, Maybe someone will provide us a codec instead. default: return Optional.empty(); } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/metadata/TableInfo.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/metadata/TableInfo.java index a31b39929dd..0d3420ad00e 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/metadata/TableInfo.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/metadata/TableInfo.java @@ -5,6 +5,7 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.annotation.JsonSerialize; @@ -34,6 +35,7 @@ public abstract class TableInfo { static { final ObjectMapper objectMapper = new ObjectMapper(); objectMapper.registerModule(new Jdk8Module()); + objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); OBJECT_MAPPER = objectMapper; } diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index fcdcca73a05..513c8eecee6 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -9,6 +9,7 @@ import io.deephaven.engine.table.ColumnDefinition; import io.deephaven.engine.testutil.TstUtils; import io.deephaven.engine.testutil.junit4.EngineCleanup; +import io.deephaven.engine.util.BigDecimalUtils; import io.deephaven.stringset.ArrayStringSet; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.TableDefinition; @@ -73,7 +74,9 @@ private static Table getTableFlat(int size, boolean includeSerializable) { "someCharColumn = (char)i", "someTime = DateTime.now() + i", "someKey = `` + (int)(i /100)", - "nullKey = i < -1?`123`:null")); + "nullKey = i < -1?`123`:null", + "bdColumn = java.math.BigDecimal.valueOf(ii).stripTrailingZeros()", + "biColumn = java.math.BigInteger.valueOf(ii)")); if (includeSerializable) { columns.add("someSerializable = new SomeSillyTest(i)"); } @@ -162,7 +165,7 @@ private void flatTable(String tableName, int size, boolean includeSerializable) final File dest = new File(rootFile, "ParquetTest_" + tableName + "_test.parquet"); ParquetTools.writeTable(tableToSave, dest); final Table fromDisk = ParquetTools.readTable(dest); - TstUtils.assertTableEquals(tableToSave, fromDisk); + TstUtils.assertTableEquals(maybeFixBigDecimal(tableToSave), fromDisk); } private void groupedTable(String tableName, int size, boolean includeSerializable) { @@ -274,7 +277,7 @@ private void compressionCodecTestHelper(final String codec) { ParquetTools.writeTable(table1, path); assertTrue(new File(path).length() > 0); final Table table2 = ParquetTools.readTable(path); - TstUtils.assertTableEquals(table1, table2); + TstUtils.assertTableEquals(maybeFixBigDecimal(table1), table2); } finally { ParquetInstructions.setDefaultCompressionCodecName(currentCodec); } @@ -312,4 +315,25 @@ public void testParquetSnappyCompressionCodec() { // way as the other similar codec tests. compressionCodecTestHelper("SNAPPY"); } + + /** + * Encoding bigDecimal is tricky -- the writer will try to pick the precision and scale automatically. Because of + * that tableTools.assertTableEquals will fail because, even though the numbers are identical, the representation + * may not be so we have to coerce the expected values to the same precision and scale value. We know how it should + * be doing it, so we can use the same pattern of encoding/decoding with the codec. + * + * @param toFix + * @return + */ + private Table maybeFixBigDecimal(Table toFix) { + final BigDecimalUtils.PrecisionAndScale pas = BigDecimalUtils.computePrecisionAndScale(toFix, "bdColumn"); + final BigDecimalParquetBytesCodec codec = new BigDecimalParquetBytesCodec(pas.precision, pas.scale, -1); + + ExecutionContext.getContext() + .getQueryScope() + .putParam("__codec", codec); + return toFix + .updateView("bdColE = __codec.encode(bdColumn)", "bdColumn=__codec.decode(bdColE, 0, bdColE.length)") + .dropColumns("bdColE"); + } } From 0bb8eac947ffb8b97fd62a00c5601a844aab5eb2 Mon Sep 17 00:00:00 2001 From: James Nelson Date: Fri, 2 Dec 2022 14:21:56 -0600 Subject: [PATCH 4/4] Jedi Autocomplete (#3114) --- docker/server-jetty/build.gradle | 1 + .../main/server-all-ai-jetty/requirements.txt | 64 +++++ .../lang/parse/CompletionParser.java | 69 +++-- .../lang/completion/ChunkerCompleter.java | 2 +- .../ChunkerCompletionHandlerTest.groovy | 4 +- py/server/deephaven/completer/__init__.py | 25 ++ py/server/deephaven/completer/_completer.py | 111 ++++++++ py/server/deephaven/config/__init__.py | 2 +- py/server/setup.py | 3 + server/jetty-app/build.gradle | 4 + .../console/ConsoleServiceGrpcImpl.java | 176 ++----------- .../completer/JavaAutoCompleteObserver.java | 176 +++++++++++++ .../completer/PythonAutoCompleteObserver.java | 240 ++++++++++++++++++ 13 files changed, 686 insertions(+), 191 deletions(-) create mode 100644 docker/server-jetty/src/main/server-all-ai-jetty/requirements.txt create mode 100644 py/server/deephaven/completer/__init__.py create mode 100644 py/server/deephaven/completer/_completer.py create mode 100644 server/src/main/java/io/deephaven/server/console/completer/JavaAutoCompleteObserver.java create mode 100644 server/src/main/java/io/deephaven/server/console/completer/PythonAutoCompleteObserver.java diff --git a/docker/server-jetty/build.gradle b/docker/server-jetty/build.gradle index a66fe027c33..6f1588034eb 100644 --- a/docker/server-jetty/build.gradle +++ b/docker/server-jetty/build.gradle @@ -10,6 +10,7 @@ def targetArch = Architecture.targetArchitecture(project) def baseMapAmd64 = [ 'server-base': 'server-jetty', + 'all-ai-base': 'server-all-ai-jetty', ] // Only the server image is supported on arm64 diff --git a/docker/server-jetty/src/main/server-all-ai-jetty/requirements.txt b/docker/server-jetty/src/main/server-all-ai-jetty/requirements.txt new file mode 100644 index 00000000000..ab7e368e5e6 --- /dev/null +++ b/docker/server-jetty/src/main/server-all-ai-jetty/requirements.txt @@ -0,0 +1,64 @@ +absl-py==1.3.0 +astunparse==1.6.3 +cachetools==5.2.0 +certifi==2022.9.24 +charset-normalizer==2.1.1 +click==8.1.3 +deephaven-plugin==0.3.0 +flatbuffers==2.0.7 +gast==0.4.0 +google-auth==2.14.1 +google-auth-oauthlib==0.4.6 +google-pasta==0.2.0 +grpcio==1.51.1 +h5py==3.7.0 +idna==3.4 +importlib-metadata==5.1.0 +java-utilities==0.2.0 +jedi==0.18.2 +joblib==1.2.0 +jpy==0.13.0 +keras==2.7.0 +Keras-Preprocessing==1.1.2 +libclang==14.0.6 +llvmlite==0.39.1 +Markdown==3.4.1 +MarkupSafe==2.1.1 +nltk==3.7 +numba==0.56.4 +numpy==1.21.6 +nvidia-cublas-cu11==11.10.3.66 +nvidia-cuda-nvrtc-cu11==11.7.99 +nvidia-cuda-runtime-cu11==11.7.99 +nvidia-cudnn-cu11==8.5.0.96 +oauthlib==3.2.2 +opt-einsum==3.3.0 +pandas==1.3.5 +parso==0.8.3 +protobuf==3.19.6 +pyasn1==0.4.8 +pyasn1-modules==0.2.8 +python-dateutil==2.8.2 +pytz==2022.6 +regex==2022.10.31 +requests==2.28.1 +requests-oauthlib==1.3.1 +rsa==4.9 +scikit-learn==1.0.2 +scipy==1.7.3 +six==1.16.0 +tensorboard==2.11.0 +tensorboard-data-server==0.6.1 +tensorboard-plugin-wit==1.8.1 +tensorflow==2.7.4 +tensorflow-estimator==2.7.0 +tensorflow-io-gcs-filesystem==0.28.0 +termcolor==2.1.1 +threadpoolctl==3.1.0 +torch==1.13.0 +tqdm==4.64.1 +typing_extensions==4.4.0 +urllib3==1.26.13 +Werkzeug==2.2.2 +wrapt==1.14.1 +zipp==3.11.0 diff --git a/open-api/lang-parser/src/main/java/io/deephaven/lang/parse/CompletionParser.java b/open-api/lang-parser/src/main/java/io/deephaven/lang/parse/CompletionParser.java index 2ab10b13cf5..31c49c8a570 100644 --- a/open-api/lang-parser/src/main/java/io/deephaven/lang/parse/CompletionParser.java +++ b/open-api/lang-parser/src/main/java/io/deephaven/lang/parse/CompletionParser.java @@ -24,6 +24,36 @@ public class CompletionParser implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(CompletionParser.class); private final Map docs = new ConcurrentHashMap<>(); + public static String updateDocumentChanges(final String uri, final int version, String document, + final List changes) { + for (ChangeDocumentRequest.TextDocumentContentChangeEventOrBuilder change : changes) { + DocumentRange range = change.getRange(); + int length = change.getRangeLength(); + + int offset = LspTools.getOffsetFromPosition(document, range.getStart()); + if (offset < 0) { + if (LOGGER.isWarnEnabled()) { + LOGGER.warn() + .append("Invalid change in document ") + .append(uri) + .append("[") + .append(version) + .append("] @") + .append(range.getStart().getLine()) + .append(":") + .append(range.getStart().getCharacter()) + .endl(); + } + return null; + } + + String prefix = offset > 0 && offset <= document.length() ? document.substring(0, offset) : ""; + String suffix = offset + length < document.length() ? document.substring(offset + length) : ""; + document = prefix + change.getText() + suffix; + } + return document; + } + public ParsedDocument parse(String document) throws ParseException { Chunker chunker = new Chunker(document); final ChunkerDocument doc = chunker.Document(); @@ -49,7 +79,7 @@ private PendingParse startParse(String uri) { return docs.computeIfAbsent(uri, k -> new PendingParse(uri)); } - public void update(final String uri, final String version, + public void update(final String uri, final int version, final List changes) { if (LOGGER.isTraceEnabled()) { LOGGER.trace() @@ -74,32 +104,11 @@ public void update(final String uri, final String version, forceParse = true; } String document = doc.getText(); - for (ChangeDocumentRequest.TextDocumentContentChangeEventOrBuilder change : changes) { - DocumentRange range = change.getRange(); - int length = change.getRangeLength(); - - int offset = LspTools.getOffsetFromPosition(document, range.getStart()); - if (offset < 0) { - if (LOGGER.isWarnEnabled()) { - LOGGER.warn() - .append("Invalid change in document ") - .append(uri) - .append("[") - .append(version) - .append("] @") - .append(range.getStart().getLine()) - .append(":") - .append(range.getStart().getCharacter()) - .endl(); - } - return; - } - - String prefix = offset > 0 && offset <= document.length() ? document.substring(0, offset) : ""; - String suffix = offset + length < document.length() ? document.substring(offset + length) : ""; - document = prefix + change.getText() + suffix; + document = updateDocumentChanges(uri, version, document, changes); + if (document == null) { + return; } - doc.requestParse(version, document, forceParse); + doc.requestParse(Integer.toString(version), document, forceParse); if (LOGGER.isTraceEnabled()) { LOGGER.trace() .append("Finished updating ") @@ -118,6 +127,14 @@ public void remove(String uri) { } } + public String getText(String uri) { + final PendingParse doc = docs.get(uri); + if (doc == null) { + throw new IllegalStateException("Unable to find parsed document " + uri); + } + return doc.getText(); + } + public ParsedDocument finish(String uri) { final PendingParse doc = docs.get(uri); if (doc == null) { diff --git a/open-api/lang-tools/src/main/java/io/deephaven/lang/completion/ChunkerCompleter.java b/open-api/lang-tools/src/main/java/io/deephaven/lang/completion/ChunkerCompleter.java index f30fdc3a04d..43acc2e3438 100644 --- a/open-api/lang-tools/src/main/java/io/deephaven/lang/completion/ChunkerCompleter.java +++ b/open-api/lang-tools/src/main/java/io/deephaven/lang/completion/ChunkerCompleter.java @@ -354,7 +354,7 @@ private TextEdit.Builder extendEnd(final CompletionItem.Builder item, final Posi } - private String sortable(int i) { + public static String sortable(int i) { StringBuilder res = new StringBuilder(Integer.toString(i, 36)); while (res.length() < 5) { res.insert(0, "0"); diff --git a/open-api/lang-tools/src/test/groovy/io/deephaven/lang/completion/ChunkerCompletionHandlerTest.groovy b/open-api/lang-tools/src/test/groovy/io/deephaven/lang/completion/ChunkerCompletionHandlerTest.groovy index 8b0f647087b..e852bf6cd5d 100644 --- a/open-api/lang-tools/src/test/groovy/io/deephaven/lang/completion/ChunkerCompletionHandlerTest.groovy +++ b/open-api/lang-tools/src/test/groovy/io/deephaven/lang/completion/ChunkerCompletionHandlerTest.groovy @@ -160,8 +160,8 @@ b = 2 c = 3 """ String src2 = "t = " - p.update(uri, "0", [ makeChange(0, 0, src1) ]) - p.update(uri, "1", [ makeChange(3, 0, src2) ]) + p.update(uri, 0, [ makeChange(0, 0, src1) ]) + p.update(uri, 1, [ makeChange(3, 0, src2) ]) doc = p.finish(uri) VariableProvider variables = Mock(VariableProvider) { diff --git a/py/server/deephaven/completer/__init__.py b/py/server/deephaven/completer/__init__.py new file mode 100644 index 00000000000..6219e30e507 --- /dev/null +++ b/py/server/deephaven/completer/__init__.py @@ -0,0 +1,25 @@ +# +# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending +# + +""" This module allows the user to configure if and how we use jedi to perform autocompletion. +See https://github.com/davidhalter/jedi for information on jedi. + +# To disable autocompletion +from deephaven.completer import jedi_settings +jedi_settings.mode = 'off' + +Valid options for completer_mode are one of: [off, safe, strong]. +off: do not use any autocomplete +safe mode: uses static analysis of source files. Can't execute any code. +strong mode: looks in your globals() for answers to autocomplete and analyzes your runtime python objects +later, we may add slow mode, which uses both static and interpreted completion modes. +""" + +from deephaven.completer._completer import Completer +from jedi import preload_module, Interpreter + +jedi_settings = Completer() +# warm jedi up a little. We could probably off-thread this. +preload_module('deephaven') +Interpreter('', []).complete(1, 0) diff --git a/py/server/deephaven/completer/_completer.py b/py/server/deephaven/completer/_completer.py new file mode 100644 index 00000000000..652d8e605b9 --- /dev/null +++ b/py/server/deephaven/completer/_completer.py @@ -0,0 +1,111 @@ +# only python 3.8 needs this, but it must be the first expression in the file, so we can't predicate it +from __future__ import annotations +from enum import Enum +from typing import Any +from jedi import Interpreter, Script + + +class CompleterMode(Enum): + off = 'off' + safe = 'safe' + strong = 'strong' + + def __str__(self) -> str: + return self.value + + +class Completer(object): + + def __init__(self): + self._docs = {} + self._versions = {} + # we will replace this w/ top-level globals() when we open the document + self.__scope = globals() + # might want to make this a {uri: []} instead of [] + self.pending = [] + try: + import jedi + self.__can_jedi = True + self.mode = CompleterMode.strong + except ImportError: + self.__can_jedi = False + self.mode = CompleterMode.off + + @property + def mode(self) -> CompleterMode: + return self.__mode + + @mode.setter + def mode(self, mode) -> None: + if type(mode) == 'str': + mode = CompleterMode[mode] + self.__mode = mode + + def open_doc(self, text: str, uri: str, version: int) -> None: + self._docs[uri] = text + self._versions[uri] = version + + def get_doc(self, uri: str) -> str: + return self._docs[uri] + + def update_doc(self, text: str, uri: str, version: int) -> None: + self._docs[uri] = text + self._versions[uri] = version + # any pending completions should stop running now. We use a list of Event to signal any running threads to stop + for pending in self.pending: + pending.set() + + def close_doc(self, uri: str) -> None: + del self._docs[uri] + del self._versions[uri] + for pending in self.pending: + pending.set() + + def is_enabled(self) -> bool: + return self.__mode != CompleterMode.off + + def can_jedi(self) -> bool: + return self.__can_jedi + + def set_scope(self, scope: dict) -> None: + self.__scope = scope + + def do_completion(self, uri: str, version: int, line: int, col: int) -> list[list[Any]]: + if not self._versions[uri] == version: + # if you aren't the newest completion, you get nothing, quickly + return [] + + # run jedi + txt = self.get_doc(uri) + # The Script completer is static analysis only, so we should actually be feeding it a whole document at once. + + completer = Script if self.__mode == CompleterMode.safe else Interpreter + + completions = completer(txt, [self.__scope]).complete(line, col) + # for now, a simple sorting based on number of preceding _ + # we may want to apply additional sorting to each list before combining + results: list = [] + results_: list = [] + results__: list = [] + for complete in completions: + # keep checking the latest version as we run, so updated doc can cancel us + if not self._versions[uri] == version: + return [] + result: list = self.to_result(complete, col) + if result[0].startswith('__'): + results__.append(result) + elif result[0].startswith('_'): + results_.append(result) + else: + results.append(result) + + # put the results together in a better-than-nothing sorting + return results + results_ + results__ + + @staticmethod + def to_result(complete: Any, col: int) -> list[Any]: + name: str = complete.name + prefix_length: int = complete.get_completion_prefix_length() + start: int = col - prefix_length + # all java needs to build a grpc response is completion text (name) and where the completion should start + return [name, start] diff --git a/py/server/deephaven/config/__init__.py b/py/server/deephaven/config/__init__.py index 589755f54ec..cdc5c971cbd 100644 --- a/py/server/deephaven/config/__init__.py +++ b/py/server/deephaven/config/__init__.py @@ -19,6 +19,6 @@ def get_server_timezone() -> TimeZone: for tz in TimeZone: if j_timezone == tz.value.getTimeZone(): return tz - raise NotImplementedError("can't find the time zone in the TImeZone Enum.") + raise NotImplementedError("can't find the time zone in the TimeZone Enum.") except Exception as e: raise DHError(e, message=f"failed to find a recognized time zone") from e diff --git a/py/server/setup.py b/py/server/setup.py index 027a0b53d6b..c11ddfeea65 100644 --- a/py/server/setup.py +++ b/py/server/setup.py @@ -62,6 +62,9 @@ def normalize_version(version): # TODO(deephaven-core#3082): Remove numba dependency workarounds 'numba; python_version < "3.11"', ], + extras_require={ + "autocomplete": ["jedi==0.18.2"], + }, entry_points={ 'deephaven.plugin': ['registration_cls = deephaven.pandasplugin:PandasPluginRegistration'] } diff --git a/server/jetty-app/build.gradle b/server/jetty-app/build.gradle index 60a50b23165..332212aa653 100644 --- a/server/jetty-app/build.gradle +++ b/server/jetty-app/build.gradle @@ -65,6 +65,10 @@ if (hasProperty('debug')) { extraJvmArgs += ['-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005'] } +if (hasProperty('debugAutocomplete')) { + extraJvmArgs += ['-Ddeephaven.console.autocomplete.quiet=false'] +} + if (hasProperty('gcApplication')) { extraJvmArgs += ['-Dio.deephaven.app.GcApplication.enabled=true'] } diff --git a/server/src/main/java/io/deephaven/server/console/ConsoleServiceGrpcImpl.java b/server/src/main/java/io/deephaven/server/console/ConsoleServiceGrpcImpl.java index e77423dac8e..e6edf6181a0 100644 --- a/server/src/main/java/io/deephaven/server/console/ConsoleServiceGrpcImpl.java +++ b/server/src/main/java/io/deephaven/server/console/ConsoleServiceGrpcImpl.java @@ -11,7 +11,6 @@ import io.deephaven.engine.updategraph.DynamicNode; import io.deephaven.engine.util.DelegatingScriptSession; import io.deephaven.engine.util.ScriptSession; -import io.deephaven.engine.util.VariableProvider; import io.deephaven.extensions.barrage.util.GrpcUtil; import io.deephaven.integrations.python.PythonDeephavenSession; import io.deephaven.internal.log.LoggerFactory; @@ -19,32 +18,25 @@ import io.deephaven.io.logger.LogBufferRecord; import io.deephaven.io.logger.LogBufferRecordListener; import io.deephaven.io.logger.Logger; -import io.deephaven.lang.completion.ChunkerCompleter; -import io.deephaven.lang.completion.CompletionLookups; -import io.deephaven.lang.parse.CompletionParser; -import io.deephaven.lang.parse.LspTools; -import io.deephaven.lang.parse.ParsedDocument; -import io.deephaven.lang.shared.lsp.CompletionCancelled; import io.deephaven.proto.backplane.grpc.FieldInfo; import io.deephaven.proto.backplane.grpc.FieldsChangeUpdate; import io.deephaven.proto.backplane.grpc.Ticket; import io.deephaven.proto.backplane.grpc.TypedTicket; import io.deephaven.proto.backplane.script.grpc.*; +import io.deephaven.server.console.completer.JavaAutoCompleteObserver; +import io.deephaven.server.console.completer.PythonAutoCompleteObserver; import io.deephaven.server.session.SessionCloseableObserver; import io.deephaven.server.session.SessionService; import io.deephaven.server.session.SessionState; import io.deephaven.server.session.SessionState.ExportBuilder; import io.deephaven.server.session.TicketRouter; -import io.deephaven.util.SafeCloseable; import io.grpc.stub.StreamObserver; +import org.jpy.PyObject; import javax.inject.Inject; import javax.inject.Provider; import javax.inject.Singleton; -import java.util.Collection; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyExecute; import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyExecuteLocked; @@ -267,162 +259,24 @@ public StreamObserver autoCompleteStream( return GrpcUtil.rpcWrapper(log, responseObserver, () -> { final SessionState session = sessionService.getCurrentSession(); if (PythonDeephavenSession.SCRIPT_TYPE.equals(scriptSessionProvider.get().scriptType())) { - return new NoopAutoCompleteObserver(responseObserver); + PyObject[] settings = new PyObject[1]; + safelyExecute(() -> { + final ScriptSession scriptSession = scriptSessionProvider.get(); + scriptSession.evaluateScript( + "from deephaven.completer import jedi_settings ; jedi_settings.set_scope(globals())"); + settings[0] = (PyObject) scriptSession.getVariable("jedi_settings"); + }); + boolean canJedi = settings[0] != null && settings[0].call("can_jedi").getBooleanValue(); + log.info().append(canJedi ? "Using jedi for python autocomplete" + : "No jedi dependency available in python environment; disabling autocomplete.").endl(); + return canJedi ? new PythonAutoCompleteObserver(responseObserver, scriptSessionProvider, session) + : new NoopAutoCompleteObserver(responseObserver); } return new JavaAutoCompleteObserver(session, responseObserver); }); } - /** - * Autocomplete handling for JVM languages, that directly can interact with Java instances without any name - * mangling, and are able to use our flexible parser. - */ - private static class JavaAutoCompleteObserver implements StreamObserver { - private final CompletionParser parser; - private final SessionState session; - private final StreamObserver responseObserver; - - private final Map parsers = new ConcurrentHashMap<>(); - - private CompletionParser ensureParserForSession(SessionState session) { - return parsers.computeIfAbsent(session, s -> { - CompletionParser parser = new CompletionParser(); - s.addOnCloseCallback(() -> { - parsers.remove(s); - parser.close(); - }); - return parser; - }); - } - - - public JavaAutoCompleteObserver(SessionState session, StreamObserver responseObserver) { - this.session = session; - this.responseObserver = responseObserver; - parser = ensureParserForSession(session); - } - - @Override - public void onNext(AutoCompleteRequest value) { - switch (value.getRequestCase()) { - case OPEN_DOCUMENT: { - final TextDocumentItem doc = value.getOpenDocument().getTextDocument(); - - parser.open(doc.getText(), doc.getUri(), Integer.toString(doc.getVersion())); - break; - } - case CHANGE_DOCUMENT: { - ChangeDocumentRequest request = value.getChangeDocument(); - final VersionedTextDocumentIdentifier text = request.getTextDocument(); - parser.update(text.getUri(), Integer.toString(text.getVersion()), - request.getContentChangesList()); - break; - } - case GET_COMPLETION_ITEMS: { - GetCompletionItemsRequest request = value.getGetCompletionItems(); - SessionState.ExportObject exportedConsole = - session.getExport(request.getConsoleId(), "consoleId"); - session.nonExport() - .require(exportedConsole) - .onError(responseObserver) - .submit(() -> { - getCompletionItems(request, exportedConsole, parser, responseObserver); - }); - break; - } - case CLOSE_DOCUMENT: { - CloseDocumentRequest request = value.getCloseDocument(); - parser.remove(request.getTextDocument().getUri()); - break; - } - case REQUEST_NOT_SET: { - throw GrpcUtil.statusRuntimeException(Code.INVALID_ARGUMENT, - "Autocomplete command missing request"); - } - } - } - - private void getCompletionItems(GetCompletionItemsRequest request, - SessionState.ExportObject exportedConsole, CompletionParser parser, - StreamObserver responseObserver) { - final ScriptSession scriptSession = exportedConsole.get(); - try (final SafeCloseable ignored = scriptSession.getExecutionContext().open()) { - final VersionedTextDocumentIdentifier doc = request.getTextDocument(); - final VariableProvider vars = scriptSession.getVariableProvider(); - final CompletionLookups h = CompletionLookups.preload(scriptSession); - // The only stateful part of a completer is the CompletionLookups, which are already - // once-per-session-cached - // so, we'll just create a new completer for each request. No need to hang onto these guys. - final ChunkerCompleter completer = new ChunkerCompleter(log, vars, h); - - final ParsedDocument parsed; - try { - parsed = parser.finish(doc.getUri()); - } catch (CompletionCancelled exception) { - if (log.isTraceEnabled()) { - log.trace().append("Completion canceled").append(exception).endl(); - } - safelyExecuteLocked(responseObserver, - () -> responseObserver.onNext(AutoCompleteResponse.newBuilder() - .setCompletionItems(GetCompletionItemsResponse.newBuilder() - .setSuccess(false) - .setRequestId(request.getRequestId())) - .build())); - return; - } - - int offset = LspTools.getOffsetFromPosition(parsed.getSource(), - request.getPosition()); - final Collection results = - completer.runCompletion(parsed, request.getPosition(), offset); - final GetCompletionItemsResponse mangledResults = - GetCompletionItemsResponse.newBuilder() - .setSuccess(true) - .setRequestId(request.getRequestId()) - .addAllItems(results.stream().map( - // insertTextFormat is a default we used to set in constructor; for now, we'll - // just - // process the objects before sending back to client - item -> item.setInsertTextFormat(2).build()) - .collect(Collectors.toSet())) - .build(); - - safelyExecuteLocked(responseObserver, - () -> responseObserver.onNext(AutoCompleteResponse.newBuilder() - .setCompletionItems(mangledResults) - .build())); - } catch (Exception exception) { - if (QUIET_AUTOCOMPLETE_ERRORS) { - if (log.isTraceEnabled()) { - log.trace().append("Exception occurred during autocomplete").append(exception).endl(); - } - } else { - log.error().append("Exception occurred during autocomplete").append(exception).endl(); - } - safelyExecuteLocked(responseObserver, - () -> responseObserver.onNext(AutoCompleteResponse.newBuilder() - .setCompletionItems(GetCompletionItemsResponse.newBuilder() - .setSuccess(false) - .setRequestId(request.getRequestId())) - .build())); - } - } - - @Override - public void onError(Throwable t) { - // ignore, client doesn't need us, will be cleaned up later - } - - @Override - public void onCompleted() { - // just hang up too, browser will reconnect if interested - synchronized (responseObserver) { - responseObserver.onCompleted(); - } - } - } - private static class NoopAutoCompleteObserver implements StreamObserver { private final StreamObserver responseObserver; diff --git a/server/src/main/java/io/deephaven/server/console/completer/JavaAutoCompleteObserver.java b/server/src/main/java/io/deephaven/server/console/completer/JavaAutoCompleteObserver.java new file mode 100644 index 00000000000..5513a1c2692 --- /dev/null +++ b/server/src/main/java/io/deephaven/server/console/completer/JavaAutoCompleteObserver.java @@ -0,0 +1,176 @@ +package io.deephaven.server.console.completer; + +import com.google.rpc.Code; +import io.deephaven.engine.util.ScriptSession; +import io.deephaven.engine.util.VariableProvider; +import io.deephaven.extensions.barrage.util.GrpcUtil; +import io.deephaven.internal.log.LoggerFactory; +import io.deephaven.io.logger.Logger; +import io.deephaven.lang.completion.ChunkerCompleter; +import io.deephaven.lang.completion.CompletionLookups; +import io.deephaven.lang.parse.CompletionParser; +import io.deephaven.lang.parse.LspTools; +import io.deephaven.lang.parse.ParsedDocument; +import io.deephaven.lang.shared.lsp.CompletionCancelled; +import io.deephaven.proto.backplane.script.grpc.*; +import io.deephaven.server.console.ConsoleServiceGrpcImpl; +import io.deephaven.server.session.SessionState; +import io.deephaven.util.SafeCloseable; +import io.grpc.stub.StreamObserver; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyExecuteLocked; + +/** + * Autocomplete handling for JVM languages, that directly can interact with Java instances without any name mangling, + * and are able to use our flexible parser. + */ +public class JavaAutoCompleteObserver implements StreamObserver { + + private static final Logger log = LoggerFactory.getLogger(JavaAutoCompleteObserver.class); + private final CompletionParser parser; + private final SessionState session; + private final StreamObserver responseObserver; + + private final Map parsers = new ConcurrentHashMap<>(); + + private CompletionParser ensureParserForSession(SessionState session) { + return parsers.computeIfAbsent(session, s -> { + CompletionParser parser = new CompletionParser(); + s.addOnCloseCallback(() -> { + parsers.remove(s); + parser.close(); + }); + return parser; + }); + } + + + public JavaAutoCompleteObserver(SessionState session, StreamObserver responseObserver) { + this.session = session; + this.responseObserver = responseObserver; + parser = ensureParserForSession(session); + } + + @Override + public void onNext(AutoCompleteRequest value) { + switch (value.getRequestCase()) { + case OPEN_DOCUMENT: { + final TextDocumentItem doc = value.getOpenDocument().getTextDocument(); + + parser.open(doc.getText(), doc.getUri(), Integer.toString(doc.getVersion())); + break; + } + case CHANGE_DOCUMENT: { + ChangeDocumentRequest request = value.getChangeDocument(); + final VersionedTextDocumentIdentifier text = request.getTextDocument(); + parser.update(text.getUri(), text.getVersion(), + request.getContentChangesList()); + break; + } + case GET_COMPLETION_ITEMS: { + GetCompletionItemsRequest request = value.getGetCompletionItems(); + SessionState.ExportObject exportedConsole = + session.getExport(request.getConsoleId(), "consoleId"); + session.nonExport() + .require(exportedConsole) + .onError(responseObserver) + .submit(() -> { + getCompletionItems(request, exportedConsole, parser, responseObserver); + }); + break; + } + case CLOSE_DOCUMENT: { + CloseDocumentRequest request = value.getCloseDocument(); + parser.remove(request.getTextDocument().getUri()); + break; + } + case REQUEST_NOT_SET: { + throw GrpcUtil.statusRuntimeException(Code.INVALID_ARGUMENT, + "Autocomplete command missing request"); + } + } + } + + private void getCompletionItems(GetCompletionItemsRequest request, + SessionState.ExportObject exportedConsole, CompletionParser parser, + StreamObserver responseObserver) { + final ScriptSession scriptSession = exportedConsole.get(); + try (final SafeCloseable ignored = scriptSession.getExecutionContext().open()) { + final VariableProvider vars = scriptSession.getVariableProvider(); + final VersionedTextDocumentIdentifier doc = request.getTextDocument(); + final CompletionLookups h = CompletionLookups.preload(scriptSession); + // The only stateful part of a completer is the CompletionLookups, which are already + // once-per-session-cached + // so, we'll just create a new completer for each request. No need to hang onto these guys. + final ChunkerCompleter completer = new ChunkerCompleter(log, vars, h); + + final ParsedDocument parsed; + try { + parsed = parser.finish(doc.getUri()); + } catch (CompletionCancelled exception) { + if (log.isTraceEnabled()) { + log.trace().append("Completion canceled").append(exception).endl(); + } + safelyExecuteLocked(responseObserver, + () -> responseObserver.onNext(AutoCompleteResponse.newBuilder() + .setCompletionItems(GetCompletionItemsResponse.newBuilder() + .setSuccess(false) + .setRequestId(request.getRequestId())) + .build())); + return; + } + + int offset = LspTools.getOffsetFromPosition(parsed.getSource(), + request.getPosition()); + final Collection results = + completer.runCompletion(parsed, request.getPosition(), offset); + final GetCompletionItemsResponse mangledResults = + GetCompletionItemsResponse.newBuilder() + .setSuccess(true) + .setRequestId(request.getRequestId()) + .addAllItems(results.stream().map( + // insertTextFormat is a default we used to set in constructor; for now, we'll + // just process the objects before sending back to client + item -> item.setInsertTextFormat(2).build()) + .collect(Collectors.toSet())) + .build(); + + safelyExecuteLocked(responseObserver, + () -> responseObserver.onNext(AutoCompleteResponse.newBuilder() + .setCompletionItems(mangledResults) + .build())); + } catch (Exception exception) { + if (ConsoleServiceGrpcImpl.QUIET_AUTOCOMPLETE_ERRORS) { + if (log.isTraceEnabled()) { + log.trace().append("Exception occurred during autocomplete").append(exception).endl(); + } + } else { + log.error().append("Exception occurred during autocomplete").append(exception).endl(); + } + safelyExecuteLocked(responseObserver, + () -> responseObserver.onNext(AutoCompleteResponse.newBuilder() + .setCompletionItems(GetCompletionItemsResponse.newBuilder() + .setSuccess(false) + .setRequestId(request.getRequestId())) + .build())); + } + } + + @Override + public void onError(Throwable t) { + // ignore, client doesn't need us, will be cleaned up later + } + + @Override + public void onCompleted() { + // just hang up too, browser will reconnect if interested + synchronized (responseObserver) { + responseObserver.onCompleted(); + } + } +} diff --git a/server/src/main/java/io/deephaven/server/console/completer/PythonAutoCompleteObserver.java b/server/src/main/java/io/deephaven/server/console/completer/PythonAutoCompleteObserver.java new file mode 100644 index 00000000000..e64d1295a3c --- /dev/null +++ b/server/src/main/java/io/deephaven/server/console/completer/PythonAutoCompleteObserver.java @@ -0,0 +1,240 @@ +package io.deephaven.server.console.completer; + +import com.google.rpc.Code; +import io.deephaven.engine.util.ScriptSession; +import io.deephaven.extensions.barrage.util.GrpcUtil; +import io.deephaven.internal.log.LoggerFactory; +import io.deephaven.io.logger.Logger; +import io.deephaven.lang.completion.ChunkerCompleter; +import io.deephaven.lang.parse.CompletionParser; +import io.deephaven.proto.backplane.script.grpc.*; +import io.deephaven.server.console.ConsoleServiceGrpcImpl; +import io.deephaven.server.session.SessionState; +import io.deephaven.util.SafeCloseable; +import io.grpc.stub.StreamObserver; +import org.jpy.PyObject; + +import javax.inject.Provider; +import java.util.ArrayList; +import java.util.List; + +import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyExecuteLocked; + +/** + * Autocomplete handling for python that will use the jedi library, if it is installed. + */ +public class PythonAutoCompleteObserver implements StreamObserver { + + private static final Logger log = LoggerFactory.getLogger(PythonAutoCompleteObserver.class); + + /** + * We only log timing for completions that take longer than, currently, 100ms + */ + private static final long HUNDRED_MS_IN_NS = 100_000_000; + private final Provider scriptSession; + private final SessionState session; + private final StreamObserver responseObserver; + + public PythonAutoCompleteObserver(StreamObserver responseObserver, + Provider scriptSession, final SessionState session) { + this.scriptSession = scriptSession; + this.session = session; + this.responseObserver = responseObserver; + } + + @Override + @SuppressWarnings("DuplicatedCode") + public void onNext(AutoCompleteRequest value) { + switch (value.getRequestCase()) { + case OPEN_DOCUMENT: { + final OpenDocumentRequest openDoc = value.getOpenDocument(); + final TextDocumentItem doc = openDoc.getTextDocument(); + PyObject completer = (PyObject) scriptSession.get().getVariable("jedi_settings"); + completer.callMethod("open_doc", doc.getText(), doc.getUri(), doc.getVersion()); + break; + } + case CHANGE_DOCUMENT: { + ChangeDocumentRequest request = value.getChangeDocument(); + final VersionedTextDocumentIdentifier text = request.getTextDocument(); + + PyObject completer = (PyObject) scriptSession.get().getVariable("jedi_settings"); + String uri = text.getUri(); + int version = text.getVersion(); + String document = completer.callMethod("get_doc", text.getUri()).getStringValue(); + + final List changes = + request.getContentChangesList(); + document = CompletionParser.updateDocumentChanges(uri, version, document, changes); + if (document == null) { + return; + } + + completer.callMethod("update_doc", document, uri, version); + break; + } + case GET_COMPLETION_ITEMS: { + GetCompletionItemsRequest request = value.getGetCompletionItems(); + SessionState.ExportObject exportedConsole = + session.getExport(request.getConsoleId(), "consoleId"); + session.nonExport() + .require(exportedConsole) + .onError(responseObserver) + .submit(() -> { + getCompletionItems(request, exportedConsole, responseObserver); + }); + break; + } + case CLOSE_DOCUMENT: { + CloseDocumentRequest request = value.getCloseDocument(); + PyObject completer = (PyObject) scriptSession.get().getVariable("jedi_settings"); + completer.callMethod("close_doc", request.getTextDocument().getUri()); + break; + } + case REQUEST_NOT_SET: { + throw GrpcUtil.statusRuntimeException(Code.INVALID_ARGUMENT, + "Autocomplete command missing request"); + } + } + } + + private void getCompletionItems(GetCompletionItemsRequest request, + SessionState.ExportObject exportedConsole, + StreamObserver responseObserver) { + final ScriptSession scriptSession = exportedConsole.get(); + try (final SafeCloseable ignored = scriptSession.getExecutionContext().open()) { + + PyObject completer = (PyObject) scriptSession.getVariable("jedi_settings"); + boolean canJedi = completer.callMethod("is_enabled").getBooleanValue(); + if (!canJedi) { + log.trace().append("Ignoring completion request because jedi is disabled").endl(); + // send back an empty, failed response... + safelyExecuteLocked(responseObserver, + () -> responseObserver.onNext(AutoCompleteResponse.newBuilder() + .setCompletionItems(GetCompletionItemsResponse.newBuilder() + .setSuccess(false) + .setRequestId(request.getRequestId())) + .build())); + return; + } + final VersionedTextDocumentIdentifier doc = request.getTextDocument(); + final Position pos = request.getPosition(); + final long startNano = System.nanoTime(); + + if (log.isTraceEnabled()) { + String text = completer.call("get_doc", doc.getUri()).getStringValue(); + log.trace().append("Completion version ").append(doc.getVersion()) + .append(" has source code:").append(text).endl(); + } + final PyObject results = completer.callMethod("do_completion", doc.getUri(), doc.getVersion(), + // our java is 0-indexed lines, 1-indexed chars. jedi is 1-indexed-both. + // we'll keep that translation ugliness to the in-java result-processing. + pos.getLine() + 1, pos.getCharacter()); + if (!results.isList()) { + throw new UnsupportedOperationException( + "Expected list from jedi_settings.do_completion, got " + results.call("repr")); + } + final long nanosJedi = System.nanoTime(); + // translate from-python list of completion results. For now, each item in the outer list is a [str, int] + // which contains the text of the replacement, and the column where is should be inserted. + List finalItems = new ArrayList<>(); + + for (PyObject result : results.asList()) { + if (!result.isList()) { + throw new UnsupportedOperationException("Expected list-of-lists from jedi_settings.do_completion, " + + + "got bad result " + result.call("repr") + " from full results: " + results.call("repr")); + } + // we expect [ "completion text", start_column ] as our result. + // in the future we may want to get more interesting info from jedi to pass back to client + final List items = result.asList(); + String completionName = items.get(0).getStringValue(); + int start = items.get(1).getIntValue(); + final CompletionItem.Builder item = CompletionItem.newBuilder(); + final TextEdit.Builder textEdit = item.getTextEditBuilder(); + textEdit.setText(completionName); + final DocumentRange.Builder range = textEdit.getRangeBuilder(); + item.setStart(start); + item.setLabel(completionName); + item.setLength(completionName.length()); + range.getStartBuilder().setLine(pos.getLine()).setCharacter(start); + range.getEndBuilder().setLine(pos.getLine()).setCharacter(start + completionName.length()); + item.setInsertTextFormat(2); + item.setSortText(ChunkerCompleter.sortable(finalItems.size())); + finalItems.add(item.build()); + } + + final long nanosBuiltResponse = System.nanoTime(); + + final GetCompletionItemsResponse builtItems = GetCompletionItemsResponse.newBuilder() + .setSuccess(true) + .setRequestId(request.getRequestId()) + .addAllItems(finalItems) + .build(); + + try { + safelyExecuteLocked(responseObserver, + () -> responseObserver.onNext(AutoCompleteResponse.newBuilder() + .setCompletionItems(builtItems) + .build())); + } finally { + // let's track how long completions take, as it's known that some + // modules like numpy can cause slow completion, and we'll want to know what was causing them + final long totalCompletionNanos = nanosBuiltResponse - startNano; + final long totalJediNanos = nanosJedi - startNano; + final long totalResponseBuildNanos = nanosBuiltResponse - nanosJedi; + // only log completions taking more than 100ms + if (totalCompletionNanos > HUNDRED_MS_IN_NS && log.isTraceEnabled()) { + log.trace().append("Found ") + .append(finalItems.size()) + .append(" jedi completions from doc ") + .append(doc.getVersion()) + .append("\tjedi_time=").append(toMillis(totalJediNanos)) + .append("\tbuild_response_time=").append(toMillis(totalResponseBuildNanos)) + .append("\ttotal_complete_time=").append(toMillis(totalCompletionNanos)) + .endl(); + } + } + } catch (Throwable exception) { + if (ConsoleServiceGrpcImpl.QUIET_AUTOCOMPLETE_ERRORS) { + exception.printStackTrace(); + if (log.isTraceEnabled()) { + log.trace().append("Exception occurred during autocomplete").append(exception).endl(); + } + } else { + log.error().append("Exception occurred during autocomplete").append(exception).endl(); + } + safelyExecuteLocked(responseObserver, + () -> responseObserver.onNext(AutoCompleteResponse.newBuilder() + .setCompletionItems(GetCompletionItemsResponse.newBuilder() + .setSuccess(false) + .setRequestId(request.getRequestId())) + .build())); + if (exception instanceof Error) { + throw exception; + } + } + } + + private String toMillis(final long totalNanos) { + StringBuilder totalNano = new StringBuilder(Long.toString(totalNanos)); + while (totalNano.length() < 7) { + totalNano.insert(0, "0"); + } + int milliCutoff = totalNano.length() - 6; + return totalNano.substring(0, milliCutoff) + "." + + (totalNano.substring(milliCutoff, Math.min(milliCutoff + 2, totalNano.length()))) + "ms"; + } + + @Override + public void onError(Throwable t) { + // ignore, client doesn't need us, will be cleaned up later + } + + @Override + public void onCompleted() { + // just hang up too, browser will reconnect if interested + synchronized (responseObserver) { + responseObserver.onCompleted(); + } + } +}