From 848f15d88baba1b2751b2cac994cc5707fa67c5b Mon Sep 17 00:00:00 2001 From: lena Date: Thu, 29 Feb 2024 12:29:09 +0300 Subject: [PATCH] [add-timestamp] add timestamp instead of sequence (UpsertRemoveTest failed) --- .../itmo/khodosovaelena/ByteArraySegment.java | 48 +++ .../vk/itmo/khodosovaelena/DiskStorage.java | 303 ------------------ .../vk/itmo/khodosovaelena/InMemoryDao.java | 119 ------- .../khodosovaelena/LiveFilteringIterator.java | 52 +++ .../ru/vk/itmo/khodosovaelena/MemTable.java | 49 +++ .../MemorySegmentComparator.java | 89 +++++ .../vk/itmo/khodosovaelena/MergeIterator.java | 134 -------- .../khodosovaelena/MergingEntryIterator.java | 68 ++++ .../vk/itmo/khodosovaelena/ReferenceDao.java | 294 +++++++++++++++++ .../ru/vk/itmo/khodosovaelena/SSTable.java | 204 ++++++++++++ .../vk/itmo/khodosovaelena/SSTableWriter.java | 166 ++++++++++ .../ru/vk/itmo/khodosovaelena/SSTables.java | 162 ++++++++++ .../ru/vk/itmo/khodosovaelena/TableSet.java | 194 +++++++++++ .../WeightedPeekingEntryIterator.java | 67 ++++ .../itmo/test/khodosovaelena/MyFactory.java | 6 +- 15 files changed, 1396 insertions(+), 559 deletions(-) create mode 100644 src/main/java/ru/vk/itmo/khodosovaelena/ByteArraySegment.java delete mode 100644 src/main/java/ru/vk/itmo/khodosovaelena/DiskStorage.java delete mode 100644 src/main/java/ru/vk/itmo/khodosovaelena/InMemoryDao.java create mode 100644 src/main/java/ru/vk/itmo/khodosovaelena/LiveFilteringIterator.java create mode 100644 src/main/java/ru/vk/itmo/khodosovaelena/MemTable.java create mode 100644 src/main/java/ru/vk/itmo/khodosovaelena/MemorySegmentComparator.java delete mode 100644 src/main/java/ru/vk/itmo/khodosovaelena/MergeIterator.java create mode 100644 src/main/java/ru/vk/itmo/khodosovaelena/MergingEntryIterator.java create mode 100644 src/main/java/ru/vk/itmo/khodosovaelena/ReferenceDao.java create mode 100644 src/main/java/ru/vk/itmo/khodosovaelena/SSTable.java create mode 100644 src/main/java/ru/vk/itmo/khodosovaelena/SSTableWriter.java create mode 100644 src/main/java/ru/vk/itmo/khodosovaelena/SSTables.java create mode 100644 src/main/java/ru/vk/itmo/khodosovaelena/TableSet.java create mode 100644 src/main/java/ru/vk/itmo/khodosovaelena/WeightedPeekingEntryIterator.java diff --git a/src/main/java/ru/vk/itmo/khodosovaelena/ByteArraySegment.java b/src/main/java/ru/vk/itmo/khodosovaelena/ByteArraySegment.java new file mode 100644 index 000000000..63a750f95 --- /dev/null +++ b/src/main/java/ru/vk/itmo/khodosovaelena/ByteArraySegment.java @@ -0,0 +1,48 @@ +package ru.vk.itmo.khodosovaelena; + +import java.io.IOException; +import java.lang.foreign.MemorySegment; +import java.nio.ByteBuffer; + +/** + * Growable buffer with {@link ByteBuffer} and {@link MemorySegment} interface. + * + * @author incubos + */ +final class ByteArraySegment { + private byte[] array; + private MemorySegment segment; + + ByteArraySegment(final int capacity) { + this.array = new byte[capacity]; + this.segment = MemorySegment.ofArray(array); + } + + void withArray(final ArrayConsumer consumer) throws IOException { + consumer.process(array); + } + + MemorySegment segment() { + return segment; + } + + void ensureCapacity(final long size) { + if (size > Integer.MAX_VALUE) { + throw new IllegalArgumentException("Too big!"); + } + + final int capacity = (int) size; + if (array.length >= capacity) { + return; + } + + // Grow to the nearest bigger power of 2 + final int newSize = Integer.highestOneBit(capacity) << 1; + array = new byte[newSize]; + segment = MemorySegment.ofArray(array); + } + + interface ArrayConsumer { + void process(byte[] array) throws IOException; + } +} diff --git a/src/main/java/ru/vk/itmo/khodosovaelena/DiskStorage.java b/src/main/java/ru/vk/itmo/khodosovaelena/DiskStorage.java deleted file mode 100644 index e682ebb93..000000000 --- a/src/main/java/ru/vk/itmo/khodosovaelena/DiskStorage.java +++ /dev/null @@ -1,303 +0,0 @@ -package ru.vk.itmo.khodosovaelena; - -import ru.vk.itmo.BaseEntry; -import ru.vk.itmo.Entry; - -import java.io.IOException; -import java.lang.foreign.Arena; -import java.lang.foreign.MemorySegment; -import java.lang.foreign.ValueLayout; -import java.nio.channels.FileChannel; -import java.nio.charset.StandardCharsets; -import java.nio.file.FileAlreadyExistsException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.StandardCopyOption; -import java.nio.file.StandardOpenOption; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; - -public class DiskStorage { - private static final String INDEX_IDX = "index.idx"; - private static final String INDEX_TMP = "index.tmp"; - - private final List segmentList; - - public DiskStorage(List segmentList) { - this.segmentList = segmentList; - } - - public Iterator> range( - Iterator> firstIterator, - MemorySegment from, - MemorySegment to) { - List>> iterators = new ArrayList<>(segmentList.size() + 1); - for (MemorySegment memorySegment : segmentList) { - iterators.add(iterator(memorySegment, from, to)); - } - iterators.add(firstIterator); - - return new MergeIterator<>(iterators, Comparator.comparing(Entry::key, InMemoryDao::compare)) { - @Override - protected boolean skip(Entry memorySegmentEntry) { - return memorySegmentEntry.value() == null; - } - }; - } - - public static void save(Path storagePath, Iterable> iterable) - throws IOException { - final Path indexTmp = storagePath.resolve(INDEX_TMP); - final Path indexFile = storagePath.resolve(INDEX_IDX); - - try { - Files.createFile(indexFile); - } catch (FileAlreadyExistsException ignored) { - // it is ok, actually it is normal state - } - List existedFiles = Files.readAllLines(indexFile, StandardCharsets.UTF_8); - - String newFileName = String.valueOf(existedFiles.size()); - - long dataSize = 0; - long count = 0; - for (Entry entry : iterable) { - dataSize += entry.key().byteSize(); - MemorySegment value = entry.value(); - if (value != null) { - dataSize += value.byteSize(); - } - count++; - } - long indexSize = count * 2 * Long.BYTES; - - try ( - FileChannel fileChannel = FileChannel.open( - storagePath.resolve(newFileName), - StandardOpenOption.WRITE, - StandardOpenOption.READ, - StandardOpenOption.CREATE - ); - Arena writeArena = Arena.ofConfined() - ) { - MemorySegment fileSegment = fileChannel.map( - FileChannel.MapMode.READ_WRITE, - 0, - indexSize + dataSize, - writeArena - ); - - // index: - // |key0_Start|value0_Start|key1_Start|value1_Start|key2_Start|value2_Start|... - // key0_Start = data start = end of index - long dataOffset = indexSize; - int indexOffset = 0; - for (Entry entry : iterable) { - fileSegment.set(ValueLayout.JAVA_LONG_UNALIGNED, indexOffset, dataOffset); - dataOffset += entry.key().byteSize(); - indexOffset += Long.BYTES; - - MemorySegment value = entry.value(); - if (value == null) { - fileSegment.set(ValueLayout.JAVA_LONG_UNALIGNED, indexOffset, tombstone(dataOffset)); - } else { - fileSegment.set(ValueLayout.JAVA_LONG_UNALIGNED, indexOffset, dataOffset); - dataOffset += value.byteSize(); - } - indexOffset += Long.BYTES; - } - - // data: - // |key0|value0|key1|value1|... - dataOffset = indexSize; - for (Entry entry : iterable) { - MemorySegment key = entry.key(); - MemorySegment.copy(key, 0, fileSegment, dataOffset, key.byteSize()); - dataOffset += key.byteSize(); - - MemorySegment value = entry.value(); - if (value != null) { - MemorySegment.copy(value, 0, fileSegment, dataOffset, value.byteSize()); - dataOffset += value.byteSize(); - } - } - } - - Files.move(indexFile, indexTmp, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING); - - List list = new ArrayList<>(existedFiles.size() + 1); - list.addAll(existedFiles); - list.add(newFileName); - Files.write( - indexFile, - list, - StandardOpenOption.WRITE, - StandardOpenOption.CREATE, - StandardOpenOption.TRUNCATE_EXISTING - ); - - Files.delete(indexTmp); - } - - public static List loadOrRecover(Path storagePath, Arena arena) throws IOException { - Path indexTmp = storagePath.resolve(INDEX_TMP); - Path indexFile = storagePath.resolve(INDEX_IDX); - - if (Files.exists(indexTmp)) { - Files.move(indexTmp, indexFile, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING); - } else { - try { - Files.createFile(indexFile); - } catch (FileAlreadyExistsException ignored) { - // it is ok, actually it is normal state - } - } - - List existedFiles = Files.readAllLines(indexFile, StandardCharsets.UTF_8); - List result = new ArrayList<>(existedFiles.size()); - for (String fileName : existedFiles) { - Path file = storagePath.resolve(fileName); - try (FileChannel fileChannel = FileChannel.open(file, StandardOpenOption.READ, StandardOpenOption.WRITE)) { - MemorySegment fileSegment = fileChannel.map( - FileChannel.MapMode.READ_WRITE, - 0, - Files.size(file), - arena - ); - result.add(fileSegment); - } - } - - return result; - } - - private static long indexOf(MemorySegment segment, MemorySegment key) { - long recordsCount = recordsCount(segment); - - long left = 0; - long right = recordsCount - 1; - while (left <= right) { - long mid = (left + right) >>> 1; - - long startOfKey = startOfKey(segment, mid); - long endOfKey = endOfKey(segment, mid); - long mismatch = MemorySegment.mismatch(segment, startOfKey, endOfKey, key, 0, key.byteSize()); - if (mismatch == -1) { - return mid; - } - - if (mismatch == key.byteSize()) { - right = mid - 1; - continue; - } - - if (mismatch == endOfKey - startOfKey) { - left = mid + 1; - continue; - } - - int b1 = Byte.toUnsignedInt(segment.get(ValueLayout.JAVA_BYTE, startOfKey + mismatch)); - int b2 = Byte.toUnsignedInt(key.get(ValueLayout.JAVA_BYTE, mismatch)); - if (b1 > b2) { - right = mid - 1; - } else { - left = mid + 1; - } - } - - return tombstone(left); - } - - private static long recordsCount(MemorySegment segment) { - long indexSize = indexSize(segment); - return indexSize / Long.BYTES / 2; - } - - private static long indexSize(MemorySegment segment) { - return segment.get(ValueLayout.JAVA_LONG_UNALIGNED, 0); - } - - private static Iterator> iterator(MemorySegment page, MemorySegment from, MemorySegment to) { - long recordIndexFrom = from == null ? 0 : normalize(indexOf(page, from)); - long recordIndexTo = to == null ? recordsCount(page) : normalize(indexOf(page, to)); - long recordsCount = recordsCount(page); - - return new Iterator<>() { - long index = recordIndexFrom; - - @Override - public boolean hasNext() { - return index < recordIndexTo; - } - - @Override - public Entry next() { - if (!hasNext()) { - throw new NoSuchElementException(); - } - MemorySegment key = slice(page, startOfKey(page, index), endOfKey(page, index)); - long startOfValue = startOfValue(page, index); - MemorySegment value = - startOfValue < 0 - ? null - : slice(page, startOfValue, endOfValue(page, index, recordsCount)); - index++; - return new BaseEntry<>(key, value); - } - }; - } - - public void compact(Path basePath, Iterable> iterableStorage) throws IOException { - clearFiles(basePath); - save(basePath, iterableStorage); - } - - private void clearFiles(Path path) throws IOException { - final Path indexFile = path.resolve(INDEX_IDX); - final List existedFiles = Files.readAllLines(indexFile, StandardCharsets.UTF_8); - for (String fileName : existedFiles) { - Files.deleteIfExists(path.resolve(fileName)); - } - Files.delete(indexFile); - Files.deleteIfExists(path.resolve(INDEX_TMP)); - } - - private static MemorySegment slice(MemorySegment page, long start, long end) { - return page.asSlice(start, end - start); - } - - private static long startOfKey(MemorySegment segment, long recordIndex) { - return segment.get(ValueLayout.JAVA_LONG_UNALIGNED, recordIndex * 2 * Long.BYTES); - } - - private static long endOfKey(MemorySegment segment, long recordIndex) { - return normalizedStartOfValue(segment, recordIndex); - } - - private static long normalizedStartOfValue(MemorySegment segment, long recordIndex) { - return normalize(startOfValue(segment, recordIndex)); - } - - private static long startOfValue(MemorySegment segment, long recordIndex) { - return segment.get(ValueLayout.JAVA_LONG_UNALIGNED, recordIndex * 2 * Long.BYTES + Long.BYTES); - } - - private static long endOfValue(MemorySegment segment, long recordIndex, long recordsCount) { - if (recordIndex < recordsCount - 1) { - return startOfKey(segment, recordIndex + 1); - } - return segment.byteSize(); - } - - private static long tombstone(long offset) { - return 1L << 63 | offset; - } - - private static long normalize(long value) { - return value & ~(1L << 63); - } - -} diff --git a/src/main/java/ru/vk/itmo/khodosovaelena/InMemoryDao.java b/src/main/java/ru/vk/itmo/khodosovaelena/InMemoryDao.java deleted file mode 100644 index 330f4f1d0..000000000 --- a/src/main/java/ru/vk/itmo/khodosovaelena/InMemoryDao.java +++ /dev/null @@ -1,119 +0,0 @@ -package ru.vk.itmo.khodosovaelena; - -import ru.vk.itmo.Config; -import ru.vk.itmo.Dao; -import ru.vk.itmo.Entry; - -import java.io.IOException; -import java.lang.foreign.Arena; -import java.lang.foreign.MemorySegment; -import java.lang.foreign.ValueLayout; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Collections; -import java.util.Comparator; -import java.util.Iterator; -import java.util.NavigableMap; -import java.util.concurrent.ConcurrentSkipListMap; - -public class InMemoryDao implements Dao> { - - private final Comparator comparator = InMemoryDao::compare; - private final NavigableMap> storage = new ConcurrentSkipListMap<>(comparator); - private final Arena arena; - private final DiskStorage diskStorage; - private final Path path; - - public InMemoryDao(Config config) throws IOException { - this.path = config.basePath().resolve("data"); - Files.createDirectories(path); - - arena = Arena.ofShared(); - - this.diskStorage = new DiskStorage(DiskStorage.loadOrRecover(path, arena)); - } - - static int compare(MemorySegment memorySegment1, MemorySegment memorySegment2) { - long mismatch = memorySegment1.mismatch(memorySegment2); - if (mismatch == -1) { - return 0; - } - - if (mismatch == memorySegment1.byteSize()) { - return -1; - } - - if (mismatch == memorySegment2.byteSize()) { - return 1; - } - byte b1 = memorySegment1.get(ValueLayout.JAVA_BYTE, mismatch); - byte b2 = memorySegment2.get(ValueLayout.JAVA_BYTE, mismatch); - return Byte.compare(b1, b2); - } - - @Override - public Iterator> get(MemorySegment from, MemorySegment to) { - return diskStorage.range(getInMemory(from, to), from, to); - } - - private Iterator> getInMemory(MemorySegment from, MemorySegment to) { - if (from == null && to == null) { - return storage.values().iterator(); - } - if (from == null) { - return storage.headMap(to).values().iterator(); - } - if (to == null) { - return storage.tailMap(from).values().iterator(); - } - return storage.subMap(from, to).values().iterator(); - } - - @Override - public void upsert(Entry entry) { - storage.put(entry.key(), entry); - } - - @Override - public Entry get(MemorySegment key) { - Entry entry = storage.get(key); - if (entry != null) { - if (entry.value() == null) { - return null; - } - return entry; - } - - Iterator> iterator = diskStorage.range(Collections.emptyIterator(), key, null); - - if (!iterator.hasNext()) { - return null; - } - Entry next = iterator.next(); - if (compare(next.key(), key) == 0) { - return next; - } - return null; - } - - @Override - public void close() throws IOException { - if (!arena.scope().isAlive()) { - return; - } - - arena.close(); - - if (!storage.isEmpty()) { - DiskStorage.save(path, storage.values()); - } - } - - @Override - public void compact() throws IOException { - final Iterable> iterableStorage = () -> get(null, null); - if (iterableStorage.iterator().hasNext()) { - diskStorage.compact(path, iterableStorage); - } - } -} diff --git a/src/main/java/ru/vk/itmo/khodosovaelena/LiveFilteringIterator.java b/src/main/java/ru/vk/itmo/khodosovaelena/LiveFilteringIterator.java new file mode 100644 index 000000000..7e42fb3b5 --- /dev/null +++ b/src/main/java/ru/vk/itmo/khodosovaelena/LiveFilteringIterator.java @@ -0,0 +1,52 @@ +package ru.vk.itmo.khodosovaelena; + +import ru.vk.itmo.Entry; + +import java.lang.foreign.MemorySegment; +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * Filters non tombstone {@link Entry}s. + * + * @author incubos + */ +final class LiveFilteringIterator implements Iterator> { + private final Iterator> delegate; + private Entry next; + + LiveFilteringIterator(final Iterator> delegate) { + this.delegate = delegate; + skipTombstones(); + } + + private void skipTombstones() { + while (delegate.hasNext()) { + final Entry entry = delegate.next(); + if (entry.value() != null) { + this.next = entry; + break; + } + } + } + + @Override + public boolean hasNext() { + return next != null; + } + + @Override + public Entry next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + // Consume + final Entry result = next; + next = null; + + skipTombstones(); + + return result; + } +} diff --git a/src/main/java/ru/vk/itmo/khodosovaelena/MemTable.java b/src/main/java/ru/vk/itmo/khodosovaelena/MemTable.java new file mode 100644 index 000000000..e298f43c1 --- /dev/null +++ b/src/main/java/ru/vk/itmo/khodosovaelena/MemTable.java @@ -0,0 +1,49 @@ +package ru.vk.itmo.khodosovaelena; + +import ru.vk.itmo.Entry; + +import java.lang.foreign.MemorySegment; +import java.util.Iterator; +import java.util.NavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; + +/** + * Memory table. + * + * @author incubos + */ +final class MemTable { + private final NavigableMap> map = + new ConcurrentSkipListMap<>( + MemorySegmentComparator.INSTANCE); + + boolean isEmpty() { + return map.isEmpty(); + } + + Iterator> get( + final MemorySegment from, + final MemorySegment to) { + if (from == null && to == null) { + // All + return map.values().iterator(); + } else if (from == null) { + // Head + return map.headMap(to).values().iterator(); + } else if (to == null) { + // Tail + return map.tailMap(from).values().iterator(); + } else { + // Slice + return map.subMap(from, to).values().iterator(); + } + } + + Entry get(final MemorySegment key) { + return map.get(key); + } + + Entry upsert(final Entry entry) { + return map.put(entry.key(), entry); + } +} diff --git a/src/main/java/ru/vk/itmo/khodosovaelena/MemorySegmentComparator.java b/src/main/java/ru/vk/itmo/khodosovaelena/MemorySegmentComparator.java new file mode 100644 index 000000000..dd7cd6e4e --- /dev/null +++ b/src/main/java/ru/vk/itmo/khodosovaelena/MemorySegmentComparator.java @@ -0,0 +1,89 @@ +package ru.vk.itmo.khodosovaelena; + +import java.lang.foreign.MemorySegment; +import java.lang.foreign.ValueLayout; +import java.util.Comparator; + +/** + * Compares {@link MemorySegment}s. + * + * @author incubos + */ +final class MemorySegmentComparator implements Comparator { + static final Comparator INSTANCE = + new MemorySegmentComparator(); + + private MemorySegmentComparator() { + // Singleton + } + + @Override + public int compare( + final MemorySegment left, + final MemorySegment right) { + final long mismatch = left.mismatch(right); + if (mismatch == -1L) { + // No mismatch + return 0; + } + + if (mismatch == left.byteSize()) { + // left is prefix of right, so left is smaller + return -1; + } + + if (mismatch == right.byteSize()) { + // right is prefix of left, so left is greater + return 1; + } + + // Compare mismatched bytes as unsigned + return Byte.compareUnsigned( + left.getAtIndex( + ValueLayout.OfByte.JAVA_BYTE, + mismatch), + right.getAtIndex( + ValueLayout.OfByte.JAVA_BYTE, + mismatch)); + } + + static int compare( + final MemorySegment srcSegment, + final long srcFromOffset, + final long srcLength, + final MemorySegment dstSegment, + final long dstFromOffset, + final long dstLength) { + final long mismatch = + MemorySegment.mismatch( + srcSegment, + srcFromOffset, + srcFromOffset + srcLength, + dstSegment, + dstFromOffset, + dstFromOffset + dstLength); + if (mismatch == -1L) { + // No mismatch + return 0; + } + + if (mismatch == srcLength) { + // left is prefix of right, so left is smaller + return -1; + } + + if (mismatch == dstLength) { + // right is prefix of left, so left is greater + return 1; + } + + // Compare mismatched bytes as unsigned + return Byte.compareUnsigned( + srcSegment.getAtIndex( + ValueLayout.OfByte.JAVA_BYTE, + srcFromOffset + mismatch), + dstSegment.getAtIndex( + ValueLayout.OfByte.JAVA_BYTE, + dstFromOffset + mismatch)); + } +} diff --git a/src/main/java/ru/vk/itmo/khodosovaelena/MergeIterator.java b/src/main/java/ru/vk/itmo/khodosovaelena/MergeIterator.java deleted file mode 100644 index 2f38a8a75..000000000 --- a/src/main/java/ru/vk/itmo/khodosovaelena/MergeIterator.java +++ /dev/null @@ -1,134 +0,0 @@ -package ru.vk.itmo.khodosovaelena; - -import java.util.Collection; -import java.util.Comparator; -import java.util.Iterator; -import java.util.NoSuchElementException; -import java.util.PriorityQueue; - -public class MergeIterator implements Iterator { - - private final PriorityQueue> priorityQueue; - private final Comparator comparator; - PeekIterator peek; - - private static class PeekIterator implements Iterator { - - public final int id; - private final Iterator delegate; - private T peekValue; - - private PeekIterator(int id, Iterator delegate) { - this.id = id; - this.delegate = delegate; - } - - @Override - public boolean hasNext() { - if (peekValue == null) { - return delegate.hasNext(); - } - return true; - } - - @Override - public T next() { - if (!hasNext()) { - throw new NoSuchElementException(); - } - T tmpValue = peek(); - this.peekValue = null; - return tmpValue; - } - - private T peek() { - if (peekValue == null) { - if (!delegate.hasNext()) { - return null; - } - peekValue = delegate.next(); - } - return peekValue; - } - } - - public MergeIterator(Collection> iterators, Comparator comparator) { - this.comparator = comparator; - Comparator> peekComp = (o1, o2) -> comparator.compare(o1.peek(), o2.peek()); - priorityQueue = new PriorityQueue<>( - iterators.size(), - peekComp.thenComparing(o -> -o.id) - ); - - int id = 0; - for (Iterator iterator : iterators) { - if (iterator.hasNext()) { - priorityQueue.add(new PeekIterator<>(id++, iterator)); - } - } - } - - private PeekIterator peek() { - while (peek == null) { - peek = priorityQueue.poll(); - if (peek == null) { - return null; - } - peekFromQueue(); - if (peek.peek() == null) { - peek = null; - continue; - } - if (skip(peek.peek())) { - peek.next(); - if (peek.hasNext()) { - priorityQueue.add(peek); - } - peek = null; - } - } - - return peek; - } - - private void peekFromQueue() { - while (true) { - PeekIterator next = priorityQueue.peek(); - if (next == null) { - break; - } - - int compare = comparator.compare(peek.peek(), next.peek()); - if (compare != 0) break; - PeekIterator poll = priorityQueue.poll(); - if (poll == null) continue; - poll.next(); - if (poll.hasNext()) { - priorityQueue.add(poll); - } - } - } - - protected boolean skip(T t) { - return t == null; - } - - @Override - public boolean hasNext() { - return peek() != null; - } - - @Override - public T next() { - PeekIterator peekIterator = peek(); - if (peekIterator == null) { - throw new NoSuchElementException(); - } - T next = peek.next(); - this.peek = null; - if (peekIterator.hasNext()) { - priorityQueue.add(peekIterator); - } - return next; - } -} diff --git a/src/main/java/ru/vk/itmo/khodosovaelena/MergingEntryIterator.java b/src/main/java/ru/vk/itmo/khodosovaelena/MergingEntryIterator.java new file mode 100644 index 000000000..63225d474 --- /dev/null +++ b/src/main/java/ru/vk/itmo/khodosovaelena/MergingEntryIterator.java @@ -0,0 +1,68 @@ +package ru.vk.itmo.khodosovaelena; + +import ru.vk.itmo.Entry; + +import java.lang.foreign.MemorySegment; +import java.util.*; + +/** + * Merges entry {@link Iterator}s. + * + * @author incubos + */ +final class MergingEntryIterator implements Iterator> { + private final Queue iterators; + + MergingEntryIterator(final List iterators) { + assert iterators.stream().allMatch(WeightedPeekingEntryIterator::hasNext); + + this.iterators = new PriorityQueue<>(iterators); + } + + @Override + public boolean hasNext() { + return !iterators.isEmpty(); + } + + @Override + public Entry next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + final WeightedPeekingEntryIterator top = iterators.remove(); + final Entry result = top.next(); + + if (top.hasNext()) { + // Not exhausted + iterators.add(top); + } + + // Remove older versions of the key + while (true) { + final WeightedPeekingEntryIterator iterator = iterators.peek(); + if (iterator == null) { + // Nothing left + break; + } + + // Skip entries with the same key + final Entry entry = iterator.peek(); + if (MemorySegmentComparator.INSTANCE.compare(result.key(), entry.key()) != 0) { + // Reached another key + break; + } + + // Drop + iterators.remove(); + // Skip + iterator.next(); + if (iterator.hasNext()) { + // Not exhausted + iterators.add(iterator); + } + } + + return result; + } +} diff --git a/src/main/java/ru/vk/itmo/khodosovaelena/ReferenceDao.java b/src/main/java/ru/vk/itmo/khodosovaelena/ReferenceDao.java new file mode 100644 index 000000000..6ba7024c8 --- /dev/null +++ b/src/main/java/ru/vk/itmo/khodosovaelena/ReferenceDao.java @@ -0,0 +1,294 @@ +package ru.vk.itmo.khodosovaelena; + +import ru.vk.itmo.Config; +import ru.vk.itmo.Dao; +import ru.vk.itmo.Entry; + +import java.io.IOException; +import java.lang.foreign.Arena; +import java.lang.foreign.MemorySegment; +import java.sql.Timestamp; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * Reference implementation of {@link Dao}. + * + * @author incubos + */ +public class ReferenceDao implements Dao> { + private final Config config; + private final Arena arena; + + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + // Guarded by lock + private volatile TableSet tableSet; + + private final ExecutorService flusher = + Executors.newSingleThreadExecutor(r -> { + final Thread result = new Thread(r); + result.setName("flusher"); + return result; + }); + private final ExecutorService compactor = + Executors.newSingleThreadExecutor(r -> { + final Thread result = new Thread(r); + result.setName("compactor"); + return result; + }); + + private final AtomicBoolean closed = new AtomicBoolean(); + + public ReferenceDao(final Config config) throws IOException { + this.config = config; + this.arena = Arena.ofShared(); + + // First complete promotion of compacted SSTables + SSTables.promote( + config.basePath(), + 0, + 1); + + this.tableSet = + TableSet.from( + SSTables.discover( + arena, + config.basePath())); + } + + @Override + public Iterator> get( + final MemorySegment from, + final MemorySegment to) { + return new LiveFilteringIterator( + tableSet.get( + from, + to)); + } + + @Override + public Entry get(final MemorySegment key) { + // Without lock, just snapshot of table set + return tableSet.get(key); + } + + @Override + public void upsert(final Entry entry) { + final boolean autoFlush; + lock.readLock().lock(); + try { + if (tableSet.memTableSize.get() > config.flushThresholdBytes() + && tableSet.flushingTable != null) { + throw new IllegalStateException("Can't keep up with flushing!"); + } + + // Upsert + final Entry previous = tableSet.upsert(entry); + + // Update size estimate + final long size = tableSet.memTableSize.addAndGet(sizeOf(entry) - sizeOf(previous)); + autoFlush = size > config.flushThresholdBytes(); + } finally { + lock.readLock().unlock(); + } + + if (autoFlush) { + initiateFlush(true); + } + } + + private static long sizeOf(final Entry entry) { + if (entry == null) { + return 0L; + } + + if (entry.value() == null) { + return entry.key().byteSize(); + } + + return entry.key().byteSize() + entry.value().byteSize(); + } + + private void initiateFlush(final boolean auto) { + flusher.submit(() -> { + final TableSet currentTableSet; + lock.writeLock().lock(); + try { + if (this.tableSet.memTable.isEmpty()) { + // Nothing to flush + return; + } + + if (auto && this.tableSet.memTableSize.get() < config.flushThresholdBytes()) { + // Not enough data to flush + return; + } + + // Switch memTable to flushing + currentTableSet = this.tableSet.flushing(); + this.tableSet = currentTableSet; + } finally { + lock.writeLock().unlock(); + } + + // Write + final long timestamp = System.currentTimeMillis(); + // final int sequence = currentTableSet.nextSequence(); + try { + new SSTableWriter() + .write( + config.basePath(), + timestamp, + currentTableSet.flushingTable.get(null, null)); + } catch (IOException e) { + e.printStackTrace(); + Runtime.getRuntime().halt(-1); + return; + } + + // Open + final SSTable flushed; + try { + flushed = SSTables.open( + arena, + config.basePath(), + timestamp); + } catch (IOException e) { + e.printStackTrace(); + Runtime.getRuntime().halt(-2); + return; + } + + // Switch + lock.writeLock().lock(); + try { + this.tableSet = this.tableSet.flushed(flushed); + } finally { + lock.writeLock().unlock(); + } + }).state(); + } + + @Override + public void flush() throws IOException { + initiateFlush(false); + } + + @Override + public void compact() throws IOException { + compactor.submit(() -> { + final TableSet currentTableSet; + lock.writeLock().lock(); + try { + currentTableSet = this.tableSet; + if (currentTableSet.ssTables.size() < 2) { + // Nothing to compact + return; + } + } finally { + lock.writeLock().unlock(); + } + + // Compact to 0 + try { + new SSTableWriter() + .write( + config.basePath(), + 0, + new LiveFilteringIterator( + currentTableSet.allSSTableEntries())); + } catch (IOException e) { + e.printStackTrace(); + Runtime.getRuntime().halt(-3); + } + + // Open 0 + final SSTable compacted; + try { + compacted = + SSTables.open( + arena, + config.basePath(), + 0); + } catch (IOException e) { + e.printStackTrace(); + Runtime.getRuntime().halt(-4); + return; + } + + // Replace old SSTables with compacted one to + // keep serving requests + final Set replaced = new HashSet<>(currentTableSet.ssTables); + lock.writeLock().lock(); + try { + this.tableSet = + this.tableSet.compacted( + replaced, + compacted); + } finally { + lock.writeLock().unlock(); + } + + // Remove compacted SSTables starting from the oldest ones. + // If we crash, 0 contains all the data, and + // it will be promoted on reopen. + for (final SSTable ssTable : currentTableSet.ssTables.reversed()) { + try { + SSTables.remove( + config.basePath(), + ssTable.timestamp); + } catch (IOException e) { + e.printStackTrace(); + Runtime.getRuntime().halt(-5); + } + } + + // Promote zero to one (possibly replacing) + try { + SSTables.promote( + config.basePath(), + 0, + 1); + } catch (IOException e) { + e.printStackTrace(); + Runtime.getRuntime().halt(-6); + } + + // Replace promoted SSTable + lock.writeLock().lock(); + try { + this.tableSet = + this.tableSet.compacted( + Collections.singleton(compacted), + compacted.withSequence(1)); + } finally { + lock.writeLock().unlock(); + } + }).state(); + } + + @Override + public void close() throws IOException { + if (closed.getAndSet(true)) { + // Already closed + return; + } + + // Maybe flush + flush(); + + // Stop all the threads + flusher.close(); + compactor.close(); + + // Close arena + arena.close(); + } +} diff --git a/src/main/java/ru/vk/itmo/khodosovaelena/SSTable.java b/src/main/java/ru/vk/itmo/khodosovaelena/SSTable.java new file mode 100644 index 000000000..a980db0bf --- /dev/null +++ b/src/main/java/ru/vk/itmo/khodosovaelena/SSTable.java @@ -0,0 +1,204 @@ +package ru.vk.itmo.khodosovaelena; + +import ru.vk.itmo.BaseEntry; +import ru.vk.itmo.Entry; + +import java.lang.foreign.MemorySegment; +import java.lang.foreign.ValueLayout; +import java.util.Collections; +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * Persistent SSTable in data file and index file. + * + * @author incubos + * @see SSTables + */ +final class SSTable { + final long timestamp; + + private final MemorySegment index; + private final MemorySegment data; + private final long size; + + SSTable( + final long timestamp, + final MemorySegment index, + final MemorySegment data) { + this.timestamp = timestamp; + this.index = index; + this.data = data; + this.size = index.byteSize() / Long.BYTES; + } + + SSTable withSequence(final int sequence) { + return new SSTable( + sequence, + index, + data); + } + + /** + * Returns index of the entry if found; otherwise, (-(insertion point) - 1). + * The insertion point is defined as the point at which the key would be inserted: + * the index of the first element greater than the key, + * or size if all keys are less than the specified key. + * Note that this guarantees that the return value will be >= 0 + * if and only if the key is found. + */ + private long entryBinarySearch(final MemorySegment key) { + long low = 0L; + long high = size - 1; + + while (low <= high) { + final long mid = (low + high) >>> 1; + final long midEntryOffset = entryOffset(mid); + final long midKeyLength = getLength(midEntryOffset); + final int compare = + MemorySegmentComparator.compare( + data, + midEntryOffset + Long.BYTES, // Position at key + midKeyLength, + key, + 0L, + key.byteSize()); + + if (compare < 0) { + low = mid + 1; + } else if (compare > 0) { + high = mid - 1; + } else { + return mid; + } + } + + return -(low + 1); + } + + private long entryOffset(final long entry) { + return index.get( + ValueLayout.OfLong.JAVA_LONG, + entry * Long.BYTES); + } + + private long getLength(final long offset) { + return data.get( + ValueLayout.OfLong.JAVA_LONG_UNALIGNED, + offset); + } + + Iterator> get( + final MemorySegment from, + final MemorySegment to) { + assert from == null || to == null || MemorySegmentComparator.INSTANCE.compare(from, to) <= 0; + + // Slice of SSTable in absolute offsets + final long fromOffset; + final long toOffset; + + // Left offset bound + if (from == null) { + // Start from the beginning + fromOffset = 0L; + } else { + final long fromEntry = entryBinarySearch(from); + if (fromEntry >= 0L) { + fromOffset = entryOffset(fromEntry); + } else if (-fromEntry - 1 == size) { + // No relevant data + return Collections.emptyIterator(); + } else { + // Greater but existing key found + fromOffset = entryOffset(-fromEntry - 1); + } + } + + // Right offset bound + if (to == null) { + // Up to the end + toOffset = data.byteSize(); + } else { + final long toEntry = entryBinarySearch(to); + if (toEntry >= 0L) { + toOffset = entryOffset(toEntry); + } else if (-toEntry - 1 == size) { + // Up to the end + toOffset = data.byteSize(); + } else { + // Greater but existing key found + toOffset = entryOffset(-toEntry - 1); + } + } + + return new SliceIterator(fromOffset, toOffset); + } + + Entry get(final MemorySegment key) { + final long entry = entryBinarySearch(key); + if (entry < 0) { + return null; + } + + // Skip key (will reuse the argument) + long offset = entryOffset(entry); + offset += Long.BYTES + key.byteSize(); + // Extract value length + final long valueLength = getLength(offset); + if (valueLength == SSTables.TOMBSTONE_VALUE_LENGTH) { + // Tombstone encountered + return new BaseEntry<>(key, null); + } else { + // Get value + offset += Long.BYTES; + final MemorySegment value = data.asSlice(offset, valueLength); + return new BaseEntry<>(key, value); + } + } + + private final class SliceIterator implements Iterator> { + private long offset; + private final long toOffset; + + private SliceIterator( + final long offset, + final long toOffset) { + this.offset = offset; + this.toOffset = toOffset; + } + + @Override + public boolean hasNext() { + return offset < toOffset; + } + + @Override + public Entry next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + // Read key length + final long keyLength = getLength(offset); + offset += Long.BYTES; + + // Read key + final MemorySegment key = data.asSlice(offset, keyLength); + offset += keyLength; + + // Read value length + final long valueLength = getLength(offset); + offset += Long.BYTES; + + // Read value + if (valueLength == SSTables.TOMBSTONE_VALUE_LENGTH) { + // Tombstone encountered + return new BaseEntry<>(key, null); + } else { + final MemorySegment value = data.asSlice(offset, valueLength); + offset += valueLength; + return new BaseEntry<>(key, value); + } + } + } +} diff --git a/src/main/java/ru/vk/itmo/khodosovaelena/SSTableWriter.java b/src/main/java/ru/vk/itmo/khodosovaelena/SSTableWriter.java new file mode 100644 index 000000000..002593dde --- /dev/null +++ b/src/main/java/ru/vk/itmo/khodosovaelena/SSTableWriter.java @@ -0,0 +1,166 @@ +package ru.vk.itmo.khodosovaelena; + +import ru.vk.itmo.Entry; + +import java.io.BufferedOutputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.lang.foreign.MemorySegment; +import java.lang.foreign.ValueLayout; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.util.Iterator; + +/** + * Writes {@link Entry} {@link Iterator} to SSTable on disk. + * + *

Index file {@code .index} contains {@code long} offsets to entries in data file: + * {@code [offset0, offset1, ...]} + * + *

Data file {@code .data} contains serialized entries: + * {@code } + * + *

Tombstones are encoded as {@code valueLength} {@code -1} and no subsequent value. + * + * @author incubos + */ +final class SSTableWriter { + private static final int BUFFER_SIZE = 64 * 1024; + + // Reusable buffers to eliminate allocations. + // But excessive memory copying is still there :( + // Long cell + private final ByteArraySegment longBuffer = new ByteArraySegment(Long.BYTES); + // Growable blob cell + private final ByteArraySegment blobBuffer = new ByteArraySegment(512); + + void write( + final Path baseDir, + final long timestamp, + final Iterator> entries) throws IOException { + // Write to temporary files + final Path tempIndexName = SSTables.tempIndexName(baseDir, timestamp); + final Path tempDataName = SSTables.tempDataName(baseDir, timestamp); + + // Delete temporary files to eliminate tails + Files.deleteIfExists(tempIndexName); + Files.deleteIfExists(tempDataName); + + // Iterate in a single pass! + // Will write through FileChannel despite extra memory copying and + // no buffering (which may be implemented later). + // Looking forward to MemorySegment facilities in FileChannel! + try (OutputStream index = + new BufferedOutputStream( + new FileOutputStream( + tempIndexName.toFile()), + BUFFER_SIZE); + OutputStream data = + new BufferedOutputStream( + new FileOutputStream( + tempDataName.toFile()), + BUFFER_SIZE)) { + long entryOffset = 0L; + + // Iterate and serialize + while (entries.hasNext()) { + // First write offset to the entry + writeLong(entryOffset, index); + + // Then write the entry + final Entry entry = entries.next(); + entryOffset += writeEntry(entry, data); + } + } + + // Publish files atomically + // FIRST index, LAST data + final Path indexName = + SSTables.indexName( + baseDir, + timestamp); + Files.move( + tempIndexName, + indexName, + StandardCopyOption.ATOMIC_MOVE, + StandardCopyOption.REPLACE_EXISTING); + final Path dataName = + SSTables.dataName( + baseDir, + timestamp); + Files.move( + tempDataName, + dataName, + StandardCopyOption.ATOMIC_MOVE, + StandardCopyOption.REPLACE_EXISTING); + } + + private void writeLong( + final long value, + final OutputStream os) throws IOException { + longBuffer.segment().set( + ValueLayout.OfLong.JAVA_LONG_UNALIGNED, + 0, + value); + longBuffer.withArray(os::write); + } + + private void writeSegment( + final MemorySegment value, + final OutputStream os) throws IOException { + final long size = value.byteSize(); + blobBuffer.ensureCapacity(size); + MemorySegment.copy( + value, + 0L, + blobBuffer.segment(), + 0L, + size); + blobBuffer.withArray(array -> + os.write( + array, + 0, + (int) size)); + } + + /** + * Writes {@link Entry} to {@link FileChannel}. + * + * @return written bytes + */ + private long writeEntry( + final Entry entry, + final OutputStream os) throws IOException { + final MemorySegment key = entry.key(); + final MemorySegment value = entry.value(); + long result = 0L; + + // Key size + writeLong(key.byteSize(), os); + result += Long.BYTES; + + // Key + writeSegment(key, os); + result += key.byteSize(); + + // Value size and possibly value + if (value == null) { + // Tombstone + writeLong(SSTables.TOMBSTONE_VALUE_LENGTH, os); + result += Long.BYTES; + } else { + // Value length + writeLong(value.byteSize(), os); + result += Long.BYTES; + + // Value + writeSegment(value, os); + result += value.byteSize(); + } + + return result; + } +} diff --git a/src/main/java/ru/vk/itmo/khodosovaelena/SSTables.java b/src/main/java/ru/vk/itmo/khodosovaelena/SSTables.java new file mode 100644 index 000000000..0a80b054b --- /dev/null +++ b/src/main/java/ru/vk/itmo/khodosovaelena/SSTables.java @@ -0,0 +1,162 @@ +package ru.vk.itmo.khodosovaelena; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.lang.foreign.Arena; +import java.lang.foreign.MemorySegment; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Stream; + +/** + * Provides {@link SSTable} management facilities: dumping and discovery. + * + * @author incubos + */ +final class SSTables { + public static final String INDEX_SUFFIX = ".index"; + public static final String DATA_SUFFIX = ".data"; + public static final long TOMBSTONE_VALUE_LENGTH = -1L; + + private static final String TEMP_SUFFIX = ".tmp"; + + /** + * Can't instantiate. + */ + private SSTables() { + // Only static methods + } + + static Path indexName( + final Path baseDir, + final long timestamp) { + return baseDir.resolve(timestamp + INDEX_SUFFIX); + } + + static Path dataName( + final Path baseDir, + final long timestamp) { + return baseDir.resolve(timestamp + DATA_SUFFIX); + } + + static Path tempIndexName( + final Path baseDir, + final long timestamp) { + return baseDir.resolve(timestamp + INDEX_SUFFIX + TEMP_SUFFIX); + } + + static Path tempDataName( + final Path baseDir, + final long timestamp) { + return baseDir.resolve(timestamp + DATA_SUFFIX + TEMP_SUFFIX); + } + + /** + * Returns {@link List} of {@link SSTable}s from freshest to oldest. + */ + static List discover( + final Arena arena, + final Path baseDir) throws IOException { + if (!Files.exists(baseDir)) { + return Collections.emptyList(); + } + + final List result = new ArrayList<>(); + try (Stream files = Files.list(baseDir)) { + files.forEach(file -> { + final String fileName = file.getFileName().toString(); + if (!fileName.endsWith(DATA_SUFFIX)) { + // Skip non data + return; + } + + final long timestamp = + // .data -> N + Long.parseLong( + fileName.substring( + 0, + fileName.length() - DATA_SUFFIX.length())); + + try { + result.add(open(arena, baseDir, timestamp)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + } + + // Sort from freshest to oldest + result.sort((o1, o2) -> Long.compare(o2.timestamp, o1.timestamp)); + + return Collections.unmodifiableList(result); + } + + static SSTable open( + final Arena arena, + final Path baseDir, + final long timestamp) throws IOException { + final MemorySegment index = + mapReadOnly( + arena, + indexName(baseDir, timestamp)); + final MemorySegment data = + mapReadOnly( + arena, + dataName(baseDir, timestamp)); + + return new SSTable( + timestamp, + index, + data); + } + + private static MemorySegment mapReadOnly( + final Arena arena, + final Path file) throws IOException { + try (FileChannel channel = + FileChannel.open( + file, + StandardOpenOption.READ)) { + return channel.map( + FileChannel.MapMode.READ_ONLY, + 0L, + Files.size(file), + arena); + } + } + + static void remove( + final Path baseDir, + final long timestamp) throws IOException { + // First delete data file to make SSTable invisible + Files.delete(dataName(baseDir, timestamp)); + Files.delete(indexName(baseDir, timestamp)); + } + + static void promote( + final Path baseDir, + final int from, + final int to) throws IOException { + // Build to progress to the same outcome + if (Files.exists(indexName(baseDir, from))) { + Files.move( + indexName(baseDir, from), + indexName(baseDir, to), + StandardCopyOption.ATOMIC_MOVE, + StandardCopyOption.REPLACE_EXISTING); + } + if (Files.exists(dataName(baseDir, from))) { + Files.move( + dataName(baseDir, from), + dataName(baseDir, to), + StandardCopyOption.ATOMIC_MOVE, + StandardCopyOption.REPLACE_EXISTING); + } + } +} diff --git a/src/main/java/ru/vk/itmo/khodosovaelena/TableSet.java b/src/main/java/ru/vk/itmo/khodosovaelena/TableSet.java new file mode 100644 index 000000000..ea0141018 --- /dev/null +++ b/src/main/java/ru/vk/itmo/khodosovaelena/TableSet.java @@ -0,0 +1,194 @@ +package ru.vk.itmo.khodosovaelena; + +import ru.vk.itmo.Entry; + +import java.lang.foreign.MemorySegment; +import java.util.*; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Data set in various tables. + * + * @author incubos + */ +final class TableSet { + final MemTable memTable; + final AtomicLong memTableSize; + // null or read-only + final MemTable flushingTable; + // From freshest to oldest + final List ssTables; + + private TableSet( + final MemTable memTable, + final AtomicLong memTableSize, + final MemTable flushingTable, + final List ssTables) { + this.memTable = memTable; + this.memTableSize = memTableSize; + this.flushingTable = flushingTable; + this.ssTables = ssTables; + } + + static TableSet from(final List ssTables) { + return new TableSet( + new MemTable(), + new AtomicLong(), + null, + ssTables); + } + + TableSet flushing() { + if (memTable.isEmpty()) { + throw new IllegalStateException("Nothing to flush"); + } + + if (flushingTable != null) { + throw new IllegalStateException("Already flushing"); + } + + return new TableSet( + new MemTable(), + new AtomicLong(), + memTable, + ssTables); + } + + TableSet flushed(final SSTable flushed) { + final List newSSTables = new ArrayList<>(ssTables.size() + 1); + newSSTables.add(flushed); + newSSTables.addAll(ssTables); + return new TableSet( + memTable, + memTableSize, + null, + newSSTables); + } + + TableSet compacted( + final Set replaced, + final SSTable with) { + final List newSsTables = new ArrayList<>(this.ssTables.size() + 1); + + // Keep not replaced SSTables + for (final SSTable ssTable : this.ssTables) { + if (!replaced.contains(ssTable)) { + newSsTables.add(ssTable); + } + } + + // Logically the oldest one + newSsTables.add(with); + + return new TableSet( + memTable, + memTableSize, + flushingTable, + newSsTables); + } + + Iterator> get( + final MemorySegment from, + final MemorySegment to) { + final List iterators = + new ArrayList<>(2 + ssTables.size()); + + // MemTable goes first + final Iterator> memTableIterator = + memTable.get(from, to); + if (memTableIterator.hasNext()) { + iterators.add( + new WeightedPeekingEntryIterator( + Integer.MIN_VALUE, + memTableIterator)); + } + + // Then goes flushing + if (flushingTable != null) { + final Iterator> flushingIterator = + flushingTable.get(from, to); + if (flushingIterator.hasNext()) { + iterators.add( + new WeightedPeekingEntryIterator( + Integer.MIN_VALUE + 1, + flushingIterator)); + } + } + + // Then go all the SSTables + for (int i = 0; i < ssTables.size(); i++) { + final SSTable ssTable = ssTables.get(i); + final Iterator> ssTableIterator = + ssTable.get(from, to); + if (ssTableIterator.hasNext()) { + iterators.add( + new WeightedPeekingEntryIterator( + i, + ssTableIterator)); + } + } + + return switch (iterators.size()) { + case 0 -> Collections.emptyIterator(); + case 1 -> iterators.get(0); + default -> new MergingEntryIterator(iterators); + }; + } + + Entry get(final MemorySegment key) { + // Slightly optimized version not to pollute the heap + + // First check MemTable + Entry result = memTable.get(key); + if (result != null) { + // Transform tombstone + return swallowTombstone(result); + } + + // Then check flushing + if (flushingTable != null) { + result = flushingTable.get(key); + if (result != null) { + // Transform tombstone + return swallowTombstone(result); + } + } + + // At last check SSTables from freshest to oldest + for (final SSTable ssTable : ssTables) { + result = ssTable.get(key); + if (result != null) { + // Transform tombstone + return swallowTombstone(result); + } + } + + // Nothing found + return null; + } + + private static Entry swallowTombstone(final Entry entry) { + return entry.value() == null ? null : entry; + } + + Entry upsert(final Entry entry) { + return memTable.upsert(entry); + } + + Iterator> allSSTableEntries() { + final List iterators = + new ArrayList<>(ssTables.size()); + + for (int i = 0; i < ssTables.size(); i++) { + final SSTable ssTable = ssTables.get(i); + final Iterator> ssTableIterator = + ssTable.get(null, null); + iterators.add( + new WeightedPeekingEntryIterator( + i, + ssTableIterator)); + } + + return new MergingEntryIterator(iterators); + } +} diff --git a/src/main/java/ru/vk/itmo/khodosovaelena/WeightedPeekingEntryIterator.java b/src/main/java/ru/vk/itmo/khodosovaelena/WeightedPeekingEntryIterator.java new file mode 100644 index 000000000..024dfd04a --- /dev/null +++ b/src/main/java/ru/vk/itmo/khodosovaelena/WeightedPeekingEntryIterator.java @@ -0,0 +1,67 @@ +package ru.vk.itmo.khodosovaelena; + +import ru.vk.itmo.Entry; + +import java.lang.foreign.MemorySegment; +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * Peeking {@link Iterator} wrapper. + * + * @author incubos + */ +final class WeightedPeekingEntryIterator + implements Iterator>, + Comparable { + private final int weight; + private final Iterator> delegate; + private Entry next; + + WeightedPeekingEntryIterator( + final int weight, + final Iterator> delegate) { + this.weight = weight; + this.delegate = delegate; + this.next = delegate.hasNext() ? delegate.next() : null; + } + + @Override + public boolean hasNext() { + return next != null; + } + + @Override + public Entry next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + final Entry result = next; + next = delegate.hasNext() ? delegate.next() : null; + return result; + } + + Entry peek() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + return next; + } + + @Override + public int compareTo(final WeightedPeekingEntryIterator other) { + // First compare keys + int result = + MemorySegmentComparator.INSTANCE.compare( + peek().key(), + other.peek().key()); + if (result != 0) { + return result; + } + + // Then compare weights if keys are equal + return Integer.compare(weight, other.weight); + } +} diff --git a/src/main/java/ru/vk/itmo/test/khodosovaelena/MyFactory.java b/src/main/java/ru/vk/itmo/test/khodosovaelena/MyFactory.java index 08fed885c..a7957e88f 100644 --- a/src/main/java/ru/vk/itmo/test/khodosovaelena/MyFactory.java +++ b/src/main/java/ru/vk/itmo/test/khodosovaelena/MyFactory.java @@ -3,7 +3,7 @@ import ru.vk.itmo.Config; import ru.vk.itmo.Dao; import ru.vk.itmo.Entry; -import ru.vk.itmo.khodosovaelena.InMemoryDao; +import ru.vk.itmo.khodosovaelena.ReferenceDao; import ru.vk.itmo.test.DaoFactory; import java.io.IOException; @@ -11,11 +11,11 @@ import java.lang.foreign.ValueLayout; import java.nio.charset.StandardCharsets; -@DaoFactory(stage = 4) +@DaoFactory(stage = 5) public class MyFactory implements DaoFactory.Factory> { @Override public Dao> createDao(Config config) throws IOException { - return new InMemoryDao(config); + return new ReferenceDao(config); } @Override