Skip to content

Commit

Permalink
fix: add stream name to every request when connection is created duri…
Browse files Browse the repository at this point in the history
…ng multiplexing (#2699)

* Add profiler for request execution details. The usage of the new API
will be added in the next PR

* Add profiler for request execution details. The usage of the new API
will be added in the next PR

* fix: add stream name to every append request when connection is created
during multiplexing
  • Loading branch information
GaoleMeng authored Oct 9, 2024
1 parent b27268d commit c53a77c
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,9 @@ class ConnectionWorker implements AutoCloseable {
private final RequestProfiler.RequestProfilerHook requestProfilerHook;
private final TelemetryMetrics telemetryMetrics;

/** Indicate whether this connection is created during multiplexing mode. */
private final Boolean isMultiplexing;

private static String projectMatching = "projects/[^/]+/";
private static Pattern streamPatternProject = Pattern.compile(projectMatching);

Expand Down Expand Up @@ -327,7 +330,8 @@ public ConnectionWorker(
BigQueryWriteSettings clientSettings,
RetrySettings retrySettings,
boolean enableRequestProfiler,
boolean enableOpenTelemetry)
boolean enableOpenTelemetry,
boolean isMultiplexing)
throws IOException {
this.lock = new ReentrantLock();
this.hasMessageInWaitingQueue = lock.newCondition();
Expand All @@ -353,6 +357,7 @@ public ConnectionWorker(
this.requestProfilerHook = new RequestProfiler.RequestProfilerHook(enableRequestProfiler);
this.telemetryMetrics =
new TelemetryMetrics(this, enableOpenTelemetry, getTableName(), writerId, traceId);
this.isMultiplexing = isMultiplexing;

// Always recreate a client for connection worker.
HashMap<String, String> newHeaders = new HashMap<>();
Expand Down Expand Up @@ -744,8 +749,6 @@ private void appendLoop() {
// Indicate whether we are at the first request after switching destination.
// True means the schema and other metadata are needed.
boolean firstRequestForTableOrSchemaSwitch = true;
// Represent whether we have entered multiplexing.
boolean isMultiplexing = false;

while (!waitingQueueDrained()) {
this.lock.lock();
Expand Down Expand Up @@ -848,7 +851,6 @@ private void appendLoop() {
streamName = originalRequest.getWriteStream();
telemetryMetrics.refreshOpenTelemetryTableNameAttributes(getTableName());
writerSchema = originalRequest.getProtoRows().getWriterSchema();
isMultiplexing = true;
firstRequestForTableOrSchemaSwitch = true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,8 @@ private ConnectionWorker createConnectionWorker(
clientSettings,
retrySettings,
enableRequestProfiler,
enableOpenTelemetry);
enableOpenTelemetry,
/*isMultiplexing=*/ true);
connectionWorkerPool.add(connectionWorker);
log.info(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,8 @@ private StreamWriter(Builder builder) throws IOException {
clientSettings,
builder.retrySettings,
builder.enableRequestProfiler,
builder.enableOpenTelemetry));
builder.enableOpenTelemetry,
/*isMultiplexing=*/ false));
} else {
if (!isDefaultStream(streamName)) {
log.warning(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void setUp() throws Exception {

@Test
public void testMultiplexedAppendSuccess() throws Exception {
try (ConnectionWorker connectionWorker = createConnectionWorker()) {
try (ConnectionWorker connectionWorker = createMultiplexedConnectionWorker()) {
long appendCount = 20;
for (long i = 0; i < appendCount; i++) {
testBigQueryWrite.addResponse(createAppendResponse(i));
Expand Down Expand Up @@ -150,7 +150,7 @@ public void testMultiplexedAppendSuccess() throws Exception {

// We will get the request as the pattern of:
// (writer_stream: t1, schema: t1)
// (writer_stream: _, schema: _)
// (writer_stream: t1, schema: _)
// (writer_stream: t2, schema: t2) -> multiplexing entered.
// (writer_stream: t2, schema: _)
// (writer_stream: t1, schema: t1)
Expand All @@ -164,11 +164,7 @@ public void testMultiplexedAppendSuccess() throws Exception {
break;
case 1:
// The write stream is empty until we enter multiplexing.
if (i == 1) {
assertThat(serverRequest.getWriteStream()).isEmpty();
} else {
assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1);
}
assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1);
// Schema is empty if not at the first request after table switch.
assertThat(serverRequest.getProtoRows().hasWriterSchema()).isFalse();
break;
Expand Down Expand Up @@ -198,7 +194,7 @@ public void testMultiplexedAppendSuccess() throws Exception {

@Test
public void testAppendInSameStream_switchSchema() throws Exception {
try (ConnectionWorker connectionWorker = createConnectionWorker()) {
try (ConnectionWorker connectionWorker = createMultiplexedConnectionWorker()) {
long appendCount = 20;
for (long i = 0; i < appendCount; i++) {
testBigQueryWrite.addResponse(createAppendResponse(i));
Expand Down Expand Up @@ -279,26 +275,20 @@ public void testAppendInSameStream_switchSchema() throws Exception {

// We will get the request as the pattern of:
// (writer_stream: t1, schema: schema1)
// (writer_stream: _, schema: _)
// (writer_stream: t1, schema: _)
// (writer_stream: t1, schema: schema3)
// (writer_stream: t1, schema: _)
// (writer_stream: t1, schema: schema1)
// (writer_stream: t1, schema: _)
switch (i % 4) {
case 0:
if (i == 0) {
assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1);
}
assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1);
assertThat(
serverRequest.getProtoRows().getWriterSchema().getProtoDescriptor().getName())
.isEqualTo("foo");
break;
case 1:
if (i == 1) {
assertThat(serverRequest.getWriteStream()).isEmpty();
} else {
assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1);
}
assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1);
// Schema is empty if not at the first request after table switch.
assertThat(serverRequest.getProtoRows().hasWriterSchema()).isFalse();
break;
Expand Down Expand Up @@ -346,7 +336,8 @@ public void testAppendButInflightQueueFull() throws Exception {
client.getSettings(),
retrySettings,
/*enableRequestProfiler=*/ false,
/*enableOpenTelemetry=*/ false);
/*enableOpenTelemetry=*/ false,
/*isMultiplexing=*/ false);
testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1));
ConnectionWorker.setMaxInflightQueueWaitTime(500);

Expand Down Expand Up @@ -405,7 +396,8 @@ public void testThrowExceptionWhileWithinAppendLoop() throws Exception {
client.getSettings(),
retrySettings,
/*enableRequestProfiler=*/ false,
/*enableOpenTelemetry=*/ false);
/*enableOpenTelemetry=*/ false,
/*isMultiplexing=*/ true);
testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1));
ConnectionWorker.setMaxInflightQueueWaitTime(500);

Expand Down Expand Up @@ -476,7 +468,8 @@ public void testLocationMismatch() throws Exception {
client.getSettings(),
retrySettings,
/*enableRequestProfiler=*/ false,
/*enableOpenTelemetry=*/ false);
/*enableOpenTelemetry=*/ false,
/*isMultiplexing=*/ true);
StatusRuntimeException ex =
assertThrows(
StatusRuntimeException.class,
Expand Down Expand Up @@ -510,7 +503,8 @@ public void testStreamNameMismatch() throws Exception {
client.getSettings(),
retrySettings,
/*enableRequestProfiler=*/ false,
/*enableOpenTelemetry=*/ false);
/*enableOpenTelemetry=*/ false,
/*isMultiplexing=*/ true);
StatusRuntimeException ex =
assertThrows(
StatusRuntimeException.class,
Expand Down Expand Up @@ -539,13 +533,13 @@ private AppendRowsResponse createAppendResponse(long offset) {
.build();
}

private ConnectionWorker createConnectionWorker() throws IOException {
private ConnectionWorker createMultiplexedConnectionWorker() throws IOException {
// By default use only the first table as table reference.
return createConnectionWorker(
return createMultiplexedConnectionWorker(
TEST_STREAM_1, TEST_TRACE_ID, 100, 1000, java.time.Duration.ofSeconds(5));
}

private ConnectionWorker createConnectionWorker(
private ConnectionWorker createMultiplexedConnectionWorker(
String streamName,
String traceId,
long maxRequests,
Expand All @@ -565,7 +559,8 @@ private ConnectionWorker createConnectionWorker(
client.getSettings(),
retrySettings,
/*enableRequestProfiler=*/ false,
/*enableOpenTelemetry=*/ false);
/*enableOpenTelemetry=*/ false,
/*isMultiplexing=*/ true);
}

private ProtoSchema createProtoSchema(String protoName) {
Expand Down Expand Up @@ -663,7 +658,8 @@ public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws E
client.getSettings(),
retrySettings,
/*enableRequestProfiler=*/ false,
/*enableOpenTelemetry=*/ false);
/*enableOpenTelemetry=*/ false,
/*isMultiplexing*/ false);
org.threeten.bp.Duration durationSleep = org.threeten.bp.Duration.ofSeconds(2);
testBigQueryWrite.setResponseSleep(durationSleep);

Expand Down Expand Up @@ -740,7 +736,8 @@ public void testLongTimeIdleWontFail() throws Exception {
client.getSettings(),
retrySettings,
/*enableRequestProfiler=*/ false,
/*enableOpenTelemetry=*/ false);
/*enableOpenTelemetry=*/ false,
/*isMultiplexing*/ false);

long appendCount = 10;
for (int i = 0; i < appendCount * 2; i++) {
Expand Down Expand Up @@ -787,7 +784,8 @@ private void exerciseOpenTelemetryAttributesWithStreamNames(String streamName, S
client.getSettings(),
retrySettings,
/*enableRequestProfiler=*/ false,
/*enableOpenTelemetry=*/ true);
/*enableOpenTelemetry=*/ true,
/*isMultiplexing*/ false);

Attributes attributes = connectionWorker.getTelemetryAttributes();
String attributesTableId = attributes.get(TelemetryMetrics.telemetryKeyTableId);
Expand Down Expand Up @@ -829,7 +827,8 @@ void exerciseOpenTelemetryAttributesWithTraceId(
client.getSettings(),
retrySettings,
/*enableRequestProfiler=*/ false,
/*enableOpenTelemetry=*/ true);
/*enableOpenTelemetry=*/ true,
/*isMultiplexing*/ false);

Attributes attributes = connectionWorker.getTelemetryAttributes();
checkOpenTelemetryTraceIdAttribute(attributes, 0, expectedField1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -949,11 +949,7 @@ public void testProtoSchemaPiping_multiplexingCase() throws Exception {
assertEquals(
appendRowsRequest.getProtoRows().getWriterSchema(), ProtoSchema.getDefaultInstance());
// Before entering multiplexing (i == 1) case, the write stream won't be populated.
if (i == 1) {
assertEquals(appendRowsRequest.getWriteStream(), "");
} else {
assertEquals(appendRowsRequest.getWriteStream(), TEST_STREAM_1);
}
assertEquals(appendRowsRequest.getWriteStream(), TEST_STREAM_1);
} else if (i % 4 == 2) {
assertEquals(appendRowsRequest.getProtoRows().getWriterSchema(), schema2);
assertEquals(appendRowsRequest.getWriteStream(), TEST_STREAM_2);
Expand Down

0 comments on commit c53a77c

Please sign in to comment.