Skip to content
This repository has been archived by the owner on Mar 31, 2021. It is now read-only.

Cursor integration #76

Merged
merged 15 commits into from
May 7, 2020
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ To setup a connection, the driver requires a JDBC connection URL. The connection
| ------------- |-------------| -----|---------|
| user | Connection username. mandatory if `auth` property selects a authentication scheme that mandates a username value | any string | `null` |
| password | Connection password. mandatory if `auth` property selects a authentication scheme that mandates a password value | any string | `null` |
| fetchSize | Cursor page size | positive integer value. Max value is limited by `index.max_result_window` Elasticsearch setting | `0` (for non-paginated response) |
| logOutput | location where driver logs should be emitted | a valid file path | `null` (logs are disabled) |
| logLevel | severity level for which driver logs should be emitted | in order from highest(least logging) to lowest(most logging): OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE, ALL | OFF (logs are disabled) |
| auth | authentication mechanism to use | `NONE` (no auth), `BASIC` (HTTP Basic), `AWS_SIGV4` (AWS SIGV4) | `basic` if username and/or password is specified, `NONE` otherwise |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class ConnectionImpl implements ElasticsearchConnection, JdbcWrapper, Log
private String url;
private String user;
private Logger log;
private int fetchSize;
private boolean open = false;
private Transport transport;
private Protocol protocol;
Expand All @@ -74,6 +75,7 @@ public ConnectionImpl(ConnectionConfig connectionConfig, TransportFactory transp
this.log = log;
this.url = connectionConfig.getUrl();
this.user = connectionConfig.getUser();
this.fetchSize = connectionConfig.getFetchSize();

try {
this.transport = transportFactory.getTransport(connectionConfig, log, getUserAgent());
Expand Down Expand Up @@ -101,6 +103,10 @@ public String getUser() {
return user;
}

public int getFetchSize() {
return fetchSize;
}

@Override
public Statement createStatement() throws SQLException {
log.debug(() -> logEntry("createStatement()"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,19 @@ public PreparedStatementImpl(ConnectionImpl connection, String sql, Logger log)
public ResultSet executeQuery() throws SQLException {
log.debug(() -> logEntry("executeQuery()"));
checkOpen();
ResultSet rs = executeQueryX();
ResultSet rs = executeQueryXWithFetchSize(getFetchSize());
log.debug(() -> logExit("executeQuery", rs));
return rs;
}

protected ResultSet executeQueryXWithFetchSize(int fetchSize) throws SQLException {
abbashus marked this conversation as resolved.
Show resolved Hide resolved
checkParamsFilled();
JdbcQueryRequest jdbcQueryRequest = new JdbcQueryRequest(sql, fetchSize);
jdbcQueryRequest.setParameters(Arrays.asList(parameters));
return executeQueryRequest(jdbcQueryRequest);
}


protected ResultSet executeQueryX() throws SQLException {
checkParamsFilled();
JdbcQueryRequest jdbcQueryRequest = new JdbcQueryRequest(sql);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,17 @@
import com.amazon.opendistroforelasticsearch.jdbc.protocol.ColumnDescriptor;
import com.amazon.opendistroforelasticsearch.jdbc.internal.JdbcWrapper;
import com.amazon.opendistroforelasticsearch.jdbc.protocol.QueryResponse;
import com.amazon.opendistroforelasticsearch.jdbc.protocol.exceptions.InternalServerErrorException;
import com.amazon.opendistroforelasticsearch.jdbc.protocol.exceptions.ResponseException;
import com.amazon.opendistroforelasticsearch.jdbc.protocol.http.JdbcCursorQueryRequest;
import com.amazon.opendistroforelasticsearch.jdbc.protocol.http.JsonCursorHttpProtocol;
import com.amazon.opendistroforelasticsearch.jdbc.protocol.http.JsonCursorHttpProtocolFactory;
import com.amazon.opendistroforelasticsearch.jdbc.transport.http.HttpTransport;
import com.amazon.opendistroforelasticsearch.jdbc.types.TypeConverter;
import com.amazon.opendistroforelasticsearch.jdbc.types.TypeConverters;
import com.amazon.opendistroforelasticsearch.jdbc.types.UnrecognizedElasticsearchTypeException;

import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.math.BigDecimal;
Expand Down Expand Up @@ -71,18 +78,24 @@ public class ResultSetImpl implements ResultSet, JdbcWrapper, LoggingSource {

private StatementImpl statement;
protected Cursor cursor;
private String cursorId;
private boolean open = false;
private boolean wasNull = false;
private boolean afterLast = false;
private boolean beforeFirst = true;
private Logger log;

public ResultSetImpl(StatementImpl statement, QueryResponse queryResponse, Logger log) throws SQLException {
this(statement, queryResponse.getColumnDescriptors(), queryResponse.getDatarows(), log);
this(statement, queryResponse.getColumnDescriptors(), queryResponse.getDatarows(), queryResponse.getCursor(), log);
}

public ResultSetImpl(StatementImpl statement, List<? extends ColumnDescriptor> columnDescriptors,
List<List<Object>> dataRows, Logger log) throws SQLException {
this(statement, columnDescriptors, dataRows, null, log);
}

public ResultSetImpl(StatementImpl statement, List<? extends ColumnDescriptor> columnDescriptors,
List<List<Object>> dataRows, String cursorId, Logger log) throws SQLException {
this.statement = statement;
this.log = log;

Expand All @@ -93,12 +106,10 @@ public ResultSetImpl(StatementImpl statement, List<? extends ColumnDescriptor> c
.map(ColumnMetaData::new)
.collect(Collectors.toList()));

List<Row> rows = dataRows
.parallelStream()
.map(Row::new)
.collect(Collectors.toList());
List<Row> rows = getRowsFromDataRows(dataRows);

this.cursor = new Cursor(schema, rows);
this.cursorId = cursorId;
this.open = true;

} catch (UnrecognizedElasticsearchTypeException ex) {
Expand All @@ -112,15 +123,55 @@ public boolean next() throws SQLException {
log.debug(() -> logEntry("next()"));
checkOpen();
boolean next = cursor.next();

if (!next && this.cursorId != null) {
abbashus marked this conversation as resolved.
Show resolved Hide resolved
// TODO: add debug logs around here
buildNextPageFromCursorId();
next = cursor.next();
abbashus marked this conversation as resolved.
Show resolved Hide resolved
}

if (next) {
beforeFirst = false;
} else {
afterLast = true;
}
log.debug(() -> logExit("next", next));
boolean finalNext = next;
log.debug(() -> logExit("next", finalNext));
return next;
}


protected void buildNextPageFromCursorId() throws SQLException {
try {
JdbcCursorQueryRequest jdbcCursorQueryRequest = new JdbcCursorQueryRequest(this.cursorId);
JsonCursorHttpProtocolFactory protocolFactory = JsonCursorHttpProtocolFactory.INSTANCE;
ConnectionImpl connection = (ConnectionImpl) statement.getConnection();

JsonCursorHttpProtocol protocol = protocolFactory.getProtocol(null, (HttpTransport) connection.getTransport());
QueryResponse queryResponse = protocol.execute(jdbcCursorQueryRequest);

if (queryResponse.getError() != null) {
throw new InternalServerErrorException(
queryResponse.getError().getReason(),
queryResponse.getError().getType(),
queryResponse.getError().getDetails());
}

cursor = new Cursor(cursor.getSchema(), getRowsFromDataRows(queryResponse.getDatarows()));
cursorId = queryResponse.getCursor();

} catch (ResponseException | IOException ex) {
logAndThrowSQLException(log, new SQLException("Error executing cursor query", ex));
}
}

private List<Row> getRowsFromDataRows(List<List<Object>> dataRows) {
return dataRows
.parallelStream()
.map(Row::new)
.collect(Collectors.toList());
}

@Override
public void close() throws SQLException {
log.debug(() -> logEntry("close()"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.amazon.opendistroforelasticsearch.jdbc.protocol.exceptions.InternalServerErrorException;
import com.amazon.opendistroforelasticsearch.jdbc.protocol.QueryResponse;
import com.amazon.opendistroforelasticsearch.jdbc.protocol.exceptions.ResponseException;
import com.amazon.opendistroforelasticsearch.jdbc.protocol.http.JsonQueryResponse;

import java.io.IOException;
import java.sql.Connection;
Expand All @@ -38,20 +39,22 @@ public class StatementImpl implements Statement, JdbcWrapper, LoggingSource {

protected ConnectionImpl connection;
protected boolean open = false;
protected int fetchSize;
protected ResultSetImpl resultSet;
protected Logger log;
private boolean closeOnCompletion;

public StatementImpl(ConnectionImpl connection, Logger log) {
this.connection = connection;
this.open = true;
this.fetchSize = connection.getFetchSize();
this.log = log;
}

@Override
public ResultSet executeQuery(String sql) throws SQLException {
log.debug(()-> logEntry("executeQuery (%s)", sql));
ResultSet rs = executeQueryX(sql);
ResultSet rs = executeQueryXWithFetchSize(sql, fetchSize);
log.debug(()-> logExit("executeQuery", rs));
return rs;
}
Expand All @@ -61,6 +64,11 @@ protected ResultSet executeQueryX(String sql) throws SQLException {
return executeQueryRequest(jdbcQueryRequest);
}

protected ResultSet executeQueryXWithFetchSize(String sql, int fetchSize) throws SQLException {
JdbcQueryRequest jdbcQueryRequest = new JdbcQueryRequest(sql, fetchSize);
return executeQueryRequest(jdbcQueryRequest);
}

protected ResultSet executeQueryRequest(JdbcQueryRequest jdbcQueryRequest) throws SQLException {

// JDBC Spec: A ResultSet object is automatically closed when the Statement
Expand Down Expand Up @@ -205,12 +213,12 @@ public int getFetchDirection() throws SQLException {

@Override
public void setFetchSize(int rows) throws SQLException {

fetchSize = rows;
}

@Override
public int getFetchSize() throws SQLException {
return 0;
return fetchSize;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class ConnectionConfig {
private String url;
private String host;
private int port;
private int fetchSize;
private String path;
private boolean useSSL;
private int loginTimeout;
Expand All @@ -60,6 +61,7 @@ private ConnectionConfig(Builder builder) {
this.url = builder.getUrl();
this.host = builder.getHostProperty().getValue();
this.port = builder.getPortProperty().getValue();
this.fetchSize = builder.getFetchSizeProperty().getValue();
this.path = builder.getPathProperty().getValue();
this.useSSL = builder.getUseSSLProperty().getValue();

Expand Down Expand Up @@ -106,6 +108,10 @@ public int getPort() {
return port;
}

public int getFetchSize() {
return fetchSize;
}

public String getPath() {
return path;
}
Expand Down Expand Up @@ -192,6 +198,7 @@ public String toString() {
"url='" + url + '\'' +
", host='" + host + '\'' +
", port=" + port +
", fetchSize=" + fetchSize +
", path='" + path + '\'' +
", useSSL=" + useSSL +
", loginTimeout=" + loginTimeout +
Expand Down Expand Up @@ -223,6 +230,7 @@ public static class Builder {

private HostConnectionProperty hostProperty = new HostConnectionProperty();
private PortConnectionProperty portProperty = new PortConnectionProperty();
private FetchSizeProperty fetchSizeProperty = new FetchSizeProperty();
private LoginTimeoutConnectionProperty loginTimeoutProperty = new LoginTimeoutConnectionProperty();
private UseSSLConnectionProperty useSSLProperty = new UseSSLConnectionProperty();
private PathConnectionProperty pathProperty = new PathConnectionProperty();
Expand Down Expand Up @@ -261,6 +269,7 @@ public static class Builder {
ConnectionProperty[] connectionProperties = new ConnectionProperty[]{
hostProperty,
portProperty,
fetchSizeProperty,
loginTimeoutProperty,
useSSLProperty,
pathProperty,
Expand Down Expand Up @@ -302,6 +311,10 @@ public PortConnectionProperty getPortProperty() {
return portProperty;
}

public FetchSizeProperty getFetchSizeProperty() {
return fetchSizeProperty;
}

public LoginTimeoutConnectionProperty getLoginTimeoutProperty() {
return loginTimeoutProperty;
}
Expand Down Expand Up @@ -519,6 +532,11 @@ private void validateConfig() throws ConnectionPropertyException {
// change the default port to use to 443
portProperty.setRawValue(443);
}

if (fetchSizeProperty.getValue() < 0) {
throw new ConnectionPropertyException(fetchSizeProperty.getKey(),
"Cursor fetch size value should be greater or equal to zero");
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.amazon.opendistroforelasticsearch.jdbc.config;

public class FetchSizeProperty extends IntConnectionProperty{

public static final String KEY = "fetchSize";

public FetchSizeProperty() {
super(KEY);
}

public Integer getDefault() {
return 0;
abbashus marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,19 @@
public class JdbcQueryRequest implements QueryRequest {

String statement;

int fetchSize;
abbashus marked this conversation as resolved.
Show resolved Hide resolved
List<JdbcQueryParam> parameters;

public JdbcQueryRequest(String sql) {
this.statement = sql;
}

public JdbcQueryRequest(String sql, int fetchSize) {
this.statement = sql;
this.fetchSize = fetchSize;
}


@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down Expand Up @@ -59,13 +65,14 @@ public void setParameters(List<JdbcQueryParam> parameters) {

@Override
public int getFetchSize() {
return 0;
return fetchSize;
}

@Override
public String toString() {
return "JdbcQueryRequest{" +
"statement='" + statement + '\'' +
", fetchSize='" + fetchSize + '\'' +
", parameters=" + parameters +
'}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,7 @@ public interface QueryResponse {

int getStatus();

String getCursor();

RequestError getError();
}
Loading