Skip to content

Commit

Permalink
Introduce TableInitializationException to Propagate JobScheduler Fail…
Browse files Browse the repository at this point in the history
…ures (deephaven#5165)
  • Loading branch information
nbauernfeind authored Feb 17, 2024
1 parent 8875f66 commit 1ef397c
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 97 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.engine.exceptions;

import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/**
* An {@link UncheckedTableException} derivative which indicates a table was unable to be initialized for one reason or
* another.
*/
public class TableInitializationException extends UncheckedTableException {

public TableInitializationException(String reason, Throwable cause) {
super(reason, cause);
}

public TableInitializationException(@NotNull String tableDescription, @Nullable String reason) {
super(makeDescription(tableDescription, reason));
}

public TableInitializationException(@NotNull String tableDescription, @Nullable String reason,
Throwable cause) {
super(makeDescription(tableDescription, reason), cause);
}

private static String makeDescription(@NotNull String tableDescription, @Nullable String reason) {
final StringBuilder sb = new StringBuilder();

sb.append("Error while initializing ").append(tableDescription);

if (reason != null && !reason.isEmpty()) {
sb.append(": ").append(reason);
}

return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.deephaven.configuration.Configuration;
import io.deephaven.datastructures.util.CollectionUtil;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.exceptions.TableInitializationException;
import io.deephaven.engine.table.impl.util.*;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.exceptions.CancellationException;
Expand Down Expand Up @@ -1185,7 +1186,8 @@ private QueryTable whereInternal(final WhereFilter... filters) {
return this;
}

return QueryPerformanceRecorder.withNugget("where(" + Arrays.toString(filters) + ")", sizeForInstrumentation(),
final String whereDescription = "where(" + Arrays.toString(filters) + ")";
return QueryPerformanceRecorder.withNugget(whereDescription, sizeForInstrumentation(),
() -> {
for (int fi = 0; fi < filters.length; ++fi) {
if (!(filters[fi] instanceof ReindexingFilter)) {
Expand Down Expand Up @@ -1255,11 +1257,10 @@ private QueryTable whereInternal(final WhereFilter... filters) {
} catch (ExecutionException | InterruptedException e) {
if (e instanceof InterruptedException) {
throw new CancellationException("interrupted while filtering");
} else if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else {
throw new UncheckedDeephavenException(e);
}
throw new TableInitializationException(whereDescription,
"an exception occurred while performing the initial filter",
e.getCause());
} finally {
// account for work done in alternative threads
final BasePerformanceEntry basePerformanceEntry =
Expand Down Expand Up @@ -1291,7 +1292,7 @@ private QueryTable whereInternal(final WhereFilter... filters) {

if (snapshotControl != null) {
final ListenerRecorder recorder = new ListenerRecorder(
"where(" + Arrays.toString(filters) + ")", QueryTable.this,
whereDescription, QueryTable.this,
filteredTable);
final WhereListener whereListener = new WhereListener(
log, this, recorder, filteredTable, filters);
Expand Down Expand Up @@ -1501,12 +1502,9 @@ this, mode, columns, rowSet, getModifiedColumnSetForUpdates(), publishTheseSourc
} catch (InterruptedException e) {
throw new CancellationException("interrupted while computing select or update");
} catch (ExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else {
throw new UncheckedDeephavenException("Failure computing select or update",
e.getCause());
}
throw new TableInitializationException(updateDescription,
"an exception occurred while performing the initial select or update",
e.getCause());
} finally {
final BasePerformanceEntry baseEntry = jobScheduler.getAccumulatedPerformance();
if (baseEntry != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.deephaven.api.updateby.UpdateByControl;
import io.deephaven.base.verify.Assert;
import io.deephaven.engine.exceptions.CancellationException;
import io.deephaven.engine.exceptions.TableInitializationException;
import io.deephaven.engine.liveness.LivenessScopeStack;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.RowSetShiftData;
Expand Down Expand Up @@ -55,7 +56,7 @@ class BucketedPartitionedUpdateByManager extends UpdateBy {
* @param source the source table
* @param preservedColumns columns from the source table that are unchanged in the result table
* @param resultSources the result sources
* @param byColumns the columns to use for the bucket keys
* @param byColumnNames the columns to use for the bucket keys
* @param timestampColumnName the column to use for all time-aware operators
* @param rowRedirection the row redirection for dense output sources
* @param control the control object.
Expand Down Expand Up @@ -144,12 +145,9 @@ class BucketedPartitionedUpdateByManager extends UpdateBy {
} catch (InterruptedException e) {
throw new CancellationException("Interrupted while initializing bucketed updateBy");
} catch (ExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else {
// rethrow the error
throw new UncheckedDeephavenException("Failure while initializing bucketed updateBy", e.getCause());
}
throw new TableInitializationException(result.getDescription(),
"an exception occurred while initializing bucketed updateBy",
e.getCause());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.deephaven.UncheckedDeephavenException;
import io.deephaven.api.updateby.UpdateByControl;
import io.deephaven.engine.exceptions.CancellationException;
import io.deephaven.engine.exceptions.TableInitializationException;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.RowSetShiftData;
import io.deephaven.engine.table.ColumnSource;
Expand Down Expand Up @@ -108,12 +109,9 @@ public class ZeroKeyUpdateByManager extends UpdateBy {
} catch (InterruptedException e) {
throw new CancellationException("Interrupted while initializing zero-key updateBy");
} catch (ExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else {
// rethrow the error
throw new UncheckedDeephavenException("Failure while initializing zero-key updateBy", e.getCause());
}
throw new TableInitializationException(bucketDescription,
"an exception occurred while initializing zero-key updateBy",
e.getCause());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,8 @@ public void testParallel() {
} catch (InterruptedException e) {
throw new CancellationException("interrupted while processing test");
} catch (ExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else {
// rethrow the error
throw new UncheckedDeephavenException("failure while processing test", e.getCause());
}
// rethrow the error
throw new UncheckedDeephavenException("failure while processing test", e.getCause());
}
}

Expand Down Expand Up @@ -111,12 +107,8 @@ public void testParallelWithResume() {
} catch (InterruptedException e) {
throw new CancellationException("interrupted while processing test");
} catch (ExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else {
// rethrow the error
throw new UncheckedDeephavenException("failure while processing test", e.getCause());
}
// rethrow the error
throw new UncheckedDeephavenException("failure while processing test", e.getCause());
}
}

Expand Down Expand Up @@ -181,12 +173,8 @@ public void close() {
} catch (InterruptedException e) {
throw new CancellationException("interrupted while processing test");
} catch (ExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else {
// rethrow the error
throw new UncheckedDeephavenException("failure while processing test", e.getCause());
}
// rethrow the error
throw new UncheckedDeephavenException("failure while processing test", e.getCause());
}
}

Expand Down Expand Up @@ -232,12 +220,8 @@ public void testSerialWithResume() {
} catch (InterruptedException e) {
throw new CancellationException("interrupted while processing test");
} catch (ExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else {
// rethrow the error
throw new UncheckedDeephavenException("failure while processing test", e.getCause());
}
// rethrow the error
throw new UncheckedDeephavenException("failure while processing test", e.getCause());
}
}

Expand Down Expand Up @@ -302,12 +286,8 @@ public void close() {
} catch (InterruptedException e) {
throw new CancellationException("interrupted while processing test");
} catch (ExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else {
// rethrow the error
throw new UncheckedDeephavenException("failure while processing test", e.getCause());
}
// rethrow the error
throw new UncheckedDeephavenException("failure while processing test", e.getCause());
}
}

Expand Down Expand Up @@ -340,12 +320,8 @@ public void testSerialEmpty() {
} catch (InterruptedException e) {
throw new CancellationException("interrupted while processing test");
} catch (ExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else {
// rethrow the error
throw new UncheckedDeephavenException("failure while processing test", e.getCause());
}
// rethrow the error
throw new UncheckedDeephavenException("failure while processing test", e.getCause());
}
}

Expand Down Expand Up @@ -379,12 +355,8 @@ public void testParallelEmpty() {
} catch (InterruptedException e) {
throw new CancellationException("interrupted while processing test");
} catch (ExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else {
// rethrow the error
throw new UncheckedDeephavenException("failure while processing test", e.getCause());
}
// rethrow the error
throw new UncheckedDeephavenException("failure while processing test", e.getCause());
}
}

Expand Down Expand Up @@ -453,12 +425,8 @@ public void close() {
} catch (InterruptedException e) {
throw new CancellationException("Interrupted while processing test");
} catch (ExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else {
// rethrow the error
throw new UncheckedDeephavenException("Failure while processing test", e.getCause());
}
// rethrow the error
throw new UncheckedDeephavenException("Failure while processing test", e.getCause());
}
}

Expand Down Expand Up @@ -536,12 +504,8 @@ public void close() {
} catch (InterruptedException e) {
throw new CancellationException("interrupted while processing test");
} catch (ExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else {
// rethrow the error
throw new UncheckedDeephavenException("failure while processing test", e.getCause());
}
// rethrow the error
throw new UncheckedDeephavenException("failure while processing test", e.getCause());
}
}

Expand Down Expand Up @@ -618,12 +582,8 @@ public void close() {
} catch (InterruptedException e) {
throw new CancellationException("Interrupted while processing test");
} catch (ExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else {
// rethrow the error
throw new UncheckedDeephavenException("Failure while processing test", e.getCause());
}
// rethrow the error
throw new UncheckedDeephavenException("Failure while processing test", e.getCause());
}
}

Expand Down Expand Up @@ -696,12 +656,8 @@ public void close() {
} catch (InterruptedException e) {
throw new CancellationException("Interrupted while processing test");
} catch (ExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else {
// rethrow the error
throw new UncheckedDeephavenException("Failure while processing test", e.getCause());
}
// rethrow the error
throw new UncheckedDeephavenException("Failure while processing test", e.getCause());
}
}

Expand Down Expand Up @@ -774,12 +730,8 @@ public void close() {
} catch (InterruptedException e) {
throw new CancellationException("Interrupted while processing test");
} catch (ExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else {
// rethrow the error
throw new UncheckedDeephavenException("Failure while processing test", e.getCause());
}
// rethrow the error
throw new UncheckedDeephavenException("Failure while processing test", e.getCause());
}
}

Expand Down

0 comments on commit 1ef397c

Please sign in to comment.