Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Солнышко Ксения, ИТМО ФИТиП, Cluster range requests #232

Open
wants to merge 48 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 47 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
5f3d344
Add dao
persehoney Feb 18, 2024
3bc8c9c
Add server
persehoney Feb 18, 2024
559d352
Fix failed tests
persehoney Feb 18, 2024
4369129
Fix failed codeclimate
persehoney Feb 18, 2024
4221fd3
Fix failed codeclimate
persehoney Feb 18, 2024
7e5f1ae
Remove string template
persehoney Feb 18, 2024
d22f04b
Resolve some problems
persehoney Feb 21, 2024
37ccd74
Add scripts, report and attachments
persehoney Feb 21, 2024
e134dd1
Merge branch 'polis-vk:main' into stage1
persehoney Feb 22, 2024
18a1acb
Add async server
persehoney Feb 25, 2024
be75498
Merge branch 'polis-vk:main' into stage2
persehoney Feb 25, 2024
8ae3281
Add scripts, report and attachments
persehoney Feb 25, 2024
fba972e
Merge remote-tracking branch 'origin/stage2' into stage2
persehoney Feb 25, 2024
ece08a4
Move attachments from stage1
persehoney Feb 25, 2024
ceea88f
Merge branch 'polis-vk:main' into stage2
persehoney Feb 25, 2024
b83677a
Fix codestyle
persehoney Feb 25, 2024
66fc5c9
Fix codestyle
persehoney Feb 25, 2024
a6fe53f
Add new testing and attachments
persehoney Feb 26, 2024
6518ffd
Complete report and add attachments
persehoney Feb 29, 2024
76e1177
Update stage
persehoney Feb 29, 2024
5113afa
Merge branch 'main' into stage2
persehoney Mar 3, 2024
ed22aca
Fix server after incorrect conflict resolve
persehoney Mar 3, 2024
5dca825
Fix codestyle
persehoney Mar 3, 2024
8442129
Merge branch 'polis-vk:main' into stage2
persehoney Mar 7, 2024
0aebd8d
Merge branch 'main' into stage2
daniil-ushkov Mar 17, 2024
3069620
Merge branch 'polis-vk:main' into stage2
persehoney Mar 27, 2024
dcc8b1a
Add sharding
persehoney Mar 27, 2024
0a9ae7e
Add timestamp to DAO
persehoney Apr 2, 2024
bca399f
Merge branch 'polis-vk:main' into stage4
persehoney Apr 2, 2024
c2d638b
Add replication
persehoney Apr 8, 2024
d170acd
Merge branch 'polis-vk:main' into stage4
persehoney Apr 8, 2024
aa139c3
Fix replication
persehoney Apr 8, 2024
5b258a5
Add async server
persehoney Apr 11, 2024
9276a7e
Fix some codestyle mistakes
persehoney Apr 11, 2024
5fda6c4
Fix codestyle mistakes
persehoney Apr 11, 2024
c214a61
Merge branch 'polis-vk:main' into stage5
persehoney Apr 17, 2024
53bf1e3
Add attachments to report
persehoney Apr 17, 2024
98ba8d0
Add report and attachments
persehoney Apr 17, 2024
f5a376c
Merge branch 'polis-vk:main' into stage5
persehoney Apr 20, 2024
a0faade
Change report attachments structure
persehoney Apr 24, 2024
71b2b40
Add range-requests support
persehoney Apr 24, 2024
5256a03
codeclimate <3
persehoney Apr 24, 2024
630c8b9
Merge branch 'polis-vk:main' into stage6
persehoney Apr 25, 2024
e962cb5
Merge branch 'main' into stage6
incubos Apr 28, 2024
3548972
Add cluster range requests
persehoney Jun 26, 2024
4cf9bf9
Codeclimate
persehoney Jun 26, 2024
99e1b7a
Codeclimate
persehoney Jun 26, 2024
0e1938e
Merge branch 'main' into cluster-range-requests
incubos Jun 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package ru.vk.itmo.test.solnyshkoksenia;

import one.nio.http.HttpServer;
import one.nio.http.HttpSession;
import one.nio.http.Response;
import one.nio.net.Socket;
import ru.vk.itmo.dao.Entry;

import java.io.IOException;
import java.lang.foreign.MemorySegment;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;

public class CustomHttpSession extends HttpSession {
private static final byte[] CRLF = "\r\n".getBytes(StandardCharsets.UTF_8);
private static final byte[] LF = "\n".getBytes(StandardCharsets.UTF_8);
private static final byte[] EMPTY_CHUNK = "0\r\n\r\n".getBytes(StandardCharsets.UTF_8);
Iterator<Entry<MemorySegment>> iterator;

public CustomHttpSession(Socket socket, HttpServer server) {
super(socket, server);
}

@Override
protected void processWrite() throws Exception {
super.processWrite();
nextChunk();
}

public void stream(Iterator<Entry<MemorySegment>> iterator) throws IOException {
this.iterator = iterator;
Response response = new Response(Response.OK);
response.addHeader("Transfer-Encoding: chunked");
writeResponse(response, false);

nextChunk();
}

private void nextChunk() throws IOException {
while (iterator.hasNext() && queueHead == null) {
Entry<MemorySegment> next = iterator.next();
ByteBuffer key = next.key().asByteBuffer();
ByteBuffer value = next.value().asByteBuffer();
int payloadSize = key.remaining() + value.remaining() + LF.length;
String payloadSizeStr = Integer.toHexString(payloadSize);
byte[] payloadSizeStrBytes = payloadSizeStr.getBytes(StandardCharsets.UTF_8);
write(payloadSizeStrBytes, 0, payloadSizeStrBytes.length);
write(CRLF, 0, CRLF.length);
write(new ReferenceQueueItem(key));
write(LF, 0, LF.length);
write(new ReferenceQueueItem(value));
write(CRLF, 0, CRLF.length);
}

if (!iterator.hasNext()) {
write(EMPTY_CHUNK, 0, EMPTY_CHUNK.length);

if ((this.handling = pipeline.pollFirst()) != null) {
if (handling == FIN) {
scheduleClose();
} else {
server.handleRequest(handling, this);
}
}
}
}

public void sendError(Throwable e) {
log.error("Exception during handleRequest", e);
try {
sendResponse(new Response(Response.INTERNAL_ERROR, Response.EMPTY));
} catch (IOException ex) {
log.error("Exception while sending close connection", e);
scheduleClose();
}
}

static class ReferenceQueueItem extends QueueItem {
private final ByteBuffer buffer;

ReferenceQueueItem(ByteBuffer buffer) {
this.buffer = buffer;
}

@Override
public int remaining() {
return buffer.remaining();
}

@Override
public int write(Socket socket) throws IOException {
return socket.write(buffer);
}
}
}
126 changes: 126 additions & 0 deletions src/main/java/ru/vk/itmo/test/solnyshkoksenia/CustomSubscriber.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package ru.vk.itmo.test.solnyshkoksenia;

import java.net.http.HttpResponse.BodySubscriber;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Flow;

public class CustomSubscriber implements BodySubscriber<byte[]> {
private static final byte DELIMITER = '\n';
volatile CompletableFuture<byte[]> bodyCF;
Flow.Subscription subscription;
List<ByteBuffer> responseData = new CopyOnWriteArrayList<>();

@Override
public CompletionStage<byte[]> getBody() {
while (bodyCF == null) {
Thread.onSpinWait();
}
return bodyCF;
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}

@Override
public void onNext(List<ByteBuffer> buffers) {
buffers.forEach(ByteBuffer::rewind);
responseData.addAll(buffers);
subscription.request(1);
}

@Override
public void onError(Throwable throwable) {
bodyCF = CompletableFuture.failedFuture(throwable);
}

@Override
public void onComplete() {
bodyCF = CompletableFuture.completedFuture(toBytes(responseData));
}

private List<byte[]> toArrays(List<ByteBuffer> buffers) {
List<byte[]> chunks = new ArrayList<>();
for (ByteBuffer buffer : buffers) {
int remaining = buffer.remaining();
byte[] cur = new byte[remaining];
buffer.get(cur, 0, remaining);
chunks.add(cur);
}
return chunks;
}

private boolean containsDelim(byte[] bytes) {
for (int element : bytes) {
if (element == DELIMITER) {
return true;
}
}
return false;
}

private boolean startsWithDelim(byte[] bytes) {
return DELIMITER == bytes[0];
}

private byte[] merge(byte[] src1, byte[] src2) {
byte[] dst = new byte[src1.length + src2.length];
System.arraycopy(src1, 0, dst, 0, src1.length);
System.arraycopy(src2, 0, dst, src1.length, src2.length);
return dst;
}

private byte[] toArray(List<byte[]> bytes) {
var size = bytes.stream().mapToInt(b -> b.length).sum() + bytes.size();
byte[] dst = new byte[size];
int offset = 0;
for (byte[] src : bytes) {
System.arraycopy(src, 0, dst, offset, src.length);
offset += src.length;
dst[offset] = DELIMITER;
offset++;
}
return dst;
}

private byte[] toBytes(List<ByteBuffer> buffers) {
List<byte[]> chunks = toArrays(buffers);
List<byte[]> bytes = new ArrayList<>();

boolean predEndsWithDelim = false;
boolean predContainsDelim = true;

byte[] pred = null;
for (int i = 0; i < chunks.size(); i++) {
byte[] cur = chunks.get(i);
byte[] next = i + 1 == chunks.size() ? new byte[0] : chunks.get(i + 1);
if (startsWithDelim(cur) && !predContainsDelim) {
cur = merge(pred, cur);
} else if (!containsDelim(cur) && predEndsWithDelim) {
if (startsWithDelim(next)) {
bytes.add(pred);
} else {
cur = merge(pred, cur);
}
} else if (pred != null) {
bytes.add(pred);
}

predEndsWithDelim = DELIMITER == cur[cur.length - 1];
predContainsDelim = containsDelim(cur);
pred = cur;
}

if (pred != null) {
bytes.add(pred);
}
return toArray(bytes);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package ru.vk.itmo.test.solnyshkoksenia;

import one.nio.http.Response;
import ru.vk.itmo.dao.BaseEntry;
import ru.vk.itmo.dao.Entry;
import ru.vk.itmo.test.solnyshkoksenia.dao.MemorySegmentComparator;
import ru.vk.itmo.test.solnyshkoksenia.dao.MergeIterator;

import java.lang.foreign.MemorySegment;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;

public final class MergeRangeResult {
private static final Comparator<MemorySegment> comparator = new MemorySegmentComparator();

private MergeRangeResult() {
}

public static Iterator<Entry<MemorySegment>> range(Iterator<Entry<MemorySegment>> firstIterator,
List<Response> responses) {
List<Iterator<Entry<MemorySegment>>> iterators = new ArrayList<>(responses.size() + 1);
for (var response : responses) {
iterators.add(iterator(response));
}
iterators.add(firstIterator);

return new MergeIterator<>(iterators, (e1, e2) -> comparator.compare(e1.key(), e2.key())) {
@Override
protected boolean skip(Entry<MemorySegment> memorySegmentEntry) {
return memorySegmentEntry.value() == null;
}
};
}

private static Iterator<Entry<MemorySegment>> iterator(Response response) {
byte[] body = response.getBody();
char separator = '\n';
return new Iterator<>() {
int offset;

@Override
public boolean hasNext() {
return offset < body.length;
}

@Override
public Entry<MemorySegment> next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
MemorySegment key = getMS();
MemorySegment value = getMS();
return new BaseEntry<>(key, value);
}

private MemorySegment getMS() {
int startOffset = offset;

byte b = body[offset];
while (b != separator) {
b = body[++offset];
}

byte[] tmp = new byte[offset - startOffset];
System.arraycopy(body, startOffset, tmp, 0, offset - startOffset);

offset++;
return MemorySegment.ofArray(tmp);
}
};
}
}
Loading
Loading