21static inline uint64_t get_time_ms(
void) {
23 if (clock_gettime(CLOCK_MONOTONIC, &ts) != 0) {
40static inline bool is_pooled_buffer(
void *data) {
48static inline bool has_buffer_header(
void *data) {
56static inline void update_peak(atomic_size_t *peak,
size_t value) {
57 size_t old = atomic_load_explicit(peak, memory_order_relaxed);
59 if (atomic_compare_exchange_weak_explicit(peak, &old, value, memory_order_relaxed, memory_order_relaxed)) {
83 atomic_init(&pool->free_list, NULL);
87 atomic_init(&pool->current_bytes, 0);
88 atomic_init(&pool->used_bytes, 0);
89 atomic_init(&pool->peak_bytes, 0);
90 atomic_init(&pool->peak_pool_bytes, 0);
91 atomic_init(&pool->hits, 0);
92 atomic_init(&pool->allocs, 0);
93 atomic_init(&pool->returns, 0);
94 atomic_init(&pool->shrink_freed, 0);
95 atomic_init(&pool->malloc_fallbacks, 0);
99 log_info(
"Created buffer pool (max: %s, shrink: %llu ms, lock-free)", pretty_max,
134 atomic_init(&node->next, NULL);
135 atomic_init(&node->returned_at_ms, 0);
139 atomic_fetch_add_explicit(&pool->malloc_fallbacks, 1, memory_order_relaxed);
141 return data_from_node(node);
145 buffer_node_t *node = atomic_load_explicit(&pool->free_list, memory_order_acquire);
147 buffer_node_t *next = atomic_load_explicit(&node->next, memory_order_relaxed);
148 if (atomic_compare_exchange_weak_explicit(&pool->free_list, &node, next, memory_order_release,
149 memory_order_acquire)) {
151 if (node->
size >= size) {
153 atomic_store_explicit(&node->next, NULL, memory_order_relaxed);
154 size_t node_size = node->
size;
155 atomic_fetch_add_explicit(&pool->used_bytes, node_size, memory_order_relaxed);
156 atomic_fetch_add_explicit(&pool->hits, 1, memory_order_relaxed);
157 update_peak(&pool->peak_bytes, atomic_load(&pool->used_bytes));
158 return data_from_node(node);
162 buffer_node_t *head = atomic_load_explicit(&pool->free_list, memory_order_relaxed);
164 atomic_store_explicit(&node->next, head, memory_order_relaxed);
165 }
while (!atomic_compare_exchange_weak_explicit(&pool->free_list, &head, node, memory_order_release,
166 memory_order_relaxed));
171 node = atomic_load_explicit(&pool->free_list, memory_order_acquire);
176 size_t current = atomic_load_explicit(&pool->current_bytes, memory_order_relaxed);
179 while (current + total_size <= pool->max_bytes) {
180 if (atomic_compare_exchange_weak_explicit(&pool->current_bytes, ¤t, current + total_size,
181 memory_order_relaxed, memory_order_relaxed)) {
187 atomic_fetch_sub_explicit(&pool->current_bytes, total_size, memory_order_relaxed);
188 atomic_fetch_add_explicit(&pool->malloc_fallbacks, 1, memory_order_relaxed);
195 atomic_init(&node->next, NULL);
196 atomic_init(&node->returned_at_ms, 0);
199 atomic_fetch_add_explicit(&pool->used_bytes, size, memory_order_relaxed);
200 atomic_fetch_add_explicit(&pool->allocs, 1, memory_order_relaxed);
201 update_peak(&pool->peak_bytes, atomic_load(&pool->used_bytes));
202 update_peak(&pool->peak_pool_bytes, atomic_load(&pool->current_bytes));
204 return data_from_node(node);
210 atomic_fetch_add_explicit(&pool->malloc_fallbacks, 1, memory_order_relaxed);
243 log_error(
"Pooled buffer has no pool reference!");
248 atomic_fetch_sub_explicit(&pool->used_bytes, node->
size, memory_order_relaxed);
249 atomic_fetch_add_explicit(&pool->returns, 1, memory_order_relaxed);
252 atomic_store_explicit(&node->returned_at_ms, get_time_ms(), memory_order_relaxed);
255 buffer_node_t *head = atomic_load_explicit(&pool->free_list, memory_order_relaxed);
257 atomic_store_explicit(&node->next, head, memory_order_relaxed);
258 }
while (!atomic_compare_exchange_weak_explicit(&pool->free_list, &head, node, memory_order_release,
259 memory_order_relaxed));
262 uint64_t returns = atomic_load_explicit(&pool->returns, memory_order_relaxed);
263 if (returns % 100 == 0) {
281 buffer_node_t *list = atomic_exchange_explicit(&pool->free_list, NULL, memory_order_acquire);
288 buffer_node_t *next = atomic_load_explicit(&list->next, memory_order_relaxed);
289 uint64_t returned_at = atomic_load_explicit(&list->returned_at_ms, memory_order_relaxed);
291 if (returned_at < cutoff) {
293 atomic_store_explicit(&list->next, free_list, memory_order_relaxed);
297 atomic_store_explicit(&list->next, keep_list, memory_order_relaxed);
307 while (atomic_load(&tail->next)) {
308 tail = atomic_load(&tail->next);
312 buffer_node_t *head = atomic_load_explicit(&pool->free_list, memory_order_relaxed);
314 atomic_store_explicit(&tail->next, head, memory_order_relaxed);
315 }
while (!atomic_compare_exchange_weak_explicit(&pool->free_list, &head, keep_list, memory_order_release,
316 memory_order_relaxed));
321 buffer_node_t *next = atomic_load_explicit(&free_list->next, memory_order_relaxed);
323 atomic_fetch_sub_explicit(&pool->current_bytes, total_size, memory_order_relaxed);
324 atomic_fetch_add_explicit(&pool->shrink_freed, 1, memory_order_relaxed);
343 size_t current = atomic_load_explicit(&pool->current_bytes, memory_order_relaxed);
344 size_t used = atomic_load_explicit(&pool->used_bytes, memory_order_relaxed);
347 *current_bytes = current;
351 *free_bytes = (current > used) ? (current - used) : 0;
358 size_t current = atomic_load(&pool->current_bytes);
359 size_t used = atomic_load(&pool->used_bytes);
360 size_t peak = atomic_load(&pool->peak_bytes);
361 size_t peak_pool = atomic_load(&pool->peak_pool_bytes);
362 uint64_t hits = atomic_load(&pool->hits);
363 uint64_t allocs = atomic_load(&pool->allocs);
364 uint64_t returns = atomic_load(&pool->returns);
365 uint64_t shrink_freed = atomic_load(&pool->shrink_freed);
366 uint64_t fallbacks = atomic_load(&pool->malloc_fallbacks);
368 char pretty_current[64], pretty_used[64], pretty_free[64];
369 char pretty_peak[64], pretty_peak_pool[64], pretty_max[64];
373 format_bytes_pretty(current > used ? current - used : 0, pretty_free,
sizeof(pretty_free));
378 uint64_t total_requests = hits + allocs + fallbacks;
379 double hit_rate = total_requests > 0 ? (double)hits * 100.0 / (
double)total_requests : 0;
381 log_info(
"=== Buffer Pool: %s ===", name ? name :
"unnamed");
382 log_info(
" Pool: %s / %s (peak: %s)", pretty_current, pretty_max, pretty_peak_pool);
383 log_info(
" Used: %s, Free: %s (peak used: %s)", pretty_used, pretty_free, pretty_peak);
384 log_info(
" Hits: %llu (%.1f%%), Allocs: %llu, Fallbacks: %llu", (
unsigned long long)hits, hit_rate,
385 (
unsigned long long)allocs, (
unsigned long long)fallbacks);
386 log_info(
" Returns: %llu, Shrink freed: %llu", (
unsigned long long)returns, (
unsigned long long)shrink_freed);
398 static_mutex_lock(&g_global_pool_mutex);
399 if (!g_global_pool) {
402 log_info(
"Initialized global buffer pool");
405 static_mutex_unlock(&g_global_pool_mutex);
409 static_mutex_lock(&g_global_pool_mutex);
413 g_global_pool = NULL;
415 static_mutex_unlock(&g_global_pool_mutex);
419 return g_global_pool;
⚠️‼️ Error and/or exit() when things go bad.
🗃️ Lock-Free Unified Memory Buffer Pool with Lazy Allocation
#define BUFFER_POOL_MAGIC
Magic value to identify pooled buffers.
buffer_pool_t * buffer_pool_get_global(void)
#define BUFFER_POOL_MAX_BYTES
Maximum total bytes the pool can hold (337 MB)
void buffer_pool_shrink(buffer_pool_t *pool)
Force shrink the pool (free old unused buffers)
void buffer_pool_destroy(buffer_pool_t *pool)
Destroy a buffer pool and free all memory.
void buffer_pool_cleanup_global(void)
#define BUFFER_POOL_MAX_SINGLE_SIZE
Maximum single buffer size to pool (larger allocations use malloc directly)
#define BUFFER_POOL_SHRINK_DELAY_MS
Time in milliseconds before unused buffers are freed (5 seconds)
void buffer_pool_log_stats(buffer_pool_t *pool, const char *name)
Log pool statistics.
#define BUFFER_POOL_MAGIC_FALLBACK
Magic value for malloc fallback buffers (not in pool)
void buffer_pool_free(buffer_pool_t *pool, void *data, size_t size)
Free a buffer back to the pool (lock-free)
buffer_pool_t * buffer_pool_create(size_t max_bytes, uint64_t shrink_delay_ms)
Create a new buffer pool.
void buffer_pool_get_stats(buffer_pool_t *pool, size_t *current_bytes, size_t *used_bytes, size_t *free_bytes)
Get pool statistics (atomic reads)
void * buffer_pool_alloc(buffer_pool_t *pool, size_t size)
Allocate a buffer from the pool (lock-free fast path)
void buffer_pool_init_global(void)
struct buffer_node buffer_node_t
Node header embedded before user data.
#define SAFE_MALLOC_ALIGNED(size, alignment, cast)
#define SAFE_MALLOC(size, cast)
unsigned long long uint64_t
#define SET_ERRNO(code, context_msg,...)
Set error code with custom context message and log it.
#define log_error(...)
Log an ERROR message.
#define log_info(...)
Log an INFO message.
void format_bytes_pretty(size_t bytes, char *out, size_t out_capacity)
Format byte count into human-readable string.
Platform initialization and static synchronization helpers.
Node header embedded before user data.
uint32_t _pad
Padding for alignment.
uint32_t magic
Magic to identify pooled buffers.
size_t size
Size of user data portion.
struct buffer_pool * pool
Owning pool (for free)
Unified buffer pool with lock-free fast path.
size_t max_bytes
Maximum total bytes allowed.
mutex_t shrink_mutex
Only used for shrinking.
uint64_t shrink_delay_ms
Time before unused buffers freed.
Static mutex structure for global mutexes requiring static initialization.
Cross-platform system functions interface for ascii-chat.
Common SIMD utilities and structures.