-
Notifications
You must be signed in to change notification settings - Fork 0
/
any_ruptor_perf.py
221 lines (163 loc) · 5.79 KB
/
any_ruptor_perf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
"""
"""
import os
import gc
import enum
import time
import ctypes
import random
import trio
import curio
import asyncio
import threading
import multiprocessing
from typing import Callable
from dataclasses import dataclass
count = int(1e5)
@enum.unique
class Mode(enum.Enum):
""
SYNC = enum.auto()
ASIO = enum.auto()
CRIO = enum.auto()
TRIO = enum.auto()
def __str__(self):
return self.name
def __repr__(self):
return self.name
@dataclass(frozen=True)
class BufferResult:
""
buffer_name:str
reader_mode:Mode
writer_mode:Mode
measured_time:float
def buffer_perf(
buffer_maker:Callable,
reader_mode=Mode.SYNC,
writer_mode=Mode.SYNC,
# runner_class=threading.Thread,
runner_class=multiprocessing.Process,
) -> BufferResult:
gc.collect()
buffer = buffer_maker()
print(f"buffer: {buffer} reader_mode={reader_mode} writer_mode={writer_mode}")
ring_data = multiprocessing.RawArray(ctypes.c_double, buffer.index.ring_size)
source_data = multiprocessing.RawArray(ctypes.c_double, count)
target_data = multiprocessing.RawArray(ctypes.c_double, count)
for index in range(count):
source_data[index] = +random.random()
target_data[index] = -random.random()
def buffer_reader_sync():
for index in range(count):
ring_index = buffer.reader_claim_sync()
target_data[index] = ring_data[ring_index]
buffer.reader_commit()
def buffer_writer_sync():
for index in range(count):
ring_index = buffer.writer_claim_sync()
ring_data[ring_index] = source_data[index]
buffer.writer_commit()
def buffer_reader_asio():
async def buffer_reader():
for index in range(count):
ring_index = await buffer.reader_claim()
target_data[index] = ring_data[ring_index]
buffer.reader_commit()
asyncio.run(buffer_reader())
def buffer_writer_asio():
async def buffer_writer():
for index in range(count):
ring_index = await buffer.writer_claim()
ring_data[ring_index] = source_data[index]
buffer.writer_commit()
asyncio.run(buffer_writer())
def buffer_reader_crio():
async def buffer_reader():
for index in range(count):
ring_index = await buffer.reader_claim()
target_data[index] = ring_data[ring_index]
buffer.reader_commit()
curio.run(buffer_reader)
def buffer_writer_crio():
async def buffer_writer():
for index in range(count):
ring_index = await buffer.writer_claim()
ring_data[ring_index] = source_data[index]
buffer.writer_commit()
curio.run(buffer_writer)
def buffer_reader_trio():
async def buffer_reader():
for index in range(count):
ring_index = await buffer.reader_claim()
target_data[index] = ring_data[ring_index]
buffer.reader_commit()
trio.run(buffer_reader)
def buffer_writer_trio():
async def buffer_writer():
for index in range(count):
ring_index = await buffer.writer_claim()
ring_data[ring_index] = source_data[index]
buffer.writer_commit()
trio.run(buffer_writer)
runner_reader:Any = None
runner_writer:Any = None
if reader_mode == Mode.SYNC:
runner_reader = runner_class(target=buffer_reader_sync)
elif reader_mode == Mode.ASIO:
runner_reader = runner_class(target=buffer_reader_asio)
elif reader_mode == Mode.CRIO:
runner_reader = runner_class(target=buffer_reader_crio)
elif reader_mode == Mode.TRIO:
runner_reader = runner_class(target=buffer_reader_trio)
else:
assert False, f"wrong reader_mode={reader_mode}"
if writer_mode == Mode.SYNC:
runner_writer = runner_class(target=buffer_writer_sync)
elif writer_mode == Mode.ASIO:
runner_writer = runner_class(target=buffer_writer_asio)
elif writer_mode == Mode.CRIO:
runner_writer = runner_class(target=buffer_writer_crio)
elif writer_mode == Mode.TRIO:
runner_writer = runner_class(target=buffer_writer_trio)
else:
assert False, f"wrong writer_mode={writer_mode}"
runner_reader.name = reader_mode.name
runner_reader.daemon = True
runner_writer.name = writer_mode.name
runner_writer.daemon = True
time_start = time.time()
runner_reader.start()
runner_writer.start()
runner_reader.join()
runner_writer.join()
time_finish = time.time()
for index in range(count):
assert source_data[index] == target_data[index], f"index={index}"
time_diff = time_finish - time_start
return BufferResult(
buffer_name=buffer.__class__.__name__,
reader_mode=reader_mode,
writer_mode=writer_mode,
measured_time=1e6 * time_diff / count,
)
def invoke_perf(buffer_maker:Callable, session_size:int=1):
for session in range(session_size):
print(f"------ session={session} ------")
for reader_mode in Mode:
for writer_mode in Mode:
result = buffer_perf(buffer_maker, reader_mode, writer_mode)
print(f"result={result}")
#
#
#
from data_pipe.runtime_library import RuptorBuffer # @UnresolvedImport
# from data_pipe.basic_ruptor import RuptorBuffer
def buffer_maker():
store_size = RuptorBuffer.ruptor_store_size()
index_store = multiprocessing.RawArray(ctypes.c_char, store_size)
buffer = RuptorBuffer() # @UndefinedVariable
buffer.setup(index_store=index_store)
return buffer
if __name__ == '__main__':
invoke_perf(buffer_maker)