Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into 2985-better-shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
niloc132 committed Dec 2, 2022
2 parents a7e233e + 0bb8eac commit 1ecd509
Show file tree
Hide file tree
Showing 26 changed files with 951 additions and 295 deletions.
1 change: 1 addition & 0 deletions docker/server-jetty/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ def targetArch = Architecture.targetArchitecture(project)

def baseMapAmd64 = [
'server-base': 'server-jetty',
'all-ai-base': 'server-all-ai-jetty',
]

// Only the server image is supported on arm64
Expand Down
64 changes: 64 additions & 0 deletions docker/server-jetty/src/main/server-all-ai-jetty/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
absl-py==1.3.0
astunparse==1.6.3
cachetools==5.2.0
certifi==2022.9.24
charset-normalizer==2.1.1
click==8.1.3
deephaven-plugin==0.3.0
flatbuffers==2.0.7
gast==0.4.0
google-auth==2.14.1
google-auth-oauthlib==0.4.6
google-pasta==0.2.0
grpcio==1.51.1
h5py==3.7.0
idna==3.4
importlib-metadata==5.1.0
java-utilities==0.2.0
jedi==0.18.2
joblib==1.2.0
jpy==0.13.0
keras==2.7.0
Keras-Preprocessing==1.1.2
libclang==14.0.6
llvmlite==0.39.1
Markdown==3.4.1
MarkupSafe==2.1.1
nltk==3.7
numba==0.56.4
numpy==1.21.6
nvidia-cublas-cu11==11.10.3.66
nvidia-cuda-nvrtc-cu11==11.7.99
nvidia-cuda-runtime-cu11==11.7.99
nvidia-cudnn-cu11==8.5.0.96
oauthlib==3.2.2
opt-einsum==3.3.0
pandas==1.3.5
parso==0.8.3
protobuf==3.19.6
pyasn1==0.4.8
pyasn1-modules==0.2.8
python-dateutil==2.8.2
pytz==2022.6
regex==2022.10.31
requests==2.28.1
requests-oauthlib==1.3.1
rsa==4.9
scikit-learn==1.0.2
scipy==1.7.3
six==1.16.0
tensorboard==2.11.0
tensorboard-data-server==0.6.1
tensorboard-plugin-wit==1.8.1
tensorflow==2.7.4
tensorflow-estimator==2.7.0
tensorflow-io-gcs-filesystem==0.28.0
termcolor==2.1.1
threadpoolctl==3.1.0
torch==1.13.0
tqdm==4.64.1
typing_extensions==4.4.0
urllib3==1.26.13
Werkzeug==2.2.2
wrapt==1.14.1
zipp==3.11.0
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.io.Externalizable;
import java.math.BigDecimal;
import java.math.BigInteger;

/**
* Utility class to concentrate {@link ObjectCodec} lookups.
Expand Down Expand Up @@ -76,7 +77,11 @@ private static boolean noCodecRequired(@NotNull final Class<?> dataType) {
// appropriate precision and scale calculated from column data,
// unless the user explicitly requested something else
// via instructions.
dataType == BigDecimal.class;
dataType == BigDecimal.class ||

// BigIntegers can be encoded as a DecimalLogicalType using a precision of 1 and scale of 0, which lets
// them be read by other parquet tools.
dataType == BigInteger.class;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
* refreshing tables, we need the user to tell us.
*/
public class BigDecimalUtils {
private static final PrecisionAndScale EMPTY_TABLE_PRECISION_AND_SCALE = new PrecisionAndScale(1, 1);
private static final int TARGET_CHUNK_SIZE = 4096;
public static final int INVALID_PRECISION_OR_SCALE = -1;

/**
Expand Down Expand Up @@ -53,7 +55,8 @@ public static PrecisionAndScale computePrecisionAndScale(
}

/**
* Compute an overall precision and scale that would fit all existing values in a column source.
* Compute an overall precision and scale that would fit all existing values in a column source. Note that this
* requires a full table scan to ensure the correct values are determined.
*
* @param rowSet The rowset for the provided column
* @param source a {@code ColumnSource} of {@code BigDecimal} type
Expand All @@ -62,29 +65,45 @@ public static PrecisionAndScale computePrecisionAndScale(
public static PrecisionAndScale computePrecisionAndScale(
final RowSet rowSet,
final ColumnSource<BigDecimal> source) {
final int sz = 4096;
// we first compute max(precision - scale) and max(scale), which corresponds to
// max(digits left of the decimal point), max(digits right of the decimal point).
// Then we convert to (precision, scale) before returning.
int maxPrecisionMinusScale = 0;
int maxScale = 0;
try (final ChunkSource.GetContext context = source.makeGetContext(sz);
if (rowSet.isEmpty()) {
return EMPTY_TABLE_PRECISION_AND_SCALE;
}

// We will walk the entire table to determine the max(precision - scale) and
// max(scale), which corresponds to max(digits left of the decimal point), max(digits right of the decimal
// point). Then we convert to (precision, scale) before returning.
int maxPrecisionMinusScale = -1;
int maxScale = -1;
try (final ChunkSource.GetContext context = source.makeGetContext(TARGET_CHUNK_SIZE);
final RowSequence.Iterator it = rowSet.getRowSequenceIterator()) {
final RowSequence rowSeq = it.getNextRowSequenceWithLength(sz);
final ObjectChunk<BigDecimal, ? extends Values> chunk = source.getChunk(context, rowSeq).asObjectChunk();
for (int i = 0; i < chunk.size(); ++i) {
final BigDecimal x = chunk.get(i);
final int precision = x.precision();
final int scale = x.scale();
final int precisionMinusScale = precision - scale;
if (precisionMinusScale > maxPrecisionMinusScale) {
maxPrecisionMinusScale = precisionMinusScale;
}
if (scale > maxScale) {
maxScale = scale;
while (it.hasMore()) {
final RowSequence rowSeq = it.getNextRowSequenceWithLength(TARGET_CHUNK_SIZE);
final ObjectChunk<BigDecimal, ? extends Values> chunk =
source.getChunk(context, rowSeq).asObjectChunk();
for (int i = 0; i < chunk.size(); ++i) {
final BigDecimal x = chunk.get(i);
if (x == null) {
continue;
}

final int precision = x.precision();
final int scale = x.scale();
final int precisionMinusScale = precision - scale;
if (precisionMinusScale > maxPrecisionMinusScale) {
maxPrecisionMinusScale = precisionMinusScale;
}
if (scale > maxScale) {
maxScale = scale;
}
}
}
}

// If these are < 0, then every value we visited was null
if (maxPrecisionMinusScale < 0 && maxScale < 0) {
return EMPTY_TABLE_PRECISION_AND_SCALE;
}

return new PrecisionAndScale(maxPrecisionMinusScale + maxScale, maxScale);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.testutil.TstUtils;
import io.deephaven.parquet.table.BigIntegerParquetBytesCodec;
import io.deephaven.parquet.table.ParquetTools;
import io.deephaven.engine.util.TableTools;
import io.deephaven.parquet.table.ParquetInstructions;
Expand Down Expand Up @@ -51,16 +52,18 @@ public class TestCodecColumns {
ColumnDefinition.fromGenericType("VWBA", byte[].class, byte.class);
writeBuilder.addColumnCodec("VWBA", SimpleByteArrayCodec.class.getName());
readBuilder.addColumnCodec("VWBA", SimpleByteArrayCodec.class.getName());

VARIABLE_WIDTH_COLUMN_DEFINITION_2 = ColumnDefinition.fromGenericType("VWCD", ArrayTuple.class);
readBuilder.addColumnCodec("VWCD", ExternalizableCodec.class.getName(), ArrayTuple.class.getName());
FIXED_WIDTH_BYTE_ARRAY_COLUMN_DEFINITION = ColumnDefinition.fromGenericType("FWBA", byte[].class, byte.class);
writeBuilder.addColumnCodec("FWBA", SimpleByteArrayCodec.class.getName(), "9");
readBuilder.addColumnCodec("FWBA", SimpleByteArrayCodec.class.getName(), "9");

VARIABLE_WIDTH_BIG_INTEGER_COLUMN_DEFINITION = ColumnDefinition.fromGenericType("VWBI", BigInteger.class);
writeBuilder.addColumnCodec("VWBI", BigIntegerCodec.class.getName());
readBuilder.addColumnCodec("VWBI", BigIntegerCodec.class.getName());
writeBuilder.addColumnCodec("VWBI", SerializableCodec.class.getName());
readBuilder.addColumnCodec("VWBI", SerializableCodec.class.getName());

VARIABLE_WIDTH_BIG_INTEGER_COLUMN_DEFINITION_S = ColumnDefinition.fromGenericType("VWBIS", BigInteger.class);
readBuilder.addColumnCodec("VWBIS", SerializableCodec.class.getName());
expectedReadInstructions = readBuilder.build();
writeInstructions = writeBuilder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.util.FunctionalInterfaces;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.locks.AwareFunctionalLock;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
Expand Down Expand Up @@ -105,13 +104,8 @@ public static void installInstrumentation(@Nullable final Instrumentation instru
rwLock = new ReentrantReadWriteLock(true);
readLock = rwLock.readLock();
writeLock = rwLock.writeLock();
if (allowUnitTestMode) {
sharedLock = new DebugAwareFunctionalLock(new SharedLock());
exclusiveLock = new DebugAwareFunctionalLock(new ExclusiveLock());
} else {
sharedLock = new SharedLock();
exclusiveLock = new ExclusiveLock();
}
sharedLock = new SharedLock();
exclusiveLock = allowUnitTestMode ? new DebugAwareFunctionalLock(new ExclusiveLock()) : new ExclusiveLock();
this.allowUnitTestMode = allowUnitTestMode;
}

Expand Down Expand Up @@ -306,10 +300,10 @@ public final Condition newCondition() {

// region DebugLock
class DebugAwareFunctionalLock implements AwareFunctionalLock {
private final AwareFunctionalLock delegate;
private final Deque<Throwable> lockingContext = new ArrayDeque<>();
private final ExclusiveLock delegate;
private final Deque<Throwable> lockingContext = new ConcurrentLinkedDeque<>();

DebugAwareFunctionalLock(AwareFunctionalLock delegate) {
DebugAwareFunctionalLock(ExclusiveLock delegate) {
this.delegate = delegate;
}

Expand All @@ -321,19 +315,19 @@ public boolean isHeldByCurrentThread() {
@Override
public void lock() {
delegate.lock();
lockingContext.push(new Throwable());
pushContext();
}

@Override
public void lockInterruptibly() throws InterruptedException {
delegate.lockInterruptibly();
lockingContext.push(new Throwable());
pushContext();
}

@Override
public boolean tryLock() {
if (delegate.tryLock()) {
lockingContext.push(new Throwable());
pushContext();
return true;
}
return false;
Expand All @@ -342,16 +336,16 @@ public boolean tryLock() {
@Override
public boolean tryLock(long time, @NotNull TimeUnit unit) throws InterruptedException {
if (delegate.tryLock(time, unit)) {
lockingContext.push(new Throwable());
pushContext();
return true;
}
return false;
}

@Override
public void unlock() {
delegate.unlock();
lockingContext.pop();
delegate.unlock();
}

@NotNull
Expand All @@ -360,37 +354,20 @@ public Condition newCondition() {
return delegate.newCondition();
}

@Override
public <EXCEPTION_TYPE extends Exception> void doLocked(
@NotNull FunctionalInterfaces.ThrowingRunnable<EXCEPTION_TYPE> runnable) throws EXCEPTION_TYPE {
delegate.doLocked(runnable);
}

@Override
public <EXCEPTION_TYPE extends Exception> void doLockedInterruptibly(
@NotNull FunctionalInterfaces.ThrowingRunnable<EXCEPTION_TYPE> runnable)
throws InterruptedException, EXCEPTION_TYPE {
delegate.doLockedInterruptibly(runnable);
}

@Override
public <RESULT_TYPE, EXCEPTION_TYPE extends Exception> RESULT_TYPE computeLocked(
@NotNull FunctionalInterfaces.ThrowingSupplier<RESULT_TYPE, EXCEPTION_TYPE> supplier)
throws EXCEPTION_TYPE {
return delegate.computeLocked(supplier);
}

@Override
public <RESULT_TYPE, EXCEPTION_TYPE extends Exception> RESULT_TYPE computeLockedInterruptibly(
@NotNull FunctionalInterfaces.ThrowingSupplier<RESULT_TYPE, EXCEPTION_TYPE> supplier)
throws InterruptedException, EXCEPTION_TYPE {
return delegate.computeLockedInterruptibly(supplier);
}

String getDebugMessage() {
final Throwable item = lockingContext.peek();
return item == null ? "locking context is empty" : ExceptionUtils.getStackTrace(item);
}

private void pushContext() {
// Implementation must have already acquired lock
try {
lockingContext.push(new Throwable());
} catch (Throwable t) {
delegate.unlock();
throw t;
}
}
}
// endregion DebugLock

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,16 +575,16 @@ public void enableUnitTestMode() {
}

private void assertLockAvailable(@NotNull final String action) {
if (!UpdateGraphProcessor.DEFAULT.exclusiveLock().tryLock()) {
final AwareFunctionalLock exclusiveLock = UpdateGraphProcessor.DEFAULT.exclusiveLock();
if (!exclusiveLock.tryLock()) {
log.error().append("Lock is held when ").append(action).append(", with previous holder: ")
.append(unitTestModeHolder).endl();
ThreadDump.threadDump(System.err);
UpdateGraphLock.DebugAwareFunctionalLock lock =
(UpdateGraphLock.DebugAwareFunctionalLock) UpdateGraphProcessor.DEFAULT.exclusiveLock();
UpdateGraphLock.DebugAwareFunctionalLock lock = (UpdateGraphLock.DebugAwareFunctionalLock) exclusiveLock;
throw new IllegalStateException(
"Lock is held when " + action + ", with previous holder: " + lock.getDebugMessage());
}
UpdateGraphProcessor.DEFAULT.exclusiveLock().unlock();
exclusiveLock.unlock();
}

/**
Expand Down
Loading

0 comments on commit 1ecd509

Please sign in to comment.