Skip to content

Commit a0c122a

Browse files
committed
Stage 5 - basic implementation
1 parent cfe4755 commit a0c122a

File tree

3 files changed

+112
-38
lines changed

3 files changed

+112
-38
lines changed

src/main/java/ru/vk/itmo/test/alenkovayulya/ServerImpl.java

+84-28
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,17 @@
1717
import java.lang.foreign.MemorySegment;
1818
import java.lang.foreign.ValueLayout;
1919
import java.nio.charset.StandardCharsets;
20-
import java.util.ArrayList;
2120
import java.util.Arrays;
2221
import java.util.List;
22+
import java.util.Set;
23+
import java.util.concurrent.CompletableFuture;
24+
import java.util.concurrent.CopyOnWriteArrayList;
2325
import java.util.concurrent.ExecutorService;
2426
import java.util.concurrent.RejectedExecutionException;
27+
import java.util.concurrent.atomic.AtomicBoolean;
28+
import java.util.concurrent.atomic.AtomicInteger;
2529

26-
import static ru.vk.itmo.test.alenkovayulya.ShardRouter.REDIRECT_HEADER;
27-
import static ru.vk.itmo.test.alenkovayulya.ShardRouter.TIMESTAMP_HEADER;
28-
import static ru.vk.itmo.test.alenkovayulya.ShardRouter.redirectRequest;
30+
import static ru.vk.itmo.test.alenkovayulya.ShardRouter.*;
2931

3032
public class ServerImpl extends HttpServer {
3133

@@ -34,6 +36,11 @@ public class ServerImpl extends HttpServer {
3436
private final ExecutorService executorService;
3537
private final String url;
3638
private final ShardSelector shardSelector;
39+
private static final Set<Integer> ALLOWED_METHODS = Set.of(
40+
Request.METHOD_GET,
41+
Request.METHOD_PUT,
42+
Request.METHOD_DELETE
43+
);
3744
private static final int[] AVAILABLE_GOOD_RESPONSE_CODES = new int[] {200, 201, 202, 404};
3845

3946
public ServerImpl(ServiceConfig serviceConfig,
@@ -59,6 +66,11 @@ private static HttpServerConfig createServerConfig(ServiceConfig serviceConfig)
5966

6067
@Override
6168
public void handleRequest(Request request, HttpSession session) throws IOException {
69+
if (!ALLOWED_METHODS.contains(request.getMethod())) {
70+
sendEmptyResponse(Response.METHOD_NOT_ALLOWED, session);
71+
return;
72+
}
73+
6274
String id = request.getParameter("id=");
6375
if (isEmptyId(id)) {
6476
sendEmptyResponse(Response.BAD_REQUEST, session);
@@ -104,62 +116,107 @@ private void handleAsLeader(Request request, HttpSession session, String id) {
104116
LOGGER.error("Request rejected by policy", e);
105117
sendEmptyResponse(Response.SERVICE_UNAVAILABLE, session);
106118
}
107-
108119
}
109120

110121
private void collectResponses(Request request,
111122
HttpSession session,
112123
String id,
113124
int from,
114125
int ack
115-
) throws IOException {
116-
List<Response> responses = new ArrayList<>();
126+
) {
127+
List<CompletableFuture<Response>> asyncResponses = new CopyOnWriteArrayList<>();
117128
long timestamp = System.currentTimeMillis();
118129
int firstOwnerShardIndex = shardSelector.getOwnerShardIndex(id);
119130

120131
for (int i = 0; i < from; i++) {
132+
CompletableFuture<Response> asyncResponse;
121133
int shardIndex = (firstOwnerShardIndex + i) % shardSelector.getClusterSize();
122134

123135
if (isRedirectNeeded(shardIndex)) {
124-
handleRedirect(request, timestamp, shardIndex, responses);
136+
asyncResponse = handleRedirect(request, timestamp, shardIndex);
125137
} else {
126-
Response response = handleInternalRequest(request, id, timestamp);
127-
responses.add(response);
138+
asyncResponse = handleInternalRequestAsync(request, id, timestamp);
128139
}
129140

141+
asyncResponses.add(asyncResponse);
142+
130143
}
131144

132-
checkReplicasResponsesNumber(request, session, responses, ack);
145+
handleAsyncResponses(session, ack, from, request, asyncResponses);
146+
147+
}
148+
149+
private void handleAsyncResponses(
150+
HttpSession session, int ack, int from, Request request,
151+
List<CompletableFuture<Response>> completableFutureResponses
152+
) {
153+
List<Response> validResponses = new CopyOnWriteArrayList<>();
154+
AtomicBoolean isEnoughValidResponses = new AtomicBoolean();
155+
AtomicInteger allResponsesCounter = new AtomicInteger();
156+
157+
for (CompletableFuture<Response> completableFuture : completableFutureResponses) {
158+
completableFuture.whenCompleteAsync((response, throwable) -> {
159+
if (isEnoughValidResponses.get()) {
160+
return;
161+
}
162+
allResponsesCounter.incrementAndGet();
163+
164+
if (throwable != null) {
165+
response = new Response(Response.INTERNAL_ERROR);
166+
}
167+
168+
if (isValidResponse(response)) {
169+
validResponses.add(response);
170+
}
171+
172+
sendResponseIfEnoughReplicasResponsesNumber(request, isEnoughValidResponses, session, validResponses, ack);
173+
174+
if (allResponsesCounter.get() == from && validResponses.size() < ack) {
175+
sendEmptyResponse("504 Not Enough Replicas", session);
176+
}
177+
}, executorService).exceptionally((th) -> new Response(Response.INTERNAL_ERROR));
178+
}
133179
}
134180

135-
private void checkReplicasResponsesNumber(
181+
182+
private void sendResponseIfEnoughReplicasResponsesNumber(
136183
Request request,
184+
AtomicBoolean isEnoughValidResponses,
137185
HttpSession session,
138186
List<Response> responses,
139187
int ack
140-
) throws IOException {
141-
if (responses.size() >= ack) {
142-
if (request.getMethod() == Request.METHOD_GET) {
143-
session.sendResponse(getResponseWithMaxTimestamp(responses));
144-
} else {
145-
session.sendResponse(responses.getFirst());
188+
) {
189+
try {
190+
if (responses.size() >= ack) {
191+
isEnoughValidResponses.set(true);
192+
if (request.getMethod() == Request.METHOD_GET) {
193+
session.sendResponse(getResponseWithMaxTimestamp(responses));
194+
} else {
195+
session.sendResponse(responses.getFirst());
196+
}
146197
}
147-
} else {
148-
sendEmptyResponse("504 Not Enough Replicas", session);
198+
} catch (IOException e) {
199+
LOGGER.error("Exception during send win response: ", e);
200+
sendEmptyResponse(Response.INTERNAL_ERROR, session);
201+
session.close();
149202
}
150203
}
151204

152-
private void handleRedirect(Request request, long timestamp, int nodeIndex, List<Response> responses) {
153-
Response response = redirectRequest(request.getMethodName(),
205+
private boolean isValidResponse(Response response){
206+
return Arrays.stream(AVAILABLE_GOOD_RESPONSE_CODES)
207+
.anyMatch(code -> code == response.getStatus());
208+
}
209+
210+
private CompletableFuture<Response> handleRedirect(Request request, long timestamp, int nodeIndex) {
211+
return redirectRequest(request.getMethodName(),
154212
request.getParameter("id="),
155213
shardSelector.getShardUrlByIndex(nodeIndex),
156214
request.getBody() == null
157215
? new byte[0] : request.getBody(), timestamp);
158-
boolean correctRes = Arrays.stream(AVAILABLE_GOOD_RESPONSE_CODES)
159-
.anyMatch(code -> code == response.getStatus());
160-
if (correctRes) {
161-
responses.add(response);
162-
}
216+
}
217+
218+
private CompletableFuture<Response> handleInternalRequestAsync(Request request, String id, long timestamp) {
219+
return CompletableFuture.supplyAsync(() -> handleInternalRequest(request, id, timestamp), ShardRouter.proxyExecutor);
163220
}
164221

165222
private Response handleInternalRequest(Request request, String id, long timestamp) {
@@ -233,7 +290,6 @@ private void sendEmptyResponse(String response, HttpSession session) {
233290
session.sendResponse(emptyRes);
234291
} catch (IOException e) {
235292
LOGGER.info("Exception during sending the empty response: ", e);
236-
session.close();
237293
}
238294
}
239295

src/main/java/ru/vk/itmo/test/alenkovayulya/ServiceImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ private void shutdownDao() {
7272
}
7373
}
7474

75-
@ServiceFactory(stage = 4)
75+
@ServiceFactory(stage = 5)
7676
public static class Factory implements ServiceFactory.Factory {
7777
@Override
7878
public Service create(ServiceConfig config) {
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package ru.vk.itmo.test.alenkovayulya;
22

3+
import one.nio.async.CustomThreadFactory;
34
import one.nio.http.Response;
45
import org.slf4j.Logger;
56
import org.slf4j.LoggerFactory;
@@ -8,6 +9,10 @@
89
import java.net.http.HttpClient;
910
import java.net.http.HttpRequest;
1011
import java.net.http.HttpResponse;
12+
import java.util.concurrent.ArrayBlockingQueue;
13+
import java.util.concurrent.CompletableFuture;
14+
import java.util.concurrent.ThreadPoolExecutor;
15+
import java.util.concurrent.TimeUnit;
1116

1217
public final class ShardRouter {
1318

@@ -17,10 +22,18 @@ public final class ShardRouter {
1722
public static final String REDIRECT_HEADER = "Redirect";
1823
private static final HttpClient client = HttpClient.newHttpClient();
1924

25+
public static ThreadPoolExecutor proxyExecutor = new ThreadPoolExecutor(
26+
8,
27+
8,
28+
0L,
29+
TimeUnit.MILLISECONDS,
30+
new ArrayBlockingQueue<>(128),
31+
new CustomThreadFactory("ShardRouter"));
32+
2033
private ShardRouter() {
2134
}
2235

23-
public static Response redirectRequest(String method,
36+
public static CompletableFuture<Response> redirectRequest(String method,
2437
String id,
2538
String ownerShardUrl,
2639
byte[] body,
@@ -33,26 +46,31 @@ public static Response redirectRequest(String method,
3346
.method(method, HttpRequest.BodyPublishers.ofByteArray(body))
3447
.build();
3548
try {
36-
HttpResponse<byte[]> response = client.send(request, HttpResponse.BodyHandlers.ofByteArray());
37-
Response shardResponse = new Response(getHttpResponseByCode(response.statusCode()), response.body());
38-
shardResponse.addHeader(response.headers().firstValue(TIMESTAMP_HEADER).orElse(""));
39-
return shardResponse;
49+
CompletableFuture<HttpResponse<byte[]>> response = client.sendAsync(request, HttpResponse.BodyHandlers.ofByteArray());
50+
return response.thenApplyAsync(ShardRouter::getHttpResponseByCode);
4051
} catch (Exception e) {
4152
LOGGER.error("Error during sending request by router", e);
4253
Thread.currentThread().interrupt();
43-
return new Response(Response.INTERNAL_ERROR, Response.EMPTY);
54+
return CompletableFuture.completedFuture(new Response(Response.INTERNAL_ERROR, Response.EMPTY));
4455
}
4556
}
4657

47-
private static String getHttpResponseByCode(int code) {
48-
return switch (code) {
58+
private static Response getHttpResponseByCode(HttpResponse<byte[]> response) {
59+
String responseCode = switch (response.statusCode()) {
4960
case 200 -> Response.OK;
5061
case 201 -> Response.CREATED;
5162
case 202 -> Response.ACCEPTED;
5263
case 400 -> Response.BAD_REQUEST;
5364
case 404 -> Response.NOT_FOUND;
5465
case 500 -> Response.INTERNAL_ERROR;
55-
default -> throw new IllegalStateException("Not available status code: " + code);
66+
default -> throw new IllegalStateException("Not available status code: " + response.statusCode());
5667
};
68+
69+
Response shardResponse = new Response(responseCode, response.body());
70+
shardResponse.addHeader(response.headers().firstValue(TIMESTAMP_HEADER).orElse(""));
71+
72+
return shardResponse;
73+
74+
5775
}
5876
}

0 commit comments

Comments
 (0)