-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfixed_size_allocator.h
379 lines (322 loc) · 11.5 KB
/
fixed_size_allocator.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
#pragma once
#include "alignment_helper.h"
#include "config.h"
#include "global_heap.h"
#include "task_allocator_interface.h"
#include <array>
#include <atomic>
#include <cstdint>
#include <cstring>
#include <memory>
#include <mx/synchronization/spinlock.h>
#include <mx/system/cache.h>
#include <mx/system/cpu.h>
#include <mx/tasking/config.h>
#include <mx/util/core_set.h>
#include <unordered_map>
#include <utility>
#include <vector>
namespace mx::memory::fixed {
/**
* Represents a free memory object.
*/
class FreeHeader
{
public:
constexpr FreeHeader() noexcept = default;
~FreeHeader() noexcept = default;
[[nodiscard]] FreeHeader *next() const noexcept { return _next; }
void next(FreeHeader *next) noexcept { _next = next; }
void numa_node_id(const std::uint8_t numa_node_id) noexcept { _numa_node_id = numa_node_id; }
[[nodiscard]] std::uint8_t numa_node_id() const noexcept { return _numa_node_id; }
private:
FreeHeader *_next = nullptr;
std::uint8_t _numa_node_id = 0U;
};
/**
* The Chunk holds a fixed size of memory.
*/
class Chunk
{
public:
Chunk() noexcept = default;
explicit Chunk(void *memory) noexcept : _memory(memory) {}
~Chunk() noexcept = default;
static constexpr auto size() { return 32U * 1024U * 1024U; /* 32mb */ }
explicit operator void *() const noexcept { return _memory; }
explicit operator std::uintptr_t() const noexcept { return reinterpret_cast<std::uintptr_t>(_memory); }
explicit operator bool() const noexcept { return _memory != nullptr; }
private:
void *_memory{nullptr};
};
/**
* The ProcessorHeap holds memory for a single socket.
* All cores sitting on this socket can allocate memory.
* Internal, the ProcessorHeap bufferes allocated memory
* to minimize access to the global heap.
*/
class alignas(64) ProcessorHeap
{
public:
ProcessorHeap() noexcept = default;
explicit ProcessorHeap(const std::uint8_t numa_node_id) noexcept : _numa_node_id(numa_node_id)
{
_allocated_chunks.reserve(1024U);
fill_buffer<true>();
}
~ProcessorHeap() noexcept
{
for (const auto allocated_chunk : _allocated_chunks)
{
GlobalHeap::free(static_cast<void *>(allocated_chunk), Chunk::size());
}
for (const auto free_chunk : _free_chunk_buffer)
{
if (static_cast<bool>(free_chunk))
{
GlobalHeap::free(static_cast<void *>(free_chunk), Chunk::size());
}
}
}
ProcessorHeap &operator=(ProcessorHeap &&other) noexcept
{
_numa_node_id = std::exchange(other._numa_node_id, std::numeric_limits<std::uint8_t>::max());
_free_chunk_buffer = other._free_chunk_buffer;
other._free_chunk_buffer.fill(Chunk{});
_next_free_chunk.store(other._next_free_chunk.load());
_fill_buffer_flag.store(other._fill_buffer_flag.load());
_allocated_chunks = std::move(other._allocated_chunks);
return *this;
}
/**
* @return ID of the NUMA node the memory is allocated on.
*/
[[nodiscard]] std::uint8_t numa_node_id() const noexcept { return _numa_node_id; }
/**
* Allocates a chunk of memory from the internal buffer.
* In case the buffer is empty, new Chunks from the GlobalHeap
* will be allocated.
*
* @return A chunk of allocated memory.
*/
Chunk allocate() noexcept
{
const auto next_free_chunk = _next_free_chunk.fetch_add(1U, std::memory_order_relaxed);
if (next_free_chunk < _free_chunk_buffer.size())
{
return _free_chunk_buffer[next_free_chunk];
}
auto expect = false;
const auto can_fill = _fill_buffer_flag.compare_exchange_strong(expect, true);
if (can_fill)
{
fill_buffer<false>();
_fill_buffer_flag = false;
}
else
{
while (_fill_buffer_flag)
{
system::builtin::pause();
}
}
return allocate();
}
[[nodiscard]] std::vector<std::pair<std::uintptr_t, std::uintptr_t>> allocated_chunks()
{
auto chunks = std::vector<std::pair<std::uintptr_t, std::uintptr_t>>{};
chunks.reserve(_free_chunk_buffer.size() + _allocated_chunks.size());
for (const auto chunk : _free_chunk_buffer)
{
const auto start = static_cast<std::uintptr_t>(chunk);
if (start > 0U)
{
chunks.emplace_back(start, start + Chunk::size());
}
}
for (const auto chunk : _allocated_chunks)
{
const auto start = static_cast<std::uintptr_t>(chunk);
if (start > 0U)
{
chunks.emplace_back(start, start + Chunk::size());
}
}
return chunks;
}
private:
// Size of the internal chunk buffer.
inline static constexpr auto CHUNKS = 128U;
// ID of the NUMA node of this ProcessorHeap.
std::uint8_t _numa_node_id{std::numeric_limits<std::uint8_t>::max()};
// Buffer for free chunks.
std::array<Chunk, CHUNKS> _free_chunk_buffer;
// Pointer to the next free chunk in the buffer.
alignas(64) std::atomic_uint8_t _next_free_chunk{0U};
// Flag, used for allocation from the global Heap for mutual exclusion.
std::atomic_bool _fill_buffer_flag{false};
// List of all allocated chunks, they will be freed later.
std::vector<Chunk> _allocated_chunks;
/**
* Allocates a very big chunk from the GlobalHeap and
* splits it into smaller chunks to store them in the
* internal buffer.
*/
template <bool IS_FIRST = false> void fill_buffer() noexcept
{
if constexpr (IS_FIRST == false)
{
for (const auto &chunk : _free_chunk_buffer)
{
_allocated_chunks.push_back(chunk);
}
}
auto *heap_memory = GlobalHeap::allocate(_numa_node_id, Chunk::size() * _free_chunk_buffer.size());
auto heap_memory_address = reinterpret_cast<std::uintptr_t>(heap_memory);
for (auto i = 0U; i < _free_chunk_buffer.size(); ++i)
{
_free_chunk_buffer[i] = Chunk(reinterpret_cast<void *>(heap_memory_address + (i * Chunk::size())));
/// Touch the page to handle page fault.
reinterpret_cast<char *>(_free_chunk_buffer[i].operator void *())[0] = '\0';
}
_next_free_chunk.store(0U);
}
};
/**
* The worker local heap represents the allocator on a single core.
* By this, allocations are latch-free.
*/
template <std::size_t S> class alignas(64) WorkerLocalHeap
{
public:
explicit WorkerLocalHeap(ProcessorHeap *processor_heap) noexcept : _processor_heap(processor_heap)
{
fill_buffer();
}
WorkerLocalHeap() noexcept = default;
~WorkerLocalHeap() noexcept = default;
/**
* Allocates new memory from the worker local heap.
* When the internal buffer is empty, the worker local heap
* will allocate new chunks from the ProcessorHeap.
*
* @return Pointer to the new allocated memory.
*/
[[nodiscard]] void *allocate() noexcept
{
if (empty())
{
fill_buffer();
}
auto *free_element = std::exchange(_first, _first->next());
return static_cast<void *>(free_element);
}
/**
* Frees a memory object. The new available memory location
* will be placed in front of the "available"-list. By this,
* the next allocation will use the just freed object, which
* may be still in the CPU cache.
*
* @param pointer Pointer to the memory object to be freed.
*/
void free(void *pointer) noexcept
{
auto *free_object = static_cast<FreeHeader *>(pointer);
free_object->next(_first);
_first = free_object;
}
/**
* Fills the buffer by asking the ProcessorHeap for more memory.
* This is latch-free since just a single core calls this method.
*/
void fill_buffer()
{
auto chunk = _processor_heap->allocate();
const auto chunk_address = static_cast<std::uintptr_t>(chunk);
constexpr auto object_size = S;
constexpr auto count_objects = std::uint64_t{Chunk::size() / object_size};
auto *first_free = reinterpret_cast<FreeHeader *>(chunk_address);
auto *last_free = reinterpret_cast<FreeHeader *>(chunk_address + ((count_objects - 1) * object_size));
auto *current_free = first_free;
for (auto i = 0U; i < count_objects - 1U; ++i)
{
auto *next = reinterpret_cast<FreeHeader *>(chunk_address + ((i + 1U) * object_size));
current_free->next(next);
current_free = next;
}
last_free->next(nullptr);
_first = first_free;
}
private:
// Processor heap to allocate new chunks.
ProcessorHeap *_processor_heap{nullptr};
// First element of the list of free memory objects.
FreeHeader *_first{nullptr};
/**
* @return True, when the buffer is empty.
*/
[[nodiscard]] bool empty() const noexcept { return _first == nullptr; }
};
/**
* The Allocator is the interface to the internal worker local heaps.
*/
template <std::size_t S> class Allocator final : public TaskAllocatorInterface
{
public:
explicit Allocator(const util::core_set &core_set)
{
for (auto node_id = std::uint8_t(0U); node_id < config::max_numa_nodes(); ++node_id)
{
if (core_set.has_core_of_numa_node(node_id))
{
_processor_heaps[node_id] = ProcessorHeap{node_id};
}
}
for (auto worker_id = 0U; worker_id < core_set.count_cores(); ++worker_id)
{
const auto node_id = system::cpu::node_id(core_set[worker_id]);
_worker_local_heaps[worker_id] = WorkerLocalHeap<S>{&_processor_heaps[node_id]};
}
}
explicit Allocator(util::core_set &&core_set) : Allocator(core_set) {}
~Allocator() override = default;
/**
* Allocates memory from the given worker local heap.
*
* @param worker_id ID of the worker.
* @return Allocated memory object.
*/
[[nodiscard]] void *allocate(const std::uint16_t worker_id) override
{
return _worker_local_heaps[worker_id].allocate();
}
/**
* Frees memory.
*
* @param worker_id ID of the worker to place the free object in.
* @param address Pointer to the memory object.
*/
void free(const std::uint16_t worker_id, void *address) noexcept override
{
_worker_local_heaps[worker_id].free(address);
}
[[nodiscard]] std::unordered_map<std::string, std::vector<std::pair<std::uintptr_t, std::uintptr_t>>>
allocated_chunks() override
{
auto tags = std::unordered_map<std::string, std::vector<std::pair<std::uintptr_t, std::uintptr_t>>>{};
auto chunks = _processor_heaps.front().allocated_chunks();
for (auto i = 1U; i < _processor_heaps.max_size(); ++i)
{
auto processor_chunks = _processor_heaps[i].allocated_chunks();
std::move(processor_chunks.begin(), processor_chunks.end(), std::back_inserter(chunks));
}
tags.insert(std::make_pair("tasks", std::move(chunks)));
return tags;
}
private:
// Heap for every processor socket/NUMA region.
std::array<ProcessorHeap, config::max_numa_nodes()> _processor_heaps;
// Map from core_id to core-local allocator.
std::array<WorkerLocalHeap<S>, tasking::config::max_cores()> _worker_local_heaps;
};
} // namespace mx::memory::fixed