-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsegments.py
138 lines (118 loc) · 4.59 KB
/
segments.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
from dataclasses import dataclass
from os import scandir, mkdir, path, remove, rename
from re import compile as compile_re
from threading import Lock, Thread
from queue import Queue
from readerwriterlock import rwlock
from binio import kv_iter
from bloomfilter import BloomFilter
from sstable import SSTable
from compaction import compaction_pass, compute_buckets
SEGMENT_TEMPLATE = 'segment-%d.dat'
SEGMENT_PATTERN = compile_re("segment-(?P<index>\d+)\.dat")
COMPACT_EXIT = 0
COMPACT_REQUIRED = 1
class Segments:
@dataclass
class CompactionThread:
queue: Queue
thread: Thread
def notify(self):
self.queue.put(COMPACT_REQUIRED, block=False)
def get_task(self):
return self.queue.get()
def task_done(self):
self.queue.task_done()
def start(self):
if not self.thread.is_alive():
self.thread.start()
def stop(self):
self.queue.put(COMPACT_EXIT)
self.thread.join()
self.queue.join()
def __init__(self, segment_dir):
self.compaction_lock = Lock()
self.compaction_thread = None
self.segment_lock = rwlock.RWLockWrite()
self.segment_dir = segment_dir
self.segments = []
if path.isdir(self.segment_dir):
self._load_segments()
else:
mkdir(self.segment_dir)
def _load_segments(self):
segment_files = []
with scandir(self.segment_dir) as files:
for f in files:
match = SEGMENT_PATTERN.search(f.name)
if match:
index = int(match.group('index'))
segment = SSTable(f.path, index)
segment_files.append(segment)
if segment_files:
self.segments = sorted(
segment_files,
key=lambda segment: segment.index,
reverse=True,
)
def flush(self, memtable):
with self.segment_lock.gen_wlock():
index = 0
if len(self.segments):
index = self.segments[0].index + 1
path = f"{self.segment_dir}/{SEGMENT_TEMPLATE % index}"
sstable = SSTable.create(path, index, memtable)
self.segments.insert(0, sstable)
self._notify_compaction_thread()
def search(self, k):
with self.segment_lock.gen_rlock():
for segment in self.segments:
value = segment.search(k)
if value is not None:
return value
return None
def compact(self):
with self.compaction_lock:
with self.segment_lock.gen_rlock():
buckets = compute_buckets(self.segments)
old_segments, new_segment = compaction_pass(buckets)
if new_segment is None:
return
# update the in-memory segments list
new_segments = []
updated = False
old_indexes = set(f.index for f in old_segments)
with self.segment_lock.gen_wlock():
for segment in self.segments:
if not updated and segment.index <= new_segment.index:
new_segments.append(new_segment)
updated = True
if segment.index not in old_indexes:
new_segments.append(segment)
# delete the old segments from disk
for old_segment in old_segments:
remove(old_segment.path)
# fix the new segment's path
updated_path = new_segment.path.replace("-compacted", "")
rename(new_segment.path, updated_path)
new_segment.path = updated_path
# update the segments
self.segments = new_segments
def _compaction_loop(self):
while self.compaction_thread.get_task():
self.compact()
self.compaction_thread.task_done()
self.compaction_thread.task_done()
def _notify_compaction_thread(self):
if self.compaction_thread:
self.compaction_thread.notify()
def start_compaction_thread(self):
if not self.compaction_thread:
worker = Thread(target=self._compaction_loop, daemon=True)
self.compaction_thread = Segments.CompactionThread(Queue(), worker)
self.compaction_thread.start()
def stop_compaction_thread(self, graceful=True):
if self.compaction_thread:
if graceful:
self.compaction_thread.stop()
self.compaction_thread = None