Skip to content
This repository has been archived by the owner on Mar 3, 2024. It is now read-only.

Commit

Permalink
Merge branch 'main' into hw5
Browse files Browse the repository at this point in the history
  • Loading branch information
incubos authored Dec 26, 2023
2 parents 6201252 + 080e2a7 commit 78d301d
Show file tree
Hide file tree
Showing 191 changed files with 12,918 additions and 3,073 deletions.
81 changes: 81 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,84 @@ NB! Под "не блокирует" ниже понимается отсутс
### Report
Когда всё будет готово, присылайте pull request в ветку `main` со своей реализацией на review.
Не забывайте подтягивать **новые тесты и изменения**, **отвечать на комментарии в PR** и **исправлять замечания**!

## Бонусные задания (deadline 2023-12-06 23:59:59 MSK)

После реализации **всех** предыдущих этапов, необходимо выбрать одну из **незанятых** фич, описанных ниже, и предварительно обсудить с преподавателем предполагаемый способ реализации.

При реализации фичи допускается изменение API `Dao`.

Добавление тестов, демонстрирующих работоспособность реализации, является **обязательным**.

### Feedback

Развёрнутая **конструктивная** обратная связь по курсу: достоинства и недостатки курса, сложность тем, предложения по улучшению.

### Autocompact

Регулярный автоматический фоновый compaction.

### Durability (WAL)

Гарантирует durability (отсутствие потерь подтверджённых записей/удалений) даже в случае "падения" процесса за счёт того, что операции модификации подтверждаются только после того, как попадут в write-ahead-log.
При инициализации стораджа обнаруженные записи WAL "проигрываются" перед началом обслуживания новых операций.
Устаревшие WAL должны ротироваться, чтобы не занимать лишнее место на диске.

Существуют разные подходы к реализации `sync()` на диск.

### Reverse Iterator

Текущий интерфейс `DAO` позволяет итерироваться по данным только в лексикографическом порядке.
Требуется реализовать возможность корректной итерации в обратном порядке, т.е. добавить метод `descendingGet(from, to)`.

### Expiration

Операция `upsert()` должна поддерживать опциональный параметр Time-To-Live (TTL) или время, после которого ячейка должна "пропасть".

"Протухшие" ячейки не должны отдаваться клиентам и должны вычищаться при compaction.

### Compression

Необходимо реализовать блочную компрессию в файлах на диске (используя готовые реализации LZ4, Snappy или zstd) и не забыть про compaction.

### Streaming

Необходимо реализовать механизм для записи и чтения значений **больше** чем Java Heap, например, принимая `InputStream` и выдавая `OutputStream` в качестве значения.

### Atomic Batches

Необходимо реализовать возможность атомарного применения набора модификаций (upsert или remove).

Например, можно принимать от клиента список модификаций, писать их в сериализованном виде в **отдельную таблицу** и удалять после успешного применения.
При инициализации стораджа должны "проигрываться" недоприменённые батчи.

### Transactions

Необходимо обеспечить возможность транзакционного выполнения набора любых операций (upsert/remove/get).
При возникновении конфликта (любой другой транзакции, работающей с теми же ключами несовместимым способом, т.е. не read/read конфликта) клиент должен получать `ConcurrentModificationException`.
Пример реализации -- [NewSQL](https://habr.com/ru/company/odnoklassniki/blog/417593/).

### Bloom Filters

Для каждой таблицы на диске необходимо поддерживать Bloom Filter для содержащихся в ней ключей, чтобы пропускать таблицы, гарантированно не содержащие запрашиваемых ключей.

Очевидно, что это будет работать только в случае "точечных", а не range-запросов.

### Column Families

Поддержка независимых таблиц/keyspace/database/namespace/whatever.

### Snapshots

Получение слепка БД на текущий момент времени с возможностью чтения из него вне зависимости от "развития" основной БД.

Здесь могут помочь hard links.

### Custom Comparators

Возможность указания клиентом пользовательского `Comparator` вместо лексикографического.

### Real 64-bit

* Поддержка файлов больше 2ГБ
* Модульные тесты для ключей/значений больше 2ГБ
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.3-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.5-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
Expand Down
217 changes: 42 additions & 175 deletions src/main/java/ru/vk/itmo/abramovilya/DaoImpl.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package ru.vk.itmo.abramovilya;

import ru.vk.itmo.BaseEntry;
import ru.vk.itmo.Config;
import ru.vk.itmo.Dao;
import ru.vk.itmo.Entry;
Expand All @@ -9,66 +8,23 @@
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

public class DaoImpl implements Dao<MemorySegment, Entry<MemorySegment>> {
private final ConcurrentNavigableMap<MemorySegment, Entry<MemorySegment>> map =
new ConcurrentSkipListMap<>(DaoImpl::compareMemorySegments);
private final Path storagePath;
private final Arena arena = Arena.ofShared();
private static final String SSTABLE_BASE_NAME = "storage";
private static final String INDEX_BASE_NAME = "table";
private final Path metaFilePath;
private final List<FileChannel> sstableFileChannels = new ArrayList<>();
private final List<MemorySegment> sstableMappedList = new ArrayList<>();
private final List<FileChannel> indexFileChannels = new ArrayList<>();
private final List<MemorySegment> indexMappedList = new ArrayList<>();
private final Storage storage;

public DaoImpl(Config config) throws IOException {
storagePath = config.basePath();

Files.createDirectories(storagePath);
metaFilePath = storagePath.resolve("meta");
if (!Files.exists(metaFilePath)) {
Files.createFile(metaFilePath);
Files.writeString(metaFilePath, "0", StandardOpenOption.WRITE);
}

int totalSSTables = Integer.parseInt(Files.readString(metaFilePath));
for (int sstableNum = 0; sstableNum < totalSSTables; sstableNum++) {
Path sstablePath = storagePath.resolve(SSTABLE_BASE_NAME + sstableNum);
Path indexPath = storagePath.resolve(INDEX_BASE_NAME + sstableNum);

FileChannel sstableFileChannel = FileChannel.open(sstablePath, StandardOpenOption.READ);
sstableFileChannels.add(sstableFileChannel);
MemorySegment sstableMapped =
sstableFileChannel.map(FileChannel.MapMode.READ_ONLY, 0, Files.size(sstablePath), arena);
sstableMappedList.add(sstableMapped);

FileChannel indexFileChannel = FileChannel.open(indexPath, StandardOpenOption.READ);
indexFileChannels.add(indexFileChannel);
MemorySegment indexMapped =
indexFileChannel.map(FileChannel.MapMode.READ_ONLY, 0, Files.size(indexPath), arena);
indexMappedList.add(indexMapped);
}
this.storage = new Storage(config, arena);
}

@Override
public Iterator<Entry<MemorySegment>> get(MemorySegment from, MemorySegment to) {
return new DaoIterator(getTotalSStables(), from, to, sstableMappedList, indexMappedList, map);
}

@Override
public void upsert(Entry<MemorySegment> entry) {
map.put(entry.key(), entry);
return new DaoIterator(storage.getTotalSStables(), from, to, storage, map);
}

@Override
Expand All @@ -80,147 +36,47 @@ public Entry<MemorySegment> get(MemorySegment key) {
}
return null;
}

int totalSStables = getTotalSStables();
for (int sstableNum = totalSStables; sstableNum >= 0; sstableNum--) {
var foundEntry = seekForValueInFile(key, sstableNum);
if (foundEntry != null) {
if (foundEntry.value() != null) {
return foundEntry;
}
return null;
}
}
return null;
return storage.get(key);
}

private Entry<MemorySegment> seekForValueInFile(MemorySegment key, int sstableNum) {
if (sstableNum >= sstableFileChannels.size()) {
return null;
}

MemorySegment storageMapped = sstableMappedList.get(sstableNum);
MemorySegment indexMapped = indexMappedList.get(sstableNum);

int foundIndex = upperBound(key, storageMapped, indexMapped, indexMapped.byteSize());
long keyStorageOffset = getKeyStorageOffset(indexMapped, foundIndex);
long foundKeySize = storageMapped.get(ValueLayout.JAVA_LONG_UNALIGNED, keyStorageOffset);
keyStorageOffset += Long.BYTES;

if (MemorySegment.mismatch(key,
0,
key.byteSize(),
storageMapped,
keyStorageOffset,
keyStorageOffset + foundKeySize) == -1) {
return getEntryFromIndexFile(storageMapped, indexMapped, foundIndex);
}
return null;
}

static int upperBound(MemorySegment key, MemorySegment storageMapped, MemorySegment indexMapped, long indexSize) {
int l = -1;
int r = indexMapped.get(ValueLayout.JAVA_INT_UNALIGNED, indexSize - Long.BYTES - Integer.BYTES);

while (r - l > 1) {
int m = (r + l) / 2;
long keyStorageOffset = getKeyStorageOffset(indexMapped, m);
long keySize = storageMapped.get(ValueLayout.JAVA_LONG_UNALIGNED, keyStorageOffset);
keyStorageOffset += Long.BYTES;

if (compareMemorySegmentsUsingOffset(key, storageMapped, keyStorageOffset, keySize) > 0) {
l = m;
} else {
r = m;
}
}
return r;
}

static long getKeyStorageOffset(MemorySegment indexMapped, int entryNum) {
return indexMapped.get(
ValueLayout.JAVA_LONG_UNALIGNED,
(long) (Integer.BYTES + Long.BYTES) * entryNum + Integer.BYTES
);
@Override
public void upsert(Entry<MemorySegment> entry) {
map.put(entry.key(), entry);
}

private Entry<MemorySegment> getEntryFromIndexFile(MemorySegment storageMapped,
MemorySegment indexMapped,
int entryNum) {
long offsetInStorageFile = indexMapped.get(
ValueLayout.JAVA_LONG_UNALIGNED,
(long) (Integer.BYTES + Long.BYTES) * entryNum + Integer.BYTES
);

long keySize = storageMapped.get(ValueLayout.JAVA_LONG_UNALIGNED, offsetInStorageFile);
offsetInStorageFile += Long.BYTES;
offsetInStorageFile += keySize;

long valueSize = storageMapped.get(ValueLayout.JAVA_LONG_UNALIGNED, offsetInStorageFile);
offsetInStorageFile += Long.BYTES;
MemorySegment key = storageMapped.asSlice(offsetInStorageFile - keySize - Long.BYTES, keySize);
MemorySegment value;
if (valueSize == -1) {
value = null;
} else {
value = storageMapped.asSlice(offsetInStorageFile, valueSize);
@Override
public void compact() throws IOException {
var iterator = get(null, null);
if (!iterator.hasNext()) {
return;
}
return new BaseEntry<>(key, value);
storage.compact(iterator, get(null, null));
map.clear();
}

@Override
public void flush() throws IOException {
writeMapIntoFile();
if (!map.isEmpty()) incTotalSStablesAmount();
}

private void incTotalSStablesAmount() throws IOException {
int totalSStables = getTotalSStables();
Files.writeString(metaFilePath, String.valueOf(totalSStables + 1));
}

@Override
public void close() throws IOException {
if (arena.scope().isAlive()) {
arena.close();
}
flush();
for (FileChannel fc : sstableFileChannels) {
if (fc.isOpen()) fc.close();
}
for (FileChannel fc : indexFileChannels) {
if (fc.isOpen()) fc.close();
if (!map.isEmpty()) {
writeMapIntoFile();
storage.incTotalSStablesAmount();
}
}

private void writeMapIntoFile() throws IOException {
if (map.isEmpty()) {
return;
}

int currSStableNum = getTotalSStables();
Path sstablePath = storagePath.resolve(SSTABLE_BASE_NAME + currSStableNum);
Path indexPath = storagePath.resolve(INDEX_BASE_NAME + currSStableNum);

StorageWriter.writeSStableAndIndex(sstablePath,
calcMapByteSizeInFile(),
indexPath,
calcIndexByteSizeInFile(),
map);
}

private int getTotalSStables() {
return sstableFileChannels.size();
}

private long calcIndexByteSizeInFile() {
return (long) map.size() * (Integer.BYTES + Long.BYTES);
storage.writeMapIntoFile(
mapByteSizeInFile(),
indexByteSizeInFile(),
map
);
}

private long calcMapByteSizeInFile() {
private long mapByteSizeInFile() {
long size = 0;
for (var entry : map.values()) {
size += 2 * Long.BYTES;
size += Storage.BYTES_TO_STORE_ENTRY_SIZE;
size += entry.key().byteSize();
if (entry.value() != null) {
size += entry.value().byteSize();
Expand All @@ -229,17 +85,29 @@ private long calcMapByteSizeInFile() {
return size;
}

private long indexByteSizeInFile() {
return (long) map.size() * Storage.INDEX_ENTRY_SIZE;
}

@Override
public void close() throws IOException {
if (arena.scope().isAlive()) {
arena.close();
}
flush();
storage.close();
}

public static int compareMemorySegments(MemorySegment segment1, MemorySegment segment2) {
long mismatch = segment1.mismatch(segment2);
if (mismatch == -1) {
long offset = segment1.mismatch(segment2);
if (offset == -1) {
return 0;
} else if (mismatch == segment1.byteSize()) {
} else if (offset == segment1.byteSize()) {
return -1;
} else if (mismatch == segment2.byteSize()) {
} else if (offset == segment2.byteSize()) {
return 1;
}
return Byte.compare(segment1.get(ValueLayout.JAVA_BYTE, mismatch),
segment2.get(ValueLayout.JAVA_BYTE, mismatch));
return Byte.compare(segment1.get(ValueLayout.JAVA_BYTE, offset), segment2.get(ValueLayout.JAVA_BYTE, offset));
}

public static int compareMemorySegmentsUsingOffset(MemorySegment segment1,
Expand All @@ -261,6 +129,5 @@ public static int compareMemorySegmentsUsingOffset(MemorySegment segment1,
}
return Byte.compare(segment1.get(ValueLayout.JAVA_BYTE, mismatch),
segment2.get(ValueLayout.JAVA_BYTE, segment2Offset + mismatch));

}
}
Loading

0 comments on commit 78d301d

Please sign in to comment.