Skip to content

Commit

Permalink
Merge pull request #76 from ankitmashu/fix/server-down
Browse files Browse the repository at this point in the history
Fix/server down
  • Loading branch information
ananjaykumar2 authored May 13, 2024
2 parents bb18d74 + 854c831 commit 0b4fbf5
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public MessageProcessorImpl(
public Future<JsonObject> processAuditEventMessages(JsonObject message) {
LOGGER.info("message processing starts : ");
JsonObject queries = queryBuilder(message);
LOGGER.debug("message processing {}", queries);
queries.put(DELIVERY_TAG, message.getLong(DELIVERY_TAG));
queries.put(ORIGIN, message.getString(ORIGIN));
Promise<JsonObject> promise = Promise.promise();
Expand All @@ -51,6 +52,7 @@ public Future<JsonObject> processAuditEventMessages(JsonObject message) {
.onComplete(
dbHandler -> {
if (dbHandler.succeeded()) {
LOGGER.info("Inserted successfully for the Origin {}", message.getString(ORIGIN));
promise.complete(dbHandler.result());
} else {
LOGGER.error(dbHandler.cause());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ public DmpApdAuditingStrategy(JsonObject config) {
@Override
public String buildPostgresWriteQuery(JsonObject request) {
LOGGER.debug("inside buildPostgresWriteQuery");
LOGGER.debug("request : {}", request.encodePrettily() );
String primaryKey = request.getString(PRIMARY_KEY);
String userId = request.getString(USER_ID);
String api = request.getString(API);
Expand All @@ -47,7 +46,6 @@ public String buildPostgresWriteQuery(JsonObject request) {
@Override
public String buildPostgresDeleteQuery(JsonObject request) {
LOGGER.info("inside buildPostgresDeleteQuery");
LOGGER.debug("request : {}", request.encodePrettily() );
String databaseTableName = config.getString(DMP_APD_PG_TABLE_NAME);
String primaryKey = request.getString(PRIMARY_KEY);
return DELETE_QUERY_FOR_DMP.replace("$0", databaseTableName).replace("$1", primaryKey);
Expand All @@ -56,7 +54,6 @@ public String buildPostgresDeleteQuery(JsonObject request) {
@Override
public String buildImmudbWriteQuery(JsonObject request) {
LOGGER.debug("inside buildImmudbWriteQuery");
LOGGER.debug("request : {}", request.encodePrettily() );
String primaryKey = request.getString(PRIMARY_KEY);
String userId = request.getString(USER_ID);
String api = request.getString(API);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,34 +36,56 @@ public void start() {
}

private void consume() {
client.start().onSuccess(successHandler -> {
client.basicConsumer(AUDIT_LATEST_QUEUE, options, receiveResultHandler -> {
if (receiveResultHandler.succeeded()) {
RabbitMQConsumer mqConsumer = receiveResultHandler.result();
mqConsumer.handler(message -> {
mqConsumer.pause();
LOGGER.debug("message consumption paused.");
long deliveryTag = message.envelope().getDeliveryTag();
JsonObject request = message.body().toJsonObject().put(DELIVERY_TAG, deliveryTag);
LOGGER.info("message received from {}",request.getString(ORIGIN));
Future<JsonObject> processResult = msgService.processAuditEventMessages(request);
processResult.onComplete(handler -> {
if (handler.succeeded()) {
LOGGER.info("Audit message published in databases.");
client.basicAck(handler.result().getLong(DELIVERY_TAG), true);
mqConsumer.resume();
LOGGER.debug("message consumption resumed");
} else {
LOGGER.error("Error while publishing messages for processing " + handler.cause().getMessage());
mqConsumer.resume();
LOGGER.debug("message consumption resumed");
}
client
.start()
.onSuccess(
successHandler -> {
client.basicConsumer(
AUDIT_LATEST_QUEUE,
options,
receiveResultHandler -> {
if (receiveResultHandler.succeeded()) {
RabbitMQConsumer mqConsumer = receiveResultHandler.result();
mqConsumer.handler(
message -> {
mqConsumer.pause();
LOGGER.debug("message consumption paused.");
JsonObject request = new JsonObject();
try {
long deliveryTag = message.envelope().getDeliveryTag();
request =
message.body().toJsonObject().put(DELIVERY_TAG, deliveryTag);
LOGGER.info("message received from {}", request.getString(ORIGIN));
Future<JsonObject> processResult =
msgService.processAuditEventMessages(request);
processResult.onComplete(
handler -> {
if (handler.succeeded()) {
LOGGER.info("Audit message published in databases.");
client.basicAck(
handler.result().getLong(DELIVERY_TAG), false);
mqConsumer.resume();
LOGGER.debug("message consumption resumed");
} else {
LOGGER.error(
"Error while publishing messages for processing "
+ handler.cause().getMessage());
mqConsumer.resume();
LOGGER.debug("message consumption resumed");
}
});
} catch (Exception e) {
LOGGER.error("Error while decoding the message");
mqConsumer.resume();
LOGGER.debug("message consumption resumed");
}
});
}
});
})
.onFailure(
failureHandler -> {
LOGGER.fatal("Rabbit client startup failed for Latest message Q consumer.");
});
});
}
});
}).onFailure(failureHandler -> {
LOGGER.fatal("Rabbit client startup failed for Latest message Q consumer.");
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ private void consume() {
json -> {
Future.future(e -> messagePush((JsonObject) json));
});
client.basicAck(deliveryTag, true);
client.basicAck(deliveryTag, false);
mqConsumer.resume();
} else {
messagePush(new JsonObject(body)).onSuccess(
successResult -> {
LOGGER.info("Latest message published in databases.");
client.basicAck(deliveryTag, true);
client.basicAck(deliveryTag, false);
mqConsumer.resume();
LOGGER.debug("message consumption resumed");
})
Expand Down

0 comments on commit 0b4fbf5

Please sign in to comment.