Skip to content

Commit

Permalink
Merge pull request #3 from pranavrd/postgres-example
Browse files Browse the repository at this point in the history
Postgres example added
  • Loading branch information
kailash authored Mar 1, 2022
2 parents befb6dd + 9e3aa74 commit f2578bd
Show file tree
Hide file tree
Showing 2 changed files with 208 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package iudx.rs.proxy.database.example.postgres;

public class Constants {
// configs
public static final String DATABASE_IP = "databaseIp";
public static final String DATABASE_PORT = "databasePort";
public static final String DATABASE_NAME = "databaseName";
public static final String DATABASE_USERNAME = "databaseUserName";
public static final String DATABASE_PASSWORD = "databasePassword";
public static final String POOL_SIZE = "poolSize";

// json
public static final String ID = "id";
public static final String TIME_REL = "timerel";
public static final String TIME = "time";
public static final String END_TIME = "endTime";
public static final String ATTRS = "attrs";
public static final String BEFORE = "before";
public static final String AFTER = "after";

// SQL
public static String PSQL_TABLE_EXISTS_QUERY =
"SELECT EXISTS ( SELECT FROM pg_tables WHERE schemaname='public' AND tablename='$1');";
public static String PSQL_SELECT_QUERY = "SELECT $1 FROM $$ WHERE time BETWEEN '$2' and '$3'";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package iudx.rs.proxy.database.example.postgres;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.pgclient.PgConnectOptions;
import io.vertx.pgclient.PgPool;
import io.vertx.serviceproxy.ServiceException;
import io.vertx.sqlclient.PoolOptions;
import io.vertx.sqlclient.RowSet;
import io.vertx.sqlclient.Row;
import iudx.rs.proxy.database.DatabaseService;
import org.apache.http.HttpStatus;

import java.time.LocalDateTime;
import java.util.List;
import java.util.stream.Collector;
import java.util.stream.Collectors;

import static iudx.rs.proxy.database.example.postgres.Constants.*;

public class PostgresServiceImpl implements DatabaseService {

private final PgPool pgClient;
private PgConnectOptions connectOptions;
private PoolOptions poolOptions;

private String databaseIP;
private int databasePort;
private String databaseName;
private String databaseUserName;
private String databasePassword;
private int poolSize;

private boolean exists;

public PostgresServiceImpl(Vertx vertx, JsonObject config) {

databaseIP = config.getString(DATABASE_IP);
databasePort = config.getInteger(DATABASE_PORT);
databaseName = config.getString(DATABASE_NAME);
databaseUserName = config.getString(DATABASE_USERNAME);
databasePassword = config.getString(DATABASE_PASSWORD);
poolSize = config.getInteger(POOL_SIZE);

this.connectOptions =
new PgConnectOptions()
.setPort(databasePort)
.setHost(databaseIP)
.setDatabase(databaseName)
.setUser(databaseUserName)
.setPassword(databasePassword);

this.poolOptions = new PoolOptions().setMaxSize(poolSize);
this.pgClient = PgPool.pool(vertx, connectOptions, poolOptions);
}

@Override
public DatabaseService searchQuery(JsonObject request, Handler<AsyncResult<JsonObject>> handler)
throws ServiceException {
String tableID = request.getString(ID);

if (!tableExists(tableID)) {
throw new ServiceException(HttpStatus.SC_NOT_FOUND, "message for failure");
}

String query = queryBuilder(request, false);

Collector<Row, ?, List<JsonObject>> rowCollector =
Collectors.mapping(row -> row.toJson(), Collectors.toList());

pgClient
.withConnection(
connection ->
connection.query(query).collecting(rowCollector).execute().map(row -> row.value()))
.onSuccess(
successHandler -> {
long totalHits = successHandler.size();
JsonArray response = new JsonArray(successHandler);
handler.handle(
Future.succeededFuture(
new JsonObject().put("totalHits", totalHits).put("result", response)));
})
.onFailure(
failureHandler -> {
throw new ServiceException(HttpStatus.SC_NOT_FOUND, "message for failure");
});
return this;
}

@Override
public DatabaseService countQuery(JsonObject request, Handler<AsyncResult<JsonObject>> handler)
throws ServiceException {
String tableID = request.getString(ID);

if (!tableExists(tableID)) {
throw new ServiceException(HttpStatus.SC_NOT_FOUND, "message for failure");
}

String query = queryBuilder(request, true);

pgClient
.withConnection(
sqlConnection ->
sqlConnection
.query(query)
.execute()
.map(rows -> rows.iterator().next().getInteger(0)))
.onSuccess(
count -> {
handler.handle(Future.succeededFuture(new JsonObject().put("totalHits", count)));
})
.onFailure(
failureHandler -> {
throw new ServiceException(HttpStatus.SC_NOT_FOUND, "message for failure");
});
return this;
}

private String queryBuilder(JsonObject request, boolean isCount) {
StringBuilder query;
String selection;
String tableID = request.getString(ID);
String timerel = request.getString(TIME_REL);
String[] attrs = (String[]) request.getValue(ATTRS);
LocalDateTime time, endTime;
time = LocalDateTime.parse(request.getString(TIME));
endTime = LocalDateTime.parse(request.getString(END_TIME));

if (timerel.equalsIgnoreCase(BEFORE)) {
endTime = time;
time = time.minusDays(10);
} else if (timerel.equalsIgnoreCase(AFTER)) {
endTime = time.plusDays(10);
}

if (attrs == null || attrs.length == 0) {
if (isCount) {
selection = PSQL_SELECT_QUERY.replace("$1", "count(*)");
} else {
selection = PSQL_SELECT_QUERY.replace("$1", "*");
}
} else {
selection = PSQL_SELECT_QUERY.replace("$1", String.join(",", attrs));
}

query =
new StringBuilder(
selection
.replace("$$", tableID)
.replace("$2", time.toString())
.replace("$3", endTime.toString()));

return query.toString();
}

private boolean tableExists(String tableID) {
StringBuilder query = new StringBuilder(PSQL_TABLE_EXISTS_QUERY.replace("$1", tableID));

exists = false;
pgClient
.query(query.toString())
.execute(
existsHandler -> {
if (existsHandler.succeeded()) {
RowSet<Row> rowSet = existsHandler.result();
rowSet.forEach(
row -> {
if (row.getBoolean("exists")) {
exists = true;
}
});
} else {
throw new ServiceException(HttpStatus.SC_NOT_FOUND, "message for failure");
}
});

return exists;
}
}

0 comments on commit f2578bd

Please sign in to comment.