Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Супрядкина Дарья ИТМО DWS stage 6 #218

Merged
merged 79 commits into from
May 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
79 commits
Select commit Hold shift + click to select a range
2f2080a
HW1: add realization
SuDarina Feb 18, 2024
a18dc95
HW1: fix code style
SuDarina Feb 19, 2024
528366b
HW1: change exception constructor
SuDarina Feb 19, 2024
990d153
HW1: add report
SuDarina Feb 21, 2024
3587983
HW1: add flame graphs description
SuDarina Feb 21, 2024
75db05e
HW1: add flame graphs description
SuDarina Feb 21, 2024
3c9655b
HW1: fix code style
SuDarina Feb 21, 2024
ad686bc
HW1: fixes according to review
SuDarina Feb 27, 2024
1d5c62c
HW2: first version of HW2
SuDarina Feb 28, 2024
825f223
HW2: fix codestyle
SuDarina Feb 29, 2024
8fe320f
Merge branch 'main' into hw2/async-server
incubos Feb 29, 2024
c5a6710
Merge remote-tracking branch 'upstream/main' into hw2/async-server
SuDarina Mar 4, 2024
e6673e6
HW2: fixes according to review
SuDarina Mar 5, 2024
474cc38
HW2: change string template to typical string
SuDarina Mar 5, 2024
c601bb6
HW2: change parameters
SuDarina Mar 6, 2024
fe0ffa5
HW2: fix
SuDarina Mar 6, 2024
f1da741
HW2: just finish
SuDarina Mar 7, 2024
55458dc
Merge branch 'main' into hw2/async-server
incubos Mar 10, 2024
7b834f5
HW2: fixes according to review
SuDarina Mar 12, 2024
022c4d6
HW2: explaining absence of flush
SuDarina Mar 12, 2024
7de6005
HW3: add realization
SuDarina Mar 14, 2024
889bdd7
HW3: fix style
SuDarina Mar 14, 2024
c00be65
HW3: add final
SuDarina Mar 14, 2024
69c9b6d
HW3: fix finals
SuDarina Mar 14, 2024
c852e2f
HW3: change to async thread
SuDarina Mar 14, 2024
6e9031e
HW3: add shutdown threads
SuDarina Mar 14, 2024
7529b7b
HW3: fix style
SuDarina Mar 14, 2024
91ba923
Merge remote-tracking branch 'upstream/main' into hw3/sharding
SuDarina Mar 19, 2024
a9c58df
HW3: add first part of report
SuDarina Mar 20, 2024
141a869
HW3: add full report
SuDarina Mar 20, 2024
0f118ed
HW3: fix style
SuDarina Mar 20, 2024
afd4a3f
HW3: add explanations about hashCode()
SuDarina Mar 20, 2024
e9259b8
Merge remote-tracking branch 'upstream/main' into hw4/replication
SuDarina Mar 26, 2024
b4fae77
HW4: add implementation
SuDarina Mar 28, 2024
9ffd458
HW4: fix style
SuDarina Mar 28, 2024
6d47409
HW4: add report
SuDarina Apr 3, 2024
4f529d4
Merge remote-tracking branch 'upstream/main' into hw4/replication
SuDarina Apr 3, 2024
ab42efe
HW4: fix code style
SuDarina Apr 3, 2024
6625f89
HW4: add note
SuDarina Apr 3, 2024
a13db87
Merge branch 'main' into hw4/replication
Apr 6, 2024
0ed611c
HW5: add realization
SuDarina Apr 11, 2024
6da7828
HW5: add completed future
SuDarina Apr 11, 2024
80a406b
HW5: fix tolerance
SuDarina Apr 11, 2024
9196595
Merge remote-tracking branch 'upstream/main' into hw5/async-interaction
SuDarina Apr 11, 2024
dc30ab3
HW5: fix codestyle
SuDarina Apr 11, 2024
6fe0449
HW5: fix codestyle
SuDarina Apr 11, 2024
629090b
HW5: close sessions
SuDarina Apr 11, 2024
33f5c3a
HW5: rewrite to method sendResponseAndCloseSession
SuDarina Apr 11, 2024
7523858
HW5: remove redundant parameter
SuDarina Apr 11, 2024
d0bd0e9
HW5: set close sessions
SuDarina Apr 11, 2024
2ee3bd4
HW5: change
SuDarina Apr 11, 2024
1db3665
HW5: change
SuDarina Apr 11, 2024
dca0f2d
Merge branch 'main' into hw5/async-interaction
incubos Apr 13, 2024
9d65379
HW5: fix multithreading problem for better profiling
SuDarina Apr 14, 2024
1d65f01
HW5: fix codestyle
SuDarina Apr 14, 2024
6be3d39
HW5: add report
SuDarina Apr 17, 2024
b14ee15
Merge remote-tracking branch 'upstream/main' into hw6/range-requests
SuDarina Apr 24, 2024
9910f2c
HW6: add range requests
SuDarina Apr 24, 2024
0a1e84c
HW6: fix checkstyle
SuDarina Apr 25, 2024
c4b5a78
HW6: move methods
SuDarina Apr 25, 2024
dedc72c
Merge branch 'main' into hw6/range-requests
incubos Apr 28, 2024
3a569d0
Merge branch 'main' into hw5/async-interaction
incubos Apr 28, 2024
f991781
HW6: add profiling results
SuDarina Apr 30, 2024
c680ff6
HW6: add first part of report
SuDarina May 1, 2024
71b103d
Merge branch 'main' into hw5/async-interaction
incubos May 1, 2024
2fb2d03
HW6: add report
SuDarina May 1, 2024
f7648f6
HW6: remarks
SuDarina May 1, 2024
c59f446
Merge branch 'main' into hw6/range-requests
incubos May 1, 2024
595df0a
HW5: fixes according to review
SuDarina May 5, 2024
2049803
HW5: fix header
SuDarina May 5, 2024
3788b33
Merge branch 'main' into hw5/async-interaction
incubos May 10, 2024
818a350
HW5: fixes according to review
SuDarina May 10, 2024
e9e93eb
Merge remote-tracking branch 'origin/hw5/async-interaction' into hw5/…
SuDarina May 10, 2024
b05df36
HW5: complete todo
SuDarina May 10, 2024
8785bb5
Merge remote-tracking branch 'origin/hw5/async-interaction' into hw6/…
SuDarina May 14, 2024
3f50aad
HW6: make not greedy realization
SuDarina May 15, 2024
4559a65
Merge remote-tracking branch 'upstream/main' into hw6/range-requests
SuDarina May 15, 2024
f85f7b3
HW6: reduce unnecessary allocations
SuDarina May 15, 2024
76d12f2
Merge branch 'main' into hw6/range-requests
incubos May 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package ru.vk.itmo.test.dariasupriadkina;

import one.nio.http.HttpServer;
import one.nio.http.HttpSession;
import one.nio.net.Socket;
import one.nio.util.ByteArrayBuilder;
import ru.vk.itmo.test.dariasupriadkina.dao.ExtendedEntry;

import java.io.IOException;
import java.lang.foreign.MemorySegment;
import java.util.Iterator;

public class CustomHttpSession extends HttpSession {

private Iterator<ExtendedEntry<MemorySegment>> iterator;
private ByteArrayBuilder builder;

public CustomHttpSession(Socket socket, HttpServer server) {
super(socket, server);
}

@Override
protected void processWrite() throws Exception {
super.processWrite();

sendNextChunkScope(builder == null ? new ByteArrayBuilder() : builder);
}

public void streaming(Iterator<ExtendedEntry<MemorySegment>> iterator) throws IOException {
this.iterator = iterator;
this.builder = new ByteArrayBuilder();

write(EntryChunkUtils.HEADER_BYTES, 0, EntryChunkUtils.HEADER_BYTES.length);
sendNextChunkScope(builder);
}

private void sendNextChunkScope(ByteArrayBuilder builder) throws IOException {
while (iterator.hasNext() && queueHead == null) {
ExtendedEntry<MemorySegment> ee = iterator.next();
EntryChunkUtils.getEntryByteChunk(ee, builder);
write(builder.buffer(), 0, builder.length());
builder.setLength(0);
}
if (!iterator.hasNext()) {
write(EntryChunkUtils.LAST_BYTES, 0, EntryChunkUtils.LAST_BYTES.length);
}
scheduleClose();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Мы могли не всё отправить, но всегда инициируем закрытие сессии, верно?

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package ru.vk.itmo.test.dariasupriadkina;

import one.nio.util.ByteArrayBuilder;
import ru.vk.itmo.dao.Entry;

import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.nio.charset.StandardCharsets;

public final class EntryChunkUtils {

private EntryChunkUtils() {
}

static final byte[] HEADER_BYTES =
"""
HTTP/1.1 200 OK\r
Content-Type: text/plain\r
Transfer-Encoding: chunked\r
Connection: keep-alive\r
\r
""".getBytes(StandardCharsets.UTF_8);
static final byte[] LAST_BYTES = "0\r\n\r\n".getBytes(StandardCharsets.UTF_8);
private static final byte[] DELIMITER_BYTES = "\n".getBytes(StandardCharsets.UTF_8);
private static final byte[] CLRF_BYTES = "\r\n".getBytes(StandardCharsets.UTF_8);

public static void getEntryByteChunk(Entry<MemorySegment> ee, ByteArrayBuilder bb) {
byte[] key = getEntryKeyChunk(ee);
byte[] value = getEntryValueChunk(ee);
byte[] kvLength = getKVLengthChunk(ee);

bb.append(kvLength, 0, kvLength.length);
bb.append(CLRF_BYTES, 0, CLRF_BYTES.length);
bb.append(key, 0, key.length);
bb.append(DELIMITER_BYTES, 0, DELIMITER_BYTES.length);
bb.append(value, 0, value.length);
bb.append(CLRF_BYTES, 0, CLRF_BYTES.length);
}

public static byte[] getEntryKeyChunk(Entry<MemorySegment> entry) {
return entry.key().toArray(ValueLayout.JAVA_BYTE);
}

public static byte[] getEntryValueChunk(Entry<MemorySegment> entry) {
return entry.value().toArray(ValueLayout.JAVA_BYTE);
}

public static byte[] getKVLengthChunk(Entry<MemorySegment> entry) {
return Long.toHexString((entry.value().byteSize()
+ entry.key().byteSize() + DELIMITER_BYTES.length)).getBytes(StandardCharsets.UTF_8);
}

}
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package ru.vk.itmo.test.dariasupriadkina;

import one.nio.http.HttpSession;
import one.nio.http.Request;
import one.nio.http.Response;
import ru.vk.itmo.dao.BaseEntry;
import ru.vk.itmo.dao.Dao;
import ru.vk.itmo.test.dariasupriadkina.dao.ExtendedBaseEntry;
import ru.vk.itmo.test.dariasupriadkina.dao.ExtendedEntry;

import java.io.IOException;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;

public class SelfRequestHandler {
Expand Down Expand Up @@ -95,4 +99,26 @@ public Response delete(String id) {
return new Response(Response.INTERNAL_ERROR, Response.EMPTY);
}
}

public void handleRange(Request request, HttpSession session) throws IOException {
String start = request.getParameter("start=");
String end = request.getParameter("end=");

if (start == null
|| request.getMethod() != Request.METHOD_GET
|| start.isEmpty()
|| (end != null && end.isEmpty())) {
session.sendResponse(new Response(Response.BAD_REQUEST, Response.EMPTY));
return;
}

Iterator<ExtendedEntry<MemorySegment>> it = dao.get(
utils.convertByteArrToMemorySegment(start.getBytes(StandardCharsets.UTF_8)),
end == null ? null :
utils.convertByteArrToMemorySegment(end.getBytes(StandardCharsets.UTF_8))
);

((CustomHttpSession) session).streaming(it);
}

}
21 changes: 17 additions & 4 deletions src/main/java/ru/vk/itmo/test/dariasupriadkina/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import one.nio.http.HttpSession;
import one.nio.http.Request;
import one.nio.http.Response;
import one.nio.net.Socket;
import one.nio.server.AcceptorConfig;
import one.nio.server.RejectedSessionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.vk.itmo.ServiceConfig;
Expand All @@ -22,6 +24,7 @@
import java.net.http.HttpResponse;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -65,14 +68,11 @@ public Server(ServiceConfig config, Dao<MemorySegment, ExtendedEntry<MemorySegme

private static HttpServerConfig createHttpServerConfig(ServiceConfig serviceConfig) {
HttpServerConfig httpServerConfig = new HttpServerConfig();

AcceptorConfig acceptorConfig = new AcceptorConfig();
acceptorConfig.port = serviceConfig.selfPort();
acceptorConfig.reusePort = true;

httpServerConfig.acceptors = new AcceptorConfig[]{acceptorConfig};
httpServerConfig.closeSessions = true;

return httpServerConfig;
}

Expand All @@ -92,6 +92,10 @@ public void handleRequest(Request request, HttpSession session) throws IOExcepti
try {
workerExecutor.execute(() -> {
try {
if (request.getURI().startsWith(Utils.LOCAL_STREAM_ENTRY_PREFIX)) {
selfHandler.handleRange(request, session);
return;
}
Map<String, Integer> ackFrom = getFromAndAck(request);
int from = ackFrom.get("from");
int ack = ackFrom.get("ack");
Expand Down Expand Up @@ -122,6 +126,11 @@ public void handleRequest(Request request, HttpSession session) throws IOExcepti
}
}

@Override
public HttpSession createSession(Socket socket) throws RejectedSessionException {
return new CustomHttpSession(socket, this);
}

private void solveUnexpectedError(Exception e, HttpSession session) {
logger.error("Unexpected error", e);
try {
Expand Down Expand Up @@ -188,7 +197,6 @@ private void collectResponsesCallback(List<CompletableFuture<Response>> futureRe

}, workerExecutor).exceptionally(exception -> {
logger.error("Error happened while collecting responses from nodes", exception);
sendAsyncResponse(new Response(Response.INTERNAL_ERROR, Response.EMPTY), session);
return null;
});
}
Expand Down Expand Up @@ -223,6 +231,11 @@ public CompletableFuture<Response> handleProxy(String redirectedUrl, Request req
Response response1 = new Response(String.valueOf(httpResponse.statusCode()), httpResponse.body());
if (httpResponse.headers().map().get(TIMESTAMP_MILLIS_HEADER_NORMAL) == null) {
response1.addHeader(TIMESTAMP_MILLIS_HEADER + "0");
} else {
response1.addHeader(TIMESTAMP_MILLIS_HEADER
+ httpResponse.headers().map().get(
TIMESTAMP_MILLIS_HEADER_NORMAL.toLowerCase(Locale.ROOT)).getFirst()
);
}
return response1;
}, workerExecutor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import java.nio.file.Path;

@ServiceFactory(stage = 5)
@ServiceFactory(stage = 6)
public class ServiceImlFactory implements ServiceFactory.Factory {

public static final long FLUSH_THRESHOLD_BYTES = 1024 * 1024;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
public final class TestServer {

private static final String LOCALHOST_PREFIX = "http://localhost:";
private static final int NODE_AMOUNT = 3;
private static final int NODE_AMOUNT = 1;

private TestServer() {
}
Expand Down
1 change: 1 addition & 0 deletions src/main/java/ru/vk/itmo/test/dariasupriadkina/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

public class Utils {
public static final String ENTRY_PREFIX = "/v0/entity";
public static final String LOCAL_STREAM_ENTRY_PREFIX = "/v0/entities";
public static final String ENTRY_PREFIX_WITH_ID_PARAM = ENTRY_PREFIX + "?id=";
private final Dao<MemorySegment, ExtendedEntry<MemorySegment>> dao;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,14 @@ public ExtendedEntry<MemorySegment> next() {
// Read value
if (valueLength == SSTables.TOMBSTONE_VALUE_LENGTH) {
timestamp = getLength(offset);
offset += Long.BYTES;
// Tombstone encountered
return new ExtendedBaseEntry<>(key, null, timestamp);
} else {
final MemorySegment value = data.asSlice(offset, valueLength);
offset += valueLength;
timestamp = getLength(offset);
offset += Long.BYTES;
return new ExtendedBaseEntry<>(key, value, timestamp);
}
}
Expand Down
101 changes: 101 additions & 0 deletions src/main/java/ru/vk/itmo/test/dariasupriadkina/report/hw6/REPORT.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# Range-запросы

На начало проведения экспериментов база была заполнена на 777Мб, количество записей - ≈8_990_000

Снимем профили аллокации, блокировок и cpu, нагрузив систему командой curl,
с помощью которой запросим все записи базы данных, начиная с 1ой:

```
curl -v http://localhost:8080/v0/entities\?start=1
```

## Результаты профилирования

[range-alloc.html](data%2Frange-alloc.html)

[range-cpu.html](data%2Frange-cpu.html)

[range-lock.html](data%2Frange-lock.html)


На профиле блокировок видно, что практически 100% локов уходит на `Session.write`

Если посмотреть на профиль cpu, то разделение сеймплов примерно такое:

- `Session.write` - 48.7%
- `getEntryByteChunk` (метод, отвечающий за формирование байтового чанка с данными) - 14.7%, что выглядит весьма солидными
затратами для обычного преобразования `MemorySegment`'ов в массив байт
- `MergingEntryIterator` (работа dao) - 28.7%
- Оставшаяся часть - работа `SelectorThread`

На профиле аллокаций дела обстоят несколько иначе:

- `MergingEntryIterator.next` (работа dao) - 10.6%
- `Session.write` - всего около 9% несмотря на то, что занимает больше всего процессорного времени
- `getEntryByteChunk` - ≈45%. В рамках этого метода у нас по отдельности преобразуется в байтовый массив каждая из частей
- `ByteArrayBuilder.<init>` - ≈25% - инициализация ByteArrayBuilder

`getEntryByteChunk` забирает на себя практически половину всех аллокаций, что выглядит узким местом, которое нуждается
в оптимизации

`ByteArrayBuilder.<init>` также занимает весьма солидный процент аллокаций - целую четверть. Пичина возникновения этих алллокаций весьма очевидна,
в коде мы аллоцируем новый ByteArrayBuilder каждый раз, когда формируем новый чанк key-value с данными

```
while (it.hasNext()) {
ByteArrayBuilder bb = new ByteArrayBuilder();
ExtendedEntry<MemorySegment> ee = it.next();
EntryChunkUtils.getEntryByteChunk(ee, bb);
session.write(bb.toBytes(), 0, bb.length());
}
```

Это решение было принято, чтобы отправлять через `session.write` весь чанк с данными, а не каждую его часть
(length, crlf, <key>, \n, value) по отдельность. В качестве альтернативы можно как раз отправлять каждую часть через session.write,
тогда аллокацию ByteArrayBuilder можно избежать, но, так как на профиле локов, фигурирует в основном только метод Session.write,
вероятно, лишнее использование Session.write может привести к затратам на синхронизацию, отчего в конце концов пострадает latency
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

У нас в сессию пишет один поток, поэтому никакого contention нет и затраты на синхронизацию не изменятся.

и сама производительность системы.

Тем не менее имеет смысл судя по полученным результатам профилирования, имеет смысл попытаться оптимизировать процесс формирования
массивов байт.

Заметно, что наибольшие затраты в рамках `getEntryByteChunk` происходят в методе расчета длины чанка - `getKVLengthChank`
(27.7%).
В первоначальной реализации дл того, чтобы получить длину ключа и значения, `MemorySegment` приводился сначала к массиву
байт, а затем бралась длина. С целью попытки оптимизации этого момента, вся цепочка преобразований была заменена на метод
получения длины `MemorySegment`'а в байтах - `byteSize()`, также расчет длины символа `\n` была заменена с `"\n".length()`
на `DELIMITER_BYTES.length`, где DELIMITER_BYTES - константа. Таким образом мы избегаем инициализации новой строчки.

Даже такие небольшие изменения позволили уменьшить процент аллокаций в методе `getEntryByteChunk` с 45% до 33%, а в самом
методе `getKVLengthChank` процент уменьшился с 27.7% до 11.17%.

Новые результаты профилирования аллокаций:

[range-alloc-2.html](data%2Frange-alloc-2.html)

Как видно на профиле, при снижении процента аллокаций на работу с преобразованием `MemorySegment`'ов в массивы байт
гораздо заметнее стали ресурсы, затрачиваемые на инициализацию `ByteArrayBuilder`, который инициализируется при формировании каждого
чанка. В идеале подобных инициализаций в цикле нам явно хотелось бы избежать. Несильно уверена в следующем предположении,
но, обратив внимание на принципы работы
`ByteArrayBuilder`, предполагая, что вместо инициализации `ByteArrayBuilder` можно использовать метод `.setLength(0)`.

В таком случае вышеупомянутый кусок кода можно заменить на:

```
while (it.hasNext()) {
ExtendedEntry<MemorySegment> ee = it.next();
EntryChunkUtils.getEntryByteChunk(ee, bb);
session.write(bb.toBytes(), 0, bb.length());
bb.setLength(0);
}
```

P.S. По крайней мере после таких изменений тесты по-прежнему проходят и curl работает точно так же, как и до этого изменения.

Результаты профилирования последней версии:

[range-alloc-3.html](data%2Frange-alloc-3.html)

[range-cpu-3.html](data%2Frange-cpu-3.html)

Если опять же обратиться к профилю аллокаций, то можно увидеть, что `ByteArrayBuilder.<init>` пропал совсем.
Loading
Loading