Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/CU-86c060qh5_Documentation-Add…
Browse files Browse the repository at this point in the history
…-user-guide-for-UI' into CU-86c060qh5_Documentation-Add-user-guide-for-UI
  • Loading branch information
NyashaMuusha committed Sep 5, 2024
2 parents f1ec19a + ba6561a commit 37926e8
Show file tree
Hide file tree
Showing 29 changed files with 966 additions and 164 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/entry-on-merge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
- id: get-docker-push-tag
run: echo "docker-push-tag=$(git rev-parse --abbrev-ref HEAD)-$(git log -1 --pretty=format:%h)" >> $GITHUB_OUTPUT
- uses: ./.github/workflows/actions/build-deploy-images
with:
with:
docker-push-tag: ${{ steps.get-docker-push-tag.outputs.docker-push-tag }}
docker-host: "docker.io"
docker-username: ${{ secrets.DOCKER_HUB_USER_NAME }}
Expand Down
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ my-notes
.settings
.classpath
.project
.metals
.metals
devops/benchmarking/k6
36 changes: 25 additions & 11 deletions JeMPI_Apps/JeMPI_API/src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -1,14 +1,28 @@
akka.http {
session {
enabled = true
akka {
http {
session {
enabled = true
}
host-connection-pool {
max-connections = 1024
max-open-requests = 1024
}
server {
idle-timeout = 60 s
request-timeout = 60 s
linger-timeout = 60 s
parsing {
max-to-strict-bytes = 128m
max-to-strict-bytes = ${?JEMPI_FILE_IMPORT_MAX_SIZE_BYTE}
}
}
}
server {
idle-timeout = 10 s
request-timeout = 5 s
linger-timeout = 5 s
parsing {
max-to-strict-bytes = 128m
max-to-strict-bytes = ${?JEMPI_FILE_IMPORT_MAX_SIZE_BYTE}
actor {
default-dispatcher {
fork-join-executor {
parallelism-min = 8
parallelism-max = 64
}
}
}
}
Expand Down Expand Up @@ -71,7 +85,7 @@ my-blocking-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 512
fixed-pool-size = 1024
}
throughput = 1
}
36 changes: 25 additions & 11 deletions JeMPI_Apps/JeMPI_API_KC/src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -1,14 +1,28 @@
akka.http {
session {
enabled = true
akka {
http {
session {
enabled = true
}
host-connection-pool {
max-connections = 1024
max-open-requests = 1024
}
server {
idle-timeout = 60 s
request-timeout = 60 s
linger-timeout = 60 s
parsing {
max-to-strict-bytes = 128m
max-to-strict-bytes = ${?JEMPI_FILE_IMPORT_MAX_SIZE_BYTE}
}
}
}
server {
idle-timeout = 10 s
request-timeout = 5 s
linger-timeout = 5 s
parsing {
max-to-strict-bytes = 128m
max-to-strict-bytes = ${?JEMPI_FILE_IMPORT_MAX_SIZE_BYTE}
actor {
default-dispatcher {
fork-join-executor {
parallelism-min = 8
parallelism-max = 64
}
}
}
}
Expand Down Expand Up @@ -98,7 +112,7 @@ my-blocking-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 512
fixed-pool-size = 1024
}
throughput = 1
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ private void apacheReadCSV(
if (givenName.isEmpty() && familyName.isEmpty()) {
partitionKey += "Unknown";
}
LOGGER.info("Kafka topic/partition for patient: " + partitionKey);
LOGGER.info("Using Kafka topic/partition: " + partitionKey);

final var interactionEnvelop = new InteractionEnvelop(InteractionEnvelop.ContentType.BATCH_INTERACTION,
tag,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
akka.http {
server {
idle-timeout = 10 s
request-timeout = 5 s
linger-timeout = 5 s
idle-timeout = 60 s
request-timeout = 60 s
linger-timeout = 60 s
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ akka.http {
enabled = true
}
server {
idle-timeout = 10 s
request-timeout = 5 s
linger-timeout = 5 s
idle-timeout = 60 s
request-timeout = 60 s
linger-timeout = 60 s
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
akka.http {
server {
idle-timeout = 10 s
request-timeout = 5 s
linger-timeout = 5 s
idle-timeout = 60 s
request-timeout = 60 s
linger-timeout = 60 s
}
}

Expand Down
6 changes: 3 additions & 3 deletions JeMPI_Apps/JeMPI_ETL/src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
akka.http {
server {
idle-timeout = 10 s
request-timeout = 5 s
linger-timeout = 5 s
idle-timeout = 60 s
request-timeout = 60 s
linger-timeout = 60 s
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ static Route proxyPostDashboardData(
responseBody = Unmarshaller
.entityToString()
.unmarshal(dashboardDataResponse.entity(), actorSystem)
.toCompletableFuture().get(10, TimeUnit.SECONDS);
.toCompletableFuture().get(GlobalConstants.TIMEOUT_GENERAL_SECS, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOGGER.error("Error getting dashboard data ", e);
return complete(StatusCodes.INTERNAL_SERVER_ERROR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import io.dgraph.DgraphGrpc;
import io.dgraph.DgraphProto;
import io.dgraph.TxnConflictException;
// import io.dgraph.TxnConflictException;
import io.grpc.ManagedChannelBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
Expand All @@ -13,6 +13,7 @@
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

public final class DgraphClient {

Expand Down Expand Up @@ -85,94 +86,63 @@ void alter(final DgraphProto.Operation op) {
dgraphClient.alter(op);
}

private <T> T runWithRetries(final Supplier<T> supplier, final int retries) {
int remainingRetries = retries;
while (remainingRetries > 0) {
try {
return supplier.get();
} catch (RuntimeException ex) {
LOGGER.warn("Retrying due to exception: {}", ex.getMessage());
LOGGER.warn("Cause: {}", ex.getCause());
remainingRetries--;
if (remainingRetries == 0) {
LOGGER.warn("Failed after retries.");
throw ex;
}
sleep(); // Take a break and try again
}
}
throw new RuntimeException("Retries exhausted");
}

String executeReadOnlyTransaction(
final String query,
final Map<String, String> vars) {
String json = null;
int retry = GlobalConstants.TIMEOUT_DGRAPH_RECONNECT_RETRIES;
boolean done;
do {
return runWithRetries(() -> {
final var txn = dgraphClient.newReadOnlyTransaction();
done = true;
try {
io.dgraph.DgraphProto.Response response;
if (AppUtils.isNullOrEmpty(vars)) {
response = txn.query(query);
} else {
response = txn.queryWithVars(query, vars);
}
json = response.getJson().toStringUtf8();
} catch (TxnConflictException ex) {
txn.discard();
if (--retry == 0) {
LOGGER.error(ex.getLocalizedMessage(), ex);
return null;
} else {
LOGGER.warn("{}", ex.getLocalizedMessage());
disconnect();
sleep();
connect();
done = false;
}
} catch (RuntimeException ex) {
txn.discard();
if (--retry == 0) {
LOGGER.warn("{}", vars);
LOGGER.warn("{}", query);
LOGGER.error(ex.getLocalizedMessage(), ex);
disconnect();
sleep();
connect();
return null;
} else {
LOGGER.warn("{}", ex.getLocalizedMessage(), ex);
done = false;
disconnect();
sleep();
connect();
}
return response.getJson().toStringUtf8();
} finally {
txn.discard();
}
} while (!done);
return json;
}, GlobalConstants.TIMEOUT_DGRAPH_RECONNECT_RETRIES);
}

String doMutateTransaction(final DgraphProto.Mutation mutation) {
String uid = null;
boolean done;
int retry = GlobalConstants.TIMEOUT_DGRAPH_RECONNECT_RETRIES;
do {
done = true;
return runWithRetries(() -> {
final var txn = dgraphClient.newTransaction();
if (txn == null) {
LOGGER.error("NO TRANSACTION");
return null;
}
try {
final var request = DgraphProto.Request.newBuilder().setCommitNow(true).addMutations(mutation).build();
final var response = txn.doRequest(request, 10, TimeUnit.SECONDS);
uid = response.getUidsMap().values().stream().findFirst().orElse(StringUtils.EMPTY);
final var response = txn.doRequest(request, GlobalConstants.TIMEOUT_DGRAPH_QUERY_SECS, TimeUnit.SECONDS);
return response.getUidsMap().values().stream().findFirst().orElse(StringUtils.EMPTY);
} catch (RuntimeException ex) {
if (--retry == 0) {
LOGGER.error(ex.getLocalizedMessage(), ex);
disconnect();
sleep();
connect();
return null;
} else {
LOGGER.error(ex.getLocalizedMessage(), ex);
LOGGER.debug("{}", mutation.toByteString());
done = false;
disconnect();
sleep();
connect();
}
LOGGER.error("Error during mutation transaction: {}", ex.getMessage(), ex);
LOGGER.error("Mutation details: {}", mutation);
throw ex;
} finally {
txn.discard();
}
} while (!done);
return uid;
}, GlobalConstants.TIMEOUT_DGRAPH_RECONNECT_RETRIES);
}

record AlphaHost(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.TopicPartition;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;

Expand Down Expand Up @@ -36,7 +39,19 @@ public MyKafkaConsumerByTopic(
properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, fetchMaxWaitMSConfig);
try {
consumer = new KafkaConsumer<>(properties, keyDeserializer, valueDeserializer);
consumer.subscribe(Collections.singletonList(topic));
consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {
// Commit offsets before rebalance
LOGGER.info("Committing offsets before rebalance");
consumer.commitSync();
}

@Override
public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
// Handle partition assignment if needed
}
});
} catch (Exception ex) {
LOGGER.error(ex.getLocalizedMessage(), ex);
consumer = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ public final class GlobalConstants {
public static final String SEGMENT_POST_GOLDEN_RECORD_RESTORE = "restoreGoldenRecord";

// TIMEOUTS
public static final int TIMEOUT_DGRAPH_RECONNECT_RETRIES = 20;
public static final int TIMEOUT_DGRAPH_RECONNECT_RETRIES = 5;
public static final int TIMEOUT_DGRAPH_RECONNECT_SLEEP_SECS = 2;
public static final int TIMEOUT_DGRAPH_QUERY_SECS = (TIMEOUT_DGRAPH_RECONNECT_SLEEP_SECS
* TIMEOUT_DGRAPH_RECONNECT_RETRIES);
public static final int TIMEOUT_DGRAPH_QUERY_SECS = 60;
public static final int TIMEOUT_DGRAPH_AKKA_SECS = (TIMEOUT_DGRAPH_RECONNECT_RETRIES * TIMEOUT_DGRAPH_QUERY_SECS) + (TIMEOUT_DGRAPH_RECONNECT_RETRIES * TIMEOUT_DGRAPH_RECONNECT_SLEEP_SECS);
public static final int TIMEOUT_GENERAL_SECS = 60;
public static final int TIMEOUT_TEA_TIME_SECS = 5;
public static final int TIMEOUT_TEA_TIME_SECS = 30;

private GlobalConstants() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ static CompletionStage<BackEnd.AsyncLinkInteractionResponse> linkInteraction(
final InteractionEnvelop batchInteraction) {
return AskPattern.ask(backEnd,
replyTo -> new BackEnd.AsyncLinkInteractionRequest(replyTo, key, batchInteraction),
java.time.Duration.ofSeconds(GlobalConstants.TIMEOUT_DGRAPH_QUERY_SECS),
java.time.Duration.ofSeconds(GlobalConstants.TIMEOUT_DGRAPH_AKKA_SECS),
actorSystem.scheduler());
}

Expand Down
Loading

0 comments on commit 37926e8

Please sign in to comment.