Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix blocking issue with CompletableFuture #54

Merged
merged 8 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Full Documentation: https://mapepire-ibmi.github.io
<dependency>
<groupId>io.github.mapepire-ibmi</groupId>
<artifactId>mapepire-sdk</artifactId>
<version>0.0.3</version>
<version>0.0.4</version> <!-- Use the latest version -->
</dependency>
```

Expand Down Expand Up @@ -58,7 +58,7 @@ public final class App {
job.connect(creds).get();

// Initialize and execute query
Query<Object> query = job.query("SELECT * FROM SAMPLE.DEPARTMENT");
Query query = job.query("SELECT * FROM SAMPLE.DEPARTMENT");
QueryResult<Object> result = query.execute(3).get();

// Convert to JSON string and output
Expand Down
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<suppress files=".*" checks="EqualsAvoidNull" />
<suppress files=".*" checks="NoWhitespaceAfter" />
<suppress files=".*" checks="ImportOrder" />
<suppress files=".*" checks="IllegalCatch" />
<suppress files=".*" checks="HideUtilityClassConstructor" />
<suppress files="Test.java$" checks="LineLength" />
<suppress files="Test.java$" checks="IllegalCatch" />
</suppressions>
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>io.github.mapepire-ibmi</groupId>
<artifactId>mapepire-sdk</artifactId>
<version>0.0.4</version>
<version>0.0.5</version>
<packaging>jar</packaging>

<name>Mapepire SDK</name>
Expand Down
11 changes: 6 additions & 5 deletions src/main/java/io/github/mapepire_ibmi/Pool.java
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,10 @@ public CompletableFuture<SqlJob> addJob(PoolAddOptions options)
}

if (newSqlJob.getStatus() == JobStatus.NotStarted) {
newSqlJob.connect(this.options.getCreds()).get();
return newSqlJob.connect(this.options.getCreds())
.thenApply(v -> {
return newSqlJob;
});
}

return CompletableFuture.completedFuture(newSqlJob);
Expand Down Expand Up @@ -324,7 +327,6 @@ public CompletableFuture<SqlJob> popJob()
/**
* Create a Query object using a job from the pool.
*
* @param <T> The type of data to be returned.
* @param sql The SQL query.
* @return A new Query instance.
* @throws UnknownServerException
Expand All @@ -337,7 +339,7 @@ public CompletableFuture<SqlJob> popJob()
* @throws KeyManagementException
* @throws JsonMappingException
*/
public <T> Query<T> query(String sql)
public Query query(String sql)
throws JsonMappingException, KeyManagementException, JsonProcessingException, NoSuchAlgorithmException,
InterruptedException, ExecutionException, URISyntaxException, SQLException, UnknownServerException {
QueryOptions queryOptions = new QueryOptions();
Expand All @@ -347,7 +349,6 @@ public <T> Query<T> query(String sql)
/**
* Create a Query object using a job from the pool.
*
* @param <T> The type of data to be returned.
* @param sql The SQL query.
* @param opts The options for configuring the query.
* @return A new Query instance.
Expand All @@ -361,7 +362,7 @@ public <T> Query<T> query(String sql)
* @throws KeyManagementException
* @throws JsonMappingException
*/
public <T> Query<T> query(String sql, QueryOptions opts)
public Query query(String sql, QueryOptions opts)
throws JsonMappingException, KeyManagementException, JsonProcessingException, NoSuchAlgorithmException,
InterruptedException, ExecutionException, URISyntaxException, SQLException, UnknownServerException {
SqlJob job = this.getJob();
Expand Down
127 changes: 73 additions & 54 deletions src/main/java/io/github/mapepire_ibmi/Query.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

Expand All @@ -22,11 +23,11 @@
/**
* Represents a SQL query that can be executed and managed within a SQL job.
*/
public class Query<T> {
public class Query {
/**
* A list of all global queries that are currently open.
*/
private static List<Query<?>> globalQueryList = new ArrayList<>();
private static List<Query> globalQueryList = new ArrayList<>();

/**
* The SQL job that this query will be executed in.
Expand Down Expand Up @@ -97,7 +98,7 @@ public Query(SqlJob job, String query, QueryOptions opts) {
* @param id The correlation ID of the query.
* @return The corresponding Query instance or null if not found.
*/
public static Query<?> byId(String id) {
public static Query byId(String id) {
if (id == null || id.equals("")) {
return null;
} else {
Expand Down Expand Up @@ -140,7 +141,7 @@ public static List<String> getOpenIds(SqlJob forJob) {
* @throws ExecutionException
* @throws InterruptedException
*/
public void cleanup() throws InterruptedException, ExecutionException {
public CompletableFuture<Void> cleanup() throws InterruptedException, ExecutionException {
List<CompletableFuture<Void>> futures = globalQueryList.stream()
.filter(query -> query.getState() == QueryState.RUN_DONE || query.getState() == QueryState.ERROR)
.map(query -> CompletableFuture.runAsync(() -> {
Expand All @@ -152,11 +153,12 @@ public void cleanup() throws InterruptedException, ExecutionException {
}))
.collect(Collectors.toList());

CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();

globalQueryList = globalQueryList.stream()
.filter(q -> q.getState() != QueryState.RUN_DONE)
.collect(Collectors.toList());
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenRun(() -> {
globalQueryList = globalQueryList.stream()
.filter(q -> q.getState() != QueryState.RUN_DONE)
.collect(Collectors.toList());
});
}

/**
Expand Down Expand Up @@ -224,37 +226,45 @@ public <T> CompletableFuture<QueryResult<T>> execute(int rowsToFetch) throws Cli

this.rowsToFetch = rowsToFetch;

String result = job.send(objectMapper.writeValueAsString(executeRequest)).get();
QueryResult<T> queryResult = objectMapper.readValue(result, QueryResult.class);

this.state = queryResult.getIsDone() ? QueryState.RUN_DONE
: QueryState.RUN_MORE_DATA_AVAILABLE;

if (!queryResult.getSuccess() && !this.isCLCommand) {
this.state = QueryState.ERROR;

List<String> errorList = new ArrayList<>();
String error = queryResult.getError();
if (error != null) {
errorList.add(error);
}
String sqlState = queryResult.getSqlState();
if (sqlState != null) {
errorList.add(sqlState);
}
String sqlRc = String.valueOf(queryResult.getSqlRc());
if (sqlRc != null) {
errorList.add(sqlRc);
}
if (errorList.isEmpty()) {
errorList.add("Failed to run query");
}
return job.send(objectMapper.writeValueAsString(executeRequest))
.thenApply(result -> {
QueryResult<T> queryResult;
try {
queryResult = objectMapper.readValue(result, QueryResult.class);
} catch (Exception e) {
throw new CompletionException(e);
}

throw new SQLException(String.join(", ", errorList), queryResult.getSqlState());
}
this.state = queryResult.getIsDone() ? QueryState.RUN_DONE
: QueryState.RUN_MORE_DATA_AVAILABLE;

if (!queryResult.getSuccess() && !this.isCLCommand) {
this.state = QueryState.ERROR;

List<String> errorList = new ArrayList<>();
String error = queryResult.getError();
if (error != null) {
errorList.add(error);
}
String sqlState = queryResult.getSqlState();
if (sqlState != null) {
errorList.add(sqlState);
}
String sqlRc = String.valueOf(queryResult.getSqlRc());
if (sqlRc != null) {
errorList.add(sqlRc);
}
if (errorList.isEmpty()) {
errorList.add("Failed to run query");
}

throw new CompletionException(
new SQLException(String.join(", ", errorList), queryResult.getSqlState()));
}

this.correlationId = queryResult.getId();
return CompletableFuture.completedFuture(queryResult);
this.correlationId = queryResult.getId();
return queryResult;
});
}

/**
Expand All @@ -269,7 +279,7 @@ public <T> CompletableFuture<QueryResult<T>> execute(int rowsToFetch) throws Cli
* @throws JsonMappingException
* @throws ClientException
*/
public CompletableFuture<QueryResult<T>> fetchMore() throws JsonMappingException, JsonProcessingException,
public <T> CompletableFuture<QueryResult<T>> fetchMore() throws JsonMappingException, JsonProcessingException,
UnknownServerException, InterruptedException, ExecutionException, SQLException, ClientException {
return this.fetchMore(this.rowsToFetch);
}
Expand All @@ -287,7 +297,8 @@ public CompletableFuture<QueryResult<T>> fetchMore() throws JsonMappingException
* @throws SQLException
* @throws UnknownServerException
*/
public CompletableFuture<QueryResult<T>> fetchMore(int rowsToFetch) throws ClientException, JsonMappingException,
public <T> CompletableFuture<QueryResult<T>> fetchMore(int rowsToFetch)
throws ClientException, JsonMappingException,
JsonProcessingException, InterruptedException, ExecutionException, SQLException, UnknownServerException {
if (rowsToFetch <= 0) {
throw new ClientException("Rows to fetch must be greater than 0");
Expand All @@ -311,24 +322,32 @@ public CompletableFuture<QueryResult<T>> fetchMore(int rowsToFetch) throws Clien

this.rowsToFetch = rowsToFetch;

String result = job.send(objectMapper.writeValueAsString(fetchMoreRequest)).get();
QueryResult<T> queryResult = objectMapper.readValue(result, QueryResult.class);
return job.send(objectMapper.writeValueAsString(fetchMoreRequest))
.thenApply(result -> {
QueryResult<T> queryResult;
try {
queryResult = objectMapper.readValue(result, QueryResult.class);
} catch (Exception e) {
throw new CompletionException(e);
}

this.state = queryResult.getIsDone() ? QueryState.RUN_DONE
: QueryState.RUN_MORE_DATA_AVAILABLE;
this.state = queryResult.getIsDone() ? QueryState.RUN_DONE
: QueryState.RUN_MORE_DATA_AVAILABLE;

if (!queryResult.getSuccess()) {
this.state = QueryState.ERROR;
if (!queryResult.getSuccess()) {
this.state = QueryState.ERROR;

String error = queryResult.getError();
if (error != null) {
throw new SQLException(error.toString(), queryResult.getSqlState());
} else {
throw new UnknownServerException("Failed to fetch more");
}
}
String error = queryResult.getError();
if (error != null) {
throw new CompletionException(
new SQLException(error.toString(), queryResult.getSqlState()));
} else {
throw new CompletionException(new UnknownServerException("Failed to fetch more"));
}
}

return CompletableFuture.completedFuture(queryResult);
return queryResult;
});
}

/**
Expand Down
Loading
Loading