Skip to content
This repository has been archived by the owner on Mar 3, 2024. It is now read-only.

Тверитин Александр, Магистратура ИТМО "Распределенные веб-сервисы", lamport TimeStamp #306

Closed
wants to merge 46 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
af7bc78
Implemented classes DaoFactoryImpl and InMemoryDaoImpl
Sep 26, 2023
ec92a5a
Corrections of errors in the code from CodeClimate report.
Sep 26, 2023
df92227
Merge branch 'main' into main
incubos Sep 27, 2023
d0c49a6
New comparator for MemorySegment that uses MemorySegment.mismatch().
Sep 27, 2023
5080ddb
Merge remote-tracking branch 'origin/main'
Sep 27, 2023
00d0160
New condition for comparator impl.
Sep 27, 2023
3a5bfb7
Clean code.
Sep 27, 2023
5730996
Clean code.
Sep 27, 2023
5e0ca80
Update comparator.
Sep 27, 2023
19ba0b4
Merge branch 'main' into main
atimofeyev Sep 28, 2023
579489f
Merge branch 'main' into main
ALTV2 Oct 4, 2023
4c850ef
-
Oct 4, 2023
d5d88dd
-
Oct 4, 2023
642ec37
-
Oct 4, 2023
b7d6160
-
Oct 4, 2023
a7c1682
fix codeClimate problems
Oct 4, 2023
6499d2e
-
Oct 4, 2023
1d177e1
Merge branch 'main' into main
incubos Oct 5, 2023
ae90173
Merge branch 'main' into main
atimofeyev Oct 8, 2023
485fe1e
Merge branch 'main' into main
atimofeyev Oct 8, 2023
7aef758
Merge remote-tracking branch 'upstream/main'
Oct 29, 2023
9de7bda
Merge remote-tracking branch 'upstream/main' into step-4-compact
Oct 29, 2023
9c1e8ea
Step-3 correct solution
Oct 30, 2023
5c39a3f
Step-4. Bad, but works
Oct 30, 2023
5c902d5
Merge branch 'polis-vk:main' into main
ALTV2 Oct 30, 2023
4a13336
Step-4. Bad, but butter
Oct 31, 2023
ff87f5a
Merge branch 'step-4-compact'
Oct 31, 2023
52ecf81
Step-4. Minor
Oct 31, 2023
b3dae75
Step-4. Correct Impl
Nov 1, 2023
a356e7e
Step-4. Correct Impl
Nov 1, 2023
a8fb166
Merge branch 'step-4-compact'
Nov 1, 2023
d264832
Step-4. CodeClimate remarks corrections. 1
Nov 1, 2023
b3b3277
Step-4. CodeClimate remarks corrections. 2
Nov 1, 2023
f86eb08
Step-4. CodeClimate remarks corrections. 3
Nov 1, 2023
ede6eb9
Merge remote-tracking branch 'upstream/main' into step-4-compact
Nov 1, 2023
c8874be
Step-4. CodeClimate remarks corrections. 4
Nov 1, 2023
292e012
Step-4. CodeClimate remarks corrections. 5
Nov 1, 2023
f50bdf1
Step-4. Add comments.
Nov 1, 2023
587015f
Step-4. Correct comment.
Nov 1, 2023
28d31f1
Step-4. Correct comment.
Nov 1, 2023
34708d3
Merge branch 'main' into step-4-compact
incubos Nov 2, 2023
66267ee
Merge remote-tracking branch 'origin/step-4-compact'
Nov 19, 2023
d28a7e2
Merge remote-tracking branch 'upstream/main'
Nov 19, 2023
046a438
Merge remote-tracking branch 'upstream/main' into extra-session-Task
Feb 1, 2024
2dfc10b
with get by Key.
Feb 1, 2024
ee91d43
MI doesnt work
Feb 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/main/java/ru/vk/itmo/BaseEntry.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ public record BaseEntry<Data>(Data key, Data value) implements Entry<Data> {
public String toString() {
return "{" + key + ":" + value + "}";
}

}
19 changes: 19 additions & 0 deletions src/main/java/ru/vk/itmo/reference/LamportTimeStamp.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package ru.vk.itmo.reference;

import java.util.concurrent.atomic.AtomicLong;

public class LamportTimeStamp {
private final AtomicLong currentTimeStamp = new AtomicLong(0);

public long getCurrentTimeStamp() {
return currentTimeStamp.get();
}

public void setCurrentTimeStamp(long initialTimeStamp) {
currentTimeStamp.set(initialTimeStamp);
}

public long incrementAndGet() {
return currentTimeStamp.incrementAndGet();
}
}
14 changes: 7 additions & 7 deletions src/main/java/ru/vk/itmo/reference/LiveFilteringIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,18 @@
*
* @author incubos
*/
final class LiveFilteringIterator implements Iterator<Entry<MemorySegment>> {
private final Iterator<Entry<MemorySegment>> delegate;
private Entry<MemorySegment> next;
final class LiveFilteringIterator implements Iterator<TimeStampEntry> {
private final Iterator<TimeStampEntry> delegate;
private TimeStampEntry next;

LiveFilteringIterator(final Iterator<Entry<MemorySegment>> delegate) {
LiveFilteringIterator(final Iterator<TimeStampEntry> delegate) {
this.delegate = delegate;
skipTombstones();
}

private void skipTombstones() {
while (delegate.hasNext()) {
final Entry<MemorySegment> entry = delegate.next();
final TimeStampEntry entry = delegate.next();
if (entry.value() != null) {
this.next = entry;
break;
Expand All @@ -36,13 +36,13 @@ public boolean hasNext() {
}

@Override
public Entry<MemorySegment> next() {
public TimeStampEntry next() {
if (!hasNext()) {
throw new NoSuchElementException();
}

// Consume
final Entry<MemorySegment> result = next;
final TimeStampEntry result = next;
next = null;

skipTombstones();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package ru.vk.itmo.reference;

import ru.vk.itmo.Entry;

import java.lang.foreign.MemorySegment;
import java.util.Iterator;

public class LiveFilteringIteratorAdapter implements Iterator<Entry<MemorySegment>> {
private final LiveFilteringIterator liveFilteringIterator;

public LiveFilteringIteratorAdapter(LiveFilteringIterator liveFilteringIterator) {
this.liveFilteringIterator = liveFilteringIterator;
}

@Override
public boolean hasNext() {
return liveFilteringIterator.hasNext();
}

@Override
public Entry<MemorySegment> next() {
return liveFilteringIterator.next().getClearEntry();
}
}
15 changes: 11 additions & 4 deletions src/main/java/ru/vk/itmo/reference/MemTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@
* @author incubos
*/
final class MemTable {
private final NavigableMap<MemorySegment, Entry<MemorySegment>> map =
private final NavigableMap<MemorySegment, TimeStampEntry> map =
new ConcurrentSkipListMap<>(
MemorySegmentComparator.INSTANCE);

boolean isEmpty() {
return map.isEmpty();
}

Iterator<Entry<MemorySegment>> get(
Iterator<TimeStampEntry> get(
final MemorySegment from,
final MemorySegment to) {
if (from == null && to == null) {
Expand All @@ -39,11 +39,18 @@ Iterator<Entry<MemorySegment>> get(
}
}

/// Тут мы пока просто игнорим таймсетмп
Entry<MemorySegment> get(final MemorySegment key) {
return map.get(key);
TimeStampEntry timeStampEntry = map.get(key);

if (timeStampEntry != null) {
return timeStampEntry.getClearEntry();
}

return null;
}

Entry<MemorySegment> upsert(final Entry<MemorySegment> entry) {
return map.put(entry.key(), entry);
return map.put(entry.key(), new TimeStampEntry(entry));
}
}
72 changes: 32 additions & 40 deletions src/main/java/ru/vk/itmo/reference/MergingEntryIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,20 @@
import ru.vk.itmo.Entry;

import java.lang.foreign.MemorySegment;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.*;

/**
* Merges entry {@link Iterator}s.
*
* @author incubos
*/
final class MergingEntryIterator implements Iterator<Entry<MemorySegment>> {
private final Queue<WeightedPeekingEntryIterator> iterators;
final class MergingEntryIterator implements Iterator<TimeStampEntry> {
private final List<WeightedPeekingEntryIterator> iterators;

MergingEntryIterator(final List<WeightedPeekingEntryIterator> iterators) {
assert iterators.stream().allMatch(WeightedPeekingEntryIterator::hasNext);

this.iterators = new PriorityQueue<>(iterators);
this.iterators = new ArrayList<>(iterators);
}

@Override
Expand All @@ -29,44 +25,40 @@ public boolean hasNext() {
}

@Override
public Entry<MemorySegment> next() {
if (!hasNext()) {
throw new NoSuchElementException();
}

final WeightedPeekingEntryIterator top = iterators.remove();
final Entry<MemorySegment> result = top.next();
public TimeStampEntry next() {
TimeStampEntry nextElement = null;

if (top.hasNext()) {
// Not exhausted
iterators.add(top);
}
iterators.removeIf(currentIterator -> !currentIterator.hasNext());

// Remove older versions of the key
while (true) {
final WeightedPeekingEntryIterator iterator = iterators.peek();
if (iterator == null) {
// Nothing left
break;
}
int numberOfIterator = 0;
int counter = 0;

// Skip entries with the same key
final Entry<MemorySegment> entry = iterator.peek();
if (MemorySegmentComparator.INSTANCE.compare(result.key(), entry.key()) != 0) {
// Reached another key
break;
for(WeightedPeekingEntryIterator iterator: iterators) {
if (nextElement == null) {
nextElement = iterator.peek();
} else {
TimeStampEntry nextElementCandidate = iterator.peek();
if (MemorySegmentComparator.INSTANCE.compare(nextElement.key(), nextElementCandidate.key()) > 0) {
nextElement = nextElementCandidate;
numberOfIterator = counter;
} else if (MemorySegmentComparator.INSTANCE.compare(nextElement.key(), nextElementCandidate.key()) == 0) {
if(nextElementCandidate.timeStamp() > nextElement.timeStamp()) {
nextElement = nextElementCandidate;
iterators.get(numberOfIterator).next();
numberOfIterator = counter;
} else {
iterators.get(counter).next();
}
}
}
counter += 1;
}

// Drop
iterators.remove();
// Skip
iterator.next();
if (iterator.hasNext()) {
// Not exhausted
iterators.add(iterator);
}
nextElement = iterators.get(numberOfIterator).next();
if (!iterators.get(numberOfIterator).hasNext()) {
iterators.remove(numberOfIterator);
}

return result;
return nextElement;
}
}
3 changes: 2 additions & 1 deletion src/main/java/ru/vk/itmo/reference/ReferenceDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,11 @@ public ReferenceDao(final Config config) throws IOException {
public Iterator<Entry<MemorySegment>> get(
final MemorySegment from,
final MemorySegment to) {
return new LiveFilteringIterator(
LiveFilteringIterator liveFilteringIterator = new LiveFilteringIterator(
tableSet.get(
from,
to));
return new LiveFilteringIteratorAdapter(liveFilteringIterator);
}

@Override
Expand Down
35 changes: 24 additions & 11 deletions src/main/java/ru/vk/itmo/reference/SSTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,13 @@ private long getLength(final long offset) {
offset);
}

Iterator<Entry<MemorySegment>> get(
private long getTimeStamp(final long offset) {
return data.get(
ValueLayout.OfLong.JAVA_LONG_UNALIGNED,
offset);
}

Iterator<TimeStampEntry> get(
final MemorySegment from,
final MemorySegment to) {
assert from == null || to == null || MemorySegmentComparator.INSTANCE.compare(from, to) <= 0;
Expand Down Expand Up @@ -134,29 +140,32 @@ Iterator<Entry<MemorySegment>> get(
return new SliceIterator(fromOffset, toOffset);
}

Entry<MemorySegment> get(final MemorySegment key) {
final long entry = entryBinarySearch(key);
if (entry < 0) {
TimeStampEntry get(final MemorySegment key) {
final long entryIndex = entryBinarySearch(key);
if (entryIndex < 0) {
return null;
}

// Skip key (will reuse the argument)
long offset = entryOffset(entry);
long offset = entryOffset(entryIndex);
offset += Long.BYTES + key.byteSize();
long timeStamp = getTimeStamp(offset);
offset += Long.BYTES;

// Extract value length
final long valueLength = getLength(offset);
if (valueLength == SSTables.TOMBSTONE_VALUE_LENGTH) {
// Tombstone encountered
return new BaseEntry<>(key, null);
return new TimeStampEntry(new BaseEntry<>(key, null), timeStamp);
} else {
// Get value
offset += Long.BYTES;
final MemorySegment value = data.asSlice(offset, valueLength);
return new BaseEntry<>(key, value);
return new TimeStampEntry(new BaseEntry<>(key, value), timeStamp);
}
}

private final class SliceIterator implements Iterator<Entry<MemorySegment>> {
private final class SliceIterator implements Iterator<TimeStampEntry> {
private long offset;
private final long toOffset;

Expand All @@ -173,7 +182,7 @@ public boolean hasNext() {
}

@Override
public Entry<MemorySegment> next() {
public TimeStampEntry next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
Expand All @@ -186,18 +195,22 @@ public Entry<MemorySegment> next() {
final MemorySegment key = data.asSlice(offset, keyLength);
offset += keyLength;

// Read timeStamp
final long timeStamp = getTimeStamp(offset);
offset += Long.BYTES;

// Read value length
final long valueLength = getLength(offset);
offset += Long.BYTES;

// Read value
if (valueLength == SSTables.TOMBSTONE_VALUE_LENGTH) {
// Tombstone encountered
return new BaseEntry<>(key, null);
return new TimeStampEntry(new BaseEntry<>(key, null), timeStamp);
} else {
final MemorySegment value = data.asSlice(offset, valueLength);
offset += valueLength;
return new BaseEntry<>(key, value);
return new TimeStampEntry(new BaseEntry<>(key, value), timeStamp);
}
}
}
Expand Down
15 changes: 12 additions & 3 deletions src/main/java/ru/vk/itmo/reference/SSTableWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ final class SSTableWriter {
void write(
final Path baseDir,
final int sequence,
final Iterator<Entry<MemorySegment>> entries) throws IOException {
final Iterator<TimeStampEntry> entries) throws IOException {
// Write to temporary files
final Path tempIndexName = SSTables.tempIndexName(baseDir, sequence);
final Path tempDataName = SSTables.tempDataName(baseDir, sequence);
Expand Down Expand Up @@ -71,11 +71,14 @@ void write(
writeLong(entryOffset, index);

// Then write the entry
final Entry<MemorySegment> entry = entries.next();
final TimeStampEntry entry = entries.next();
entryOffset += writeEntry(entry, data);
}
}

//index: entryOffset_1|entryOffset_2|entryOffset_3
//data: entryOffset_1|entryOffset_2|entryOffset_3

// Publish files atomically
// FIRST index, LAST data
final Path indexName =
Expand Down Expand Up @@ -132,12 +135,14 @@ private void writeSegment(
* @return written bytes
*/
private long writeEntry(
final Entry<MemorySegment> entry,
final TimeStampEntry entry,
final OutputStream os) throws IOException {
final MemorySegment key = entry.key();
final MemorySegment value = entry.value();
long result = 0L;

//Long_KeySize_1|Key_data_1|Long_timestamp|Long_ValueSize_1|Value_data_1

// Key size
writeLong(key.byteSize(), os);
result += Long.BYTES;
Expand All @@ -146,6 +151,10 @@ private long writeEntry(
writeSegment(key, os);
result += key.byteSize();

// TimeStamp
writeLong(entry.timeStamp(), os);
result += Long.BYTES;

// Value size and possibly value
if (value == null) {
// Tombstone
Expand Down
Loading
Loading