49 buffer_pool_t *pool = SAFE_MALLOC(
sizeof(buffer_pool_t), buffer_pool_t *);
51 SET_ERRNO(ERROR_MEMORY,
"Failed to allocate buffer pool");
56 SET_ERRNO(ERROR_THREAD,
"Failed to initialize shrink mutex");
61 atomic_init(&pool->free_list, NULL);
62 pool->max_bytes = max_bytes > 0 ? max_bytes : BUFFER_POOL_MAX_BYTES;
63 pool->shrink_delay_ns = shrink_delay_ns > 0 ? shrink_delay_ns : BUFFER_POOL_SHRINK_DELAY_NS;
65 atomic_init(&pool->current_bytes, 0);
66 atomic_init(&pool->used_bytes, 0);
67 atomic_init(&pool->peak_bytes, 0);
68 atomic_init(&pool->peak_pool_bytes, 0);
69 atomic_init(&pool->hits, 0);
70 atomic_init(&pool->allocs, 0);
71 atomic_init(&pool->returns, 0);
72 atomic_init(&pool->shrink_freed, 0);
73 atomic_init(&pool->malloc_fallbacks, 0);
77 log_dev(
"Created buffer pool (max: %s, shrink: %llu ns, lock-free)", pretty_max,
78 (
unsigned long long)pool->shrink_delay_ns);
106 if (!pool || size < BUFFER_POOL_MIN_SIZE || size > BUFFER_POOL_MAX_SINGLE_SIZE) {
107 size_t total_size =
sizeof(buffer_node_t) + size;
108 buffer_node_t *node = SAFE_MALLOC(total_size, buffer_node_t *);
109 node->magic = MAGIC_BUFFER_POOL_FALLBACK;
112 atomic_init(&node->next, NULL);
113 atomic_init(&node->returned_at_ns, 0);
117 atomic_fetch_add_explicit(&pool->malloc_fallbacks, 1, memory_order_relaxed);
119 return data_from_node(node);
123 buffer_node_t *node = atomic_load_explicit(&pool->free_list, memory_order_acquire);
125 buffer_node_t *next = atomic_load_explicit(&node->next, memory_order_relaxed);
126 if (atomic_compare_exchange_weak_explicit(&pool->free_list, &node, next, memory_order_release,
127 memory_order_acquire)) {
129 if (node->size >= size) {
131 atomic_store_explicit(&node->next, NULL, memory_order_relaxed);
132 size_t node_size = node->size;
133 atomic_fetch_add_explicit(&pool->used_bytes, node_size, memory_order_relaxed);
134 atomic_fetch_add_explicit(&pool->hits, 1, memory_order_relaxed);
135 update_peak(&pool->peak_bytes, atomic_load(&pool->used_bytes));
136 return data_from_node(node);
140 buffer_node_t *head = atomic_load_explicit(&pool->free_list, memory_order_relaxed);
142 atomic_store_explicit(&node->next, head, memory_order_relaxed);
143 }
while (!atomic_compare_exchange_weak_explicit(&pool->free_list, &head, node, memory_order_release,
144 memory_order_relaxed));
149 node = atomic_load_explicit(&pool->free_list, memory_order_acquire);
153 size_t total_size =
sizeof(buffer_node_t) + size;
154 size_t current = atomic_load_explicit(&pool->current_bytes, memory_order_relaxed);
157 while (current + total_size <= pool->max_bytes) {
158 if (atomic_compare_exchange_weak_explicit(&pool->current_bytes, ¤t, current + total_size,
159 memory_order_relaxed, memory_order_relaxed)) {
162 node = SAFE_MALLOC_ALIGNED(total_size, 64, buffer_node_t *);
165 atomic_fetch_sub_explicit(&pool->current_bytes, total_size, memory_order_relaxed);
166 atomic_fetch_add_explicit(&pool->malloc_fallbacks, 1, memory_order_relaxed);
167 return SAFE_MALLOC(size,
void *);
170 node->magic = MAGIC_BUFFER_POOL_VALID;
173 atomic_init(&node->next, NULL);
174 atomic_init(&node->returned_at_ns, 0);
177 atomic_fetch_add_explicit(&pool->used_bytes, size, memory_order_relaxed);
178 atomic_fetch_add_explicit(&pool->allocs, 1, memory_order_relaxed);
179 update_peak(&pool->peak_bytes, atomic_load(&pool->used_bytes));
180 update_peak(&pool->peak_pool_bytes, atomic_load(&pool->current_bytes));
182 return data_from_node(node);
188 total_size =
sizeof(buffer_node_t) + size;
189 node = SAFE_MALLOC(total_size, buffer_node_t *);
193 node->magic = MAGIC_BUFFER_POOL_FALLBACK;
196 atomic_init(&node->next, NULL);
197 atomic_init(&node->returned_at_ns, 0);
200 atomic_fetch_add_explicit(&pool->malloc_fallbacks, 1, memory_order_relaxed);
201 return data_from_node(node);
211 buffer_node_t *node = node_from_data(data);
214 if (IS_MAGIC_VALID(node->magic, MAGIC_BUFFER_POOL_FALLBACK)) {
220 if (!IS_MAGIC_VALID(node->magic, MAGIC_BUFFER_POOL_VALID)) {
233 log_error(
"Pooled buffer has no pool reference!");
238 atomic_fetch_sub_explicit(&pool->used_bytes, node->size, memory_order_relaxed);
239 atomic_fetch_add_explicit(&pool->returns, 1, memory_order_relaxed);
242 atomic_store_explicit(&node->returned_at_ns,
time_get_ns(), memory_order_relaxed);
245 buffer_node_t *head = atomic_load_explicit(&pool->free_list, memory_order_relaxed);
247 atomic_store_explicit(&node->next, head, memory_order_relaxed);
248 }
while (!atomic_compare_exchange_weak_explicit(&pool->free_list, &head, node, memory_order_release,
249 memory_order_relaxed));
252 uint64_t returns = atomic_load_explicit(&pool->returns, memory_order_relaxed);
253 if (returns % 100 == 0) {
259 if (!pool || pool->shrink_delay_ns == 0)
263 if (mutex_trylock(&pool->shrink_mutex) != 0) {
268 uint64_t cutoff = (now > pool->shrink_delay_ns) ? (now - pool->shrink_delay_ns) : 0;
271 buffer_node_t *list = atomic_exchange_explicit(&pool->free_list, NULL, memory_order_acquire);
274 buffer_node_t *keep_list = NULL;
275 buffer_node_t *free_list = NULL;
278 buffer_node_t *next = atomic_load_explicit(&list->next, memory_order_relaxed);
279 uint64_t returned_at = atomic_load_explicit(&list->returned_at_ns, memory_order_relaxed);
281 if (returned_at < cutoff) {
283 atomic_store_explicit(&list->next, free_list, memory_order_relaxed);
287 atomic_store_explicit(&list->next, keep_list, memory_order_relaxed);
296 buffer_node_t *tail = keep_list;
297 while (atomic_load(&tail->next)) {
298 tail = atomic_load(&tail->next);
302 buffer_node_t *head = atomic_load_explicit(&pool->free_list, memory_order_relaxed);
304 atomic_store_explicit(&tail->next, head, memory_order_relaxed);
305 }
while (!atomic_compare_exchange_weak_explicit(&pool->free_list, &head, keep_list, memory_order_release,
306 memory_order_relaxed));
311 buffer_node_t *next = atomic_load_explicit(&free_list->next, memory_order_relaxed);
312 size_t total_size =
sizeof(buffer_node_t) + free_list->size;
313 atomic_fetch_sub_explicit(&pool->current_bytes, total_size, memory_order_relaxed);
314 atomic_fetch_add_explicit(&pool->shrink_freed, 1, memory_order_relaxed);
315 SAFE_FREE(free_list);
319 mutex_unlock(&pool->shrink_mutex);
348 size_t current = atomic_load(&pool->current_bytes);
349 size_t used = atomic_load(&pool->used_bytes);
350 size_t peak = atomic_load(&pool->peak_bytes);
351 size_t peak_pool = atomic_load(&pool->peak_pool_bytes);
352 uint64_t hits = atomic_load(&pool->hits);
353 uint64_t allocs = atomic_load(&pool->allocs);
354 uint64_t returns = atomic_load(&pool->returns);
355 uint64_t shrink_freed = atomic_load(&pool->shrink_freed);
356 uint64_t fallbacks = atomic_load(&pool->malloc_fallbacks);
358 char pretty_current[64], pretty_used[64], pretty_free[64];
359 char pretty_peak[64], pretty_peak_pool[64], pretty_max[64];
363 format_bytes_pretty(current > used ? current - used : 0, pretty_free,
sizeof(pretty_free));
368 uint64_t total_requests = hits + allocs + fallbacks;
369 double hit_rate = total_requests > 0 ? (double)hits * 100.0 / (
double)total_requests : 0;
371 log_debug(
"=== Buffer Pool: %s ===", name ? name :
"unnamed");
372 log_debug(
" Current: %s / %s capacity (peak used: %s)", pretty_current, pretty_max, pretty_peak_pool);
373 log_debug(
" Buffers: %s used, %s free (peak: %s)", pretty_used, pretty_free, pretty_peak);
374 log_debug(
" Hits: %llu (%.1f%%), Allocs: %llu, Fallbacks: %llu", (
unsigned long long)hits, hit_rate,
375 (
unsigned long long)allocs, (
unsigned long long)fallbacks);
376 log_debug(
" Returns: %llu, Shrink freed: %llu", (
unsigned long long)returns, (
unsigned long long)shrink_freed);