-
Notifications
You must be signed in to change notification settings - Fork 47
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Плясов Климентий, ИТМО DWS, stage 5 #196
Changes from all commits
caa3b3b
3af5bd4
daef6f8
c0481f0
36d6d2a
1cda826
42a949a
9653834
25ff852
9b5fe6b
810edff
beeb6c9
e4db067
abbe08d
bcdf9fb
5f7f9b0
60e8135
c4f5480
517208e
451452c
df94762
ef914b3
5568cf8
292deb2
644ca83
45db18d
20a777e
e4f4c07
741bab9
231b1df
60e7694
ab49f24
990fc13
8e0f8ed
f2a3422
dd7b3ed
3532b14
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
package ru.vk.itmo.test.klimplyasov; | ||
|
||
import one.nio.http.HttpSession; | ||
import ru.vk.itmo.dao.Entry; | ||
|
||
import java.io.IOException; | ||
import java.lang.foreign.MemorySegment; | ||
import java.lang.foreign.ValueLayout; | ||
import java.nio.charset.StandardCharsets; | ||
|
||
public final class ChunkGenerator { | ||
private static final byte[] HTTP_RESPONSE_LINE = "HTTP/1.1 200 OK".getBytes(StandardCharsets.UTF_8); | ||
private static final byte[] CRLF = "\r\n".getBytes(StandardCharsets.UTF_8); | ||
private static final byte[] CONTENT_TYPE_HEADER = "Content-Type: text/plain".getBytes(StandardCharsets.UTF_8); | ||
private static final byte[] TRANSFER_ENCODING_HEADER = | ||
"Transfer-Encoding: chunked".getBytes(StandardCharsets.UTF_8); | ||
private static final byte[] CONNECTION_HEADER = "Connection: keep-alive".getBytes(StandardCharsets.UTF_8); | ||
private static final byte[] LF = "\n".getBytes(StandardCharsets.UTF_8); | ||
|
||
private ChunkGenerator() { | ||
//:) | ||
} | ||
|
||
public static void writeResponseHeaders(HttpSession session) throws IOException { | ||
session.write(HTTP_RESPONSE_LINE, 0, HTTP_RESPONSE_LINE.length); | ||
session.write(CRLF, 0, CRLF.length); | ||
session.write(CONTENT_TYPE_HEADER, 0, CONTENT_TYPE_HEADER.length); | ||
session.write(CRLF, 0, CRLF.length); | ||
session.write(TRANSFER_ENCODING_HEADER, 0, TRANSFER_ENCODING_HEADER.length); | ||
session.write(CRLF, 0, CRLF.length); | ||
session.write(CONNECTION_HEADER, 0, CONNECTION_HEADER.length); | ||
session.write(CRLF, 0, CRLF.length); | ||
session.write(CRLF, 0, CRLF.length); | ||
} | ||
|
||
public static void writeDataChunk(HttpSession session, Entry<MemorySegment> entry) throws IOException { | ||
byte[] key = entry.key().toArray(ValueLayout.JAVA_BYTE); | ||
byte[] value = entry.value().toArray(ValueLayout.JAVA_BYTE); | ||
byte[] lengthBytes = | ||
Integer | ||
.toHexString(key.length + value.length + LF.length) | ||
.getBytes(StandardCharsets.UTF_8); | ||
session.write(lengthBytes, 0, lengthBytes.length); | ||
session.write(CRLF, 0, CRLF.length); | ||
session.write(key, 0, key.length); | ||
session.write(LF, 0, LF.length); | ||
session.write(value, 0, value.length); | ||
session.write(CRLF, 0, CRLF.length); | ||
} | ||
|
||
public static void writeEmptyChunk(HttpSession session) throws IOException { | ||
byte[] dataSizeBytes = Integer.toHexString(0).getBytes(StandardCharsets.UTF_8); | ||
session.write(dataSizeBytes, 0, dataSizeBytes.length); | ||
session.write(CRLF, 0, CRLF.length); | ||
session.write(CRLF, 0, CRLF.length); | ||
} | ||
} |
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
package ru.vk.itmo.test.klimplyasov; | ||
|
||
public record HandleResult(int status, byte[] data, long timestamp) { | ||
|
||
public HandleResult(int status, byte[] data, long timestamp) { | ||
this.status = status; | ||
this.data = data; | ||
this.timestamp = timestamp; | ||
} | ||
|
||
public HandleResult(int status, byte[] data) { | ||
this(status, data, 0); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
package ru.vk.itmo.test.klimplyasov; | ||
|
||
import one.nio.http.HttpSession; | ||
import one.nio.http.Response; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.io.IOException; | ||
import java.net.HttpURLConnection; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
|
||
public class MergeHandleResult { | ||
private static final Logger log = LoggerFactory.getLogger(MergeHandleResult.class); | ||
private final HandleResult[] handleResults; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. здесь могут быть проблемы в многопоточной среде, чтение/запись в массив идет из разных потоков. В таком случае просто final ссылка не поможет. Нужны какие-то примитивы синхронизации. Подойдет как какая-нибудь потокобезопасная коллекция, так и AtomicReferenceArray |
||
private final AtomicInteger totalCount; | ||
private final AtomicInteger validCount; | ||
private final int ackThreshold; | ||
private final int totalExpected; | ||
private final HttpSession currentSession; | ||
|
||
public MergeHandleResult(HttpSession session, int size, int ackThreshold) { | ||
this.currentSession = session; | ||
this.handleResults = new HandleResult[size]; | ||
this.totalCount = new AtomicInteger(); | ||
this.validCount = new AtomicInteger(); | ||
this.ackThreshold = ackThreshold; | ||
this.totalExpected = size; | ||
} | ||
|
||
public boolean add(int index, HandleResult handleResult) { | ||
handleResults[index] = handleResult; | ||
int valid = handleResult.status() == HttpURLConnection.HTTP_OK | ||
|| handleResult.status() == HttpURLConnection.HTTP_CREATED | ||
|| handleResult.status() == HttpURLConnection.HTTP_ACCEPTED | ||
|| handleResult.status() == HttpURLConnection.HTTP_NOT_FOUND | ||
? validCount.incrementAndGet() : validCount.get(); | ||
if (valid >= ackThreshold || totalCount.incrementAndGet() == totalExpected) { | ||
sendResult(); | ||
return true; | ||
} | ||
return false; | ||
} | ||
|
||
private void sendResult() { | ||
HandleResult mergedResult = new HandleResult(HttpURLConnection.HTTP_GATEWAY_TIMEOUT, null); | ||
int localValidCount = 0; | ||
for (HandleResult handleResult : handleResults) { | ||
if (handleResult.status() == HttpURLConnection.HTTP_OK | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. а тут NPE потенциальный, массив может заполняться не по порядку There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Понял, исправлю |
||
|| handleResult.status() == HttpURLConnection.HTTP_CREATED | ||
|| handleResult.status() == HttpURLConnection.HTTP_ACCEPTED | ||
|| handleResult.status() == HttpURLConnection.HTTP_NOT_FOUND) { | ||
localValidCount++; | ||
if (mergedResult.timestamp() <= handleResult.timestamp()) { | ||
mergedResult = handleResult; | ||
} | ||
} | ||
} | ||
try { | ||
if (localValidCount < ackThreshold) { | ||
currentSession.sendResponse(new Response(Response.GATEWAY_TIMEOUT, Response.EMPTY)); | ||
} else { | ||
currentSession.sendResponse(new Response(String.valueOf(mergedResult.status()), mergedResult.data())); | ||
} | ||
} catch (Exception e) { | ||
log.error("Exception during handleRequest", e); | ||
try { | ||
currentSession.sendResponse(new Response(Response.INTERNAL_ERROR, Response.EMPTY)); | ||
} catch (IOException ex) { | ||
log.error("Exception while sending close connection", e); | ||
currentSession.scheduleClose(); | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ага, тут сразу в перемешку с 6ым стейджом, хорошо бы конечно разделить эти два PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Извините, случайно закоммитил в старый ПР