forked from polis-vk/2024-highload-dht
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMergeHandleResult.java
73 lines (65 loc) · 2.79 KB
/
MergeHandleResult.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package ru.vk.itmo.test.khadyrovalmasgali.replication;
import one.nio.http.HttpSession;
import one.nio.http.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.vk.itmo.test.khadyrovalmasgali.util.HttpUtil;
import java.net.HttpURLConnection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
public class MergeHandleResult {
private static final Logger log = LoggerFactory.getLogger(MergeHandleResult.class);
private final AtomicInteger successCount;
private final AtomicInteger count;
private final int from;
private final int ack;
private final HttpSession session;
private HandleResult mergedResult = new HandleResult(HttpURLConnection.HTTP_GATEWAY_TIMEOUT, Response.EMPTY);
public MergeHandleResult(int from, int ack, HttpSession session) {
this.count = new AtomicInteger();
this.successCount = new AtomicInteger();
this.ack = ack;
this.from = from;
this.session = session;
}
@SuppressWarnings("FutureReturnValueIgnored")
public void add(CompletableFuture<HandleResult> futureHandleResult) {
futureHandleResult.whenComplete((handleResult, t) -> {
checkThrowable(t);
if (validateHandleResult(handleResult)) {
if (mergedResult.timestamp() <= handleResult.timestamp()) {
mergedResult = handleResult;
}
int get = successCount.incrementAndGet();
if (get == ack) {
HttpUtil.sessionSendSafe(
session,
new Response(String.valueOf(mergedResult.status()), mergedResult.data()),
log);
}
}
int currentCount = count.incrementAndGet();
if (currentCount == from && successCount.get() < ack) {
HttpUtil.sessionSendSafe(session, new Response(Response.GATEWAY_TIMEOUT, Response.EMPTY), log);
}
});
}
private void checkThrowable(Throwable t) {
if (t != null) {
if (t instanceof Exception) {
log.info("Exception in mergeHandleResult", t);
} else {
HttpUtil.sessionSendSafe(session, new Response(Response.INTERNAL_ERROR, Response.EMPTY), log);
}
}
}
private static boolean validateHandleResult(HandleResult handleResult) {
if (handleResult == null) {
return false;
}
return handleResult.status() == HttpURLConnection.HTTP_OK
|| handleResult.status() == HttpURLConnection.HTTP_CREATED
|| handleResult.status() == HttpURLConnection.HTTP_ACCEPTED
|| handleResult.status() == HttpURLConnection.HTTP_NOT_FOUND;
}
}