Skip to content
This repository has been archived by the owner on Mar 31, 2021. It is now read-only.

Cursor integration #76

Merged
merged 15 commits into from
May 7, 2020
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ To setup a connection, the driver requires a JDBC connection URL. The connection
| ------------- |-------------| -----|---------|
| user | Connection username. mandatory if `auth` property selects a authentication scheme that mandates a username value | any string | `null` |
| password | Connection password. mandatory if `auth` property selects a authentication scheme that mandates a password value | any string | `null` |
| fetchSize | Cursor page size | positive integer value. Max value is limited by `index.max_result_window` Elasticsearch setting | `0` (for non-paginated response) |
| logOutput | location where driver logs should be emitted | a valid file path | `null` (logs are disabled) |
| logLevel | severity level for which driver logs should be emitted | in order from highest(least logging) to lowest(most logging): OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE, ALL | OFF (logs are disabled) |
| auth | authentication mechanism to use | `NONE` (no auth), `BASIC` (HTTP Basic), `AWS_SIGV4` (AWS SIGV4) | `basic` if username and/or password is specified, `NONE` otherwise |
Expand Down
22 changes: 22 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,25 @@ signing {
sign publishing.publications.shadow
}

jacoco {
toolVersion = "0.8.3"
}

jacocoTestReport {
reports {
html.enabled true
}
}
test.finalizedBy(project.tasks.jacocoTestReport)

jacocoTestCoverageVerification {
violationRules {
rule {
limit {
minimum = 0.4
}
}
}
}

check.dependsOn jacocoTestCoverageVerification
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class ConnectionImpl implements ElasticsearchConnection, JdbcWrapper, Log
private String url;
private String user;
private Logger log;
private int fetchSize;
private boolean open = false;
private Transport transport;
private Protocol protocol;
Expand All @@ -74,6 +75,7 @@ public ConnectionImpl(ConnectionConfig connectionConfig, TransportFactory transp
this.log = log;
this.url = connectionConfig.getUrl();
this.user = connectionConfig.getUser();
this.fetchSize = connectionConfig.getFetchSize();

try {
this.transport = transportFactory.getTransport(connectionConfig, log, getUserAgent());
Expand Down Expand Up @@ -101,6 +103,10 @@ public String getUser() {
return user;
}

public int getFetchSize() {
return fetchSize;
}

@Override
public Statement createStatement() throws SQLException {
log.debug(() -> logEntry("createStatement()"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,14 @@ public PreparedStatementImpl(ConnectionImpl connection, String sql, Logger log)
public ResultSet executeQuery() throws SQLException {
log.debug(() -> logEntry("executeQuery()"));
checkOpen();
ResultSet rs = executeQueryX();
ResultSet rs = executeQueryX(getFetchSize());
log.debug(() -> logExit("executeQuery", rs));
return rs;
}

protected ResultSet executeQueryX() throws SQLException {
protected ResultSet executeQueryX(int fetchSize) throws SQLException {
checkParamsFilled();
JdbcQueryRequest jdbcQueryRequest = new JdbcQueryRequest(sql);
JdbcQueryRequest jdbcQueryRequest = new JdbcQueryRequest(sql, fetchSize);
jdbcQueryRequest.setParameters(Arrays.asList(parameters));
return executeQueryRequest(jdbcQueryRequest);
}
Expand Down Expand Up @@ -293,7 +293,7 @@ private int javaToSqlType(Object x) throws SQLException {
public boolean execute() throws SQLException {
log.debug(() -> logEntry("execute()"));
checkOpen();
executeQueryX();
executeQueryX(getFetchSize());
log.debug(() -> logExit("execute", true));
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,17 @@
import com.amazon.opendistroforelasticsearch.jdbc.protocol.ColumnDescriptor;
import com.amazon.opendistroforelasticsearch.jdbc.internal.JdbcWrapper;
import com.amazon.opendistroforelasticsearch.jdbc.protocol.QueryResponse;
import com.amazon.opendistroforelasticsearch.jdbc.protocol.exceptions.InternalServerErrorException;
import com.amazon.opendistroforelasticsearch.jdbc.protocol.exceptions.ResponseException;
import com.amazon.opendistroforelasticsearch.jdbc.protocol.http.JdbcCursorQueryRequest;
import com.amazon.opendistroforelasticsearch.jdbc.protocol.http.JsonCursorHttpProtocol;
import com.amazon.opendistroforelasticsearch.jdbc.protocol.http.JsonCursorHttpProtocolFactory;
import com.amazon.opendistroforelasticsearch.jdbc.transport.http.HttpTransport;
import com.amazon.opendistroforelasticsearch.jdbc.types.TypeConverter;
import com.amazon.opendistroforelasticsearch.jdbc.types.TypeConverters;
import com.amazon.opendistroforelasticsearch.jdbc.types.UnrecognizedElasticsearchTypeException;

import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.math.BigDecimal;
Expand Down Expand Up @@ -71,18 +78,24 @@ public class ResultSetImpl implements ResultSet, JdbcWrapper, LoggingSource {

private StatementImpl statement;
protected Cursor cursor;
private String cursorId;
private boolean open = false;
private boolean wasNull = false;
private boolean afterLast = false;
private boolean beforeFirst = true;
private Logger log;

public ResultSetImpl(StatementImpl statement, QueryResponse queryResponse, Logger log) throws SQLException {
this(statement, queryResponse.getColumnDescriptors(), queryResponse.getDatarows(), log);
this(statement, queryResponse.getColumnDescriptors(), queryResponse.getDatarows(), queryResponse.getCursor(), log);
}

public ResultSetImpl(StatementImpl statement, List<? extends ColumnDescriptor> columnDescriptors,
List<List<Object>> dataRows, Logger log) throws SQLException {
this(statement, columnDescriptors, dataRows, null, log);
}

public ResultSetImpl(StatementImpl statement, List<? extends ColumnDescriptor> columnDescriptors,
List<List<Object>> dataRows, String cursorId, Logger log) throws SQLException {
this.statement = statement;
this.log = log;

Expand All @@ -93,12 +106,10 @@ public ResultSetImpl(StatementImpl statement, List<? extends ColumnDescriptor> c
.map(ColumnMetaData::new)
.collect(Collectors.toList()));

List<Row> rows = dataRows
.parallelStream()
.map(Row::new)
.collect(Collectors.toList());
List<Row> rows = getRowsFromDataRows(dataRows);

this.cursor = new Cursor(schema, rows);
this.cursorId = cursorId;
this.open = true;

} catch (UnrecognizedElasticsearchTypeException ex) {
Expand All @@ -112,15 +123,63 @@ public boolean next() throws SQLException {
log.debug(() -> logEntry("next()"));
checkOpen();
boolean next = cursor.next();

if (!next && this.cursorId != null) {
abbashus marked this conversation as resolved.
Show resolved Hide resolved
log.debug(() -> logEntry("buildNextPageFromCursorId()"));
buildNextPageFromCursorId();
log.debug(() -> logExit("buildNextPageFromCursorId()"));
next = cursor.next();
abbashus marked this conversation as resolved.
Show resolved Hide resolved
}

if (next) {
beforeFirst = false;
} else {
afterLast = true;
}
log.debug(() -> logExit("next", next));
boolean finalNext = next;
log.debug(() -> logExit("next", finalNext));
return next;
}

/**
* TODO: Refactor as suggested https://github.com/opendistro-for-elasticsearch/sql-jdbc/pull/76#discussion_r421571383
*
* This method has side effects. It creates a new Cursor to hold rows from new pages.
* Ideally fetching next set of rows using cursorId should be delegated to Cursor.
* In addition, the cursor should be final.
*
**/
protected void buildNextPageFromCursorId() throws SQLException {
try {
JdbcCursorQueryRequest jdbcCursorQueryRequest = new JdbcCursorQueryRequest(this.cursorId);
JsonCursorHttpProtocolFactory protocolFactory = JsonCursorHttpProtocolFactory.INSTANCE;
ConnectionImpl connection = (ConnectionImpl) statement.getConnection();

JsonCursorHttpProtocol protocol = protocolFactory.getProtocol(null, (HttpTransport) connection.getTransport());
QueryResponse queryResponse = protocol.execute(jdbcCursorQueryRequest);

if (queryResponse.getError() != null) {
throw new InternalServerErrorException(
queryResponse.getError().getReason(),
queryResponse.getError().getType(),
queryResponse.getError().getDetails());
}

cursor = new Cursor(cursor.getSchema(), getRowsFromDataRows(queryResponse.getDatarows()));
cursorId = queryResponse.getCursor();

} catch (ResponseException | IOException ex) {
logAndThrowSQLException(log, new SQLException("Error executing cursor query", ex));
}
}

private List<Row> getRowsFromDataRows(List<List<Object>> dataRows) {
return dataRows
.parallelStream()
.map(Row::new)
.collect(Collectors.toList());
}

@Override
public void close() throws SQLException {
log.debug(() -> logEntry("close()"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,26 +38,28 @@ public class StatementImpl implements Statement, JdbcWrapper, LoggingSource {

protected ConnectionImpl connection;
protected boolean open = false;
protected int fetchSize;
protected ResultSetImpl resultSet;
protected Logger log;
private boolean closeOnCompletion;

public StatementImpl(ConnectionImpl connection, Logger log) {
this.connection = connection;
this.open = true;
this.fetchSize = connection.getFetchSize();
this.log = log;
}

@Override
public ResultSet executeQuery(String sql) throws SQLException {
log.debug(()-> logEntry("executeQuery (%s)", sql));
ResultSet rs = executeQueryX(sql);
ResultSet rs = executeQueryX(sql, fetchSize);
log.debug(()-> logExit("executeQuery", rs));
return rs;
}

protected ResultSet executeQueryX(String sql) throws SQLException {
JdbcQueryRequest jdbcQueryRequest = new JdbcQueryRequest(sql);
protected ResultSet executeQueryX(String sql, int fetchSize) throws SQLException {
JdbcQueryRequest jdbcQueryRequest = new JdbcQueryRequest(sql, fetchSize);
return executeQueryRequest(jdbcQueryRequest);
}

Expand Down Expand Up @@ -167,7 +169,7 @@ public void setCursorName(String name) throws SQLException {
public boolean execute(String sql) throws SQLException {
log.debug(()->logEntry("execute (%s)", sql));
checkOpen();
executeQueryX(sql);
executeQueryX(sql, fetchSize);
log.debug(() -> logExit("execute", true));
return true;
}
Expand Down Expand Up @@ -205,12 +207,12 @@ public int getFetchDirection() throws SQLException {

@Override
public void setFetchSize(int rows) throws SQLException {

fetchSize = rows;
}

@Override
public int getFetchSize() throws SQLException {
return 0;
return fetchSize;
}

@Override
Expand Down Expand Up @@ -275,7 +277,7 @@ public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
if (autoGeneratedKeys != Statement.NO_GENERATED_KEYS) {
throw new SQLNonTransientException("Auto generated keys are not supported");
}
executeQueryX(sql);
executeQueryX(sql, fetchSize);
log.debug(() -> logExit("execute", true));
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class ConnectionConfig {
private String url;
private String host;
private int port;
private int fetchSize;
private String path;
private boolean useSSL;
private int loginTimeout;
Expand All @@ -60,6 +61,7 @@ private ConnectionConfig(Builder builder) {
this.url = builder.getUrl();
this.host = builder.getHostProperty().getValue();
this.port = builder.getPortProperty().getValue();
this.fetchSize = builder.getFetchSizeProperty().getValue();
this.path = builder.getPathProperty().getValue();
this.useSSL = builder.getUseSSLProperty().getValue();

Expand Down Expand Up @@ -106,6 +108,10 @@ public int getPort() {
return port;
}

public int getFetchSize() {
return fetchSize;
}

public String getPath() {
return path;
}
Expand Down Expand Up @@ -192,6 +198,7 @@ public String toString() {
"url='" + url + '\'' +
", host='" + host + '\'' +
", port=" + port +
", fetchSize=" + fetchSize +
", path='" + path + '\'' +
", useSSL=" + useSSL +
", loginTimeout=" + loginTimeout +
Expand Down Expand Up @@ -223,6 +230,7 @@ public static class Builder {

private HostConnectionProperty hostProperty = new HostConnectionProperty();
private PortConnectionProperty portProperty = new PortConnectionProperty();
private FetchSizeProperty fetchSizeProperty = new FetchSizeProperty();
private LoginTimeoutConnectionProperty loginTimeoutProperty = new LoginTimeoutConnectionProperty();
private UseSSLConnectionProperty useSSLProperty = new UseSSLConnectionProperty();
private PathConnectionProperty pathProperty = new PathConnectionProperty();
Expand Down Expand Up @@ -261,6 +269,7 @@ public static class Builder {
ConnectionProperty[] connectionProperties = new ConnectionProperty[]{
hostProperty,
portProperty,
fetchSizeProperty,
loginTimeoutProperty,
useSSLProperty,
pathProperty,
Expand Down Expand Up @@ -302,6 +311,10 @@ public PortConnectionProperty getPortProperty() {
return portProperty;
}

public FetchSizeProperty getFetchSizeProperty() {
return fetchSizeProperty;
}

public LoginTimeoutConnectionProperty getLoginTimeoutProperty() {
return loginTimeoutProperty;
}
Expand Down Expand Up @@ -519,6 +532,11 @@ private void validateConfig() throws ConnectionPropertyException {
// change the default port to use to 443
portProperty.setRawValue(443);
}

if (fetchSizeProperty.getValue() < 0) {
throw new ConnectionPropertyException(fetchSizeProperty.getKey(),
"Cursor fetch size value should be greater or equal to zero");
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.amazon.opendistroforelasticsearch.jdbc.config;

public class FetchSizeProperty extends IntConnectionProperty {

public static final String KEY = "fetchSize";

public FetchSizeProperty() {
super(KEY);
}
}
Loading