Skip to content

Commit 074c278

Browse files
HKTRpkislovdaAlexeyShik
authored andcommitted
Кислов Данил ИТМО ФИТиП HW1 (polis-vk#3)
* Stage 1 * linter fixes * linter fixes * Правки в отчёте после ревью --------- Co-authored-by: kislovda <kislovda@mos-team.ru> Co-authored-by: Alexey Shik <58121508+AlexeyShik@users.noreply.github.com>
1 parent 7c1d6b9 commit 074c278

24 files changed

+1274
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package ru.vk.itmo.test.kislovdanil;
2+
3+
import ru.vk.itmo.Service;
4+
import ru.vk.itmo.ServiceConfig;
5+
import ru.vk.itmo.test.kislovdanil.service.DatabaseServiceFactory;
6+
7+
import java.io.IOException;
8+
import java.nio.file.Path;
9+
import java.util.List;
10+
import java.util.concurrent.ExecutionException;
11+
12+
public final class Main {
13+
private Main() {
14+
15+
}
16+
17+
public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
18+
DatabaseServiceFactory factory = new DatabaseServiceFactory();
19+
ServiceConfig config = new ServiceConfig(8080, "localhost", List.of(),
20+
Path.of("/home/burprop/Study/2024-highload-dht"));
21+
Service service = factory.create(config);
22+
service.start().get();
23+
24+
}
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package ru.vk.itmo.test.kislovdanil.dao;
2+
3+
import ru.vk.itmo.dao.Entry;
4+
5+
import java.lang.foreign.MemorySegment;
6+
import java.util.Comparator;
7+
import java.util.concurrent.ConcurrentSkipListMap;
8+
import java.util.concurrent.atomic.AtomicLong;
9+
10+
/* Basically, ConcurrentSkipList with bytes threshold.
11+
*/
12+
public class MemTable {
13+
private final ConcurrentSkipListMap<MemorySegment, Entry<MemorySegment>> storage;
14+
private final long threshold;
15+
private final AtomicLong size = new AtomicLong(0);
16+
17+
private static long getEntrySize(Entry<MemorySegment> entry) {
18+
long valueSize = entry.value() == null ? 0 : entry.value().byteSize();
19+
return valueSize + entry.key().byteSize();
20+
}
21+
22+
public MemTable(Comparator<MemorySegment> comparator, long threshold) {
23+
this.storage = new ConcurrentSkipListMap<>(comparator);
24+
this.threshold = threshold;
25+
}
26+
27+
public boolean put(Entry<MemorySegment> entry) {
28+
long entrySize = getEntrySize(entry);
29+
if (size.addAndGet(entrySize) - entrySize > threshold) {
30+
return false;
31+
}
32+
storage.put(entry.key(), entry);
33+
return true;
34+
}
35+
36+
public ConcurrentSkipListMap<MemorySegment, Entry<MemorySegment>> getStorage() {
37+
return storage;
38+
}
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
package ru.vk.itmo.test.kislovdanil.dao;
2+
3+
import ru.vk.itmo.dao.Config;
4+
import ru.vk.itmo.dao.Dao;
5+
import ru.vk.itmo.dao.Entry;
6+
import ru.vk.itmo.test.kislovdanil.dao.exceptions.DBException;
7+
import ru.vk.itmo.test.kislovdanil.dao.exceptions.OverloadException;
8+
import ru.vk.itmo.test.kislovdanil.dao.iterators.DatabaseIterator;
9+
import ru.vk.itmo.test.kislovdanil.dao.iterators.MemTableIterator;
10+
import ru.vk.itmo.test.kislovdanil.dao.iterators.MergeIterator;
11+
import ru.vk.itmo.test.kislovdanil.dao.sstable.SSTable;
12+
13+
import java.io.File;
14+
import java.io.IOException;
15+
import java.lang.foreign.Arena;
16+
import java.lang.foreign.MemorySegment;
17+
import java.lang.foreign.ValueLayout;
18+
import java.util.ArrayList;
19+
import java.util.Comparator;
20+
import java.util.Iterator;
21+
import java.util.List;
22+
import java.util.concurrent.ExecutionException;
23+
import java.util.concurrent.ExecutorService;
24+
import java.util.concurrent.Executors;
25+
import java.util.concurrent.Future;
26+
import java.util.concurrent.TimeUnit;
27+
import java.util.concurrent.atomic.AtomicLong;
28+
import java.util.concurrent.locks.Lock;
29+
import java.util.concurrent.locks.ReadWriteLock;
30+
import java.util.concurrent.locks.ReentrantLock;
31+
import java.util.concurrent.locks.ReentrantReadWriteLock;
32+
33+
public class PersistentDao implements Dao<MemorySegment, Entry<MemorySegment>>, Iterable<Entry<MemorySegment>> {
34+
35+
public static final MemorySegment DELETED_VALUE = null;
36+
private final Config config;
37+
private volatile List<SSTable> tables = new ArrayList<>();
38+
private final Comparator<MemorySegment> comparator = new MemSegComparator();
39+
private volatile MemTable memTable;
40+
// Temporary storage in case of main storage flushing (Read only)
41+
private volatile MemTable additionalStorage;
42+
// In case of additional table overload while main table is flushing
43+
private final AtomicLong nextId = new AtomicLong();
44+
private final ExecutorService commonExecutorService = Executors.newFixedThreadPool(2);
45+
// To prevent parallel flushing
46+
private volatile Future<?> compcatFuture;
47+
// To make sure that flushing in close() will be started
48+
private volatile Future<?> flushFuture;
49+
// Have to take before any tables modification
50+
private final Lock compactionLock = new ReentrantLock();
51+
// Have to take read while upsert and write while flushing (to prevent data loss)
52+
private final ReadWriteLock upsertLock = new ReentrantReadWriteLock();
53+
private final Arena filesArena = Arena.ofShared();
54+
55+
private long getMaxTablesId(Iterable<SSTable> tableIterable) {
56+
long curMaxId = -1;
57+
for (SSTable table : tableIterable) {
58+
curMaxId = Math.max(curMaxId, table.getTableId());
59+
}
60+
return curMaxId;
61+
}
62+
63+
public PersistentDao(Config config) throws IOException {
64+
this.config = config;
65+
this.memTable = new MemTable(comparator, config.flushThresholdBytes());
66+
File basePathDirectory = new File(config.basePath().toString());
67+
String[] ssTablesIds = basePathDirectory.list();
68+
if (ssTablesIds == null) return;
69+
for (String tableID : ssTablesIds) {
70+
// SSTable constructor without entries iterator reads table data from disk if it exists
71+
tables.add(new SSTable(config.basePath(), comparator, Long.parseLong(tableID), filesArena));
72+
}
73+
nextId.set(getMaxTablesId(tables) + 1);
74+
tables.sort(SSTable::compareTo);
75+
}
76+
77+
@Override
78+
public Iterator<Entry<MemorySegment>> get(MemorySegment from, MemorySegment to) {
79+
List<DatabaseIterator> iterators = new ArrayList<>(tables.size() + 2);
80+
for (SSTable table : tables) {
81+
iterators.add(table.getRange(from, to));
82+
}
83+
iterators.add(new MemTableIterator(from, to, memTable, Long.MAX_VALUE));
84+
if (additionalStorage != null) {
85+
iterators.add(new MemTableIterator(from, to, additionalStorage, Long.MAX_VALUE - 1));
86+
}
87+
return new MergeIterator(iterators, comparator);
88+
}
89+
90+
private static Entry<MemorySegment> wrapEntryIfDeleted(Entry<MemorySegment> entry) {
91+
if (entry.value() == DELETED_VALUE) return null;
92+
return entry;
93+
}
94+
95+
private long getNextId() {
96+
return nextId.getAndIncrement();
97+
}
98+
99+
// Return null if it doesn't find
100+
@Override
101+
public Entry<MemorySegment> get(MemorySegment key) {
102+
Entry<MemorySegment> ans = memTable.getStorage().get(key);
103+
if (ans != null) return wrapEntryIfDeleted(ans);
104+
if (additionalStorage != null) {
105+
ans = additionalStorage.getStorage().get(key);
106+
if (ans != null) return wrapEntryIfDeleted(ans);
107+
}
108+
try {
109+
for (SSTable table : tables.reversed()) {
110+
ans = table.find(key);
111+
if (ans != null) {
112+
return wrapEntryIfDeleted(ans);
113+
}
114+
}
115+
} catch (IOException e) {
116+
throw new DBException(e);
117+
}
118+
return null;
119+
}
120+
121+
@Override
122+
public void upsert(Entry<MemorySegment> entry) {
123+
upsertLock.readLock().lock();
124+
try {
125+
if (memTable.put(entry)) {
126+
return;
127+
}
128+
} finally {
129+
upsertLock.readLock().unlock();
130+
}
131+
flush();
132+
upsertLock.readLock().lock();
133+
try {
134+
if (!memTable.put(entry)) {
135+
throw new OverloadException(entry);
136+
}
137+
} finally {
138+
upsertLock.readLock().unlock();
139+
}
140+
}
141+
142+
private void makeFlush() throws IOException {
143+
compactionLock.lock();
144+
try {
145+
if (additionalStorage == null) return;
146+
// SSTable constructor with entries iterator writes MemTable data on disk deleting old data if it exists
147+
tables.add(new SSTable(config.basePath(), comparator,
148+
getNextId(), additionalStorage.getStorage().values().iterator(), filesArena));
149+
additionalStorage = null;
150+
} finally {
151+
compactionLock.unlock();
152+
}
153+
}
154+
155+
@Override
156+
public void flush() {
157+
upsertLock.writeLock().lock();
158+
try {
159+
if (additionalStorage != null || memTable.getStorage().isEmpty()) {
160+
return;
161+
}
162+
additionalStorage = memTable;
163+
memTable = new MemTable(comparator, config.flushThresholdBytes());
164+
flushFuture = commonExecutorService.submit(
165+
() -> {
166+
try {
167+
makeFlush();
168+
} catch (IOException e) {
169+
throw new DBException(e);
170+
}
171+
});
172+
} finally {
173+
upsertLock.writeLock().unlock();
174+
}
175+
}
176+
177+
private void closeExecutorService(ExecutorService executorService) {
178+
executorService.shutdown();
179+
try {
180+
if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
181+
executorService.shutdownNow();
182+
}
183+
} catch (InterruptedException e) {
184+
Thread.currentThread().interrupt();
185+
}
186+
}
187+
188+
@Override
189+
public void close() {
190+
if (!filesArena.scope().isAlive()) {
191+
return;
192+
}
193+
if (flushFuture != null) {
194+
try {
195+
flushFuture.get();
196+
} catch (InterruptedException | ExecutionException e) {
197+
Thread.currentThread().interrupt();
198+
}
199+
}
200+
flush();
201+
closeExecutorService(commonExecutorService);
202+
filesArena.close();
203+
}
204+
205+
private void makeCompaction() throws IOException {
206+
compactionLock.lock();
207+
try {
208+
if (tables.size() <= 1) return;
209+
long compactedTableId = getNextId();
210+
SSTable compactedTable = new SSTable(config.basePath(), comparator, compactedTableId,
211+
new MergeIterator(tables, comparator), filesArena);
212+
List<SSTable> oldTables = tables;
213+
List<SSTable> newTables = new ArrayList<>();
214+
newTables.add(compactedTable);
215+
tables = newTables;
216+
for (SSTable table : oldTables) {
217+
table.deleteFromDisk();
218+
}
219+
} finally {
220+
compactionLock.unlock();
221+
}
222+
}
223+
224+
@Override
225+
public void compact() {
226+
if (compcatFuture != null && !compcatFuture.isDone()) {
227+
compcatFuture.cancel(false);
228+
}
229+
compcatFuture = commonExecutorService.submit(
230+
() -> {
231+
try {
232+
makeCompaction();
233+
} catch (IOException e) {
234+
throw new DBException(e);
235+
}
236+
});
237+
}
238+
239+
@Override
240+
public Iterator<Entry<MemorySegment>> iterator() {
241+
return get(null, null);
242+
}
243+
244+
private static class MemSegComparator implements Comparator<MemorySegment> {
245+
@Override
246+
public int compare(MemorySegment o1, MemorySegment o2) {
247+
long mismatch = o1.mismatch(o2);
248+
if (mismatch == -1) {
249+
return 0;
250+
}
251+
if (mismatch == Math.min(o1.byteSize(), o2.byteSize())) {
252+
return Long.compare(o1.byteSize(), o2.byteSize());
253+
}
254+
return Byte.compare(o1.get(ValueLayout.JAVA_BYTE, mismatch), o2.get(ValueLayout.JAVA_BYTE, mismatch));
255+
}
256+
}
257+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package ru.vk.itmo.test.kislovdanil.dao.exceptions;
2+
3+
public class DBException extends RuntimeException {
4+
public DBException(Exception e) {
5+
super(e);
6+
}
7+
8+
public DBException() {
9+
super();
10+
}
11+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package ru.vk.itmo.test.kislovdanil.dao.exceptions;
2+
3+
import ru.vk.itmo.dao.Entry;
4+
5+
import java.lang.foreign.MemorySegment;
6+
7+
public class OverloadException extends DBException {
8+
public final Entry<MemorySegment> entry;
9+
10+
public OverloadException(Entry<MemorySegment> entry) {
11+
super();
12+
this.entry = entry;
13+
}
14+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package ru.vk.itmo.test.kislovdanil.dao.iterators;
2+
3+
import ru.vk.itmo.dao.Entry;
4+
5+
import java.lang.foreign.MemorySegment;
6+
import java.util.Iterator;
7+
8+
public interface DatabaseIterator extends Iterator<Entry<MemorySegment>> {
9+
long getPriority();
10+
}

0 commit comments

Comments
 (0)