Skip to content

Commit

Permalink
fix: Close bq read client (#3644)
Browse files Browse the repository at this point in the history
* Shutdown bqReadClient after high throughput read

* Code formatted to google-java-format
  • Loading branch information
naung9 authored Jan 16, 2025
1 parent fadd992 commit 8833c97
Showing 1 changed file with 13 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class ConnectionImpl implements Connection {
private final Logger logger = Logger.getLogger(this.getClass().getName());
private BigQueryReadClient bqReadClient;
private static final long EXECUTOR_TIMEOUT_SEC = 10;
private static final long BIGQUERY_TIMEOUT_SEC = 10;
private BlockingQueue<AbstractList<FieldValue>>
bufferFvl; // initialized lazily iff we end up using the tabledata.list end point
private BlockingQueue<BigQueryResultImpl.Row>
Expand Down Expand Up @@ -148,8 +149,15 @@ public synchronized boolean close() throws BigQuerySQLException {
flagEndOfStream(); // an End of Stream flag in the buffer so that the `ResultSet.next()` stops
// advancing the cursor
queryTaskExecutor.shutdownNow();
boolean isBqReadClientTerminated = true;
try {
if (queryTaskExecutor.awaitTermination(EXECUTOR_TIMEOUT_SEC, TimeUnit.SECONDS)) {
if (bqReadClient != null) {
bqReadClient.shutdownNow();
isBqReadClientTerminated =
bqReadClient.awaitTermination(BIGQUERY_TIMEOUT_SEC, TimeUnit.SECONDS);
}
if (queryTaskExecutor.awaitTermination(EXECUTOR_TIMEOUT_SEC, TimeUnit.SECONDS)
&& isBqReadClientTerminated) {
return true;
} // else queryTaskExecutor.isShutdown() will be returned outside this try block
} catch (InterruptedException e) {
Expand All @@ -159,7 +167,9 @@ public synchronized boolean close() throws BigQuerySQLException {
e); // Logging InterruptedException instead of throwing the exception back, close method
// will return queryTaskExecutor.isShutdown()
}
return queryTaskExecutor.isShutdown(); // check if the executor has been shutdown

return queryTaskExecutor.isShutdown()
&& isBqReadClientTerminated; // check if the executor has been shutdown
}

/**
Expand Down Expand Up @@ -992,7 +1002,6 @@ BigQueryResult highThroughPutRead(
// DO a regex check using order by and use multiple streams
;
ReadSession readSession = bqReadClient.createReadSession(builder.build());

bufferRow = new LinkedBlockingDeque<>(getBufferSize());
Map<String, Integer> arrowNameToIndex = new HashMap<>();
// deserialize and populate the buffer async, so that the client isn't blocked
Expand Down Expand Up @@ -1050,6 +1059,7 @@ private void processArrowStreamAsync(
"\n" + Thread.currentThread().getName() + " Interrupted @ markLast",
e);
}
bqReadClient.shutdownNow(); // Shutdown the read client
queryTaskExecutor.shutdownNow(); // Shutdown the thread pool
}
};
Expand Down

0 comments on commit 8833c97

Please sign in to comment.