Skip to content
This repository has been archived by the owner on Mar 3, 2024. It is now read-only.

Commit

Permalink
Merge branch 'polis-vk:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
ImLena authored Nov 23, 2023
2 parents 3c3887c + 2d4772c commit 3e61170
Show file tree
Hide file tree
Showing 42 changed files with 2,290 additions and 968 deletions.
81 changes: 81 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,84 @@ NB! Под "не блокирует" ниже понимается отсутс
### Report
Когда всё будет готово, присылайте pull request в ветку `main` со своей реализацией на review.
Не забывайте подтягивать **новые тесты и изменения**, **отвечать на комментарии в PR** и **исправлять замечания**!

## Бонусные задания (deadline 2023-12-06 23:59:59 MSK)

После реализации **всех** предыдущих этапов, необходимо выбрать одну из **незанятых** фич, описанных ниже, и предварительно обсудить с преподавателем предполагаемый способ реализации.

При реализации фичи допускается изменение API `Dao`.

Добавление тестов, демонстрирующих работоспособность реализации, является **обязательным**.

### Feedback

Развёрнутая **конструктивная** обратная связь по курсу: достоинства и недостатки курса, сложность тем, предложения по улучшению.

### Autocompact

Регулярный автоматический фоновый compaction.

### Durability (WAL)

Гарантирует durability (отсутствие потерь подтверджённых записей/удалений) даже в случае "падения" процесса за счёт того, что операции модификации подтверждаются только после того, как попадут в write-ahead-log.
При инициализации стораджа обнаруженные записи WAL "проигрываются" перед началом обслуживания новых операций.
Устаревшие WAL должны ротироваться, чтобы не занимать лишнее место на диске.

Существуют разные подходы к реализации `sync()` на диск.

### Reverse Iterator

Текущий интерфейс `DAO` позволяет итерироваться по данным только в лексикографическом порядке.
Требуется реализовать возможность корректной итерации в обратном порядке, т.е. добавить метод `descendingGet(from, to)`.

### Expiration

Операция `upsert()` должна поддерживать опциональный параметр Time-To-Live (TTL) или время, после которого ячейка должна "пропасть".

"Протухшие" ячейки не должны отдаваться клиентам и должны вычищаться при compaction.

### Compression

Необходимо реализовать блочную компрессию в файлах на диске (используя готовые реализации LZ4, Snappy или zstd) и не забыть про compaction.

### Streaming

Необходимо реализовать механизм для записи и чтения значений **больше** чем Java Heap, например, принимая `InputStream` и выдавая `OutputStream` в качестве значения.

### Atomic Batches

Необходимо реализовать возможность атомарного применения набора модификаций (upsert или remove).

Например, можно принимать от клиента список модификаций, писать их в сериализованном виде в **отдельную таблицу** и удалять после успешного применения.
При инициализации стораджа должны "проигрываться" недоприменённые батчи.

### Transactions

Необходимо обеспечить возможность транзакционного выполнения набора любых операций (upsert/remove/get).
При возникновении конфликта (любой другой транзакции, работающей с теми же ключами несовместимым способом, т.е. не read/read конфликта) клиент должен получать `ConcurrentModificationException`.
Пример реализации -- [NewSQL](https://habr.com/ru/company/odnoklassniki/blog/417593/).

### Bloom Filters

Для каждой таблицы на диске необходимо поддерживать Bloom Filter для содержащихся в ней ключей, чтобы пропускать таблицы, гарантированно не содержащие запрашиваемых ключей.

Очевидно, что это будет работать только в случае "точечных", а не range-запросов.

### Column Families

Поддержка независимых таблиц/keyspace/database/namespace/whatever.

### Snapshots

Получение слепка БД на текущий момент времени с возможностью чтения из него вне зависимости от "развития" основной БД.

Здесь могут помочь hard links.

### Custom Comparators

Возможность указания клиентом пользовательского `Comparator` вместо лексикографического.

### Real 64-bit

* Поддержка файлов больше 2ГБ
* Модульные тесты для ключей/значений больше 2ГБ
33 changes: 29 additions & 4 deletions src/main/java/ru/vk/itmo/kislovdanil/PersistentDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

public class PersistentDao implements Dao<MemorySegment, Entry<MemorySegment>> {
public class PersistentDao implements Dao<MemorySegment, Entry<MemorySegment>>, Iterable<Entry<MemorySegment>> {

public static final MemorySegment DELETED_VALUE = null;
private final Config config;
private final List<SSTable> tables = new ArrayList<>();
Expand All @@ -36,7 +37,7 @@ public PersistentDao(Config config) throws IOException {
for (String tableID : ssTablesIds) {
// SSTable constructor with rewrite=false reads table data from disk if it exists
tables.add(new SSTable(config.basePath(), comparator, Long.parseLong(tableID),
storage, false));
storage.values(), false));
}
tables.sort(SSTable::compareTo);
}
Expand All @@ -56,6 +57,10 @@ private static Entry<MemorySegment> wrapEntryIfDeleted(Entry<MemorySegment> entr
return entry;
}

private void updateId() {
lastTimestamp = Math.max(lastTimestamp + 1, System.currentTimeMillis());
}

@Override
public Entry<MemorySegment> get(MemorySegment key) {
Entry<MemorySegment> ans = storage.get(key);
Expand All @@ -81,10 +86,11 @@ public void upsert(Entry<MemorySegment> entry) {
@Override
public void flush() throws IOException {
if (!storage.isEmpty()) {
lastTimestamp = Math.max(lastTimestamp + 1, System.currentTimeMillis());
updateId();
// SSTable constructor with rewrite=true writes MemTable data on disk deleting old data if it exists
tables.add(new SSTable(config.basePath(), comparator,
lastTimestamp, storage, true));
lastTimestamp, storage.values(), true));
storage.clear();
}
}

Expand All @@ -93,6 +99,25 @@ public void close() throws IOException {
flush();
}

@Override
public void compact() throws IOException {
if (!tables.isEmpty()) {
updateId();
SSTable compactedTable = new SSTable(config.basePath(), comparator, lastTimestamp,
this, true);
storage.clear();
for (SSTable table : tables) {
table.deleteFromDisk();
}
tables.add(compactedTable);
}
}

@Override
public Iterator<Entry<MemorySegment>> iterator() {
return get(null, null);
}

private static class MemSegComparator implements Comparator<MemorySegment> {
@Override
public int compare(MemorySegment o1, MemorySegment o2) {
Expand Down
68 changes: 42 additions & 26 deletions src/main/java/ru/vk/itmo/kislovdanil/SSTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,41 +13,44 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Comparator;
import java.util.NavigableMap;

public class SSTable implements Comparable<SSTable> {
// Contains offset and size for every key and every value in index file
private MemorySegment summaryFile;
private static final String SUMMARY_FILENAME = "summary";
// Contains keys
private MemorySegment indexFile;
private static final String INDEX_FILENAME = "index";
// Contains values
private MemorySegment dataFile;
private static final String DATA_FILENAME = "data";
private final Comparator<MemorySegment> memSegComp;
private final Arena filesArena = Arena.ofAuto();
private final long tableId;
private final Path ssTablePath;

private final long size;

public SSTable(Path basePath, Comparator<MemorySegment> memSegComp, long tableId,
NavigableMap<MemorySegment, Entry<MemorySegment>> memTable,
Iterable<Entry<MemorySegment>> entriesContainer,
boolean rewrite) throws IOException {
this.tableId = tableId;
Path ssTablePath = basePath.resolve(Long.toString(tableId));
this.ssTablePath = basePath.resolve(Long.toString(tableId));
this.memSegComp = memSegComp;
Path summaryFilePath = ssTablePath.resolve("summary");
Path indexFilePath = ssTablePath.resolve("index");
Path dataFilePath = ssTablePath.resolve("data");
Path summaryFilePath = this.ssTablePath.resolve(SUMMARY_FILENAME);
Path indexFilePath = this.ssTablePath.resolve(INDEX_FILENAME);
Path dataFilePath = this.ssTablePath.resolve(DATA_FILENAME);
if (rewrite) {
write(memTable, summaryFilePath, indexFilePath, dataFilePath);
write(entriesContainer, summaryFilePath, indexFilePath, dataFilePath);
} else {
readOld(summaryFilePath, indexFilePath, dataFilePath);
}

summaryFile = summaryFile.asReadOnly();
indexFile = indexFile.asReadOnly();
dataFile = dataFile.asReadOnly();
this.summaryFile = this.summaryFile.asReadOnly();
this.indexFile = this.indexFile.asReadOnly();
this.dataFile = this.dataFile.asReadOnly();

size = (summaryFile.byteSize() / Metadata.SIZE);
this.size = (this.summaryFile.byteSize() / Metadata.SIZE);
}

private void readOld(Path summaryFilePath, Path indexFilePath, Path dataFilePath) throws IOException {
Expand Down Expand Up @@ -90,30 +93,35 @@ private void writeEntry(Entry<MemorySegment> entry,
Metadata.writeEntryMetadata(entry, summaryFile, summaryOffset, indexOffset, dataOffset);
}

private long[] getFilesSize(Iterable<Entry<MemorySegment>> entriesContainer) {
long indexSize = 0;
long dataSize = 0;
long summarySize = 0;
for (Entry<MemorySegment> entry : entriesContainer) {
indexSize += entry.key().byteSize();
dataSize += entry.value() == null ? 0 : entry.value().byteSize();
summarySize += Metadata.SIZE;
}
return new long[]{summarySize, indexSize, dataSize};
}

// Sequentially writes every entity data in SStable keeping files data consistent
private void write(NavigableMap<MemorySegment, Entry<MemorySegment>> memTable,
private void write(Iterable<Entry<MemorySegment>> entriesContainer,
Path summaryFilePath, Path indexFilePath, Path dataFilePath) throws IOException {
prepareForWriting(summaryFilePath);
prepareForWriting(indexFilePath);
prepareForWriting(dataFilePath);

long indexSize = 0;
long dataSize = 0;
long summarySize;
for (Entry<MemorySegment> entry : memTable.values()) {
indexSize += entry.key().byteSize();
dataSize += entry.value() == null ? 0 : entry.value().byteSize();
}
summarySize = memTable.size() * Metadata.SIZE;
long[] filesSize = getFilesSize(entriesContainer);

summaryFile = mapFile(summarySize, summaryFilePath);
indexFile = mapFile(indexSize, indexFilePath);
dataFile = mapFile(dataSize, dataFilePath);
summaryFile = mapFile(filesSize[0], summaryFilePath);
indexFile = mapFile(filesSize[1], indexFilePath);
dataFile = mapFile(filesSize[2], dataFilePath);

long currentDataOffset = 0;
long currentIndexOffset = 0;
long currentSummaryOffset = 0;
for (Entry<MemorySegment> entry : memTable.values()) {
long currentIndexOffset = 0;
long currentDataOffset = 0;
for (Entry<MemorySegment> entry : entriesContainer) {
MemorySegment value = entry.value();
value = value == null ? filesArena.allocate(0) : value;
MemorySegment key = entry.key();
Expand All @@ -124,6 +132,14 @@ private void write(NavigableMap<MemorySegment, Entry<MemorySegment>> memTable,
}
}

// Deletes all SSTable files from disk. Don't use object after invocation of this method!
public void deleteFromDisk() throws IOException {
Files.delete(ssTablePath.resolve(SUMMARY_FILENAME));
Files.delete(ssTablePath.resolve(INDEX_FILENAME));
Files.delete(ssTablePath.resolve(DATA_FILENAME));
Files.delete(ssTablePath);
}

private Range readRange(MemorySegment segment, long offset) {
return new Range(segment.get(ValueLayout.JAVA_LONG_UNALIGNED, offset),
segment.get(ValueLayout.JAVA_LONG_UNALIGNED, offset + Long.BYTES));
Expand Down
Loading

0 comments on commit 3e61170

Please sign in to comment.