Skip to content

Commit abc9729

Browse files
committedMar 4, 2024
feat: custom headers are also supported for the query (gRPC request)
1 parent 5861cf4 commit abc9729

File tree

3 files changed

+212
-25
lines changed

3 files changed

+212
-25
lines changed
 

‎src/main/java/com/influxdb/v3/client/config/ClientConfig.java

+24-7
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,12 @@
5858
* </li>
5959
* <li><code>proxy</code> - HTTP proxy selector</li>
6060
* <li><code>authenticator</code> - HTTP proxy authenticator</li>
61-
* <li><code>headers</code> - set of HTTP headers to be added to requests</li>
61+
* <li><code>headers</code> - headers to be added to requests</li>
6262
* </ul>
6363
* <p>
6464
* If you want to create a client with custom configuration, you can use following code:
6565
* <pre>
66-
* ClientConfig config = new Config.Builder()
66+
* ClientConfig config = new ClientConfig.Builder()
6767
* .host("https://us-east-1-1.aws.cloud2.influxdata.com")
6868
* .token("my-token".toCharArray())
6969
* .database("my-database")
@@ -217,9 +217,9 @@ public Authenticator getAuthenticator() {
217217
}
218218

219219
/**
220-
* Gets custom HTTP headers.
220+
* Gets custom headers for requests.
221221
*
222-
* @return the HTTP headers
222+
* @return the headers
223223
*/
224224
@Nullable
225225
public Map<String, String> getHeaders() {
@@ -465,9 +465,26 @@ public Builder authenticator(@Nullable final Authenticator authenticator) {
465465
}
466466

467467
/**
468-
* Sets the custom HTTP headers that will be included in requests.
468+
* Sets the custom headers that will be added to requests. This is useful for adding custom headers to requests,
469+
* such as tracing headers. To add custom headers use following code:
470+
* <pre>
471+
* ClientConfig config = new ClientConfig.Builder()
472+
* .host("https://us-east-1-1.aws.cloud2.influxdata.com")
473+
* .token("my-token".toCharArray())
474+
* .database("my-database")
475+
* .headers(Map.of("X-Tracing-Id", "123"))
476+
* .build();
469477
*
470-
* @param headers Set of HTTP headers.
478+
* try (InfluxDBClient client = InfluxDBClient.getInstance(config)) {
479+
* //
480+
* // your code here
481+
* //
482+
* } catch (Exception e) {
483+
* throw new RuntimeException(e);
484+
* }
485+
* </pre>
486+
*
487+
* @param headers the headers to be added to requests
471488
* @return this
472489
*/
473490
@Nonnull
@@ -526,7 +543,7 @@ public ClientConfig build(@Nonnull final String connectionString) throws Malform
526543
/**
527544
* Build an instance of {@code ClientConfig} from environment variables and/or system properties.
528545
*
529-
* @param env environment variables
546+
* @param env environment variables
530547
* @param properties system properties
531548
* @return the configuration for an {@code InfluxDBClient}.
532549
*/

‎src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java

+40-18
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.stream.Stream;
3636
import java.util.stream.StreamSupport;
3737
import javax.annotation.Nonnull;
38+
import javax.annotation.Nullable;
3839

3940
import com.fasterxml.jackson.core.JsonProcessingException;
4041
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -60,32 +61,30 @@ final class FlightSqlClient implements AutoCloseable {
6061
private final ObjectMapper objectMapper = new ObjectMapper();
6162

6263
FlightSqlClient(@Nonnull final ClientConfig config) {
64+
this(config, null);
65+
}
66+
67+
/**
68+
* Constructor for testing purposes.
69+
*
70+
* @param config the client configuration
71+
* @param client the flight client, if null a new client will be created
72+
*/
73+
FlightSqlClient(@Nonnull final ClientConfig config, @Nullable final FlightClient client) {
6374
Arguments.checkNotNull(config, "config");
6475

6576
MetadataAdapter metadata = new MetadataAdapter(new Metadata());
6677
if (config.getToken() != null && config.getToken().length > 0) {
6778
metadata.insert("Authorization", "Bearer " + new String(config.getToken()));
6879
}
69-
70-
this.headers = new HeaderCallOption(metadata);
71-
72-
Location location;
73-
try {
74-
URI uri = new URI(config.getHost());
75-
if ("https".equals(uri.getScheme())) {
76-
location = Location.forGrpcTls(uri.getHost(), uri.getPort() != -1 ? uri.getPort() : 443);
77-
} else {
78-
location = Location.forGrpcInsecure(uri.getHost(), uri.getPort() != -1 ? uri.getPort() : 80);
80+
if (config.getHeaders() != null) {
81+
for (Map.Entry<String, String> entry : config.getHeaders().entrySet()) {
82+
metadata.insert(entry.getKey(), entry.getValue());
7983
}
80-
} catch (URISyntaxException e) {
81-
throw new RuntimeException(e);
8284
}
8385

84-
client = FlightClient.builder()
85-
.location(location)
86-
.allocator(new RootAllocator(Long.MAX_VALUE))
87-
.verifyServer(!config.getDisableServerCertificateValidation())
88-
.build();
86+
this.headers = new HeaderCallOption(metadata);
87+
this.client = (client != null) ? client : createFlightClient(config);
8988
}
9089

9190
@Nonnull
@@ -100,7 +99,7 @@ Stream<VectorSchemaRoot> execute(@Nonnull final String query,
10099
put("query_type", queryType.name().toLowerCase());
101100
}};
102101

103-
if (queryParameters.size() > 0) {
102+
if (!queryParameters.isEmpty()) {
104103
ticketData.put("params", queryParameters);
105104
}
106105

@@ -124,6 +123,29 @@ public void close() throws Exception {
124123
client.close();
125124
}
126125

126+
@Nonnull
127+
private FlightClient createFlightClient(@Nonnull final ClientConfig config) {
128+
final FlightClient client;
129+
Location location;
130+
try {
131+
URI uri = new URI(config.getHost());
132+
if ("https".equals(uri.getScheme())) {
133+
location = Location.forGrpcTls(uri.getHost(), uri.getPort() != -1 ? uri.getPort() : 443);
134+
} else {
135+
location = Location.forGrpcInsecure(uri.getHost(), uri.getPort() != -1 ? uri.getPort() : 80);
136+
}
137+
} catch (URISyntaxException e) {
138+
throw new RuntimeException(e);
139+
}
140+
141+
client = FlightClient.builder()
142+
.location(location)
143+
.allocator(new RootAllocator(Long.MAX_VALUE))
144+
.verifyServer(!config.getDisableServerCertificateValidation())
145+
.build();
146+
return client;
147+
}
148+
127149
private static final class FlightSqlIterator implements Iterator<VectorSchemaRoot>, AutoCloseable {
128150

129151
private final List<AutoCloseable> autoCloseable = new ArrayList<>();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* The MIT License
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy
5+
* of this software and associated documentation files (the "Software"), to deal
6+
* in the Software without restriction, including without limitation the rights
7+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8+
* copies of the Software, and to permit persons to whom the Software is
9+
* furnished to do so, subject to the following conditions:
10+
*
11+
* The above copyright notice and this permission notice shall be included in
12+
* all copies or substantial portions of the Software.
13+
*
14+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20+
* THE SOFTWARE.
21+
*/
22+
package com.influxdb.v3.client.internal;
23+
24+
import java.util.Map;
25+
26+
import io.grpc.internal.GrpcUtil;
27+
import org.apache.arrow.flight.CallHeaders;
28+
import org.apache.arrow.flight.CallInfo;
29+
import org.apache.arrow.flight.CallStatus;
30+
import org.apache.arrow.flight.FlightClient;
31+
import org.apache.arrow.flight.FlightClientMiddleware;
32+
import org.apache.arrow.flight.FlightServer;
33+
import org.apache.arrow.flight.Location;
34+
import org.apache.arrow.flight.NoOpFlightProducer;
35+
import org.apache.arrow.memory.RootAllocator;
36+
import org.assertj.core.api.Assertions;
37+
import org.junit.jupiter.api.AfterEach;
38+
import org.junit.jupiter.api.BeforeEach;
39+
import org.junit.jupiter.api.Test;
40+
41+
import com.influxdb.v3.client.config.ClientConfig;
42+
import com.influxdb.v3.client.query.QueryType;
43+
44+
public class FlightSqlClientTest {
45+
46+
private static final String LOCALHOST = "localhost";
47+
private final Location grpcLocation = Location.forGrpcInsecure(LOCALHOST, 0);
48+
private final String serverLocation = String.format("http://%s:%d", LOCALHOST, grpcLocation.getUri().getPort());
49+
50+
private final CallHeadersMiddleware callHeadersMiddleware = new CallHeadersMiddleware();
51+
52+
private RootAllocator allocator;
53+
private FlightServer server;
54+
private FlightClient client;
55+
56+
@BeforeEach
57+
void reset() {
58+
callHeadersMiddleware.headers = null;
59+
}
60+
61+
@BeforeEach
62+
void setUp() throws Exception {
63+
allocator = new RootAllocator(Long.MAX_VALUE);
64+
server = FlightServer.builder(allocator, grpcLocation, new NoOpFlightProducer()).build().start();
65+
client = FlightClient.builder(allocator, server.getLocation()).intercept(callHeadersMiddleware).build();
66+
callHeadersMiddleware.headers = null;
67+
}
68+
69+
@AfterEach
70+
void tearDown() throws Exception {
71+
if (client != null) {
72+
client.close();
73+
}
74+
if (server != null) {
75+
server.shutdown();
76+
server.awaitTermination();
77+
}
78+
if (allocator != null) {
79+
allocator.close();
80+
}
81+
}
82+
83+
@Test
84+
public void callHeaders() throws Exception {
85+
ClientConfig clientConfig = new ClientConfig.Builder()
86+
.host(serverLocation)
87+
.token("my-token".toCharArray())
88+
.build();
89+
90+
try (FlightSqlClient flightSqlClient = new FlightSqlClient(clientConfig, client)) {
91+
92+
flightSqlClient.execute("select * from cpu", "mydb", QueryType.SQL, Map.of());
93+
94+
final CallHeaders incomingHeaders = callHeadersMiddleware.headers;
95+
96+
Assertions.assertThat(incomingHeaders.keys()).containsOnly(
97+
"authorization",
98+
GrpcUtil.MESSAGE_ACCEPT_ENCODING
99+
);
100+
Assertions.assertThat(incomingHeaders.get("authorization")).isEqualTo("Bearer my-token");
101+
}
102+
}
103+
104+
@Test
105+
public void callHeadersCustomHeader() throws Exception {
106+
ClientConfig clientConfig = new ClientConfig.Builder()
107+
.host(serverLocation)
108+
.token("my-token".toCharArray())
109+
.headers(Map.of("X-Tracing-Id", "123"))
110+
.build();
111+
112+
try (FlightSqlClient flightSqlClient = new FlightSqlClient(clientConfig, client)) {
113+
114+
flightSqlClient.execute("select * from cpu", "mydb", QueryType.SQL, Map.of());
115+
116+
final CallHeaders incomingHeaders = callHeadersMiddleware.headers;
117+
118+
Assertions.assertThat(incomingHeaders.keys()).containsOnly(
119+
"authorization",
120+
"x-tracing-id",
121+
GrpcUtil.MESSAGE_ACCEPT_ENCODING
122+
);
123+
Assertions.assertThat(incomingHeaders.get("X-Tracing-Id")).isEqualTo("123");
124+
}
125+
}
126+
127+
static class CallHeadersMiddleware implements FlightClientMiddleware.Factory {
128+
CallHeaders headers;
129+
130+
@Override
131+
public FlightClientMiddleware onCallStarted(final CallInfo info) {
132+
return new FlightClientMiddleware() {
133+
@Override
134+
public void onBeforeSendingHeaders(final CallHeaders outgoingHeaders) {
135+
headers = outgoingHeaders;
136+
}
137+
138+
@Override
139+
public void onHeadersReceived(final CallHeaders incomingHeaders) {
140+
}
141+
142+
@Override
143+
public void onCallCompleted(final CallStatus status) {
144+
}
145+
};
146+
}
147+
}
148+
}

0 commit comments

Comments
 (0)