Skip to content
This repository has been archived by the owner on Mar 3, 2024. It is now read-only.

Commit

Permalink
Солнышко Ксения, ИТМО ФИТиП, M3332, Expiration (#300)
Browse files Browse the repository at this point in the history
Co-authored-by: Vadim Tsesko <incubos@users.noreply.github.com>
Co-authored-by: Alexey Shik <58121508+AlexeyShik@users.noreply.github.com>
  • Loading branch information
3 people authored Jan 9, 2024
1 parent 94e5d9b commit 913870a
Show file tree
Hide file tree
Showing 8 changed files with 795 additions and 125 deletions.
11 changes: 11 additions & 0 deletions src/main/java/ru/vk/itmo/solnyshkoksenia/DaoException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package ru.vk.itmo.solnyshkoksenia;

public class DaoException extends RuntimeException {
public DaoException(String message) {
super(message);
}

public DaoException(String message, Throwable cause) {
super(message, cause);
}
}
172 changes: 131 additions & 41 deletions src/main/java/ru/vk/itmo/solnyshkoksenia/DaoImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,92 +10,182 @@
import java.lang.foreign.MemorySegment;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.NavigableMap;
import java.util.List;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class DaoImpl implements Dao<MemorySegment, Entry<MemorySegment>> {
private static final Comparator<MemorySegment> comparator = new MemorySegmentComparator();
private final NavigableMap<MemorySegment, Entry<MemorySegment>> storage = new ConcurrentSkipListMap<>(comparator);
private Arena arena;
private DiskStorage diskStorage;

public DaoImpl() {
// Empty constructor
}
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
private final Arena arena;
private final Path path;
private volatile State curState;

public DaoImpl(Config config) throws IOException {
Path path = config.basePath().resolve("data");
path = config.basePath().resolve("data");
Files.createDirectories(path);

arena = Arena.ofShared();

this.diskStorage = new DiskStorage(DiskStorage.loadOrRecover(path, arena), path);
this.curState = new State(config, new DiskStorage(DiskStorage.loadOrRecover(path, arena), path));
}

@Override
public Iterator<Entry<MemorySegment>> get(MemorySegment from, MemorySegment to) {
return diskStorage.range(getInMemory(from, to), from, to);
State state = this.curState.checkAndGet();
List<Iterator<EntryExtended<MemorySegment>>> iterators = List.of(
state.getInMemory(state.flushingStorage, from, to),
state.getInMemory(state.storage, from, to)
);

Iterator<EntryExtended<MemorySegment>> iterator = new MergeIterator<>(iterators,
(e1, e2) -> comparator.compare(e1.key(), e2.key()));
Iterator<EntryExtended<MemorySegment>> innerIterator = state.diskStorage.range(iterator, from, to);

return new Iterator<>() {
@Override
public boolean hasNext() {
return innerIterator.hasNext();
}

@Override
public Entry<MemorySegment> next() {
return innerIterator.next();
}
};
}

private Iterator<Entry<MemorySegment>> getInMemory(MemorySegment from, MemorySegment to) {
if (from == null && to == null) {
return storage.values().iterator();
}
if (from == null) {
return storage.headMap(to).values().iterator();
public void upsert(Entry<MemorySegment> entry, Long ttl) {
State state = this.curState.checkAndGet();

lock.readLock().lock();
try {
state.putInMemory(entry, ttl);
} finally {
lock.readLock().unlock();
}
if (to == null) {
return storage.tailMap(from).values().iterator();

if (state.isOverflowed()) {
try {
autoFlush();
} catch (IOException e) {
throw new DaoException("Memory storage overflowed. Cannot flush", e);
}
}
return storage.subMap(from, to).values().iterator();
}

@Override
public void upsert(Entry<MemorySegment> entry) {
storage.put(entry.key(), entry);
upsert(entry, null);
}

@Override
public Entry<MemorySegment> get(MemorySegment key) {
Entry<MemorySegment> entry = storage.get(key);
if (entry != null) {
if (entry.value() == null) {
return null;
State state = this.curState.checkAndGet();
return state.get(key, comparator);
}

@Override
public void flush() throws IOException {
State state = this.curState.checkAndGet();
if (state.storage.isEmpty() || state.isFlushing()) {
return;
}
autoFlush();
}

private void autoFlush() throws IOException {
State state = this.curState.checkAndGet();
lock.writeLock().lock();
try {
if (state.isFlushing()) {
if (state.isOverflowed()) {
throw new IOException();
} else {
return;
}
}
return entry;
this.curState = state.moveStorage();
} finally {
lock.writeLock().unlock();
}

Iterator<Entry<MemorySegment>> iterator = diskStorage.range(Collections.emptyIterator(), key, null);
executor.execute(this::tryFlush);
}

if (!iterator.hasNext()) {
return null;
private void tryFlush() {
State state = this.curState.checkAndGet();
try {
state.flush();
} catch (IOException e) {
throw new DaoException("Flush failed", e);
}
Entry<MemorySegment> next = iterator.next();
if (comparator.compare(next.key(), key) == 0) {
return next;

lock.writeLock().lock();
try {
this.curState = new State(state.config, state.storage, new ConcurrentSkipListMap<>(comparator),
new DiskStorage(DiskStorage.loadOrRecover(path, arena), path));
} catch (IOException e) {
throw new DaoException("Cannot recover storage on disk", e);
} finally {
lock.writeLock().unlock();
}
return null;
}

@Override
public void compact() throws IOException {
diskStorage.compact(storage.values());
storage.clear();
try {
executor.submit(this::tryCompact).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new DaoException("Compaction failed. Thread interrupted", e);
} catch (ExecutionException e) {
throw new DaoException("Compaction failed", e);
}
}

private Object tryCompact() {
State state = this.curState.checkAndGet();
try {
state.diskStorage.compact();
} catch (IOException e) {
throw new DaoException("Cannot compact", e);
}

lock.writeLock().lock();
try {
this.curState = new State(state.config, state.storage, state.flushingStorage,
new DiskStorage(DiskStorage.loadOrRecover(path, arena), path));
} catch (IOException e) {
throw new DaoException("Cannot recover storage on disk after compaction", e);
} finally {
lock.writeLock().unlock();
}

return null;
}

@Override
public void close() throws IOException {
if (!arena.scope().isAlive()) {
public synchronized void close() throws IOException {
State state = this.curState;
if (state.isClosed() || !arena.scope().isAlive()) {
return;
}

if (!state.storage.isEmpty()) {
state.save();
}

executor.close();
arena.close();

if (!storage.isEmpty()) {
diskStorage.save(storage.values());
}
this.curState = state.close();
}
}
10 changes: 10 additions & 0 deletions src/main/java/ru/vk/itmo/solnyshkoksenia/EntryExtended.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package ru.vk.itmo.solnyshkoksenia;

import ru.vk.itmo.Entry;

public record EntryExtended<Data>(Data key, Data value, Data expiration) implements Entry<Data> {
@Override
public String toString() {
return "{" + key + ":" + value + ":" + expiration + "}";
}
}
148 changes: 148 additions & 0 deletions src/main/java/ru/vk/itmo/solnyshkoksenia/State.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package ru.vk.itmo.solnyshkoksenia;

import ru.vk.itmo.Config;
import ru.vk.itmo.Entry;
import ru.vk.itmo.solnyshkoksenia.storage.DiskStorage;

import java.io.IOException;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.NavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

public class State {
private static final Comparator<MemorySegment> comparator = new MemorySegmentComparator();
protected final Config config;
protected final NavigableMap<MemorySegment, EntryExtended<MemorySegment>> storage;
protected final NavigableMap<MemorySegment, EntryExtended<MemorySegment>> flushingStorage;
protected final DiskStorage diskStorage;
private final AtomicLong storageByteSize = new AtomicLong();
private final AtomicBoolean isClosed = new AtomicBoolean();
private final AtomicBoolean overflow = new AtomicBoolean();

public State(Config config,
NavigableMap<MemorySegment, EntryExtended<MemorySegment>> storage,
NavigableMap<MemorySegment, EntryExtended<MemorySegment>> flushingStorage,
DiskStorage diskStorage) {
this.config = config;
this.storage = storage;
this.flushingStorage = flushingStorage;
this.diskStorage = diskStorage;
}

public State(Config config,
DiskStorage diskStorage) {
this.config = config;
this.storage = new ConcurrentSkipListMap<>(comparator);
this.flushingStorage = new ConcurrentSkipListMap<>(comparator);
this.diskStorage = diskStorage;
}

public void putInMemory(Entry<MemorySegment> entry, Long ttl) {
MemorySegment expiration = null;
if (ttl != null) {
long[] ar = {System.currentTimeMillis() + ttl};
expiration = MemorySegment.ofArray(ar);
}
EntryExtended<MemorySegment> entryExtended = new EntryExtended<>(entry.key(), entry.value(), expiration);
EntryExtended<MemorySegment> previousEntry = storage.put(entryExtended.key(), entryExtended);

if (previousEntry != null) {
storageByteSize.addAndGet(-getSize(previousEntry));
}

if (storageByteSize.addAndGet(getSize(entryExtended)) > config.flushThresholdBytes()) {
overflow.set(true);
}
}

public void save() throws IOException {
diskStorage.save(storage.values());
}

private static long getSize(EntryExtended<MemorySegment> entry) {
long valueSize = entry.value() == null ? 0 : entry.value().byteSize();
long expirationSize = entry.expiration() == null ? 0 : entry.expiration().byteSize();
return Long.BYTES + entry.key().byteSize() + Long.BYTES + valueSize + Long.BYTES + expirationSize;
}

public State checkAndGet() {
if (isClosed.get()) {
throw new DaoException("Dao is already closed");
}
return this;
}

public boolean isClosed() {
return isClosed.get();
}

public boolean isOverflowed() {
return overflow.get();
}

public boolean isFlushing() {
return !flushingStorage.isEmpty();
}

public State moveStorage() {
return new State(config, new ConcurrentSkipListMap<>(comparator), storage, diskStorage);
}

public void flush() throws IOException {
diskStorage.save(flushingStorage.values());
}

public State close() {
isClosed.set(true);
return this;
}

public Entry<MemorySegment> get(MemorySegment key, Comparator<MemorySegment> comparator) {
EntryExtended<MemorySegment> entry = storage.get(key);
if (isValidEntry(entry)) {
return entry.value() == null ? null : entry;
}

entry = flushingStorage.get(key);
if (isValidEntry(entry)) {
return entry.value() == null ? null : entry;
}

Iterator<EntryExtended<MemorySegment>> iterator = diskStorage.range(Collections.emptyIterator(), key, null);

if (!iterator.hasNext()) {
return null;
}
EntryExtended<MemorySegment> next = iterator.next();
if (comparator.compare(next.key(), key) == 0 && isValidEntry(next)) {
return next;
}
return null;
}

private boolean isValidEntry(EntryExtended<MemorySegment> entry) {
return entry != null && (entry.expiration() == null
|| entry.expiration().toArray(ValueLayout.JAVA_LONG_UNALIGNED)[0] > System.currentTimeMillis());
}

protected Iterator<EntryExtended<MemorySegment>> getInMemory(
NavigableMap<MemorySegment, EntryExtended<MemorySegment>> memory,
MemorySegment from, MemorySegment to) {
if (from == null && to == null) {
return memory.values().iterator();
}
if (from == null) {
return memory.headMap(to).values().iterator();
}
if (to == null) {
return memory.tailMap(from).values().iterator();
}
return memory.subMap(from, to).values().iterator();
}
}
Loading

0 comments on commit 913870a

Please sign in to comment.