Skip to content

Commit

Permalink
Merge pull request #429 from hampelratte/parameterbinding
Browse files Browse the repository at this point in the history
Parameter binding in InfluxQL (#274)
  • Loading branch information
majst01 authored Mar 21, 2018
2 parents 90f02ea + 2fc928e commit aca3701
Show file tree
Hide file tree
Showing 7 changed files with 309 additions and 9 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## 2.10 [unreleased]

### Features

- Support for parameter binding in queries ("prepared statements") [PR #429](https://github.com/influxdata/influxdb-java/pull/429)

## 2.9 [2018-02-27]

### Features
Expand Down
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,22 @@ this.influxDB.query(new Query("SELECT idle FROM cpu", dbName), queryResult -> {
});
```

#### Query using parameter binding ("prepared statements", version 2.10+ required)

If your Query is based on user input, it is good practice to use parameter binding to avoid [injection attacks](https://en.wikipedia.org/wiki/SQL_injection).
You can create queries with parameter binding with the help of the QueryBuilder:

```java
Query query = QueryBuilder.newQuery("SELECT * FROM cpu WHERE idle > $idle AND system > $system")
.forDatabase(dbName)
.bind("idle", 90)
.bind("system", 5)
.create();
QueryResult results = influxDB.query(query);
```

The values of the bind() calls are bound to the placeholders in the query ($idle, $system).

#### Batch flush interval jittering (version 2.9+ required)

When using large number of influxdb-java clients against a single server it may happen that all the clients
Expand Down
101 changes: 101 additions & 0 deletions src/main/java/org/influxdb/dto/BoundParameterQuery.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package org.influxdb.dto;

import com.squareup.moshi.JsonWriter;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import org.influxdb.InfluxDBIOException;

import okio.Buffer;

public final class BoundParameterQuery extends Query {

private final Map<String, Object> params = new HashMap<>();

private BoundParameterQuery(final String command, final String database) {
super(command, database, true);
}

public String getParameterJsonWithUrlEncoded() {
try {
String jsonParameterObject = createJsonObject(params);
String urlEncodedJsonParameterObject = encode(jsonParameterObject);
return urlEncodedJsonParameterObject;
} catch (IOException e) {
throw new InfluxDBIOException(e);
}
}

private String createJsonObject(final Map<String, Object> parameterMap) throws IOException {
Buffer b = new Buffer();
JsonWriter writer = JsonWriter.of(b);
writer.beginObject();
for (Entry<String, Object> pair : parameterMap.entrySet()) {
String name = pair.getKey();
Object value = pair.getValue();
if (value instanceof Number) {
Number number = (Number) value;
writer.name(name).value(number);
} else if (value instanceof String) {
writer.name(name).value((String) value);
} else if (value instanceof Boolean) {
writer.name(name).value((Boolean) value);
} else {
writer.name(name).value(String.valueOf(value));
}
}
writer.endObject();
return b.readString(Charset.forName("utf-8"));
}

@Override
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + params.hashCode();
return result;
}

@Override
public boolean equals(final Object obj) {
if (this == obj) {
return true;
}
if (!super.equals(obj)) {
return false;
}
BoundParameterQuery other = (BoundParameterQuery) obj;
if (!params.equals(other.params)) {
return false;
}
return true;
}

public static class QueryBuilder {
private BoundParameterQuery query;
private String influxQL;

public static QueryBuilder newQuery(final String influxQL) {
QueryBuilder instance = new QueryBuilder();
instance.influxQL = influxQL;
return instance;
}

public QueryBuilder forDatabase(final String database) {
query = new BoundParameterQuery(influxQL, database);
return this;
}

public QueryBuilder bind(final String placeholder, final Object value) {
query.params.put(placeholder, value);
return this;
}

public BoundParameterQuery create() {
return query;
}
}
}
43 changes: 34 additions & 9 deletions src/main/java/org/influxdb/impl/InfluxDBImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.influxdb.InfluxDBException;
import org.influxdb.InfluxDBIOException;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.BoundParameterQuery;
import org.influxdb.dto.Point;
import org.influxdb.dto.Pong;
import org.influxdb.dto.Query;
Expand Down Expand Up @@ -454,8 +455,16 @@ public void query(final Query query, final int chunkSize, final Consumer<QueryRe
throw new UnsupportedOperationException("chunking not supported");
}

Call<ResponseBody> call = this.influxDBService.query(this.username, this.password,
query.getDatabase(), query.getCommandWithUrlEncoded(), chunkSize);
Call<ResponseBody> call = null;
if (query instanceof BoundParameterQuery) {
BoundParameterQuery boundParameterQuery = (BoundParameterQuery) query;
call = this.influxDBService.query(this.username, this.password,
query.getDatabase(), query.getCommandWithUrlEncoded(), chunkSize,
boundParameterQuery.getParameterJsonWithUrlEncoded());
} else {
call = this.influxDBService.query(this.username, this.password,
query.getDatabase(), query.getCommandWithUrlEncoded(), chunkSize);
}

call.enqueue(new Callback<ResponseBody>() {
@Override
Expand Down Expand Up @@ -496,8 +505,17 @@ public void onFailure(final Call<ResponseBody> call, final Throwable t) {
*/
@Override
public QueryResult query(final Query query, final TimeUnit timeUnit) {
return execute(this.influxDBService.query(this.username, this.password, query.getDatabase(),
TimeUtil.toTimePrecision(timeUnit), query.getCommandWithUrlEncoded()));
Call<QueryResult> call = null;
if (query instanceof BoundParameterQuery) {
BoundParameterQuery boundParameterQuery = (BoundParameterQuery) query;
call = this.influxDBService.query(this.username, this.password, query.getDatabase(),
TimeUtil.toTimePrecision(timeUnit), query.getCommandWithUrlEncoded(),
boundParameterQuery.getParameterJsonWithUrlEncoded());
} else {
call = this.influxDBService.query(this.username, this.password, query.getDatabase(),
TimeUtil.toTimePrecision(timeUnit), query.getCommandWithUrlEncoded());
}
return execute(call);
}

/**
Expand Down Expand Up @@ -560,12 +578,19 @@ public boolean databaseExists(final String name) {
*/
private Call<QueryResult> callQuery(final Query query) {
Call<QueryResult> call;
if (query.requiresPost()) {
call = this.influxDBService.postQuery(this.username,
this.password, query.getDatabase(), query.getCommandWithUrlEncoded());
if (query instanceof BoundParameterQuery) {
BoundParameterQuery boundParameterQuery = (BoundParameterQuery) query;
call = this.influxDBService.postQuery(this.username,
this.password, query.getDatabase(), query.getCommandWithUrlEncoded(),
boundParameterQuery.getParameterJsonWithUrlEncoded());
} else {
call = this.influxDBService.query(this.username,
this.password, query.getDatabase(), query.getCommandWithUrlEncoded());
if (query.requiresPost()) {
call = this.influxDBService.postQuery(this.username,
this.password, query.getDatabase(), query.getCommandWithUrlEncoded());
} else {
call = this.influxDBService.query(this.username,
this.password, query.getDatabase(), query.getCommandWithUrlEncoded());
}
}
return call;
}
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/org/influxdb/impl/InfluxDBService.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ interface InfluxDBService {
public static final String Q = "q";
public static final String DB = "db";
public static final String RP = "rp";
public static final String PARAMS = "params";
public static final String PRECISION = "precision";
public static final String CONSISTENCY = "consistency";
public static final String EPOCH = "epoch";
Expand Down Expand Up @@ -47,6 +48,11 @@ public Call<ResponseBody> writePoints(@Query(U) String username,
public Call<QueryResult> query(@Query(U) String username, @Query(P) String password, @Query(DB) String db,
@Query(EPOCH) String epoch, @Query(value = Q, encoded = true) String query);

@POST("/query")
public Call<QueryResult> query(@Query(U) String username, @Query(P) String password, @Query(DB) String db,
@Query(EPOCH) String epoch, @Query(value = Q, encoded = true) String query,
@Query(value = PARAMS, encoded = true) String params);

@GET("/query")
public Call<QueryResult> query(@Query(U) String username, @Query(P) String password, @Query(DB) String db,
@Query(value = Q, encoded = true) String query);
Expand All @@ -55,6 +61,10 @@ public Call<QueryResult> query(@Query(U) String username, @Query(P) String passw
public Call<QueryResult> postQuery(@Query(U) String username, @Query(P) String password, @Query(DB) String db,
@Query(value = Q, encoded = true) String query);

@POST("/query")
public Call<QueryResult> postQuery(@Query(U) String username, @Query(P) String password, @Query(DB) String db,
@Query(value = Q, encoded = true) String query, @Query(value = PARAMS, encoded = true) String params);

@GET("/query")
public Call<QueryResult> query(@Query(U) String username, @Query(P) String password,
@Query(value = Q, encoded = true) String query);
Expand All @@ -68,4 +78,10 @@ public Call<QueryResult> postQuery(@Query(U) String username,
public Call<ResponseBody> query(@Query(U) String username,
@Query(P) String password, @Query(DB) String db, @Query(value = Q, encoded = true) String query,
@Query(CHUNK_SIZE) int chunkSize);

@Streaming
@POST("/query?chunked=true")
public Call<ResponseBody> query(@Query(U) String username,
@Query(P) String password, @Query(DB) String db, @Query(value = Q, encoded = true) String query,
@Query(CHUNK_SIZE) int chunkSize, @Query(value = PARAMS, encoded = true) String params);
}
45 changes: 45 additions & 0 deletions src/test/java/org/influxdb/InfluxDBTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

import org.influxdb.InfluxDB.LogLevel;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.BoundParameterQuery.QueryBuilder;
import org.influxdb.dto.Point;
import org.influxdb.dto.Pong;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.influxdb.dto.QueryResult.Series;
import org.influxdb.impl.InfluxDBImpl;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -89,6 +91,49 @@ public void testQuery() {
this.influxDB.query(new Query("DROP DATABASE mydb2", "mydb"));
}

@Test
public void testBoundParameterQuery() throws InterruptedException {
// set up
Point point = Point
.measurement("cpu")
.tag("atag", "test")
.addField("idle", 90L)
.addField("usertime", 9L)
.addField("system", 1L)
.build();
this.influxDB.setDatabase(UDP_DATABASE);
this.influxDB.write(point);

// test
Query query = QueryBuilder.newQuery("SELECT * FROM cpu WHERE atag = $atag")
.forDatabase(UDP_DATABASE)
.bind("atag", "test")
.create();
QueryResult result = this.influxDB.query(query);
Assertions.assertTrue(result.getResults().get(0).getSeries().size() == 1);
Series series = result.getResults().get(0).getSeries().get(0);
Assertions.assertTrue(series.getValues().size() == 1);

result = this.influxDB.query(query, TimeUnit.SECONDS);
Assertions.assertTrue(result.getResults().get(0).getSeries().size() == 1);
series = result.getResults().get(0).getSeries().get(0);
Assertions.assertTrue(series.getValues().size() == 1);

Object waitForTestresults = new Object();
Consumer<QueryResult> check = (queryResult) -> {
Assertions.assertTrue(queryResult.getResults().get(0).getSeries().size() == 1);
Series s = queryResult.getResults().get(0).getSeries().get(0);
Assertions.assertTrue(s.getValues().size() == 1);
synchronized (waitForTestresults) {
waitForTestresults.notifyAll();
}
};
this.influxDB.query(query, 10, check);
synchronized (waitForTestresults) {
waitForTestresults.wait(2000);
}
}

/**
* Tests for callback query.
*/
Expand Down
Loading

0 comments on commit aca3701

Please sign in to comment.