Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
yirutang committed Nov 10, 2023
1 parent fc1eae9 commit 2df610d
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,6 @@ class ConnectionWorker implements AutoCloseable {
private RuntimeException testOnlyRunTimeExceptionInAppendLoop = null;
private long testOnlyAppendLoopSleepTime = 0;

private boolean enableLargerRequest = false;

/*
* Tracks the number of responses to ignore in the case of exclusive stream retry
*/
Expand All @@ -262,10 +260,7 @@ public static Boolean isDefaultStreamName(String streamName) {

/** The maximum size of one request. Defined by the API. */
public long getApiMaxRequestBytes() {
if (enableLargerRequest) {
return 20L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)
}
return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)
return 20L * 1000L * 1000L; // 20 megabytes (https://en.wikipedia.org/wiki/Megabyte)
}

static String extractProjectName(String streamName) {
Expand Down Expand Up @@ -294,8 +289,7 @@ public ConnectionWorker(
String traceId,
@Nullable String compressorName,
BigQueryWriteSettings clientSettings,
RetrySettings retrySettings,
boolean enableLargerRequest)
RetrySettings retrySettings)
throws IOException {
this.lock = new ReentrantLock();
this.hasMessageInWaitingQueue = lock.newCondition();
Expand All @@ -318,7 +312,6 @@ public ConnectionWorker(
this.inflightRequestQueue = new LinkedList<AppendRequestAndResponse>();
this.compressorName = compressorName;
this.retrySettings = retrySettings;
this.enableLargerRequest = enableLargerRequest;
// Always recreate a client for connection worker.
HashMap<String, String> newHeaders = new HashMap<>();
newHeaders.putAll(clientSettings.toBuilder().getHeaderProvider().getHeaders());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,10 +301,7 @@ private ConnectionWorker createOrReuseConnectionWorker(
if (connectionWorkerPool.size() < currentMaxConnectionCount) {
// Always create a new connection if we haven't reached current maximum.
return createConnectionWorker(
streamWriter.getStreamName(),
streamWriter.getLocation(),
streamWriter.getProtoSchema(),
streamWriter.getEnableLargerRequest());
streamWriter.getStreamName(), streamWriter.getLocation(), streamWriter.getProtoSchema());
} else {
ConnectionWorker existingBestConnection =
pickBestLoadConnection(
Expand All @@ -323,8 +320,7 @@ private ConnectionWorker createOrReuseConnectionWorker(
return createConnectionWorker(
streamWriter.getStreamName(),
streamWriter.getLocation(),
streamWriter.getProtoSchema(),
streamWriter.getEnableLargerRequest());
streamWriter.getProtoSchema());
} else {
// Stick to the original connection if all the connections are overwhelmed.
if (existingConnectionWorker != null) {
Expand Down Expand Up @@ -380,8 +376,7 @@ static ConnectionWorker pickBestLoadConnection(
* computeIfAbsent(...) which is at most once per key.
*/
private ConnectionWorker createConnectionWorker(
String streamName, String location, ProtoSchema writeSchema, boolean enableLargeRequest)
throws IOException {
String streamName, String location, ProtoSchema writeSchema) throws IOException {
if (enableTesting) {
// Though atomic integer is super lightweight, add extra if check in case adding future logic.
testValueCreateConnectionCount.getAndIncrement();
Expand All @@ -398,8 +393,7 @@ private ConnectionWorker createConnectionWorker(
traceId,
compressorName,
clientSettings,
retrySettings,
enableLargeRequest);
retrySettings);
connectionWorkerPool.add(connectionWorker);
log.info(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,6 @@ public class StreamWriter implements AutoCloseable {
*/
private final String location;

/*
* If larger request is enabled.
*/
private final Boolean enableLargerRequest;

/*
* If user has closed the StreamWriter.
*/
Expand Down Expand Up @@ -216,7 +211,6 @@ private StreamWriter(Builder builder) throws IOException {
this.streamName = builder.streamName;
this.writerSchema = builder.writerSchema;
this.defaultMissingValueInterpretation = builder.defaultMissingValueInterpretation;
this.enableLargerRequest = builder.enableLargerRequest;
BigQueryWriteSettings clientSettings = getBigQueryWriteSettings(builder);
if (!builder.enableConnectionPool) {
this.location = builder.location;
Expand All @@ -233,8 +227,7 @@ private StreamWriter(Builder builder) throws IOException {
builder.traceId,
builder.compressorName,
clientSettings,
builder.retrySettings,
builder.enableLargerRequest));
builder.retrySettings));
} else {
if (!isDefaultStream(streamName)) {
log.warning(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,7 @@ public void testAppendButInflightQueueFull() throws Exception {
TEST_TRACE_ID,
null,
client.getSettings(),
retrySettings,
false);
retrySettings);
testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1));
ConnectionWorker.setMaxInflightQueueWaitTime(500);

Expand Down Expand Up @@ -401,8 +400,7 @@ public void testThrowExceptionWhileWithinAppendLoop() throws Exception {
TEST_TRACE_ID,
null,
client.getSettings(),
retrySettings,
false);
retrySettings);
testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1));
ConnectionWorker.setMaxInflightQueueWaitTime(500);

Expand Down Expand Up @@ -471,8 +469,7 @@ public void testLocationMismatch() throws Exception {
TEST_TRACE_ID,
null,
client.getSettings(),
retrySettings,
false);
retrySettings);
StatusRuntimeException ex =
assertThrows(
StatusRuntimeException.class,
Expand Down Expand Up @@ -504,8 +501,7 @@ public void testStreamNameMismatch() throws Exception {
TEST_TRACE_ID,
null,
client.getSettings(),
retrySettings,
false);
retrySettings);
StatusRuntimeException ex =
assertThrows(
StatusRuntimeException.class,
Expand Down Expand Up @@ -558,8 +554,7 @@ private ConnectionWorker createConnectionWorker(
TEST_TRACE_ID,
null,
client.getSettings(),
retrySettings,
false);
retrySettings);
}

private ProtoSchema createProtoSchema(String protoName) {
Expand Down Expand Up @@ -654,8 +649,7 @@ public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws E
TEST_TRACE_ID,
null,
client.getSettings(),
retrySettings,
false);
retrySettings);
testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(3));

long appendCount = 10;
Expand Down Expand Up @@ -717,8 +711,7 @@ public void testLongTimeIdleWontFail() throws Exception {
TEST_TRACE_ID,
null,
client.getSettings(),
retrySettings,
false);
retrySettings);

long appendCount = 10;
for (int i = 0; i < appendCount * 2; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1614,8 +1614,7 @@ public void testDefaultRequestLimit()
TableInfo tableInfo =
TableInfo.newBuilder(tableId, StandardTableDefinition.of(originalSchema)).build();
bigquery.create(tableInfo);
TableName parent =
TableName.of(ServiceOptions.getDefaultProjectId(), datasetId.getDataset(), tableName);
TableName parent = TableName.of(datasetId.getProject(), datasetId.getDataset(), tableName);
try (StreamWriter streamWriter =
StreamWriter.newBuilder(parent.toString() + "/_default")
.setWriterSchema(CreateProtoSchemaWithColField())
Expand All @@ -1634,9 +1633,11 @@ public void testDefaultRequestLimit()
// This verifies that the Beam connector can consume this custom exception's grpc
// StatusCode
assertEquals(Code.INVALID_ARGUMENT, actualError.getStatus().getCode());
assertEquals(
"MessageSize is too large. Max allow: 10000000 Actual: 19922986",
actualError.getStatus().getDescription());
assertThat(
actualError
.getStatus()
.getDescription()
.contains("AppendRows request too large: 19923131 limit 10485760"));
}
}
} finally {
Expand Down

0 comments on commit 2df610d

Please sign in to comment.