diff --git a/src/main/java/ru/vk/itmo/plyasovklimentii/ByteArraySegment.java b/src/main/java/ru/vk/itmo/plyasovklimentii/ByteArraySegment.java new file mode 100644 index 000000000..e5d6de272 --- /dev/null +++ b/src/main/java/ru/vk/itmo/plyasovklimentii/ByteArraySegment.java @@ -0,0 +1,48 @@ +package ru.vk.itmo.plyasovklimentii; + +import java.io.IOException; +import java.lang.foreign.MemorySegment; +import java.nio.ByteBuffer; + +/** + * Growable buffer with {@link ByteBuffer} and {@link MemorySegment} interface. + * + * @author incubos + */ +final class ByteArraySegment { + private byte[] array; + private MemorySegment segment; + + ByteArraySegment(final int capacity) { + this.array = new byte[capacity]; + this.segment = MemorySegment.ofArray(array); + } + + void withArray(final ArrayConsumer consumer) throws IOException { + consumer.process(array); + } + + MemorySegment segment() { + return segment; + } + + void ensureCapacity(final long size) { + if (size > Integer.MAX_VALUE) { + throw new IllegalArgumentException("Too big!"); + } + + final int capacity = (int) size; + if (array.length >= capacity) { + return; + } + + // Grow to the nearest bigger power of 2 + final int newSize = Integer.highestOneBit(capacity) << 1; + array = new byte[newSize]; + segment = MemorySegment.ofArray(array); + } + + interface ArrayConsumer { + void process(byte[] array) throws IOException; + } +} diff --git a/src/main/java/ru/vk/itmo/plyasovklimentii/DiskStorage.java b/src/main/java/ru/vk/itmo/plyasovklimentii/DiskStorage.java deleted file mode 100644 index 2e3d8e52b..000000000 --- a/src/main/java/ru/vk/itmo/plyasovklimentii/DiskStorage.java +++ /dev/null @@ -1,274 +0,0 @@ -package ru.vk.itmo.plyasovklimentii; - -import ru.vk.itmo.Entry; - -import java.io.IOException; -import java.lang.foreign.Arena; -import java.lang.foreign.MemorySegment; -import java.lang.foreign.ValueLayout; -import java.nio.channels.FileChannel; -import java.nio.charset.StandardCharsets; -import java.nio.file.FileAlreadyExistsException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.StandardCopyOption; -import java.nio.file.StandardOpenOption; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; - -public class DiskStorage { - - private final List segmentList = new CopyOnWriteArrayList<>(); - - public DiskStorage(List segmentList) { - this.segmentList.addAll(segmentList); - } - - public Iterator> range( - List>> addIterators, - MemorySegment from, - MemorySegment to) { - List>> iterators = new ArrayList<>(segmentList.size() + 1); - for (MemorySegment memorySegment : segmentList) { - iterators.add(Utils.iterator(memorySegment, from, to)); - } - iterators.addAll(addIterators); - - return new MergeIterator<>(iterators, Comparator.comparing(Entry::key, PlyasovDao::compare)) { - @Override - protected boolean shouldSkip(Entry memorySegmentEntry) { - return memorySegmentEntry.value() == null; - } - }; - } - - public void saveNextSSTable(Path storagePath, Iterable> iterable, Arena arena) - throws IOException { - final Path indexTmp = storagePath.resolve("index.tmp"); - final Path indexFile = storagePath.resolve("index.idx"); - try { - Files.createFile(indexFile); - } catch (FileAlreadyExistsException ignored) { - // it is ok, actually it is normal state - } - List existedFiles = Files.readAllLines(indexFile, StandardCharsets.UTF_8); - String newFileName = Utils.SSTABLE_PREFIX + existedFiles.size(); - long dataSize = 0; - long count = 0; - for (Entry entry : iterable) { - dataSize += entry.key().byteSize(); - MemorySegment value = entry.value(); - if (value != null) { - dataSize += value.byteSize(); - } - count++; - } - long indexSize = count * 2 * Long.BYTES; - try ( - FileChannel fileChannel = FileChannel.open( - storagePath.resolve(newFileName), - StandardOpenOption.WRITE, - StandardOpenOption.READ, - StandardOpenOption.CREATE - ); - Arena writeArena = Arena.ofConfined() - ) { - MemorySegment fileSegment = fileChannel.map( - FileChannel.MapMode.READ_WRITE, - 0, - indexSize + dataSize, - writeArena - ); - - long dataOffset = indexSize; - int indexOffset = 0; - for (Entry entry : iterable) { - fileSegment.set(ValueLayout.JAVA_LONG_UNALIGNED, indexOffset, dataOffset); - dataOffset += entry.key().byteSize(); - indexOffset += Long.BYTES; - - MemorySegment value = entry.value(); - if (value == null) { - fileSegment.set(ValueLayout.JAVA_LONG_UNALIGNED, indexOffset, Utils.tombstone(dataOffset)); - } else { - fileSegment.set(ValueLayout.JAVA_LONG_UNALIGNED, indexOffset, dataOffset); - dataOffset += value.byteSize(); - } - indexOffset += Long.BYTES; - } - - dataOffset = indexSize; - for (Entry entry : iterable) { - MemorySegment key = entry.key(); - MemorySegment.copy(key, 0, fileSegment, dataOffset, key.byteSize()); - dataOffset += key.byteSize(); - - MemorySegment value = entry.value(); - if (value != null) { - MemorySegment.copy(value, 0, fileSegment, dataOffset, value.byteSize()); - dataOffset += value.byteSize(); - } - } - } - - List list = new ArrayList<>(existedFiles.size() + 1); - list.addAll(existedFiles); - list.add(newFileName); - Files.write( - indexTmp, - list, - StandardOpenOption.WRITE, - StandardOpenOption.CREATE, - StandardOpenOption.TRUNCATE_EXISTING - ); - - Files.deleteIfExists(indexFile); - - Files.move(indexTmp, indexFile, StandardCopyOption.ATOMIC_MOVE); - - if (arena.scope().isAlive()) { - addSegment(storagePath.resolve(newFileName), arena); - } - } - - public void addSegment(Path file, Arena arena) throws IOException { - try (FileChannel fileChannel = FileChannel.open(file, StandardOpenOption.READ, StandardOpenOption.WRITE)) { - MemorySegment fileSegment = fileChannel.map( - FileChannel.MapMode.READ_WRITE, - 0, - Files.size(file), - arena - ); - this.segmentList.add(fileSegment); - } - } - - public static void compact(Path storagePath, Iterable> iterable) throws IOException { - Utils.deleteAllSSTables(storagePath); - String newFileName = "compaction.tmp"; - Path compactionTmpFile = storagePath.resolve(newFileName); - - long dataSize = 0; - long count = 0; - for (Entry entry : iterable) { - dataSize += entry.key().byteSize(); - MemorySegment value = entry.value(); - if (value != null) { - dataSize += value.byteSize(); - } - count++; - } - long indexSize = count * 2 * Long.BYTES; - - try ( - FileChannel fileChannel = FileChannel.open( - compactionTmpFile, - StandardOpenOption.WRITE, - StandardOpenOption.READ, - StandardOpenOption.CREATE - ); - Arena writeArena = Arena.ofConfined() - ) { - long totalSize = indexSize + dataSize; - MemorySegment fileSegment = fileChannel.map( - FileChannel.MapMode.READ_WRITE, - 0, - totalSize, - writeArena - ); - - // index: - // |key0_Start|value0_Start|key1_Start|value1_Start|key2_Start|value2_Start|... - // key0_Start = data start = end of index - long dataOffset = indexSize; - int indexOffset = 0; - for (Entry entry : iterable) { - fileSegment.set( - ValueLayout.JAVA_LONG_UNALIGNED, - indexOffset, - dataOffset - ); - dataOffset += entry.key().byteSize(); - indexOffset += Long.BYTES; - - MemorySegment value = entry.value(); - if (value == null) { - fileSegment.set( - ValueLayout.JAVA_LONG_UNALIGNED, - indexOffset, - Utils.tombstone(dataOffset) - ); - } else { - fileSegment.set( - ValueLayout.JAVA_LONG_UNALIGNED, - indexOffset, - dataOffset - ); - dataOffset += value.byteSize(); - } - indexOffset += Long.BYTES; - } - - // data: - // |key0|value0|key1|value1|... - dataOffset = indexSize; - for (Entry entry : iterable) { - MemorySegment key = entry.key(); - MemorySegment.copy(key, 0, fileSegment, dataOffset, key.byteSize()); - dataOffset += key.byteSize(); - - MemorySegment value = entry.value(); - if (value != null) { - MemorySegment.copy(value, 0, fileSegment, dataOffset, value.byteSize()); - dataOffset += value.byteSize(); - } - } - } - - Files.move( - compactionTmpFile, - storagePath.resolve("compaction"), - StandardCopyOption.ATOMIC_MOVE, - StandardCopyOption.REPLACE_EXISTING - ); - - Utils.finalizeCompaction(storagePath); - } - - public static List loadOrRecover(Path storagePath, Arena arena) throws IOException { - if (Files.exists(Utils.compactionFile(storagePath))) { - Utils.finalizeCompaction(storagePath); - } - - Path indexTmp = storagePath.resolve("index.tmp"); - Path indexFile = storagePath.resolve("index.idx"); - - if (!Files.exists(indexFile)) { - if (Files.exists(indexTmp)) { - Files.move(indexTmp, indexFile, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING); - } else { - Files.createFile(indexFile); - } - } - - List existedFiles = Files.readAllLines(indexFile, StandardCharsets.UTF_8); - List result = new ArrayList<>(existedFiles.size()); - for (String fileName : existedFiles) { - Path file = storagePath.resolve(fileName); - try (FileChannel fileChannel = FileChannel.open(file, StandardOpenOption.READ, StandardOpenOption.WRITE)) { - MemorySegment fileSegment = fileChannel.map( - FileChannel.MapMode.READ_WRITE, - 0, - Files.size(file), - arena - ); - result.add(fileSegment); - } - } - - return result; - } -} diff --git a/src/main/java/ru/vk/itmo/plyasovklimentii/LiveFilteringIterator.java b/src/main/java/ru/vk/itmo/plyasovklimentii/LiveFilteringIterator.java new file mode 100644 index 000000000..1fb8433a9 --- /dev/null +++ b/src/main/java/ru/vk/itmo/plyasovklimentii/LiveFilteringIterator.java @@ -0,0 +1,52 @@ +package ru.vk.itmo.plyasovklimentii; + +import ru.vk.itmo.Entry; + +import java.lang.foreign.MemorySegment; +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * Filters non tombstone {@link Entry}s. + * + * @author incubos + */ +final class LiveFilteringIterator implements Iterator> { + private final Iterator> delegate; + private Entry next; + + LiveFilteringIterator(final Iterator> delegate) { + this.delegate = delegate; + skipTombstones(); + } + + private void skipTombstones() { + while (delegate.hasNext()) { + final Entry entry = delegate.next(); + if (entry.value() != null) { + this.next = entry; + break; + } + } + } + + @Override + public boolean hasNext() { + return next != null; + } + + @Override + public Entry next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + // Consume + final Entry result = next; + next = null; + + skipTombstones(); + + return result; + } +} diff --git a/src/main/java/ru/vk/itmo/plyasovklimentii/MemTable.java b/src/main/java/ru/vk/itmo/plyasovklimentii/MemTable.java new file mode 100644 index 000000000..cd3060887 --- /dev/null +++ b/src/main/java/ru/vk/itmo/plyasovklimentii/MemTable.java @@ -0,0 +1,49 @@ +package ru.vk.itmo.plyasovklimentii; + +import ru.vk.itmo.Entry; + +import java.lang.foreign.MemorySegment; +import java.util.Iterator; +import java.util.NavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; + +/** + * Memory table. + * + * @author incubos + */ +final class MemTable { + private final NavigableMap> map = + new ConcurrentSkipListMap<>( + MemorySegmentComparator.INSTANCE); + + boolean isEmpty() { + return map.isEmpty(); + } + + Iterator> get( + final MemorySegment from, + final MemorySegment to) { + if (from == null && to == null) { + // All + return map.values().iterator(); + } else if (from == null) { + // Head + return map.headMap(to).values().iterator(); + } else if (to == null) { + // Tail + return map.tailMap(from).values().iterator(); + } else { + // Slice + return map.subMap(from, to).values().iterator(); + } + } + + Entry get(final MemorySegment key) { + return map.get(key); + } + + Entry upsert(final Entry entry) { + return map.put(entry.key(), entry); + } +} diff --git a/src/main/java/ru/vk/itmo/plyasovklimentii/MemorySegmentComparator.java b/src/main/java/ru/vk/itmo/plyasovklimentii/MemorySegmentComparator.java new file mode 100644 index 000000000..be7f46560 --- /dev/null +++ b/src/main/java/ru/vk/itmo/plyasovklimentii/MemorySegmentComparator.java @@ -0,0 +1,89 @@ +package ru.vk.itmo.plyasovklimentii; + +import java.lang.foreign.MemorySegment; +import java.lang.foreign.ValueLayout; +import java.util.Comparator; + +/** + * Compares {@link MemorySegment}s. + * + * @author incubos + */ +final class MemorySegmentComparator implements Comparator { + static final Comparator INSTANCE = + new MemorySegmentComparator(); + + private MemorySegmentComparator() { + // Singleton + } + + @Override + public int compare( + final MemorySegment left, + final MemorySegment right) { + final long mismatch = left.mismatch(right); + if (mismatch == -1L) { + // No mismatch + return 0; + } + + if (mismatch == left.byteSize()) { + // left is prefix of right, so left is smaller + return -1; + } + + if (mismatch == right.byteSize()) { + // right is prefix of left, so left is greater + return 1; + } + + // Compare mismatched bytes as unsigned + return Byte.compareUnsigned( + left.getAtIndex( + ValueLayout.OfByte.JAVA_BYTE, + mismatch), + right.getAtIndex( + ValueLayout.OfByte.JAVA_BYTE, + mismatch)); + } + + static int compare( + final MemorySegment srcSegment, + final long srcFromOffset, + final long srcLength, + final MemorySegment dstSegment, + final long dstFromOffset, + final long dstLength) { + final long mismatch = + MemorySegment.mismatch( + srcSegment, + srcFromOffset, + srcFromOffset + srcLength, + dstSegment, + dstFromOffset, + dstFromOffset + dstLength); + if (mismatch == -1L) { + // No mismatch + return 0; + } + + if (mismatch == srcLength) { + // left is prefix of right, so left is smaller + return -1; + } + + if (mismatch == dstLength) { + // right is prefix of left, so left is greater + return 1; + } + + // Compare mismatched bytes as unsigned + return Byte.compareUnsigned( + srcSegment.getAtIndex( + ValueLayout.OfByte.JAVA_BYTE, + srcFromOffset + mismatch), + dstSegment.getAtIndex( + ValueLayout.OfByte.JAVA_BYTE, + dstFromOffset + mismatch)); + } +} diff --git a/src/main/java/ru/vk/itmo/plyasovklimentii/MergeIterator.java b/src/main/java/ru/vk/itmo/plyasovklimentii/MergeIterator.java deleted file mode 100644 index 759371bf2..000000000 --- a/src/main/java/ru/vk/itmo/plyasovklimentii/MergeIterator.java +++ /dev/null @@ -1,149 +0,0 @@ -package ru.vk.itmo.plyasovklimentii; - -import java.util.Collection; -import java.util.Comparator; -import java.util.Iterator; -import java.util.NoSuchElementException; -import java.util.PriorityQueue; - -public class MergeIterator implements Iterator { - - private final PriorityQueue> priorityQueue; - private final Comparator comparator; - private PeekIterator nextIterator; - - private static class PeekIterator implements Iterator { - - public final int id; - private final Iterator delegate; - private T peek; - - private PeekIterator(int id, Iterator delegate) { - this.id = id; - this.delegate = delegate; - } - - @Override - public boolean hasNext() { - if (peek == null) { - return delegate.hasNext(); - } - return true; - } - - @Override - public T next() { - if (!hasNext()) { - throw new NoSuchElementException(); - } - T peeked = peek(); - this.peek = null; - return peeked; - } - - private T peek() { - if (peek == null) { - if (!delegate.hasNext()) { - return null; - } - peek = delegate.next(); - } - return peek; - } - } - - public MergeIterator(Collection> iterators, Comparator comparator) { - this.comparator = comparator; - Comparator> peekComp = (o1, o2) -> comparator.compare(o1.peek(), o2.peek()); - priorityQueue = new PriorityQueue<>( - iterators.size(), - peekComp.thenComparing(o -> -o.id) - ); - - int id = 0; - for (Iterator iterator : iterators) { - if (iterator.hasNext()) { - priorityQueue.add(new PeekIterator<>(id++, iterator)); - } - } - } - - private PeekIterator peek() { - while (nextIterator == null) { - nextIterator = priorityQueue.poll(); - if (nextIterator == null) { - return null; - } - - skipIteratorsWithSameKey(); - - if (nextIterator.peek() == null) { - nextIterator = null; - continue; - } - - if (shouldSkip(nextIterator.peek())) { - moveNextAndPutBack(nextIterator); - nextIterator = null; - } - } - - return nextIterator; - } - - private void skipIteratorsWithSameKey() { - while (true) { - PeekIterator next = priorityQueue.peek(); - if (next == null) { - break; - } - - if (!skipTheSameKey(next)) { - break; - } - } - } - - private boolean skipTheSameKey(PeekIterator next) { - int compare = comparator.compare(nextIterator.peek(), next.peek()); - if (compare != 0) { - return false; - } - - PeekIterator poll = priorityQueue.poll(); - if (poll != null) { - moveNextAndPutBack(poll); - } - return true; - } - - private void moveNextAndPutBack(PeekIterator poll) { - poll.next(); - if (poll.hasNext()) { - priorityQueue.add(poll); - } - } - - protected boolean shouldSkip(T t) { - return t == null; - } - - @Override - public boolean hasNext() { - return peek() != null; - } - - @Override - public T next() { - PeekIterator peeked = peek(); - if (peeked == null) { - throw new NoSuchElementException(); - } - T nextValue = peeked.next(); - this.nextIterator = null; - if (peeked.hasNext()) { - priorityQueue.add(peeked); - } - return nextValue; - } -} diff --git a/src/main/java/ru/vk/itmo/plyasovklimentii/MergingEntryIterator.java b/src/main/java/ru/vk/itmo/plyasovklimentii/MergingEntryIterator.java new file mode 100644 index 000000000..179ad3149 --- /dev/null +++ b/src/main/java/ru/vk/itmo/plyasovklimentii/MergingEntryIterator.java @@ -0,0 +1,68 @@ +package ru.vk.itmo.plyasovklimentii; + +import ru.vk.itmo.Entry; + +import java.lang.foreign.MemorySegment; +import java.util.*; + +/** + * Merges entry {@link Iterator}s. + * + * @author incubos + */ +final class MergingEntryIterator implements Iterator> { + private final Queue iterators; + + MergingEntryIterator(final List iterators) { + assert iterators.stream().allMatch(WeightedPeekingEntryIterator::hasNext); + + this.iterators = new PriorityQueue<>(iterators); + } + + @Override + public boolean hasNext() { + return !iterators.isEmpty(); + } + + @Override + public Entry next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + final WeightedPeekingEntryIterator top = iterators.remove(); + final Entry result = top.next(); + + if (top.hasNext()) { + // Not exhausted + iterators.add(top); + } + + // Remove older versions of the key + while (true) { + final WeightedPeekingEntryIterator iterator = iterators.peek(); + if (iterator == null) { + // Nothing left + break; + } + + // Skip entries with the same key + final Entry entry = iterator.peek(); + if (MemorySegmentComparator.INSTANCE.compare(result.key(), entry.key()) != 0) { + // Reached another key + break; + } + + // Drop + iterators.remove(); + // Skip + iterator.next(); + if (iterator.hasNext()) { + // Not exhausted + iterators.add(iterator); + } + } + + return result; + } +} diff --git a/src/main/java/ru/vk/itmo/plyasovklimentii/MutableLong.java b/src/main/java/ru/vk/itmo/plyasovklimentii/MutableLong.java new file mode 100644 index 000000000..f9272cc37 --- /dev/null +++ b/src/main/java/ru/vk/itmo/plyasovklimentii/MutableLong.java @@ -0,0 +1,25 @@ +package ru.vk.itmo.plyasovklimentii; + +class MutableLong { + private long value; + + public MutableLong(long value) { + this.value = value; + } + + public void increment() { + this.value++; + } + + public long getValue() { + return this.value; + } + + public void setValue(long value) { + this.value = value; + } + + public void incrementBy(long inc) { + this.value += inc; + } +} diff --git a/src/main/java/ru/vk/itmo/plyasovklimentii/PlyasovDao.java b/src/main/java/ru/vk/itmo/plyasovklimentii/PlyasovDao.java index 0be7258f3..6480fb97b 100644 --- a/src/main/java/ru/vk/itmo/plyasovklimentii/PlyasovDao.java +++ b/src/main/java/ru/vk/itmo/plyasovklimentii/PlyasovDao.java @@ -5,226 +5,288 @@ import ru.vk.itmo.Entry; import java.io.IOException; -import java.io.UncheckedIOException; import java.lang.foreign.Arena; import java.lang.foreign.MemorySegment; -import java.lang.foreign.ValueLayout; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; +import java.util.HashSet; import java.util.Iterator; -import java.util.List; -import java.util.NavigableMap; -import java.util.concurrent.ConcurrentSkipListMap; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +/** + * Reference implementation of {@link Dao}. + * + * @author incubos + */ public class PlyasovDao implements Dao> { - - private final Comparator comparator = PlyasovDao::compare; + private final Config config; private final Arena arena; - private final DiskStorage diskStorage; - private final Path path; - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - private final ExecutorService executorService = Executors.newSingleThreadExecutor(); - private NavigableMap> currentSSTable = new ConcurrentSkipListMap<>(comparator); - private NavigableMap> flushingSSTable = new ConcurrentSkipListMap<>(comparator); - private final AtomicLong currentSSTableSize = new AtomicLong(); - - private final long flushThresholdBytes; - private final AtomicBoolean isFlushInProgress = new AtomicBoolean(false); - private final AtomicBoolean isCompactInProgress = new AtomicBoolean(false); - - public PlyasovDao(Config config) throws IOException { - this.flushThresholdBytes = config.flushThresholdBytes(); - this.path = config.basePath().resolve("data"); - Files.createDirectories(path); - - arena = Arena.ofShared(); - - this.diskStorage = new DiskStorage(DiskStorage.loadOrRecover(path, arena)); - } - static int compare(MemorySegment memorySegment1, MemorySegment memorySegment2) { - long mismatch = memorySegment1.mismatch(memorySegment2); - if (mismatch == -1) { - return 0; - } + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + // Guarded by lock + private volatile TableSet tableSet; - if (mismatch == memorySegment1.byteSize()) { - return -1; - } + private final ExecutorService flusher = + Executors.newSingleThreadExecutor(r -> { + final Thread result = new Thread(r); + result.setName("flusher"); + return result; + }); + private final ExecutorService compactor = + Executors.newSingleThreadExecutor(r -> { + final Thread result = new Thread(r); + result.setName("compactor"); + return result; + }); - if (mismatch == memorySegment2.byteSize()) { - return 1; - } - byte b1 = memorySegment1.get(ValueLayout.JAVA_BYTE, mismatch); - byte b2 = memorySegment2.get(ValueLayout.JAVA_BYTE, mismatch); - return Byte.compare(b1, b2); + private final AtomicBoolean closed = new AtomicBoolean(); + + public PlyasovDao(final Config config) throws IOException { + this.config = config; + this.arena = Arena.ofShared(); + + // First complete promotion of compacted SSTables + SSTables.promote( + config.basePath(), + 0, + 1); + + this.tableSet = + TableSet.from( + SSTables.discover( + arena, + config.basePath())); } @Override - public Iterator> get(MemorySegment from, MemorySegment to) { - List>> iterators = new ArrayList<>(); - iterators.addAll(List.of(getInMemory(currentSSTable, from, to))); - if (isFlushInProgress.get() && flushingSSTable != null) { - iterators.addFirst(getInMemory(flushingSSTable, from, to)); - } - return diskStorage.range(iterators, from, to); + public Iterator> get( + final MemorySegment from, + final MemorySegment to) { + return new LiveFilteringIterator( + tableSet.get( + from, + to)); } - private Iterator> getInMemory( - NavigableMap> storage, - MemorySegment from, - MemorySegment to) { - if (from == null && to == null) { - return storage.values().iterator(); - } - if (from == null) { - return storage.headMap(to).values().iterator(); - } - if (to == null) { - return storage.tailMap(from).values().iterator(); - } - return storage.subMap(from, to).values().iterator(); + @Override + public Entry get(final MemorySegment key) { + // Without lock, just snapshot of table set + return tableSet.get(key); } @Override - public Entry get(MemorySegment key) { - Entry currentEntry = currentSSTable.get(key); - if (currentEntry != null) { - return currentEntry.value() == null ? null : currentEntry; - } - - if (isFlushInProgress.get()) { - Entry flushingEntry = flushingSSTable.get(key); - if (flushingEntry != null) { - return flushingEntry.value() == null ? null : flushingEntry; + public void upsert(final Entry entry) { + final boolean autoFlush; + lock.readLock().lock(); + try { + if (tableSet.memTableSize.get() > config.flushThresholdBytes() + && tableSet.flushingTable != null) { + throw new IllegalStateException("Can't keep up with flushing!"); } + + // Upsert + final Entry previous = tableSet.upsert(entry); + + // Update size estimate + final long size = tableSet.memTableSize.addAndGet(sizeOf(entry) - sizeOf(previous)); + autoFlush = size > config.flushThresholdBytes(); + } finally { + lock.readLock().unlock(); } - Iterator> iterator = diskStorage.range(List.of(Collections.emptyIterator()), key, null); - if (iterator.hasNext()) { - Entry next = iterator.next(); - if (compare(next.key(), key) == 0) { - return next; - } + if (autoFlush) { + initiateFlush(true); } - return null; } - @Override - public void upsert(Entry entry) { - if (flushingSSTable != null && currentSSTableSize.get() >= flushThresholdBytes) { - throw new IllegalStateException("Flush in progress, operation cannot proceed"); + private static long sizeOf(final Entry entry) { + if (entry == null) { + return 0L; } - long sizeDifference = updateSSTableAndGetSizeDifference(entry); + if (entry.value() == null) { + return entry.key().byteSize(); + } + + return entry.key().byteSize() + entry.value().byteSize(); + } - if (sizeDifference > 0 && currentSSTableSize.get() >= flushThresholdBytes) { + private void initiateFlush(final boolean auto) { + flusher.submit(() -> { + final TableSet currentTableSet; + lock.writeLock().lock(); try { - flush(); + if (this.tableSet.memTable.isEmpty()) { + // Nothing to flush + return; + } + + if (auto && this.tableSet.memTableSize.get() < config.flushThresholdBytes()) { + // Not enough data to flush + return; + } + + // Switch memTable to flushing + currentTableSet = this.tableSet.flushing(); + this.tableSet = currentTableSet; + } finally { + lock.writeLock().unlock(); + } + + // Write + final int sequence = currentTableSet.nextSequence(); + try { + new SSTableWriter() + .write( + config.basePath(), + sequence, + currentTableSet.flushingTable.get(null, null)); } catch (IOException e) { - throw new UncheckedIOException("Failed to execute flush", e); + e.printStackTrace(); + Runtime.getRuntime().halt(-1); + return; } - } - } - private long updateSSTableAndGetSizeDifference(Entry entry) { - lock.writeLock().lock(); - try { - Entry previousEntry = currentSSTable.put(entry.key(), entry); - long newSize = calculateEntrySize(entry); - long oldSize = previousEntry == null ? 0 : calculateEntrySize(previousEntry); - long sizeDifference = newSize - oldSize; - currentSSTableSize.addAndGet(sizeDifference); + // Open + final SSTable flushed; + try { + flushed = SSTables.open( + arena, + config.basePath(), + sequence); + } catch (IOException e) { + e.printStackTrace(); + Runtime.getRuntime().halt(-2); + return; + } - return sizeDifference; - } finally { - lock.writeLock().unlock(); - } + // Switch + lock.writeLock().lock(); + try { + this.tableSet = this.tableSet.flushed(flushed); + } finally { + lock.writeLock().unlock(); + } + }).state(); } - private long calculateEntrySize(Entry entry) { - long keySize = entry.key().byteSize(); - long valueSize = entry.value() == null ? Long.BYTES : entry.value().byteSize(); - return keySize + valueSize; + @Override + public void flush() throws IOException { + initiateFlush(false); } @Override - public synchronized void flush() throws IOException { - if (isFlushInProgress.get() || currentSSTable.isEmpty()) { - return; - } + public void compact() throws IOException { + compactor.submit(() -> { + final TableSet currentTableSet; + lock.writeLock().lock(); + try { + currentTableSet = this.tableSet; + if (currentTableSet.ssTables.size() < 2) { + // Nothing to compact + return; + } + } finally { + lock.writeLock().unlock(); + } - isFlushInProgress.set(true); - lock.writeLock().lock(); - try { - flushingSSTable = currentSSTable; - currentSSTable = new ConcurrentSkipListMap<>(comparator); - currentSSTableSize.set(0); - } finally { - lock.writeLock().unlock(); - } + // Compact to 0 + try { + new SSTableWriter() + .write( + config.basePath(), + 0, + new LiveFilteringIterator( + currentTableSet.allSSTableEntries())); + } catch (IOException e) { + e.printStackTrace(); + Runtime.getRuntime().halt(-3); + } - executorService.execute(this::flushToDisk); - } + // Open 0 + final SSTable compacted; + try { + compacted = + SSTables.open( + arena, + config.basePath(), + 0); + } catch (IOException e) { + e.printStackTrace(); + Runtime.getRuntime().halt(-4); + return; + } - private void flushToDisk() { - try { - diskStorage.saveNextSSTable(path, flushingSSTable.values(), arena); - flushingSSTable = null; - isFlushInProgress.set(false); - } catch (IOException e) { - throw new UncheckedIOException(e); - } + // Replace old SSTables with compacted one to + // keep serving requests + final Set replaced = new HashSet<>(currentTableSet.ssTables); + lock.writeLock().lock(); + try { + this.tableSet = + this.tableSet.compacted( + replaced, + compacted); + } finally { + lock.writeLock().unlock(); + } + + // Remove compacted SSTables starting from the oldest ones. + // If we crash, 0 contains all the data, and + // it will be promoted on reopen. + for (final SSTable ssTable : currentTableSet.ssTables.reversed()) { + try { + SSTables.remove( + config.basePath(), + ssTable.sequence); + } catch (IOException e) { + e.printStackTrace(); + Runtime.getRuntime().halt(-5); + } + } + + // Promote zero to one (possibly replacing) + try { + SSTables.promote( + config.basePath(), + 0, + 1); + } catch (IOException e) { + e.printStackTrace(); + Runtime.getRuntime().halt(-6); + } + + // Replace promoted SSTable + lock.writeLock().lock(); + try { + this.tableSet = + this.tableSet.compacted( + Collections.singleton(compacted), + compacted.withSequence(1)); + } finally { + lock.writeLock().unlock(); + } + }).state(); } @Override - public synchronized void compact() { - if (isCompactInProgress.get()) { + public void close() throws IOException { + if (closed.getAndSet(true)) { + // Already closed return; } - isCompactInProgress.set(true); - executorService.execute(this::compactOnDisk); - } + // Maybe flush + flush(); - private void compactOnDisk() { - try { - diskStorage.compact(path, this::all); - } catch (IOException e) { - throw new UncheckedIOException(e); - } finally { - isCompactInProgress.set(false); - } - } + // Stop all the threads + flusher.close(); + compactor.close(); - @Override - public synchronized void close() throws IOException { - try { - if (!arena.scope().isAlive()) { - return; - } - flush(); - executorService.shutdown(); - if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { - throw new IOException("Executor did not terminate"); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } finally { - if (arena.scope().isAlive()) { - arena.close(); - } - } + // Close arena + arena.close(); } - } diff --git a/src/main/java/ru/vk/itmo/plyasovklimentii/SSTable.java b/src/main/java/ru/vk/itmo/plyasovklimentii/SSTable.java new file mode 100644 index 000000000..65da34223 --- /dev/null +++ b/src/main/java/ru/vk/itmo/plyasovklimentii/SSTable.java @@ -0,0 +1,216 @@ +package ru.vk.itmo.plyasovklimentii; + +import ru.vk.itmo.BaseEntry; +import ru.vk.itmo.Entry; + +import java.lang.foreign.MemorySegment; +import java.lang.foreign.ValueLayout; +import java.util.Collections; +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * Persistent SSTable in data file and index file. + * + * @author incubos + * @see SSTables + */ +final class SSTable { + final int sequence; + + private final MemorySegment index; + private final MemorySegment data; + private final long size; + + SSTable( + final int sequence, + final MemorySegment index, + final MemorySegment data) { + this.sequence = sequence; + this.index = index; + this.data = data; + this.size = calculateSize(index); + } + + private long calculateSize(MemorySegment index) { + MutableLong position = new MutableLong(0); + long count = 0; + while (position.getValue() < index.byteSize()) { + VarInt.decode(index, position); + count++; + } + return count; + } + + SSTable withSequence(final int sequence) { + return new SSTable( + sequence, + index, + data); + } + + /** + * Returns index of the entry if found; otherwise, (-(insertion point) - 1). + * The insertion point is defined as the point at which the key would be inserted: + * the index of the first element greater than the key, + * or size if all keys are less than the specified key. + * Note that this guarantees that the return value will be >= 0 + * if and only if the key is found. + */ + + private long entryBinarySearch(final MemorySegment key) { + long low = 0L; + long high = size - 1; + + while (low <= high) { + final long mid = (low + high) >>> 1; + final MutableLong midEntryOffsetWrapper = new MutableLong(entryOffset(mid)); + final long midKeyLength = VarInt.decode(data, midEntryOffsetWrapper); + final long midKeyOffset = midEntryOffsetWrapper.getValue(); + + + final int compare = MemorySegmentComparator.compare( + data, + midKeyOffset, + midKeyLength, + key, + 0L, + key.byteSize()); + + if (compare < 0) { + low = mid + 1; + } else if (compare > 0) { + high = mid - 1; + } else { + return mid; + } + } + + return -(low + 1); + } + + + private long entryOffset(final long entryIndex) { + MutableLong position = new MutableLong(0); + position.setValue(entryIndex); + long offset = VarInt.decode(index, position); + + return offset; + } + + + Iterator> get( + final MemorySegment from, + final MemorySegment to) { + assert from == null || to == null || MemorySegmentComparator.INSTANCE.compare(from, to) <= 0; + + // Slice of SSTable in absolute offsets + final long fromOffset; + final long toOffset; + + // Left offset bound + if (from == null) { + // Start from the beginning + fromOffset = 0L; + } else { + final long fromEntry = entryBinarySearch(from); + if (fromEntry >= 0L) { + fromOffset = entryOffset(fromEntry); + } else if (-fromEntry - 1 == size) { + // No relevant data + return Collections.emptyIterator(); + } else { + // Greater but existing key found + fromOffset = entryOffset(-fromEntry - 1); + } + } + + // Right offset bound + if (to == null) { + // Up to the end + toOffset = data.byteSize(); + } else { + final long toEntry = entryBinarySearch(to); + if (toEntry >= 0L) { + toOffset = entryOffset(toEntry); + } else if (-toEntry - 1 == size) { + // Up to the end + toOffset = data.byteSize(); + } else { + // Greater but existing key found + toOffset = entryOffset(-toEntry - 1); + } + } + + return new SliceIterator(fromOffset, toOffset); + } + + Entry get(final MemorySegment key) { + final long entry = entryBinarySearch(key); + if (entry < 0) { + return null; + } + MutableLong offsetWrapper = new MutableLong(entryOffset(entry)); + + // Skip key (will reuse the argument) + long keyLength = VarInt.decode(data, offsetWrapper); + offsetWrapper.incrementBy(keyLength); + // Extract value length + final long valueLength = VarInt.decode(data, offsetWrapper); + if (valueLength == SSTables.TOMBSTONE_VALUE_LENGTH) { + return new BaseEntry<>(key, null); + } else { + final MemorySegment value = data.asSlice(offsetWrapper.getValue(), valueLength); + return new BaseEntry<>(key, value); + } + } + + + private final class SliceIterator implements Iterator> { + private long offset; + private final long toOffset; + + private SliceIterator( + final long offset, + final long toOffset) { + this.offset = offset; + this.toOffset = toOffset; + } + + @Override + public boolean hasNext() { + return offset < toOffset; + } + + @Override + public Entry next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + MutableLong offsetWrapper = new MutableLong(offset); + // Read key length + final long keyLength = VarInt.decode(data, offsetWrapper); + offset = offsetWrapper.getValue(); + + // Read key + final MemorySegment key = data.asSlice(offset, keyLength); + offset += keyLength; + + // Read value length + final long valueLength = VarInt.decode(data, offsetWrapper); + offset += offsetWrapper.getValue(); + + + // Read value + if (valueLength == SSTables.TOMBSTONE_VALUE_LENGTH) { + // Tombstone encountered + return new BaseEntry<>(key, null); + } else { + final MemorySegment value = data.asSlice(offset, valueLength); + offset += valueLength; + return new BaseEntry<>(key, value); + } + } + + } +} \ No newline at end of file diff --git a/src/main/java/ru/vk/itmo/plyasovklimentii/SSTableWriter.java b/src/main/java/ru/vk/itmo/plyasovklimentii/SSTableWriter.java new file mode 100644 index 000000000..7b96bb412 --- /dev/null +++ b/src/main/java/ru/vk/itmo/plyasovklimentii/SSTableWriter.java @@ -0,0 +1,166 @@ +package ru.vk.itmo.plyasovklimentii; + +import ru.vk.itmo.Entry; + +import java.io.BufferedOutputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.lang.foreign.MemorySegment; +import java.lang.foreign.ValueLayout; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.util.Iterator; + +/** + * Writes {@link Entry} {@link Iterator} to SSTable on disk. + * + *

Index file {@code .index} contains {@code long} offsets to entries in data file: + * {@code [offset0, offset1, ...]} + * + *

Data file {@code .data} contains serialized entries: + * {@code } + * + *

Tombstones are encoded as {@code valueLength} {@code -1} and no subsequent value. + * + * @author incubos + */ +final class SSTableWriter { + private static final int BUFFER_SIZE = 64 * 1024; + + // Reusable buffers to eliminate allocations. + // But excessive memory copying is still there :( + // Growable blob cell + private final ByteArraySegment blobBuffer = new ByteArraySegment(512); + + void write( + final Path baseDir, + final int sequence, + final Iterator> entries) throws IOException { + // Write to temporary files + final Path tempIndexName = SSTables.tempIndexName(baseDir, sequence); + final Path tempDataName = SSTables.tempDataName(baseDir, sequence); + + // Delete temporary files to eliminate tails + Files.deleteIfExists(tempIndexName); + Files.deleteIfExists(tempDataName); + + // Iterate in a single pass! + // Will write through FileChannel despite extra memory copying and + // no buffering (which may be implemented later). + // Looking forward to MemorySegment facilities in FileChannel! + try (OutputStream index = + new BufferedOutputStream( + new FileOutputStream( + tempIndexName.toFile()), + BUFFER_SIZE); + OutputStream data = + new BufferedOutputStream( + new FileOutputStream( + tempDataName.toFile()), + BUFFER_SIZE)) { + long entryOffset = 0L; + + // Iterate and serialize + while (entries.hasNext()) { + // First write offset to the entry + final byte[] entryOffsetBytes = VarInt.encode(entryOffset); + index.write(entryOffsetBytes); + + // Then write the entry + final Entry entry = entries.next(); + entryOffset += writeEntry(entry, data); + } + } + + // Publish files atomically + // FIRST index, LAST data + final Path indexName = + SSTables.indexName( + baseDir, + sequence); + Files.move( + tempIndexName, + indexName, + StandardCopyOption.ATOMIC_MOVE, + StandardCopyOption.REPLACE_EXISTING); + final Path dataName = + SSTables.dataName( + baseDir, + sequence); + Files.move( + tempDataName, + dataName, + StandardCopyOption.ATOMIC_MOVE, + StandardCopyOption.REPLACE_EXISTING); + } + + + private void writeSegment( + final MemorySegment value, + final OutputStream os) throws IOException { + final long size = value.byteSize(); + blobBuffer.ensureCapacity(size); + MemorySegment.copy( + value, + 0L, + blobBuffer.segment(), + 0L, + size); + blobBuffer.withArray(array -> + os.write( + array, + 0, + (int) size)); + } + + /** + * Writes {@link Entry} to {@link FileChannel}. + * + * @return written bytes + */ + + private long writeEntry( + final Entry entry, + final OutputStream os) throws IOException { + final MemorySegment key = entry.key(); + final MemorySegment value = entry.value(); + long result = 0L; + + // Key size + byte[] keySize = VarInt.encode(key.byteSize()); + os.write(keySize); + result += keySize.length; + + + // |key_size1|key1|value_size1|value1| + + // Key + writeSegment(key, os); + result += key.byteSize(); + + // Value size and possibly value + if (value == null) { + // Tombstone + byte[] tombstoneMarker = VarInt.encode(SSTables.TOMBSTONE_VALUE_LENGTH); + os.write(tombstoneMarker); + result += tombstoneMarker.length; + } else { + // Value length + byte[] valueSize = VarInt.encode(value.byteSize()); + os.write(valueSize); + result += valueSize.length; + + // Value + writeSegment(value, os); + result += value.byteSize(); + } + + return result; + } + +} + + diff --git a/src/main/java/ru/vk/itmo/plyasovklimentii/SSTables.java b/src/main/java/ru/vk/itmo/plyasovklimentii/SSTables.java new file mode 100644 index 000000000..931942587 --- /dev/null +++ b/src/main/java/ru/vk/itmo/plyasovklimentii/SSTables.java @@ -0,0 +1,162 @@ +package ru.vk.itmo.plyasovklimentii; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.lang.foreign.Arena; +import java.lang.foreign.MemorySegment; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Stream; + +/** + * Provides {@link SSTable} management facilities: dumping and discovery. + * + * @author incubos + */ +final class SSTables { + public static final String INDEX_SUFFIX = ".index"; + public static final String DATA_SUFFIX = ".data"; + public static final long TOMBSTONE_VALUE_LENGTH = 0xFF; + + private static final String TEMP_SUFFIX = ".tmp"; + + /** + * Can't instantiate. + */ + private SSTables() { + // Only static methods + } + + static Path indexName( + final Path baseDir, + final int sequence) { + return baseDir.resolve(sequence + INDEX_SUFFIX); + } + + static Path dataName( + final Path baseDir, + final int sequence) { + return baseDir.resolve(sequence + DATA_SUFFIX); + } + + static Path tempIndexName( + final Path baseDir, + final int sequence) { + return baseDir.resolve(sequence + INDEX_SUFFIX + TEMP_SUFFIX); + } + + static Path tempDataName( + final Path baseDir, + final int sequence) { + return baseDir.resolve(sequence + DATA_SUFFIX + TEMP_SUFFIX); + } + + /** + * Returns {@link List} of {@link SSTable}s from freshest to oldest. + */ + static List discover( + final Arena arena, + final Path baseDir) throws IOException { + if (!Files.exists(baseDir)) { + return Collections.emptyList(); + } + + final List result = new ArrayList<>(); + try (Stream files = Files.list(baseDir)) { + files.forEach(file -> { + final String fileName = file.getFileName().toString(); + if (!fileName.endsWith(DATA_SUFFIX)) { + // Skip non data + return; + } + + final int sequence = + // .data -> N + Integer.parseInt( + fileName.substring( + 0, + fileName.length() - DATA_SUFFIX.length())); + + try { + result.add(open(arena, baseDir, sequence)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + } + + // Sort from freshest to oldest + result.sort((o1, o2) -> Integer.compare(o2.sequence, o1.sequence)); + + return Collections.unmodifiableList(result); + } + + static SSTable open( + final Arena arena, + final Path baseDir, + final int sequence) throws IOException { + final MemorySegment index = + mapReadOnly( + arena, + indexName(baseDir, sequence)); + final MemorySegment data = + mapReadOnly( + arena, + dataName(baseDir, sequence)); + + return new SSTable( + sequence, + index, + data); + } + + private static MemorySegment mapReadOnly( + final Arena arena, + final Path file) throws IOException { + try (FileChannel channel = + FileChannel.open( + file, + StandardOpenOption.READ)) { + return channel.map( + FileChannel.MapMode.READ_ONLY, + 0L, + Files.size(file), + arena); + } + } + + static void remove( + final Path baseDir, + final int sequence) throws IOException { + // First delete data file to make SSTable invisible + Files.delete(dataName(baseDir, sequence)); + Files.delete(indexName(baseDir, sequence)); + } + + static void promote( + final Path baseDir, + final int from, + final int to) throws IOException { + // Build to progress to the same outcome + if (Files.exists(indexName(baseDir, from))) { + Files.move( + indexName(baseDir, from), + indexName(baseDir, to), + StandardCopyOption.ATOMIC_MOVE, + StandardCopyOption.REPLACE_EXISTING); + } + if (Files.exists(dataName(baseDir, from))) { + Files.move( + dataName(baseDir, from), + dataName(baseDir, to), + StandardCopyOption.ATOMIC_MOVE, + StandardCopyOption.REPLACE_EXISTING); + } + } +} diff --git a/src/main/java/ru/vk/itmo/plyasovklimentii/TableSet.java b/src/main/java/ru/vk/itmo/plyasovklimentii/TableSet.java new file mode 100644 index 000000000..3bb5cbe99 --- /dev/null +++ b/src/main/java/ru/vk/itmo/plyasovklimentii/TableSet.java @@ -0,0 +1,201 @@ +package ru.vk.itmo.plyasovklimentii; + +import ru.vk.itmo.Entry; + +import java.lang.foreign.MemorySegment; +import java.util.*; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Data set in various tables. + * + * @author incubos + */ +final class TableSet { + final MemTable memTable; + final AtomicLong memTableSize; + // null or read-only + final MemTable flushingTable; + // From freshest to oldest + final List ssTables; + + private TableSet( + final MemTable memTable, + final AtomicLong memTableSize, + final MemTable flushingTable, + final List ssTables) { + this.memTable = memTable; + this.memTableSize = memTableSize; + this.flushingTable = flushingTable; + this.ssTables = ssTables; + } + + static TableSet from(final List ssTables) { + return new TableSet( + new MemTable(), + new AtomicLong(), + null, + ssTables); + } + + int nextSequence() { + return ssTables.stream() + .mapToInt(t -> t.sequence) + .max() + .orElse(0) + 1; + } + + TableSet flushing() { + if (memTable.isEmpty()) { + throw new IllegalStateException("Nothing to flush"); + } + + if (flushingTable != null) { + throw new IllegalStateException("Already flushing"); + } + + return new TableSet( + new MemTable(), + new AtomicLong(), + memTable, + ssTables); + } + + TableSet flushed(final SSTable flushed) { + final List newSSTables = new ArrayList<>(ssTables.size() + 1); + newSSTables.add(flushed); + newSSTables.addAll(ssTables); + return new TableSet( + memTable, + memTableSize, + null, + newSSTables); + } + + TableSet compacted( + final Set replaced, + final SSTable with) { + final List newSsTables = new ArrayList<>(this.ssTables.size() + 1); + + // Keep not replaced SSTables + for (final SSTable ssTable : this.ssTables) { + if (!replaced.contains(ssTable)) { + newSsTables.add(ssTable); + } + } + + // Logically the oldest one + newSsTables.add(with); + + return new TableSet( + memTable, + memTableSize, + flushingTable, + newSsTables); + } + + Iterator> get( + final MemorySegment from, + final MemorySegment to) { + final List iterators = + new ArrayList<>(2 + ssTables.size()); + + // MemTable goes first + final Iterator> memTableIterator = + memTable.get(from, to); + if (memTableIterator.hasNext()) { + iterators.add( + new WeightedPeekingEntryIterator( + Integer.MIN_VALUE, + memTableIterator)); + } + + // Then goes flushing + if (flushingTable != null) { + final Iterator> flushingIterator = + flushingTable.get(from, to); + if (flushingIterator.hasNext()) { + iterators.add( + new WeightedPeekingEntryIterator( + Integer.MIN_VALUE + 1, + flushingIterator)); + } + } + + // Then go all the SSTables + for (int i = 0; i < ssTables.size(); i++) { + final SSTable ssTable = ssTables.get(i); + final Iterator> ssTableIterator = + ssTable.get(from, to); + if (ssTableIterator.hasNext()) { + iterators.add( + new WeightedPeekingEntryIterator( + i, + ssTableIterator)); + } + } + + return switch (iterators.size()) { + case 0 -> Collections.emptyIterator(); + case 1 -> iterators.get(0); + default -> new MergingEntryIterator(iterators); + }; + } + + Entry get(final MemorySegment key) { + // Slightly optimized version not to pollute the heap + + // First check MemTable + Entry result = memTable.get(key); + if (result != null) { + // Transform tombstone + return swallowTombstone(result); + } + + // Then check flushing + if (flushingTable != null) { + result = flushingTable.get(key); + if (result != null) { + // Transform tombstone + return swallowTombstone(result); + } + } + + // At last check SSTables from freshest to oldest + for (final SSTable ssTable : ssTables) { + result = ssTable.get(key); + if (result != null) { + // Transform tombstone + return swallowTombstone(result); + } + } + + // Nothing found + return null; + } + + private static Entry swallowTombstone(final Entry entry) { + return entry.value() == null ? null : entry; + } + + Entry upsert(final Entry entry) { + return memTable.upsert(entry); + } + + Iterator> allSSTableEntries() { + final List iterators = + new ArrayList<>(ssTables.size()); + + for (int i = 0; i < ssTables.size(); i++) { + final SSTable ssTable = ssTables.get(i); + final Iterator> ssTableIterator = + ssTable.get(null, null); + iterators.add( + new WeightedPeekingEntryIterator( + i, + ssTableIterator)); + } + + return new MergingEntryIterator(iterators); + } +} diff --git a/src/main/java/ru/vk/itmo/plyasovklimentii/Utils.java b/src/main/java/ru/vk/itmo/plyasovklimentii/Utils.java deleted file mode 100644 index 128189d91..000000000 --- a/src/main/java/ru/vk/itmo/plyasovklimentii/Utils.java +++ /dev/null @@ -1,179 +0,0 @@ -package ru.vk.itmo.plyasovklimentii; - -import ru.vk.itmo.BaseEntry; -import ru.vk.itmo.Entry; - -import java.io.IOException; -import java.io.UncheckedIOException; -import java.lang.foreign.MemorySegment; -import java.lang.foreign.ValueLayout; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.StandardCopyOption; -import java.nio.file.StandardOpenOption; -import java.util.Collections; -import java.util.Iterator; -import java.util.NoSuchElementException; -import java.util.stream.Stream; - -public final class Utils { - public static final String SSTABLE_PREFIX = "sstable_"; - - private Utils() { - // util - } - - static void deleteAllSSTables(Path storagePath) throws IOException { - try (Stream stream = Files.find(storagePath, 1, - (path, attrs) -> path.getFileName().toString().startsWith(SSTABLE_PREFIX))) { - stream.forEach(p -> { - try { - Files.delete(p); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }); - } - } - - static void finalizeCompaction(Path storagePath) throws IOException { - Path indexTmp = storagePath.resolve("index.tmp"); - Path indexFile = storagePath.resolve("index.idx"); - - Files.deleteIfExists(indexFile); - Files.deleteIfExists(indexTmp); - - Path compactionFile = compactionFile(storagePath); - boolean noData = Files.size(compactionFile) == 0; - - Files.write( - indexTmp, - noData ? Collections.emptyList() : Collections.singleton(SSTABLE_PREFIX + "0"), - StandardOpenOption.WRITE, - StandardOpenOption.CREATE, - StandardOpenOption.TRUNCATE_EXISTING - ); - - Files.move(indexTmp, indexFile, StandardCopyOption.ATOMIC_MOVE); - if (noData) { - Files.delete(compactionFile); - } else { - Files.move(compactionFile, storagePath.resolve(SSTABLE_PREFIX + "0"), StandardCopyOption.ATOMIC_MOVE); - } - } - - static Path compactionFile(Path storagePath) { - return storagePath.resolve("compaction"); - } - - static Iterator> iterator(MemorySegment page, MemorySegment from, MemorySegment to) { - long recordIndexFrom = from == null ? 0 : Utils.normalize(Utils.indexOf(page, from)); - long recordIndexTo = to == null ? Utils.recordsCount(page) : Utils.normalize(Utils.indexOf(page, to)); - long recordsCount = Utils.recordsCount(page); - - return new Iterator<>() { - long index = recordIndexFrom; - - @Override - public boolean hasNext() { - return index < recordIndexTo; - } - - @Override - public Entry next() { - if (!hasNext()) { - throw new NoSuchElementException(); - } - MemorySegment key = Utils.slice(page, Utils.startOfKey(page, index), Utils.endOfKey(page, index)); - long startOfValue = Utils.startOfValue(page, index); - MemorySegment value = - startOfValue < 0 - ? null - : Utils.slice(page, startOfValue, Utils.endOfValue(page, index, recordsCount)); - index++; - return new BaseEntry<>(key, value); - } - }; - } - - public static long indexOf(MemorySegment segment, MemorySegment key) { - long recordsCount = recordsCount(segment); - - long left = 0; - long right = recordsCount - 1; - while (left <= right) { - long mid = (left + right) >>> 1; - - long startOfKey = startOfKey(segment, mid); - long endOfKey = endOfKey(segment, mid); - long mismatch = MemorySegment.mismatch(segment, startOfKey, endOfKey, key, 0, key.byteSize()); - if (mismatch == -1) { - return mid; - } - - if (mismatch == key.byteSize()) { - right = mid - 1; - continue; - } - - if (mismatch == endOfKey - startOfKey) { - left = mid + 1; - continue; - } - - int b1 = Byte.toUnsignedInt(segment.get(ValueLayout.JAVA_BYTE, startOfKey + mismatch)); - int b2 = Byte.toUnsignedInt(key.get(ValueLayout.JAVA_BYTE, mismatch)); - if (b1 > b2) { - right = mid - 1; - } else { - left = mid + 1; - } - } - - return tombstone(left); - } - - public static MemorySegment slice(MemorySegment page, long start, long end) { - return page.asSlice(start, end - start); - } - - public static long startOfKey(MemorySegment segment, long recordIndex) { - return segment.get(ValueLayout.JAVA_LONG_UNALIGNED, recordIndex * 2 * Long.BYTES); - } - - public static long endOfKey(MemorySegment segment, long recordIndex) { - return normalizedStartOfValue(segment, recordIndex); - } - - public static long normalizedStartOfValue(MemorySegment segment, long recordIndex) { - return normalize(startOfValue(segment, recordIndex)); - } - - public static long startOfValue(MemorySegment segment, long recordIndex) { - return segment.get(ValueLayout.JAVA_LONG_UNALIGNED, recordIndex * 2 * Long.BYTES + Long.BYTES); - } - - public static long endOfValue(MemorySegment segment, long recordIndex, long recordsCount) { - if (recordIndex < recordsCount - 1) { - return startOfKey(segment, recordIndex + 1); - } - return segment.byteSize(); - } - - public static long normalize(long value) { - return value & ~(1L << 63); - } - - public static long recordsCount(MemorySegment segment) { - long indexSize = indexSize(segment); - return indexSize / Long.BYTES / 2; - } - - public static long tombstone(long offset) { - return 1L << 63 | offset; - } - - private static long indexSize(MemorySegment segment) { - return segment.get(ValueLayout.JAVA_LONG_UNALIGNED, 0); - } -} diff --git a/src/main/java/ru/vk/itmo/plyasovklimentii/VarInt.java b/src/main/java/ru/vk/itmo/plyasovklimentii/VarInt.java new file mode 100644 index 000000000..823a00fad --- /dev/null +++ b/src/main/java/ru/vk/itmo/plyasovklimentii/VarInt.java @@ -0,0 +1,39 @@ +package ru.vk.itmo.plyasovklimentii; + +import java.io.ByteArrayOutputStream; +import java.lang.foreign.MemorySegment; +import java.lang.foreign.ValueLayout; +import java.util.ArrayList; +import java.util.List; + +public class VarInt { + public static byte[] encode(long value) { + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + while (true) { + if ((value & ~0x7FL) == 0) { + buffer.write((int) value); + break; + } else { + buffer.write(((int) value & 0x7F) | 0x80); + value >>>= 7; + } + } + return buffer.toByteArray(); + } + + public static long decode(MemorySegment segment, MutableLong position) { + long result = 0; + int shift = 0; + long b; + do { + b = segment.getAtIndex(ValueLayout.JAVA_BYTE, position.getValue()); + result |= (b & 0x7F) << shift; + shift += 7; + position.increment(); + } while ((b & 0x80) != 0); + return result; + } +} + + + diff --git a/src/main/java/ru/vk/itmo/plyasovklimentii/WeightedPeekingEntryIterator.java b/src/main/java/ru/vk/itmo/plyasovklimentii/WeightedPeekingEntryIterator.java new file mode 100644 index 000000000..387512b11 --- /dev/null +++ b/src/main/java/ru/vk/itmo/plyasovklimentii/WeightedPeekingEntryIterator.java @@ -0,0 +1,67 @@ +package ru.vk.itmo.plyasovklimentii; + +import ru.vk.itmo.Entry; + +import java.lang.foreign.MemorySegment; +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * Peeking {@link Iterator} wrapper. + * + * @author incubos + */ +final class WeightedPeekingEntryIterator + implements Iterator>, + Comparable { + private final int weight; + private final Iterator> delegate; + private Entry next; + + WeightedPeekingEntryIterator( + final int weight, + final Iterator> delegate) { + this.weight = weight; + this.delegate = delegate; + this.next = delegate.hasNext() ? delegate.next() : null; + } + + @Override + public boolean hasNext() { + return next != null; + } + + @Override + public Entry next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + final Entry result = next; + next = delegate.hasNext() ? delegate.next() : null; + return result; + } + + Entry peek() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + return next; + } + + @Override + public int compareTo(final WeightedPeekingEntryIterator other) { + // First compare keys + int result = + MemorySegmentComparator.INSTANCE.compare( + peek().key(), + other.peek().key()); + if (result != 0) { + return result; + } + + // Then compare weights if keys are equal + return Integer.compare(weight, other.weight); + } +} diff --git a/src/main/java/ru/vk/itmo/test/plyasovklimentii/MyFactory.java b/src/main/java/ru/vk/itmo/test/plyasovklimentii/MyFactory.java deleted file mode 100644 index cffc7c765..000000000 --- a/src/main/java/ru/vk/itmo/test/plyasovklimentii/MyFactory.java +++ /dev/null @@ -1,43 +0,0 @@ -package ru.vk.itmo.test.plyasovklimentii; - -import ru.vk.itmo.Config; -import ru.vk.itmo.Dao; -import ru.vk.itmo.Entry; -import ru.vk.itmo.plyasovklimentii.PlyasovDao; -import ru.vk.itmo.test.DaoFactory; - -import java.io.IOException; -import java.lang.foreign.MemorySegment; -import java.lang.foreign.ValueLayout; -import java.nio.charset.StandardCharsets; - -@DaoFactory(stage = 5) -public class MyFactory implements DaoFactory.Factory> { - @Override - public Dao> createDao(Config config) throws IOException { - return new PlyasovDao(config); - } - - @Override - public String toString(MemorySegment memorySegment) { - if (memorySegment == null) { - return null; - } - return new String(memorySegment.toArray(ValueLayout.JAVA_BYTE), StandardCharsets.UTF_8); - } - - @Override - public MemorySegment fromString(String data) { - if (data == null) { - return null; - } else { - return MemorySegment.ofArray(data.getBytes(StandardCharsets.UTF_8)); - } - - } - - @Override - public Entry fromBaseEntry(Entry baseEntry) { - return baseEntry; - } -} diff --git a/src/main/java/ru/vk/itmo/test/plyasovklimentii/ReferenceDaoFactory.java b/src/main/java/ru/vk/itmo/test/plyasovklimentii/ReferenceDaoFactory.java new file mode 100644 index 000000000..48aa6d5b4 --- /dev/null +++ b/src/main/java/ru/vk/itmo/test/plyasovklimentii/ReferenceDaoFactory.java @@ -0,0 +1,50 @@ +package ru.vk.itmo.test.plyasovklimentii; + +import ru.vk.itmo.Config; +import ru.vk.itmo.Dao; +import ru.vk.itmo.Entry; +import ru.vk.itmo.plyasovklimentii.PlyasovDao; +import ru.vk.itmo.reference.ReferenceDao; +import ru.vk.itmo.test.DaoFactory; + +import java.io.IOException; +import java.lang.foreign.MemorySegment; +import java.lang.foreign.ValueLayout; +import java.nio.charset.StandardCharsets; + +/** + * Instantiates {@link ReferenceDao}. + * + * @author incubos + */ +@DaoFactory(stage = 4) +public class ReferenceDaoFactory implements DaoFactory.Factory> { + @Override + public Dao> createDao(final Config config) throws IOException { + return new PlyasovDao(config); + } + + @Override + public String toString(final MemorySegment memorySegment) { + if (memorySegment == null) { + return null; + } + + final byte[] array = memorySegment.toArray(ValueLayout.JAVA_BYTE); + return new String(array, StandardCharsets.UTF_8); + } + + @Override + public MemorySegment fromString(final String data) { + return data == null + ? null + : MemorySegment.ofArray( + data.getBytes( + StandardCharsets.UTF_8)); + } + + @Override + public Entry fromBaseEntry(final Entry baseEntry) { + return baseEntry; + } +}