Skip to content

Commit 50f2acc

Browse files
axothyincubos
andauthored
HW1 Чеботин Александр (ITMO DWS) (polis-vk#2)
* stage1 and results * fix codestyle * fix codestyle * fix codestyle * Fix bug in my DAO * fix codestyle * fix codestyle * fix codestyle * fix codestyle * final codestyle fix? * final codestyle fix? --------- Co-authored-by: Vadim Tsesko <incubos@users.noreply.github.com>
1 parent 09cea29 commit 50f2acc

35 files changed

+5808
-0
lines changed

.codeclimate.yml

+2
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ plugins:
4646
enabled: false
4747
PrematureDeclaration:
4848
enabled: false
49+
MissingBreakInSwitch:
50+
enabled: false
4951
sonar-java:
5052
enabled: true
5153
config:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package ru.vk.itmo.test.chebotinalexandr;
2+
3+
import ru.vk.itmo.ServiceConfig;
4+
import ru.vk.itmo.dao.BaseEntry;
5+
import ru.vk.itmo.dao.Config;
6+
import ru.vk.itmo.dao.Dao;
7+
import ru.vk.itmo.dao.Entry;
8+
import ru.vk.itmo.test.chebotinalexandr.dao.NotOnlyInMemoryDao;
9+
10+
import java.io.IOException;
11+
import java.lang.foreign.MemorySegment;
12+
import java.nio.charset.StandardCharsets;
13+
import java.nio.file.Files;
14+
import java.util.ArrayList;
15+
import java.util.Collections;
16+
import java.util.List;
17+
18+
public final class Server {
19+
private static final int ENTRIES_IN_DB = 500_000;
20+
21+
private Server() {
22+
23+
}
24+
25+
public static void main(String[] args) throws IOException {
26+
ServiceConfig config = new ServiceConfig(
27+
8080,
28+
"http://localhost",
29+
Collections.singletonList("http://localhost"),
30+
Files.createTempDirectory(".")
31+
);
32+
33+
Dao<MemorySegment, Entry<MemorySegment>> dao =
34+
new NotOnlyInMemoryDao(new Config(config.workingDir(), 4_194_304L));
35+
36+
StorageServer server = new StorageServer(config, dao);
37+
server.start();
38+
39+
fillFlush(dao);
40+
fillManyFlushes(dao);
41+
}
42+
43+
private static List<Integer> getRandomArray() {
44+
ArrayList<Integer> entries = new ArrayList<>(ENTRIES_IN_DB);
45+
for (int i = 0; i < ENTRIES_IN_DB; i++) {
46+
entries.add(i);
47+
}
48+
49+
Collections.shuffle(entries);
50+
return entries;
51+
}
52+
53+
/**
54+
* Just fills memtable without flushing.
55+
*/
56+
private static void fillMemtable(Dao<MemorySegment, Entry<MemorySegment>> dao) {
57+
List<Integer> entries = getRandomArray();
58+
for (Integer entry : entries) {
59+
dao.upsert(entry(keyAt(entry), valueAt(entry)));
60+
}
61+
}
62+
63+
/**
64+
* Fills memtable with one flush.
65+
*/
66+
private static void fillFlush(Dao<MemorySegment, Entry<MemorySegment>> dao) throws IOException {
67+
fillMemtable(dao);
68+
dao.flush();
69+
}
70+
71+
/**
72+
* Fills dao with multiple sstables.
73+
*/
74+
private static void fillManyFlushes(Dao<MemorySegment, Entry<MemorySegment>> dao) throws IOException {
75+
final int sstables = 100; //how many sstables dao must create
76+
final int flushEntries = ENTRIES_IN_DB / sstables; //how many entries in one sstable
77+
List<Integer> entries = getRandomArray();
78+
79+
//many flushes
80+
for (Integer entry : entries) {
81+
dao.upsert(entry(keyAt(entry), valueAt(entry)));
82+
if (entry % flushEntries == 0) {
83+
dao.flush();
84+
}
85+
}
86+
}
87+
88+
private static MemorySegment keyAt(int index) {
89+
return MemorySegment.ofArray(("k" + index).getBytes(StandardCharsets.UTF_8));
90+
}
91+
92+
private static MemorySegment valueAt(int index) {
93+
return MemorySegment.ofArray(("v" + index).getBytes(StandardCharsets.UTF_8));
94+
}
95+
96+
private static Entry<MemorySegment> entry(MemorySegment key, MemorySegment value) {
97+
return new BaseEntry<>(key, value);
98+
}
99+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
package ru.vk.itmo.test.chebotinalexandr;
2+
3+
import one.nio.http.HttpServer;
4+
import one.nio.http.HttpServerConfig;
5+
import one.nio.http.HttpSession;
6+
import one.nio.http.Param;
7+
import one.nio.http.Path;
8+
import one.nio.http.Request;
9+
import one.nio.http.RequestMethod;
10+
import one.nio.http.Response;
11+
import one.nio.server.AcceptorConfig;
12+
import ru.vk.itmo.ServiceConfig;
13+
import ru.vk.itmo.dao.BaseEntry;
14+
import ru.vk.itmo.dao.Dao;
15+
import ru.vk.itmo.dao.Entry;
16+
17+
import java.io.IOException;
18+
import java.lang.foreign.MemorySegment;
19+
import java.lang.foreign.ValueLayout;
20+
import java.nio.charset.StandardCharsets;
21+
22+
import static one.nio.http.Request.METHOD_DELETE;
23+
import static one.nio.http.Request.METHOD_GET;
24+
import static one.nio.http.Request.METHOD_PUT;
25+
26+
public class StorageServer extends HttpServer {
27+
private static final String PATH = "/v0/entity";
28+
private final Dao<MemorySegment, Entry<MemorySegment>> dao;
29+
30+
public StorageServer(ServiceConfig config, Dao<MemorySegment, Entry<MemorySegment>> dao) throws IOException {
31+
super(createConfig(config));
32+
this.dao = dao;
33+
}
34+
35+
private static HttpServerConfig createConfig(ServiceConfig config) {
36+
HttpServerConfig httpServerConfig = new HttpServerConfig();
37+
AcceptorConfig acceptorConfig = new AcceptorConfig();
38+
acceptorConfig.reusePort = true;
39+
acceptorConfig.port = config.selfPort();
40+
41+
httpServerConfig.acceptors = new AcceptorConfig[] {acceptorConfig};
42+
httpServerConfig.closeSessions = true;
43+
return httpServerConfig;
44+
}
45+
46+
@Override
47+
public void handleDefault(Request request, HttpSession session) throws IOException {
48+
Response response = new Response(Response.BAD_REQUEST, Response.EMPTY);
49+
session.sendResponse(response);
50+
}
51+
52+
@Path("/hello")
53+
public Response hello() {
54+
return Response.ok("Hello, cruel world!".getBytes(StandardCharsets.UTF_8));
55+
}
56+
57+
@Path(PATH)
58+
@RequestMethod(METHOD_GET)
59+
public Response get(@Param("id") String id) {
60+
if (id == null || id.isEmpty()) {
61+
return new Response(Response.BAD_REQUEST, Response.EMPTY);
62+
}
63+
64+
Entry<MemorySegment> entry = dao.get(fromString(id));
65+
if (entry == null) {
66+
return new Response(Response.NOT_FOUND, Response.EMPTY);
67+
} else {
68+
return Response.ok(toBytes(entry.value()));
69+
}
70+
}
71+
72+
@Path(PATH)
73+
@RequestMethod(METHOD_PUT)
74+
public Response upsert(@Param("id") String id, Request request) {
75+
if (id == null || id.isEmpty()) {
76+
return new Response(Response.BAD_REQUEST, Response.EMPTY);
77+
}
78+
79+
Entry<MemorySegment> entry = new BaseEntry<>(
80+
fromString(id),
81+
fromBytes(request.getBody())
82+
);
83+
dao.upsert(entry);
84+
85+
return new Response(Response.CREATED, Response.EMPTY);
86+
}
87+
88+
@Path(PATH)
89+
@RequestMethod(METHOD_DELETE)
90+
public Response delete(@Param("id") String id) {
91+
if (id == null || id.isEmpty()) {
92+
return new Response(Response.BAD_REQUEST, Response.EMPTY);
93+
}
94+
95+
Entry<MemorySegment> entry = new BaseEntry<>(
96+
fromString(id),
97+
null
98+
);
99+
dao.upsert(entry);
100+
101+
return new Response(Response.ACCEPTED, Response.EMPTY);
102+
}
103+
104+
@Path(PATH)
105+
public Response post(@Param("id") String id) {
106+
//Post (in-place updating in LSM) is not supported
107+
return new Response(Response.METHOD_NOT_ALLOWED, Response.EMPTY);
108+
}
109+
110+
private static MemorySegment fromString(String data) {
111+
return MemorySegment.ofArray(data.getBytes(StandardCharsets.UTF_8));
112+
}
113+
114+
private static MemorySegment fromBytes(byte[] bytes) {
115+
return MemorySegment.ofArray(bytes);
116+
}
117+
118+
private static byte[] toBytes(MemorySegment segment) {
119+
return segment.toArray(ValueLayout.JAVA_BYTE);
120+
}
121+
}
122+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package ru.vk.itmo.test.chebotinalexandr;
2+
3+
import ru.vk.itmo.Service;
4+
import ru.vk.itmo.ServiceConfig;
5+
import ru.vk.itmo.dao.Config;
6+
import ru.vk.itmo.dao.Dao;
7+
import ru.vk.itmo.dao.Entry;
8+
import ru.vk.itmo.test.ServiceFactory;
9+
import ru.vk.itmo.test.chebotinalexandr.dao.NotOnlyInMemoryDao;
10+
11+
import java.io.IOException;
12+
import java.lang.foreign.MemorySegment;
13+
import java.util.concurrent.CompletableFuture;
14+
15+
public class StorageService implements Service {
16+
private Dao<MemorySegment, Entry<MemorySegment>> dao;
17+
private static final long FLUSH_THRESHOLD_BYTES = 4_194_304L;
18+
private StorageServer server;
19+
private final ServiceConfig config;
20+
21+
public StorageService(ServiceConfig config) {
22+
this.config = config;
23+
}
24+
25+
@Override
26+
public CompletableFuture<Void> start() throws IOException {
27+
//Dao opens here in order to make it able to reopen
28+
dao = new NotOnlyInMemoryDao(new Config(config.workingDir(), FLUSH_THRESHOLD_BYTES));
29+
30+
this.server = new StorageServer(config, dao);
31+
server.start();
32+
33+
return CompletableFuture.completedFuture(null);
34+
}
35+
36+
@Override
37+
public CompletableFuture<Void> stop() throws IOException {
38+
server.stop();
39+
dao.close();
40+
return CompletableFuture.completedFuture(null);
41+
}
42+
43+
@ServiceFactory(stage = 1)
44+
public static class Factory implements ServiceFactory.Factory {
45+
46+
@Override
47+
public Service create(ServiceConfig config) {
48+
return new StorageService(config);
49+
}
50+
}
51+
52+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package ru.vk.itmo.test.chebotinalexandr.dao;
2+
3+
import java.lang.foreign.MemorySegment;
4+
import java.lang.foreign.ValueLayout;
5+
6+
import static ru.vk.itmo.test.chebotinalexandr.dao.SSTableUtils.BLOOM_FILTER_HASH_FUNCTIONS_OFFSET;
7+
import static ru.vk.itmo.test.chebotinalexandr.dao.SSTableUtils.BLOOM_FILTER_LENGTH_OFFSET;
8+
9+
public final class BloomFilter {
10+
private static final int LONG_ADDRESSABLE_BITS = 6;
11+
12+
private BloomFilter() {
13+
14+
}
15+
16+
public static long bloomFilterLength(long entriesCount, double falsePositiveRate) {
17+
long n = divide(entriesCount);
18+
return (long) (-n * Math.log(falsePositiveRate) / (Math.log(2) * Math.log(2)));
19+
}
20+
21+
/** Divides {@code entriesCount} by {@code 64} with ceiling rounding.
22+
*/
23+
private static long divide(long entriesCount) {
24+
long div = entriesCount / (long) Long.SIZE;
25+
26+
//no rounding required
27+
if (entriesCount % (long) Long.SIZE == 0) {
28+
return div;
29+
}
30+
31+
return div + 1;
32+
}
33+
34+
public static void addToSstable(MemorySegment key, MemorySegment sstable, int hashFunctionsNum, long bitSize) {
35+
long[] indexes = MurmurHash.hash64(key, 0, (int) key.byteSize());
36+
37+
long base = indexes[0];
38+
long inc = indexes[1];
39+
40+
long combinedHash = base;
41+
for (int i = 0; i < hashFunctionsNum; i++) {
42+
long bitIndex = (combinedHash & Long.MAX_VALUE) % bitSize;
43+
set(bitIndex, sstable);
44+
combinedHash += inc;
45+
}
46+
}
47+
48+
private static void set(long bitIndex, MemorySegment sstable) {
49+
long bitOffset = offsetForIndex(bitIndex);
50+
51+
long mask = 1L << bitIndex;
52+
53+
long oldValue = sstable.get(ValueLayout.JAVA_LONG_UNALIGNED, bitOffset);
54+
sstable.set(ValueLayout.JAVA_LONG_UNALIGNED, bitOffset, oldValue | mask);
55+
}
56+
57+
public static boolean sstableMayContain(MemorySegment key, MemorySegment sstable) {
58+
long[] indexes = MurmurHash.hash64(key, 0, (int) key.byteSize(), MurmurHash.DEFAULT_SEED);
59+
60+
long base = indexes[0];
61+
long inc = indexes[1];
62+
63+
long bitSize = sstable.get(ValueLayout.JAVA_LONG_UNALIGNED, BLOOM_FILTER_LENGTH_OFFSET) * Long.SIZE;
64+
long hashFunctions = sstable.get(ValueLayout.JAVA_LONG_UNALIGNED, BLOOM_FILTER_HASH_FUNCTIONS_OFFSET);
65+
66+
long combinedHash = base;
67+
for (int i = 0; i < hashFunctions; i++) {
68+
if (!getFromSstable((combinedHash & Long.MAX_VALUE) % bitSize, sstable)) {
69+
return false;
70+
}
71+
combinedHash += inc;
72+
}
73+
74+
return true;
75+
}
76+
77+
private static boolean getFromSstable(long bitIndex, MemorySegment sstable) {
78+
long bitOffset = offsetForIndex(bitIndex);
79+
80+
long hashFromSstable = sstable.get(ValueLayout.JAVA_LONG_UNALIGNED, bitOffset);
81+
return (hashFromSstable & (1L << bitIndex)) != 0;
82+
}
83+
84+
/** Get sstable offset for bloom filter array index.
85+
*/
86+
private static long offsetForIndex(long bitIndex) {
87+
long longIndex = bitIndex >>> LONG_ADDRESSABLE_BITS;
88+
return 3L * Long.BYTES + longIndex * Long.BYTES;
89+
}
90+
91+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package ru.vk.itmo.test.chebotinalexandr.dao;
2+
3+
/** Binary search in SSTable result information.
4+
*/
5+
public record FindResult(boolean found, long index) { }

0 commit comments

Comments
 (0)