diff --git a/src/main/java/ru/vk/itmo/kovalchukvladislav/AbstractBasedOnSSTableDao.java b/src/main/java/ru/vk/itmo/kovalchukvladislav/AbstractBasedOnSSTableDao.java index 8d2fec029..ff8a8cf1e 100644 --- a/src/main/java/ru/vk/itmo/kovalchukvladislav/AbstractBasedOnSSTableDao.java +++ b/src/main/java/ru/vk/itmo/kovalchukvladislav/AbstractBasedOnSSTableDao.java @@ -1,223 +1,148 @@ package ru.vk.itmo.kovalchukvladislav; import ru.vk.itmo.Config; +import ru.vk.itmo.Dao; import ru.vk.itmo.Entry; import ru.vk.itmo.kovalchukvladislav.model.DaoIterator; import ru.vk.itmo.kovalchukvladislav.model.EntryExtractor; -import ru.vk.itmo.kovalchukvladislav.model.TableInfo; +import ru.vk.itmo.kovalchukvladislav.model.SimpleDaoLoggerUtility; +import ru.vk.itmo.kovalchukvladislav.storage.InMemoryStorage; +import ru.vk.itmo.kovalchukvladislav.storage.InMemoryStorageImpl; +import ru.vk.itmo.kovalchukvladislav.storage.SSTableStorage; +import ru.vk.itmo.kovalchukvladislav.storage.SSTableStorageImpl; import java.io.IOException; -import java.lang.foreign.Arena; -import java.lang.foreign.MemorySegment; -import java.nio.channels.FileChannel; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; +import java.io.UncheckedIOException; import java.nio.file.Path; -import java.nio.file.StandardOpenOption; import java.util.ArrayList; -import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Objects; -import java.util.logging.Level; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; -public abstract class AbstractBasedOnSSTableDao> extends AbstractInMemoryDao { - // =================================== - // Constants - // =================================== +public abstract class AbstractBasedOnSSTableDao> implements Dao { + private final Logger logger = SimpleDaoLoggerUtility.createLogger(getClass()); + private static final String DB_FILENAME_PREFIX = "db_"; private static final String METADATA_FILENAME = "metadata"; private static final String OFFSETS_FILENAME_PREFIX = "offsets_"; - private static final String DB_FILENAME_PREFIX = "db_"; - - // =================================== - // Variables - // =================================== private final Path basePath; - private final Arena arena = Arena.ofShared(); + private final long flushThresholdBytes; private final EntryExtractor extractor; - private final SSTableMemorySegmentWriter writer; - - // =================================== - // Storages - // =================================== - - private int storagesCount; - private volatile boolean closed; - private final List dbMappedSegments; - private final List offsetMappedSegments; - private final Logger logger = Logger.getLogger(getClass().getSimpleName()); + private final AtomicBoolean isClosed = new AtomicBoolean(false); + private final AtomicBoolean isFlushingOrCompacting = new AtomicBoolean(false); + private final ExecutorService flushOrCompactQueue = Executors.newSingleThreadExecutor(); + + /** + * В get(), upsert() и compact() для inMemoryStorage и ssTableStorage не требуется синхронизация между собой. + * Исключение составляет только flush() и compact(). + * Следует проследить что на любом этапе оба стораджа в сумме будут иметь полные данные. + */ + private final InMemoryStorage inMemoryStorage; + private final SSTableStorage ssTableStorage; protected AbstractBasedOnSSTableDao(Config config, EntryExtractor extractor) throws IOException { - super(extractor); - this.closed = false; - this.storagesCount = 0; this.extractor = extractor; + this.flushThresholdBytes = config.flushThresholdBytes(); this.basePath = Objects.requireNonNull(config.basePath()); - this.dbMappedSegments = new ArrayList<>(); - this.offsetMappedSegments = new ArrayList<>(); - reloadFilesAndMapToSegment(); - this.writer = new SSTableMemorySegmentWriter<>(basePath, DB_FILENAME_PREFIX, OFFSETS_FILENAME_PREFIX, - METADATA_FILENAME, extractor); - logger.setLevel(Level.OFF); // чтобы не засорять вывод в гитхабе, если такое возможно - } - - // =================================== - // Restoring state - // =================================== - - private void reloadFilesAndMapToSegment() throws IOException { - if (!Files.exists(basePath)) { - Files.createDirectory(basePath); - } - logger.info(() -> String.format("Reloading files from %s", basePath)); - List ssTableIds = getSSTableIds(); - for (String ssTableId : ssTableIds) { - readFileAndMapToSegment(ssTableId); - } - logger.info(() -> String.format("Reloaded %d files", storagesCount)); - } - - private void readFileAndMapToSegment(String timestamp) throws IOException { - Path dbPath = basePath.resolve(DB_FILENAME_PREFIX + timestamp); - Path offsetsPath = basePath.resolve(OFFSETS_FILENAME_PREFIX + timestamp); - if (!Files.exists(dbPath) || !Files.exists(offsetsPath)) { - logger.severe(() -> String.format("File under path %s or %s doesn't exists", dbPath, offsetsPath)); - return; - } - - logger.info(() -> String.format("Reading files with timestamp %s", timestamp)); - - try (FileChannel dbChannel = FileChannel.open(dbPath, StandardOpenOption.READ); - FileChannel offsetChannel = FileChannel.open(offsetsPath, StandardOpenOption.READ)) { - - MemorySegment db = dbChannel.map(FileChannel.MapMode.READ_ONLY, 0, Files.size(dbPath), arena); - MemorySegment offsets = offsetChannel.map(FileChannel.MapMode.READ_ONLY, 0, Files.size(offsetsPath), arena); - dbMappedSegments.add(db); - offsetMappedSegments.add(offsets); - storagesCount++; - } - logger.info(() -> String.format("Successfully read files with %s timestamp", timestamp)); - } - - private List getSSTableIds() throws IOException { - Path metadataPath = basePath.resolve(METADATA_FILENAME); - if (!Files.exists(metadataPath)) { - return Collections.emptyList(); - } - return Files.readAllLines(metadataPath, StandardCharsets.UTF_8); - } - - private Path[] getAllTablesPath() throws IOException { - List ssTableIds = getSSTableIds(); - int size = ssTableIds.size(); - Path[] files = new Path[2 * size]; - - for (int i = 0; i < size; i++) { - String id = ssTableIds.get(i); - files[2 * i] = basePath.resolve(DB_FILENAME_PREFIX + id); - files[2 * i + 1] = basePath.resolve(OFFSETS_FILENAME_PREFIX + id); - } - return files; + this.inMemoryStorage = new InMemoryStorageImpl<>(extractor, config.flushThresholdBytes()); + this.ssTableStorage = new SSTableStorageImpl<>(basePath, METADATA_FILENAME, + DB_FILENAME_PREFIX, OFFSETS_FILENAME_PREFIX, extractor); } - // =================================== - // Finding in storage - // =================================== @Override public Iterator get(D from, D to) { - Iterator inMemotyIterator = super.get(from, to); - return new DaoIterator<>(from, to, inMemotyIterator, dbMappedSegments, offsetMappedSegments, extractor); + List> iterators = new ArrayList<>(); + iterators.addAll(inMemoryStorage.getIterators(from, to)); + iterators.addAll(ssTableStorage.getIterators(from, to)); + return new DaoIterator<>(iterators, extractor); } @Override public E get(D key) { - E e = dao.get(key); + E e = inMemoryStorage.get(key); if (e != null) { return e.value() == null ? null : e; } - E fromFile = findInStorages(key); + E fromFile = ssTableStorage.get(key); return (fromFile == null || fromFile.value() == null) ? null : fromFile; } - private E findInStorages(D key) { - for (int i = storagesCount - 1; i >= 0; i--) { - MemorySegment storage = dbMappedSegments.get(i); - MemorySegment offsets = offsetMappedSegments.get(i); - - long offset = extractor.findLowerBoundValueOffset(key, storage, offsets); - if (offset == -1) { - continue; - } - D lowerBoundKey = extractor.readValue(storage, offset); - - if (comparator.compare(lowerBoundKey, key) == 0) { - long valueOffset = offset + extractor.size(lowerBoundKey); - D value = extractor.readValue(storage, valueOffset); - return extractor.createEntry(lowerBoundKey, value); - } + @Override + public void upsert(E entry) { + long size = inMemoryStorage.upsertAndGetSize(entry); + if (size >= flushThresholdBytes) { + flush(); } - return null; } - // =================================== - // Some utils - // =================================== - - private TableInfo getInMemoryDaoSizeInfo() { - long size = 0; - for (E entry : dao.values()) { - size += extractor.size(entry); + @Override + public void flush() { + if (!isFlushingOrCompacting.compareAndSet(false, true)) { + logger.info("Flush or compact already in process"); + return; } - return new TableInfo(dao.size(), size); - } - - private TableInfo getSSTableDaoSizeInfo() { - Iterator allIterator = all(); - long entriesCount = 0; - long daoSize = 0; - - while (allIterator.hasNext()) { - E next = allIterator.next(); - entriesCount++; - daoSize += extractor.size(next); + Callable flushCallable = inMemoryStorage.prepareFlush( + basePath, + DB_FILENAME_PREFIX, + OFFSETS_FILENAME_PREFIX); + if (flushCallable == null) { + isFlushingOrCompacting.set(false); + return; } - - return new TableInfo(entriesCount, daoSize); + submitFlushAndAddSSTable(flushCallable); } - // =================================== - // Flush and close - // =================================== - - @Override - public synchronized void flush() throws IOException { - if (dao.isEmpty()) { - return; - } - writer.flush(dao.values().iterator(), getInMemoryDaoSizeInfo()); + private void submitFlushAndAddSSTable(Callable flushCallable) { + flushOrCompactQueue.execute(() -> { + try { + String newTimestamp = flushCallable.call(); + ssTableStorage.addSSTableId(newTimestamp, true); + inMemoryStorage.completeFlush(); + } catch (Exception e) { + inMemoryStorage.failFlush(); + } finally { + isFlushingOrCompacting.set(false); + } + }); } @Override - public synchronized void close() throws IOException { - if (closed) { + public void close() { + if (!isClosed.compareAndSet(false, true)) { return; } - flush(); - if (arena.scope().isAlive()) { - arena.close(); + + flushOrCompactQueue.close(); + try { + String newTimestamp = inMemoryStorage.close(basePath, DB_FILENAME_PREFIX, OFFSETS_FILENAME_PREFIX); + if (newTimestamp != null) { + ssTableStorage.addSSTableId(newTimestamp, false); + } + } catch (Exception e) { + logger.severe(() -> "Error while flushing on close: " + e.getMessage()); } - closed = true; + ssTableStorage.close(); } @Override - public synchronized void compact() throws IOException { - if (storagesCount <= 1 && dao.isEmpty()) { + public void compact() { + if (!isFlushingOrCompacting.compareAndSet(false, true)) { + logger.info("Flush or compact already in process"); return; } - Path[] oldTables = getAllTablesPath(); - writer.compact(all(), getSSTableDaoSizeInfo()); - writer.deleteUnusedFiles(oldTables); + flushOrCompactQueue.execute(() -> { + try { + ssTableStorage.compact(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } finally { + isFlushingOrCompacting.set(false); + } + }); } } diff --git a/src/main/java/ru/vk/itmo/kovalchukvladislav/AbstractInMemoryDao.java b/src/main/java/ru/vk/itmo/kovalchukvladislav/AbstractInMemoryDao.java deleted file mode 100644 index 3f48b9981..000000000 --- a/src/main/java/ru/vk/itmo/kovalchukvladislav/AbstractInMemoryDao.java +++ /dev/null @@ -1,44 +0,0 @@ -package ru.vk.itmo.kovalchukvladislav; - -import ru.vk.itmo.Dao; -import ru.vk.itmo.Entry; - -import java.util.Comparator; -import java.util.Iterator; -import java.util.concurrent.ConcurrentNavigableMap; -import java.util.concurrent.ConcurrentSkipListMap; - -public abstract class AbstractInMemoryDao> implements Dao { - protected final ConcurrentNavigableMap dao; - protected final Comparator comparator; - - protected AbstractInMemoryDao(Comparator comparator) { - this.dao = new ConcurrentSkipListMap<>(comparator); - this.comparator = comparator; - } - - @Override - public Iterator get(D from, D to) { - ConcurrentNavigableMap subMap; - if (from == null && to == null) { - subMap = dao; - } else if (from == null) { - subMap = dao.headMap(to); - } else if (to == null) { - subMap = dao.tailMap(from); - } else { - subMap = dao.subMap(from, to); - } - return subMap.values().iterator(); - } - - @Override - public E get(D key) { - return dao.get(key); - } - - @Override - public void upsert(E entry) { - dao.put(entry.key(), entry); - } -} diff --git a/src/main/java/ru/vk/itmo/kovalchukvladislav/MemorySegmentEntryExtractor.java b/src/main/java/ru/vk/itmo/kovalchukvladislav/MemorySegmentEntryExtractor.java index f55e75fdb..4e25d24fa 100644 --- a/src/main/java/ru/vk/itmo/kovalchukvladislav/MemorySegmentEntryExtractor.java +++ b/src/main/java/ru/vk/itmo/kovalchukvladislav/MemorySegmentEntryExtractor.java @@ -80,6 +80,9 @@ public long size(MemorySegment value) { @Override public long size(Entry entry) { + if (entry == null) { + return 0; + } return size(entry.key()) + size(entry.value()); } diff --git a/src/main/java/ru/vk/itmo/kovalchukvladislav/SSTableMemorySegmentWriter.java b/src/main/java/ru/vk/itmo/kovalchukvladislav/SSTableMemorySegmentWriter.java deleted file mode 100644 index 077a90c75..000000000 --- a/src/main/java/ru/vk/itmo/kovalchukvladislav/SSTableMemorySegmentWriter.java +++ /dev/null @@ -1,164 +0,0 @@ -package ru.vk.itmo.kovalchukvladislav; - -import ru.vk.itmo.Entry; -import ru.vk.itmo.kovalchukvladislav.model.EntryExtractor; -import ru.vk.itmo.kovalchukvladislav.model.TableInfo; - -import java.io.IOException; -import java.lang.foreign.Arena; -import java.lang.foreign.MemorySegment; -import java.lang.foreign.ValueLayout; -import java.nio.channels.FileChannel; -import java.nio.file.Files; -import java.nio.file.OpenOption; -import java.nio.file.Path; -import java.nio.file.StandardCopyOption; -import java.nio.file.StandardOpenOption; -import java.util.Comparator; -import java.util.Iterator; -import java.util.logging.Level; -import java.util.logging.Logger; -import java.util.stream.Stream; - -public class SSTableMemorySegmentWriter> { - private static final Logger logger = Logger.getLogger(SSTableMemorySegmentWriter.class.getSimpleName()); - private static final OpenOption[] WRITE_OPTIONS = new OpenOption[] { - StandardOpenOption.READ, - StandardOpenOption.WRITE, - StandardOpenOption.TRUNCATE_EXISTING, - StandardOpenOption.CREATE - }; - - private static final StandardCopyOption[] MOVE_OPTIONS = new StandardCopyOption[] { - StandardCopyOption.ATOMIC_MOVE, - StandardCopyOption.REPLACE_EXISTING - }; - - private final Path basePath; - private final String metadataFilename; - private final String dbFilenamePrefix; - private final String offsetsFilenamePrefix; - private final EntryExtractor extractor; - - public SSTableMemorySegmentWriter(Path basePath, String dbFilenamePrefix, String offsetsFilenamePrefix, - String metadataFilename, EntryExtractor extractor) { - this.basePath = basePath; - this.dbFilenamePrefix = dbFilenamePrefix; - this.offsetsFilenamePrefix = offsetsFilenamePrefix; - this.metadataFilename = metadataFilename; - this.extractor = extractor; - logger.setLevel(Level.OFF); // чтобы не засорять вывод в гитхабе, если такое возможно - } - - public void compact(Iterator iterator, TableInfo info) throws IOException { - Path tempDirectory = Files.createTempDirectory(null); - String timestamp = String.valueOf(System.currentTimeMillis()); - - Path newSSTable = basePath.resolve(dbFilenamePrefix + timestamp); - Path newOffsetsTable = basePath.resolve(offsetsFilenamePrefix + timestamp); - Path tmpSSTable = tempDirectory.resolve(dbFilenamePrefix + timestamp); - Path tmpOffsetsTable = tempDirectory.resolve(offsetsFilenamePrefix + timestamp); - - logger.info(() -> String.format("Compacting started to dir %s, timestamp %s, info %s", - tempDirectory, timestamp, info)); - - try { - writeData(tempDirectory, timestamp, iterator, info); - Path tmpMetadata = addSSTableId(tempDirectory, timestamp); - Path newMetadata = basePath.resolve(metadataFilename); - - Files.move(tmpSSTable, newSSTable, MOVE_OPTIONS); - Files.move(tmpOffsetsTable, newOffsetsTable, MOVE_OPTIONS); - Files.move(tmpMetadata, newMetadata, MOVE_OPTIONS); - } catch (Exception e) { - deleteUnusedFiles(newSSTable, newOffsetsTable); - throw e; - } finally { - deleteUnusedFiles(tempDirectory); - } - logger.info(() -> String.format("Compacted to dir %s, timestamp %s", basePath, timestamp)); - } - - public void flush(Iterator iterator, TableInfo info) throws IOException { - Path tempDirectory = Files.createTempDirectory(null); - String timestamp = String.valueOf(System.currentTimeMillis()); - - Path newSSTable = basePath.resolve(dbFilenamePrefix + timestamp); - Path newOffsetsTable = basePath.resolve(offsetsFilenamePrefix + timestamp); - Path tmpSSTable = tempDirectory.resolve(dbFilenamePrefix + timestamp); - Path tmpOffsetsTable = tempDirectory.resolve(offsetsFilenamePrefix + timestamp); - - logger.info(() -> String.format("Flushing started to dir %s, timestamp %s, info %s", - tempDirectory, timestamp, info)); - try { - writeData(tempDirectory, timestamp, iterator, info); - - Files.move(tmpSSTable, newSSTable, MOVE_OPTIONS); - Files.move(tmpOffsetsTable, newOffsetsTable, MOVE_OPTIONS); - addSSTableId(basePath, timestamp); - } catch (Exception e) { - deleteUnusedFiles(newSSTable, newOffsetsTable); - throw e; - } finally { - deleteUnusedFilesInDirectory(tempDirectory); - } - logger.info(() -> String.format("Flushed to dir %s, timestamp %s", basePath, timestamp)); - } - - // Удаление ненужных файлов не является чем то критически важным - // Если произойдет исключение, лучше словить и вывести в лог, чем останавливать работу - public void deleteUnusedFiles(Path... files) { - for (Path file : files) { - try { - boolean deleted = Files.deleteIfExists(file); - if (deleted) { - logger.info(() -> String.format("File %s was deleted", file)); - } else { - logger.severe(() -> String.format("File %s not deleted", file)); - } - } catch (IOException e) { - logger.severe(() -> String.format("Error while deleting file %s: %s", file, e.getMessage())); - } - } - } - - private void deleteUnusedFilesInDirectory(Path directory) { - try (Stream files = Files.walk(directory)) { - Path[] array = files.sorted(Comparator.reverseOrder()).toArray(Path[]::new); - deleteUnusedFiles(array); - } catch (Exception e) { - logger.severe(() -> String.format("Error while deleting directory %s: %s", directory, e.getMessage())); - } - } - - private void writeData(Path path, String timestamp, Iterator daoIterator, TableInfo info) throws IOException { - Path dbPath = path.resolve(dbFilenamePrefix + timestamp); - Path offsetsPath = path.resolve(offsetsFilenamePrefix + timestamp); - - try (FileChannel db = FileChannel.open(dbPath, WRITE_OPTIONS); - FileChannel offsets = FileChannel.open(offsetsPath, WRITE_OPTIONS); - Arena arena = Arena.ofConfined()) { - - long offsetsSize = info.getRecordsCount() * Long.BYTES; - MemorySegment fileSegment = db.map(FileChannel.MapMode.READ_WRITE, 0, info.getRecordsSize(), arena); - MemorySegment offsetsSegment = offsets.map(FileChannel.MapMode.READ_WRITE, 0, offsetsSize, arena); - - int i = 0; - long offset = 0; - while (daoIterator.hasNext()) { - E entry = daoIterator.next(); - offsetsSegment.setAtIndex(ValueLayout.JAVA_LONG_UNALIGNED, i, offset); - offset = extractor.writeEntry(entry, fileSegment, offset); - i += 1; - } - - fileSegment.load(); - offsetsSegment.load(); - } - } - - private Path addSSTableId(Path path, String id) throws IOException { - return Files.writeString(path.resolve(metadataFilename), id + System.lineSeparator(), - StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE); - } -} diff --git a/src/main/java/ru/vk/itmo/kovalchukvladislav/model/DaoIterator.java b/src/main/java/ru/vk/itmo/kovalchukvladislav/model/DaoIterator.java index 47572a8ac..37ed6d2c4 100644 --- a/src/main/java/ru/vk/itmo/kovalchukvladislav/model/DaoIterator.java +++ b/src/main/java/ru/vk/itmo/kovalchukvladislav/model/DaoIterator.java @@ -2,50 +2,28 @@ import ru.vk.itmo.Entry; -import java.lang.foreign.MemorySegment; -import java.lang.foreign.ValueLayout; -import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; import java.util.PriorityQueue; public class DaoIterator> implements Iterator { - private static final ValueLayout.OfLong LONG_LAYOUT = ValueLayout.JAVA_LONG_UNALIGNED; - private static final Integer IN_MEMORY_ITERATOR_ID = Integer.MAX_VALUE; - private final Iterator inMemoryIterator; + private final List> iterators; private final EntryExtractor extractor; private final PriorityQueue queue; - private final List storageIterators; - public DaoIterator(D from, D to, - Iterator inMemoryIterator, - List storageSegments, - List offsetsSegments, - EntryExtractor extractor) { + public DaoIterator(List> iteratorsSortedByPriority, EntryExtractor extractor) { + this.iterators = iteratorsSortedByPriority; this.extractor = extractor; - this.inMemoryIterator = inMemoryIterator; - this.storageIterators = getStorageIterators(from, to, storageSegments, offsetsSegments); - this.queue = new PriorityQueue<>(1 + storageIterators.size()); - addEntryByIteratorIdSafe(IN_MEMORY_ITERATOR_ID); - for (int i = 0; i < storageIterators.size(); i++) { - addEntryByIteratorIdSafe(i); + int size = iterators.size(); + this.queue = new PriorityQueue<>(size); + for (int i = 0; i < size; i++) { + addEntry(iterators.get(i), i); } cleanByNull(); } - private List getStorageIterators(D from, D to, - List storageSegments, - List offsetsSegments) { - int storagesCount = storageSegments.size(); - final List iterators = new ArrayList<>(storagesCount); - for (int i = 0; i < storagesCount; i++) { - iterators.add(new StorageIterator(storageSegments.get(i), offsetsSegments.get(i), from, to)); - } - return iterators; - } - @Override public boolean hasNext() { return !queue.isEmpty(); @@ -82,18 +60,18 @@ private void cleanByNull() { } private void addEntryByIteratorIdSafe(int iteratorId) { - Iterator iteratorById = getIteratorById(iteratorId); - if (iteratorById.hasNext()) { - E next = iteratorById.next(); - queue.add(new IndexedEntry(iteratorId, next)); + addEntry(getIteratorById(iteratorId), iteratorId); + } + + private void addEntry(Iterator iterator, int id) { + if (iterator.hasNext()) { + E next = iterator.next(); + queue.add(new IndexedEntry(id, next)); } } private Iterator getIteratorById(int id) { - if (id == IN_MEMORY_ITERATOR_ID) { - return inMemoryIterator; - } - return storageIterators.get(id); + return iterators.get(id); } private class IndexedEntry implements Comparable { @@ -111,83 +89,7 @@ public int compareTo(IndexedEntry other) { if (compared != 0) { return compared; } - return -Integer.compare(iteratorId, other.iteratorId); - } - } - - private class StorageIterator implements Iterator { - private final MemorySegment storageSegment; - private final long end; - private long start; - - public StorageIterator(MemorySegment storageSegment, MemorySegment offsetsSegment, D from, D to) { - this.storageSegment = storageSegment; - - if (offsetsSegment.byteSize() == 0) { - this.start = -1; - this.end = -1; - } else { - this.start = calculateStartPosition(offsetsSegment, from); - this.end = calculateEndPosition(offsetsSegment, to); - } - } - - private long calculateStartPosition(MemorySegment offsetsSegment, D from) { - if (from == null) { - return getFirstOffset(offsetsSegment); - } - long lowerBoundOffset = extractor.findLowerBoundValueOffset(from, storageSegment, offsetsSegment); - if (lowerBoundOffset == -1) { - // the smallest element and doesn't exist - return getFirstOffset(offsetsSegment); - } else { - // storage[lowerBoundOffset] <= from, we need >= only - return moveOffsetIfFirstKeyAreNotEqual(from, lowerBoundOffset); - } - } - - private long calculateEndPosition(MemorySegment offsetsSegment, D to) { - if (to == null) { - return getEndOffset(); - } - long lowerBoundOffset = extractor.findLowerBoundValueOffset(to, storageSegment, offsetsSegment); - if (lowerBoundOffset == -1) { - // the smallest element and doesn't exist - return getFirstOffset(offsetsSegment); - } - // storage[lowerBoundOffset] <= to, we need >= only - return moveOffsetIfFirstKeyAreNotEqual(to, lowerBoundOffset); - } - - private long getFirstOffset(MemorySegment offsetsSegment) { - return offsetsSegment.getAtIndex(LONG_LAYOUT, 0); - } - - private long getEndOffset() { - return storageSegment.byteSize(); - } - - private long moveOffsetIfFirstKeyAreNotEqual(D from, long lowerBoundOffset) { - long offset = lowerBoundOffset; - D lowerBoundKey = extractor.readValue(storageSegment, offset); - if (extractor.compare(lowerBoundKey, from) != 0) { - offset += extractor.size(lowerBoundKey); - D lowerBoundValue = extractor.readValue(storageSegment, offset); - offset += extractor.size(lowerBoundValue); - } - return offset; - } - - @Override - public boolean hasNext() { - return start < end; - } - - @Override - public E next() { - E entry = extractor.readEntry(storageSegment, start); - start += extractor.size(entry); - return entry; + return Integer.compare(iteratorId, other.iteratorId); } } } diff --git a/src/main/java/ru/vk/itmo/kovalchukvladislav/model/MemoryOverflowException.java b/src/main/java/ru/vk/itmo/kovalchukvladislav/model/MemoryOverflowException.java new file mode 100644 index 000000000..84b798886 --- /dev/null +++ b/src/main/java/ru/vk/itmo/kovalchukvladislav/model/MemoryOverflowException.java @@ -0,0 +1,7 @@ +package ru.vk.itmo.kovalchukvladislav.model; + +public class MemoryOverflowException extends RuntimeException { + public MemoryOverflowException(String message) { + super(message); + } +} diff --git a/src/main/java/ru/vk/itmo/kovalchukvladislav/model/SimpleDaoLoggerUtility.java b/src/main/java/ru/vk/itmo/kovalchukvladislav/model/SimpleDaoLoggerUtility.java new file mode 100644 index 000000000..7457d5091 --- /dev/null +++ b/src/main/java/ru/vk/itmo/kovalchukvladislav/model/SimpleDaoLoggerUtility.java @@ -0,0 +1,16 @@ +package ru.vk.itmo.kovalchukvladislav.model; + +import java.util.logging.Level; +import java.util.logging.Logger; + +// Логгер, который я включаю локально, но выключаю перед пушем, чтобы он не засорял гитхаб. +public final class SimpleDaoLoggerUtility { + private SimpleDaoLoggerUtility() { + } + + public static Logger createLogger(Class clazz) { + Logger logger = Logger.getLogger(clazz.getSimpleName()); + logger.setLevel(Level.OFF); + return logger; + } +} diff --git a/src/main/java/ru/vk/itmo/kovalchukvladislav/model/StorageIterator.java b/src/main/java/ru/vk/itmo/kovalchukvladislav/model/StorageIterator.java new file mode 100644 index 000000000..4454f1204 --- /dev/null +++ b/src/main/java/ru/vk/itmo/kovalchukvladislav/model/StorageIterator.java @@ -0,0 +1,91 @@ +package ru.vk.itmo.kovalchukvladislav.model; + +import ru.vk.itmo.Entry; +import java.lang.foreign.MemorySegment; +import java.lang.foreign.ValueLayout; +import java.util.Iterator; +import java.util.NoSuchElementException; + +public class StorageIterator> implements Iterator { + private final EntryExtractor extractor; + private final MemorySegment storageSegment; + private final long end; + private long start; + + public StorageIterator(D from, D to, + MemorySegment storageSegment, + MemorySegment offsetsSegment, + EntryExtractor extractor) { + this.storageSegment = storageSegment; + this.extractor = extractor; + + if (offsetsSegment.byteSize() == 0) { + this.start = -1; + this.end = -1; + } else { + this.start = calculateStartPosition(offsetsSegment, from); + this.end = calculateEndPosition(offsetsSegment, to); + } + } + + private long calculateStartPosition(MemorySegment offsetsSegment, D from) { + if (from == null) { + return getFirstOffset(offsetsSegment); + } + long lowerBoundOffset = extractor.findLowerBoundValueOffset(from, storageSegment, offsetsSegment); + if (lowerBoundOffset == -1) { + // the smallest element and doesn't exist + return getFirstOffset(offsetsSegment); + } else { + // storage[lowerBoundOffset] <= from, we need >= only + return moveOffsetIfFirstKeyAreNotEqual(from, lowerBoundOffset); + } + } + + private long calculateEndPosition(MemorySegment offsetsSegment, D to) { + if (to == null) { + return getEndOffset(); + } + long lowerBoundOffset = extractor.findLowerBoundValueOffset(to, storageSegment, offsetsSegment); + if (lowerBoundOffset == -1) { + // the smallest element and doesn't exist + return getFirstOffset(offsetsSegment); + } + // storage[lowerBoundOffset] <= to, we need >= only + return moveOffsetIfFirstKeyAreNotEqual(to, lowerBoundOffset); + } + + private long getFirstOffset(MemorySegment offsetsSegment) { + return offsetsSegment.getAtIndex(ValueLayout.JAVA_LONG_UNALIGNED, 0); + } + + private long getEndOffset() { + return storageSegment.byteSize(); + } + + private long moveOffsetIfFirstKeyAreNotEqual(D from, long lowerBoundOffset) { + long offset = lowerBoundOffset; + D lowerBoundKey = extractor.readValue(storageSegment, offset); + if (extractor.compare(lowerBoundKey, from) != 0) { + offset += extractor.size(lowerBoundKey); + D lowerBoundValue = extractor.readValue(storageSegment, offset); + offset += extractor.size(lowerBoundValue); + } + return offset; + } + + @Override + public boolean hasNext() { + return start < end; + } + + @Override + public E next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + E entry = extractor.readEntry(storageSegment, start); + start += extractor.size(entry); + return entry; + } +} diff --git a/src/main/java/ru/vk/itmo/kovalchukvladislav/model/TableInfo.java b/src/main/java/ru/vk/itmo/kovalchukvladislav/model/TableInfo.java index 2331fe563..38620f3b7 100644 --- a/src/main/java/ru/vk/itmo/kovalchukvladislav/model/TableInfo.java +++ b/src/main/java/ru/vk/itmo/kovalchukvladislav/model/TableInfo.java @@ -1,27 +1,4 @@ package ru.vk.itmo.kovalchukvladislav.model; -public class TableInfo { - private final long recordsCount; - private final long recordsSize; - - public TableInfo(long recordsCount, long recordsSize) { - this.recordsCount = recordsCount; - this.recordsSize = recordsSize; - } - - public long getRecordsCount() { - return recordsCount; - } - - public long getRecordsSize() { - return recordsSize; - } - - @Override - public String toString() { - return "TableInfo{" - + "recordsCount=" + recordsCount - + ", recordsSize=" + recordsSize - + '}'; - } +public record TableInfo(long recordsCount, long recordsSize) { } diff --git a/src/main/java/ru/vk/itmo/kovalchukvladislav/storage/InMemoryStorage.java b/src/main/java/ru/vk/itmo/kovalchukvladislav/storage/InMemoryStorage.java new file mode 100644 index 000000000..99a7384a5 --- /dev/null +++ b/src/main/java/ru/vk/itmo/kovalchukvladislav/storage/InMemoryStorage.java @@ -0,0 +1,43 @@ +package ru.vk.itmo.kovalchukvladislav.storage; + +import ru.vk.itmo.Entry; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.Callable; + +public interface InMemoryStorage> { + E get(D key); + + /** + * Вставляет значение и возвращает размер стораджа, по которому можно судить нужно ли планировать flush. + * Бросает MemoryOverflowException если достигнут порог по размеру, а предыдущий flush() еще выполняется или упал. + */ + long upsertAndGetSize(E entry); + + List> getIterators(D from, D to); + + /** + * Возвращает таску для flush(). Таска возвращает новый timestamp файлов в base path. + * Если уже выполняется flush, возвращает null. + * Если предыдущий flush() был завершен с ошибкой, возвратит таску на повторную попытку. + * Если нет данных, возвращает null. + */ + Callable prepareFlush(Path basePath, String dbFilenamePrefix, String offsetsFilenamePrefix); + + /** + * Помечает flush() завершенным с ошибкой. Позволяет повторить попытку при повторных flush(). + */ + void failFlush(); + + /** + * Завершает flush() и удаляет выгружаемое дао, которое хранится пока SSTableStorage не подхватит данные с диска. + */ + void completeFlush(); + + /** + * Синхронно и однопоточно делает flush().Возвращает новый timestamp или null. + */ + String close(Path basePath, String dbFilenamePrefix, String offsetsFilenamePrefix) throws IOException; +} diff --git a/src/main/java/ru/vk/itmo/kovalchukvladislav/storage/InMemoryStorageImpl.java b/src/main/java/ru/vk/itmo/kovalchukvladislav/storage/InMemoryStorageImpl.java new file mode 100644 index 000000000..54b3eb541 --- /dev/null +++ b/src/main/java/ru/vk/itmo/kovalchukvladislav/storage/InMemoryStorageImpl.java @@ -0,0 +1,293 @@ +package ru.vk.itmo.kovalchukvladislav.storage; + +import ru.vk.itmo.Entry; +import ru.vk.itmo.kovalchukvladislav.model.EntryExtractor; +import ru.vk.itmo.kovalchukvladislav.model.MemoryOverflowException; +import ru.vk.itmo.kovalchukvladislav.model.SimpleDaoLoggerUtility; +import ru.vk.itmo.kovalchukvladislav.model.TableInfo; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.logging.Logger; + +public class InMemoryStorageImpl> implements InMemoryStorage { + private static final StandardCopyOption[] MOVE_OPTIONS = new StandardCopyOption[]{ + StandardCopyOption.ATOMIC_MOVE, + StandardCopyOption.REPLACE_EXISTING + }; + private static final Logger logger = SimpleDaoLoggerUtility.createLogger(InMemoryStorageImpl.class); + private final long flushThresholdBytes; + private final EntryExtractor extractor; + + private volatile DaoState daoState; + private volatile FlushingDaoState flushingDaoState; + private final ReadWriteLock stateLock = new ReentrantReadWriteLock(); + + public InMemoryStorageImpl(EntryExtractor extractor, long flushThresholdBytes) { + this.extractor = extractor; + this.flushThresholdBytes = flushThresholdBytes; + this.daoState = createEmptyDaoState(); + this.flushingDaoState = createEmptyFlushingDaoState(); + + } + + private DaoState createEmptyDaoState() { + return new DaoState<>(new ConcurrentSkipListMap<>(extractor), new AtomicLong(0)); + } + + private FlushingDaoState createEmptyFlushingDaoState() { + return new FlushingDaoState<>(new ConcurrentSkipListMap<>(extractor), 0, FlushingState.NOT_RUNNING); + } + + /** + * daoSize допустимо увеличивать внутри readLock для большей производительности. + * Например, в upsert надо атомарно увеличить или уменьшить значение, а не узнать корректное. + * В операциях на чтение точного значения daoSize (flush) следует использовать writeLock. + * writeLock() гарантирует что в настоящее время нет readLock'ов, а значит и незаконченных операций изменения size. + */ + @SuppressWarnings("unused") + private record DaoState>(ConcurrentNavigableMap dao, AtomicLong daoSize) { + public FlushingDaoState toFlushingRunningDaoState() { + return new FlushingDaoState<>(dao, daoSize.get(), FlushingState.RUNNING); + } + } + + @SuppressWarnings("unused") + private record FlushingDaoState(ConcurrentNavigableMap dao, long daoSize, FlushingState flushingState) { + + public FlushingDaoState toFailed() { + return new FlushingDaoState<>(dao, daoSize, FlushingState.FAILED); + } + + public FlushingDaoState failedToTryAgain() { + if (flushingState != FlushingState.FAILED) { + throw new IllegalStateException("This method should be called when state is failed"); + } + return new FlushingDaoState<>(dao, daoSize, FlushingState.RUNNING); + } + } + + private enum FlushingState { + NOT_RUNNING, + RUNNING, + FAILED, + // Можно добавить еще одно состояние: данные выгружены, но произошло исключение при их релоаде в SSTableStorage. + // Позволит при повторном flush вернуть уже готовый timestamp, а не флашить опять в новый файл. + } + + @Override + public E get(D key) { + ConcurrentNavigableMap dao; + ConcurrentNavigableMap flushingDao; + + stateLock.readLock().lock(); + try { + dao = daoState.dao; + flushingDao = flushingDaoState.dao; + } finally { + stateLock.readLock().unlock(); + } + + E entry = dao.get(key); + if (entry == null && !flushingDao.isEmpty()) { + entry = flushingDao.get(key); + } + return entry; + } + + /** + * Возвращает не точное значение size в угоду перфомансу, иначе будет куча writeLock. + * При параллельных upsert(), в одном из них daoSize может не успеть инкрементироваться для вставляемого entry. + * Тем не менее "приблизительное size" можно использовать для детекта случаев когда надо делать flush(). + */ + @Override + public long upsertAndGetSize(E entry) { + stateLock.readLock().lock(); + try { + // "приблизительный" size, не пользуемся write lock'ами в угоду перфомансу + AtomicLong daoSize = getDaoSizeOrThrowMemoryOverflow(flushThresholdBytes, daoState, flushingDaoState); + + E oldEntry = daoState.dao.put(entry.key(), entry); + long delta = extractor.size(entry) - extractor.size(oldEntry); + return daoSize.addAndGet(delta); + } finally { + stateLock.readLock().unlock(); + } + } + + private static > AtomicLong getDaoSizeOrThrowMemoryOverflow( + long flushThresholdBytes, DaoState daoState, FlushingDaoState flushingDaoState) { + AtomicLong daoSize = daoState.daoSize; + if (daoSize.get() < flushThresholdBytes) { + return daoSize; + } + FlushingState flushingState = flushingDaoState.flushingState(); + if (flushingState == FlushingState.RUNNING) { + throw new MemoryOverflowException("There no free space." + + "daoSize is max, previous flush running and not completed"); + } else if (flushingState == FlushingState.FAILED) { + throw new MemoryOverflowException("There no free space." + + "daoSize is max, previous flush was failed. Try to repeat flush"); + } + return daoSize; + } + + @Override + public List> getIterators(D from, D to) { + ConcurrentNavigableMap dao; + ConcurrentNavigableMap flushingDao; + + stateLock.readLock().lock(); + try { + dao = daoState.dao; + flushingDao = flushingDaoState.dao; + } finally { + stateLock.readLock().unlock(); + } + + List> result = new ArrayList<>(2); + result.add(getIteratorDao(dao, from, to)); + result.add(getIteratorDao(flushingDao, from, to)); + return result; + } + + private Iterator getIteratorDao(ConcurrentNavigableMap dao, D from, D to) { + ConcurrentNavigableMap subMap; + if (from == null && to == null) { + subMap = dao; + } else if (from == null) { + subMap = dao.headMap(to); + } else if (to == null) { + subMap = dao.tailMap(from); + } else { + subMap = dao.subMap(from, to); + } + return subMap.values().iterator(); + } + + /** + * Возвращает таску для flush(). Таска возвращает новый таймстемп файлов в base path. + * Если уже выполняется flush, возвращает null. + * Если предыдущий flush() был завершен с ошибкой, возвратит таску на повторную попытку. + * Если нет данных, возвращает null. + */ + @Override + public Callable prepareFlush(Path basePath, String dbFilenamePrefix, String offsetsFilenamePrefix) { + FlushingDaoState newFlushingDaoState; + + stateLock.writeLock().lock(); + try { + switch (flushingDaoState.flushingState) { + case RUNNING -> { + return null; + } + case FAILED -> { + newFlushingDaoState = flushingDaoState.failedToTryAgain(); + flushingDaoState = newFlushingDaoState; + } + case NOT_RUNNING -> { + DaoState newDaoState = createEmptyDaoState(); + newFlushingDaoState = daoState.toFlushingRunningDaoState(); + + flushingDaoState = newFlushingDaoState; + daoState = newDaoState; + } + default -> throw new IllegalStateException("Unexpected state: " + flushingDaoState.flushingState); + } + } finally { + stateLock.writeLock().unlock(); + } + + return () -> { + ConcurrentNavigableMap dao = newFlushingDaoState.dao; + int recordsCount = dao.size(); + long daoSize = newFlushingDaoState.daoSize; + TableInfo tableInfo = new TableInfo(recordsCount, daoSize); + + return flushImpl(dao.values().iterator(), tableInfo, basePath, dbFilenamePrefix, offsetsFilenamePrefix); + }; + } + + private String flushImpl(Iterator immutableCollectionIterator, TableInfo info, Path basePath, + String dbPrefix, String offsetsPrefix) throws IOException { + String timestamp = String.valueOf(System.currentTimeMillis()); + Path tempDirectory = Files.createTempDirectory(null); + Path newSSTable = null; + + logger.info(() -> String.format("Flushing started to dir %s, timestamp %s, info %s", + tempDirectory, timestamp, info)); + try { + Path tmpSSTable = tempDirectory.resolve(dbPrefix + timestamp); + Path tmpOffsets = tempDirectory.resolve(offsetsPrefix + timestamp); + + StorageUtility.writeData(tmpSSTable, tmpOffsets, immutableCollectionIterator, info, extractor); + + newSSTable = Files.move(tmpSSTable, basePath.resolve(dbPrefix + timestamp), MOVE_OPTIONS); + Files.move(tmpOffsets, basePath.resolve(offsetsPrefix + timestamp), MOVE_OPTIONS); + } catch (Exception e) { + // newOffsets чистить не надо. Это последняя операция, если исключение то он точно не перемещен. + if (newSSTable != null) { + StorageUtility.deleteUnusedFiles(logger, newSSTable); + } + throw e; + } finally { + StorageUtility.deleteUnusedFilesInDirectory(logger, tempDirectory); + } + logger.info(() -> String.format("Flushed to dir %s, timestamp %s", basePath, timestamp)); + return timestamp; + } + + @Override + public void failFlush() { + stateLock.writeLock().lock(); + try { + // Помечаем как failed, но не чистим мапу и не теряем данные + flushingDaoState = flushingDaoState.toFailed(); + } finally { + stateLock.writeLock().unlock(); + } + } + + @Override + public void completeFlush() { + stateLock.writeLock().lock(); + try { + flushingDaoState = createEmptyFlushingDaoState(); + } finally { + stateLock.writeLock().unlock(); + } + } + + @Override + public String close(Path basePath, String dbFilenamePrefix, String offsetsFilenamePrefix) throws IOException { + ConcurrentNavigableMap dao; + long size; + + stateLock.writeLock().lock(); + try { + dao = daoState.dao; + size = daoState.daoSize.get(); + daoState = null; + } finally { + stateLock.writeLock().unlock(); + } + + if (dao.isEmpty()) { + return null; + } + int recordsCount = dao.size(); + TableInfo tableInfo = new TableInfo(recordsCount, size); + return flushImpl(dao.values().iterator(), tableInfo, basePath, dbFilenamePrefix, offsetsFilenamePrefix); + } +} diff --git a/src/main/java/ru/vk/itmo/kovalchukvladislav/storage/SSTableStorage.java b/src/main/java/ru/vk/itmo/kovalchukvladislav/storage/SSTableStorage.java new file mode 100644 index 000000000..f3ceffcd5 --- /dev/null +++ b/src/main/java/ru/vk/itmo/kovalchukvladislav/storage/SSTableStorage.java @@ -0,0 +1,23 @@ +package ru.vk.itmo.kovalchukvladislav.storage; + +import ru.vk.itmo.Entry; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +public interface SSTableStorage> { + /** + * Добавляет новый SSTable из basePath в метадату и рефрешит SSTableStorage. + * Используется при flush(), рефреш полезно выключать при flush() внутри close(), когда больше не будет запросов. + */ + void addSSTableId(String id, boolean needRefresh) throws IOException; + + E get(D key); + + List> getIterators(D from, D to); + + void compact() throws IOException; + + void close(); +} diff --git a/src/main/java/ru/vk/itmo/kovalchukvladislav/storage/SSTableStorageImpl.java b/src/main/java/ru/vk/itmo/kovalchukvladislav/storage/SSTableStorageImpl.java new file mode 100644 index 000000000..806c650ee --- /dev/null +++ b/src/main/java/ru/vk/itmo/kovalchukvladislav/storage/SSTableStorageImpl.java @@ -0,0 +1,243 @@ +package ru.vk.itmo.kovalchukvladislav.storage; + +import ru.vk.itmo.Entry; +import ru.vk.itmo.kovalchukvladislav.model.DaoIterator; +import ru.vk.itmo.kovalchukvladislav.model.EntryExtractor; +import ru.vk.itmo.kovalchukvladislav.model.SimpleDaoLoggerUtility; +import ru.vk.itmo.kovalchukvladislav.model.StorageIterator; +import ru.vk.itmo.kovalchukvladislav.model.TableInfo; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.lang.foreign.Arena; +import java.lang.foreign.MemorySegment; +import java.nio.channels.FileChannel; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.logging.Logger; + +public class SSTableStorageImpl> implements SSTableStorage { + private static final StandardCopyOption[] MOVE_OPTIONS = new StandardCopyOption[]{ + StandardCopyOption.ATOMIC_MOVE, + StandardCopyOption.REPLACE_EXISTING + }; + private static final Logger logger = SimpleDaoLoggerUtility.createLogger(SSTableStorageImpl.class); + private final Path basePath; + private final String metadataFilename; + private final String dataPrefix; + private final String offsetsPrefix; + private final Arena arena = Arena.ofShared(); + private final EntryExtractor extractor; + private final Set filesToDelete = ConcurrentHashMap.newKeySet(); + private volatile State state; + + public SSTableStorageImpl(Path basePath, + String metadataFilename, + String dbFilenamePrefix, + String offsetsPrefix, + EntryExtractor extractor) throws IOException { + this.basePath = basePath; + this.metadataFilename = metadataFilename; + this.dataPrefix = dbFilenamePrefix; + this.offsetsPrefix = offsetsPrefix; + this.extractor = extractor; + if (!Files.exists(basePath)) { + Files.createDirectory(basePath); + } + this.state = reloadSSTableIds(); + } + + @SuppressWarnings("unused") + // Компилятор ругается на unused переменные внутри record, хотя они очень даже used + private record State(List ssTableIds, List data, List offsets) { + public int getCount() { + return ssTableIds.size(); + } + } + + @Override + public E get(D key) { + State currentState = state; + for (int i = currentState.getCount() - 1; i >= 0; i--) { + MemorySegment storage = currentState.data.get(i); + MemorySegment offsets = currentState.offsets.get(i); + + long offset = extractor.findLowerBoundValueOffset(key, storage, offsets); + if (offset == -1) { + continue; + } + D lowerBoundKey = extractor.readValue(storage, offset); + + if (extractor.compare(lowerBoundKey, key) == 0) { + long valueOffset = offset + extractor.size(lowerBoundKey); + D value = extractor.readValue(storage, valueOffset); + return extractor.createEntry(lowerBoundKey, value); + } + } + return null; + } + + @Override + public List> getIterators(D from, D to) { + State currentState = state; + List> iterators = new ArrayList<>(currentState.getCount()); + for (int i = currentState.getCount() - 1; i >= 0; i--) { + MemorySegment storage = currentState.data.get(i); + MemorySegment offsets = currentState.offsets.get(i); + iterators.add(new StorageIterator<>(from, to, storage, offsets, extractor)); + } + return iterators; + } + + // State может поменяться только в addSSTableId (используется при flush() для обновления состояния) и compact(). + // Оба метода никогда не вызываются одновременно из AbstractBasedOnSSTableDao, синхронизация не нужна. + @Override + @SuppressWarnings("unused") + // Компилятор ругается на unused ignoredPath, хотя в названии переменной есть unused + public void addSSTableId(String id, boolean needRefresh) throws IOException { + Path ignoredPath = addSSTableId(basePath, id); + if (needRefresh) { + state = reloadSSTableIds(); + } + } + + @Override + public void compact() throws IOException { + List ssTableIds = state.ssTableIds; + if (ssTableIds.size() <= 1) { + logger.info("SSTables <= 1, not compacting: " + ssTableIds); + return; + } + + compactAndChangeMetadata(); + state = reloadSSTableIds(); + filesToDelete.addAll(convertSSTableIdsToPath(ssTableIds)); + } + + @Override + public void close() { + if (arena.scope().isAlive()) { + arena.close(); + StorageUtility.deleteUnusedFiles(logger, filesToDelete.toArray(Path[]::new)); + } + } + + private State reloadSSTableIds() throws IOException { + List ssTableIds = readSSTableIds(); + logger.info(() -> String.format("Reloading files from %s", basePath)); + List newDbMappedSegments = new ArrayList<>(ssTableIds.size()); + List newOffsetMappedSegments = new ArrayList<>(ssTableIds.size()); + + for (String ssTableId : ssTableIds) { + readFileAndMapToSegment(newDbMappedSegments, newOffsetMappedSegments, ssTableId); + } + logger.info(() -> String.format("Reloaded %d files", ssTableIds.size())); + + return new State(ssTableIds, newDbMappedSegments, newOffsetMappedSegments); + } + + private List readSSTableIds() throws IOException { + Path metadataPath = basePath.resolve(metadataFilename); + if (!Files.exists(metadataPath)) { + return Collections.emptyList(); + } + return Files.readAllLines(metadataPath, StandardCharsets.UTF_8); + } + + private void readFileAndMapToSegment(List dbMappedResult, + List offsetMappedResult, + String timestamp) throws IOException { + Path dbPath = basePath.resolve(dataPrefix + timestamp); + Path offsetsPath = basePath.resolve(offsetsPrefix + timestamp); + if (!Files.exists(dbPath) || !Files.exists(offsetsPath)) { + throw new FileNotFoundException("File under path " + dbPath + " or " + offsetsPath + " doesn't exists"); + } + + logger.info(() -> String.format("Reading files with timestamp %s", timestamp)); + + try (FileChannel dbChannel = FileChannel.open(dbPath, StandardOpenOption.READ); + FileChannel offsetChannel = FileChannel.open(offsetsPath, StandardOpenOption.READ)) { + + MemorySegment db = dbChannel.map(FileChannel.MapMode.READ_ONLY, 0, Files.size(dbPath), arena); + MemorySegment offsets = + offsetChannel.map(FileChannel.MapMode.READ_ONLY, 0, Files.size(offsetsPath), arena); + dbMappedResult.add(db); + offsetMappedResult.add(offsets); + } + logger.info(() -> String.format("Successfully read files with %s timestamp", timestamp)); + } + + private Path addSSTableId(Path path, String id) throws IOException { + return Files.writeString(path.resolve(metadataFilename), id + System.lineSeparator(), + StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE); + } + + private void compactAndChangeMetadata() throws IOException { + Path tempDirectory = Files.createTempDirectory(null); + String timestamp = String.valueOf(System.currentTimeMillis()); + + Path newSSTable = null; + Path newOffsetsTable = null; + Path tmpSSTable = tempDirectory.resolve(dataPrefix + timestamp); + Path tmpOffsetsTable = tempDirectory.resolve(offsetsPrefix + timestamp); + + try { + Iterator iterator = getIterator(); + TableInfo info = calculateStorageTableInfo(); + logger.info(() -> String.format("Compacting started to dir %s, timestamp %s, info %s", + tempDirectory, timestamp, info)); + + StorageUtility.writeData(tmpSSTable, tmpOffsetsTable, iterator, info, extractor); + Path tmpMetadata = addSSTableId(tempDirectory, timestamp); + Path newMetadata = basePath.resolve(metadataFilename); + + newSSTable = Files.move(tmpSSTable, basePath.resolve(dataPrefix + timestamp), MOVE_OPTIONS); + newOffsetsTable = Files.move(tmpOffsetsTable, basePath.resolve(offsetsPrefix + timestamp), + MOVE_OPTIONS); + Files.move(tmpMetadata, newMetadata, MOVE_OPTIONS); + } catch (Exception e) { + if (newOffsetsTable != null) { + StorageUtility.deleteUnusedFiles(logger, newSSTable, newOffsetsTable); + } else if (newSSTable != null) { + StorageUtility.deleteUnusedFiles(logger, newSSTable); + } + throw e; + } finally { + StorageUtility.deleteUnusedFiles(logger, tempDirectory); + } + logger.info(() -> String.format("Compacted to dir %s, timestamp %s", basePath, timestamp)); + } + + private TableInfo calculateStorageTableInfo() { + Iterator iterator = getIterator(); + long size = 0; + int count = 0; + while (iterator.hasNext()) { + count++; + size += extractor.size(iterator.next()); + } + return new TableInfo(count, size); + } + + private Iterator getIterator() { + return new DaoIterator<>(getIterators(null, null), extractor); + } + + private List convertSSTableIdsToPath(List ssTableIds) { + List result = new ArrayList<>(ssTableIds.size() * 2); + for (String ssTableId : ssTableIds) { + result.add(basePath.resolve(dataPrefix + ssTableId)); + result.add(basePath.resolve(offsetsPrefix + ssTableId)); + } + return result; + } +} diff --git a/src/main/java/ru/vk/itmo/kovalchukvladislav/storage/StorageUtility.java b/src/main/java/ru/vk/itmo/kovalchukvladislav/storage/StorageUtility.java new file mode 100644 index 000000000..62bb7ba74 --- /dev/null +++ b/src/main/java/ru/vk/itmo/kovalchukvladislav/storage/StorageUtility.java @@ -0,0 +1,84 @@ +package ru.vk.itmo.kovalchukvladislav.storage; + +import ru.vk.itmo.Entry; +import ru.vk.itmo.kovalchukvladislav.model.EntryExtractor; +import ru.vk.itmo.kovalchukvladislav.model.TableInfo; + +import java.io.IOException; +import java.lang.foreign.Arena; +import java.lang.foreign.MemorySegment; +import java.lang.foreign.ValueLayout; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.OpenOption; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.Comparator; +import java.util.Iterator; +import java.util.logging.Logger; +import java.util.stream.Stream; + +public final class StorageUtility { + private static final OpenOption[] WRITE_OPTIONS = new OpenOption[] { + StandardOpenOption.READ, + StandardOpenOption.WRITE, + StandardOpenOption.TRUNCATE_EXISTING, + StandardOpenOption.CREATE + }; + private static final int OFFSET_SIZE = Long.BYTES; + + private StorageUtility() { + } + + // Удаление ненужных файлов не является чем то критически важным + // Если произойдет исключение, лучше словить и вывести в лог, чем останавливать работу + public static void deleteUnusedFiles(Logger logger, Path... files) { + for (Path file : files) { + try { + boolean deleted = Files.deleteIfExists(file); + if (deleted) { + logger.info(() -> String.format("File %s was deleted", file)); + } else { + logger.severe(() -> String.format("File %s not deleted", file)); + } + } catch (IOException e) { + logger.severe(() -> String.format("Error while deleting file %s: %s", file, e.getMessage())); + } + } + } + + public static void deleteUnusedFilesInDirectory(Logger logger, Path directory) { + try (Stream files = Files.walk(directory)) { + Path[] array = files.sorted(Comparator.reverseOrder()).toArray(Path[]::new); + deleteUnusedFiles(logger, array); + } catch (Exception e) { + logger.severe(() -> String.format("Error while deleting directory %s: %s", directory, e.getMessage())); + } + } + + public static > void writeData(Path dbPath, Path offsetsPath, + Iterator daoIterator, TableInfo info, + EntryExtractor extractor) throws IOException { + + try (FileChannel db = FileChannel.open(dbPath, WRITE_OPTIONS); + FileChannel offsets = FileChannel.open(offsetsPath, WRITE_OPTIONS); + Arena arena = Arena.ofConfined()) { + + long offsetsSize = info.recordsCount() * OFFSET_SIZE; + MemorySegment fileSegment = db.map(FileChannel.MapMode.READ_WRITE, 0, info.recordsSize(), arena); + MemorySegment offsetsSegment = offsets.map(FileChannel.MapMode.READ_WRITE, 0, offsetsSize, arena); + + int i = 0; + long offset = 0; + while (daoIterator.hasNext()) { + E entry = daoIterator.next(); + offsetsSegment.setAtIndex(ValueLayout.JAVA_LONG_UNALIGNED, i, offset); + offset = extractor.writeEntry(entry, fileSegment, offset); + i += 1; + } + + fileSegment.load(); + offsetsSegment.load(); + } + } +} diff --git a/src/main/java/ru/vk/itmo/test/kovalchukvladislav/MemorySegmentDaoFactory.java b/src/main/java/ru/vk/itmo/test/kovalchukvladislav/MemorySegmentDaoFactory.java index a0d08eb31..7867c9fd4 100644 --- a/src/main/java/ru/vk/itmo/test/kovalchukvladislav/MemorySegmentDaoFactory.java +++ b/src/main/java/ru/vk/itmo/test/kovalchukvladislav/MemorySegmentDaoFactory.java @@ -12,7 +12,7 @@ import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; -@DaoFactory(stage = 4) +@DaoFactory(stage = 5) public class MemorySegmentDaoFactory implements DaoFactory.Factory> { private static final Charset CHARSET = StandardCharsets.UTF_8; private static final ValueLayout.OfByte VALUE_LAYOUT = ValueLayout.JAVA_BYTE;