ascii-chat 0.6.0
Real-time terminal-based video chat with ASCII art conversion
Loading...
Searching...
No Matches
buffer_pool.c
Go to the documentation of this file.
1
7#include "buffer_pool.h"
8#include "common.h"
9#include "asciichat_errno.h"
10#include "platform/system.h"
11#include "platform/init.h"
12#include "util/format.h"
13#include <stdlib.h>
14#include <string.h>
15
16/* ============================================================================
17 * Internal Helpers
18 * ============================================================================
19 */
20
21static inline uint64_t get_time_ms(void) {
22 struct timespec ts;
23 if (clock_gettime(CLOCK_MONOTONIC, &ts) != 0) {
24 return 0;
25 }
26 return (uint64_t)ts.tv_sec * 1000 + (uint64_t)ts.tv_nsec / 1000000;
27}
28
30static inline buffer_node_t *node_from_data(void *data) {
31 return (buffer_node_t *)((char *)data - sizeof(buffer_node_t));
32}
33
35static inline void *data_from_node(buffer_node_t *node) {
36 return (void *)((char *)node + sizeof(buffer_node_t));
37}
38
40static inline bool is_pooled_buffer(void *data) {
41 if (!data)
42 return false;
43 buffer_node_t *node = node_from_data(data);
44 return node->magic == BUFFER_POOL_MAGIC;
45}
46
48static inline bool has_buffer_header(void *data) {
49 if (!data)
50 return false;
51 buffer_node_t *node = node_from_data(data);
52 return node->magic == BUFFER_POOL_MAGIC || node->magic == BUFFER_POOL_MAGIC_FALLBACK;
53}
54
56static inline void update_peak(atomic_size_t *peak, size_t value) {
57 size_t old = atomic_load_explicit(peak, memory_order_relaxed);
58 while (value > old) {
59 if (atomic_compare_exchange_weak_explicit(peak, &old, value, memory_order_relaxed, memory_order_relaxed)) {
60 break;
61 }
62 }
63}
64
65/* ============================================================================
66 * Buffer Pool Implementation
67 * ============================================================================
68 */
69
70buffer_pool_t *buffer_pool_create(size_t max_bytes, uint64_t shrink_delay_ms) {
72 if (!pool) {
73 SET_ERRNO(ERROR_MEMORY, "Failed to allocate buffer pool");
74 return NULL;
75 }
76
77 if (mutex_init(&pool->shrink_mutex) != 0) {
78 SET_ERRNO(ERROR_THREAD, "Failed to initialize shrink mutex");
79 SAFE_FREE(pool);
80 return NULL;
81 }
82
83 atomic_init(&pool->free_list, NULL);
84 pool->max_bytes = max_bytes > 0 ? max_bytes : BUFFER_POOL_MAX_BYTES;
85 pool->shrink_delay_ms = shrink_delay_ms > 0 ? shrink_delay_ms : BUFFER_POOL_SHRINK_DELAY_MS;
86
87 atomic_init(&pool->current_bytes, 0);
88 atomic_init(&pool->used_bytes, 0);
89 atomic_init(&pool->peak_bytes, 0);
90 atomic_init(&pool->peak_pool_bytes, 0);
91 atomic_init(&pool->hits, 0);
92 atomic_init(&pool->allocs, 0);
93 atomic_init(&pool->returns, 0);
94 atomic_init(&pool->shrink_freed, 0);
95 atomic_init(&pool->malloc_fallbacks, 0);
96
97 char pretty_max[64];
98 format_bytes_pretty(pool->max_bytes, pretty_max, sizeof(pretty_max));
99 log_info("Created buffer pool (max: %s, shrink: %llu ms, lock-free)", pretty_max,
100 (unsigned long long)pool->shrink_delay_ms);
101
102 return pool;
103}
104
106 if (!pool)
107 return;
108
109 // Drain the free list
110 buffer_node_t *node = atomic_load(&pool->free_list);
111 while (node) {
112 buffer_node_t *next = atomic_load(&node->next);
113 SAFE_FREE(node); // Node and data are one allocation
114 node = next;
115 }
116
118 SAFE_FREE(pool);
119}
120
121void *buffer_pool_alloc(buffer_pool_t *pool, size_t size) {
122 // Use global pool if none specified
123 if (!pool) {
124 pool = buffer_pool_get_global();
125 }
126
127 // Size out of range - use malloc with node header for consistent cleanup
128 if (!pool || size < BUFFER_POOL_MIN_SIZE || size > BUFFER_POOL_MAX_SINGLE_SIZE) {
129 size_t total_size = sizeof(buffer_node_t) + size;
130 buffer_node_t *node = SAFE_MALLOC(total_size, buffer_node_t *);
131 node->magic = BUFFER_POOL_MAGIC_FALLBACK; // Different magic for fallbacks
132 node->_pad = 0;
133 node->size = size;
134 atomic_init(&node->next, NULL);
135 atomic_init(&node->returned_at_ms, 0);
136 node->pool = NULL; // No pool for fallbacks
137
138 if (pool) {
139 atomic_fetch_add_explicit(&pool->malloc_fallbacks, 1, memory_order_relaxed);
140 }
141 return data_from_node(node);
142 }
143
144 // Try to pop from lock-free stack (LIFO)
145 buffer_node_t *node = atomic_load_explicit(&pool->free_list, memory_order_acquire);
146 while (node) {
147 buffer_node_t *next = atomic_load_explicit(&node->next, memory_order_relaxed);
148 if (atomic_compare_exchange_weak_explicit(&pool->free_list, &node, next, memory_order_release,
149 memory_order_acquire)) {
150 // Successfully popped - check if it's big enough
151 if (node->size >= size) {
152 // Reuse this buffer
153 atomic_store_explicit(&node->next, NULL, memory_order_relaxed);
154 size_t node_size = node->size;
155 atomic_fetch_add_explicit(&pool->used_bytes, node_size, memory_order_relaxed);
156 atomic_fetch_add_explicit(&pool->hits, 1, memory_order_relaxed);
157 update_peak(&pool->peak_bytes, atomic_load(&pool->used_bytes));
158 return data_from_node(node);
159 } else {
160 // Too small - push it back and allocate new
161 // (This is rare with LIFO - usually we get similar sizes)
162 buffer_node_t *head = atomic_load_explicit(&pool->free_list, memory_order_relaxed);
163 do {
164 atomic_store_explicit(&node->next, head, memory_order_relaxed);
165 } while (!atomic_compare_exchange_weak_explicit(&pool->free_list, &head, node, memory_order_release,
166 memory_order_relaxed));
167 break; // Fall through to allocate new
168 }
169 }
170 // CAS failed - reload and retry
171 node = atomic_load_explicit(&pool->free_list, memory_order_acquire);
172 }
173
174 // Check if we can allocate more
175 size_t total_size = sizeof(buffer_node_t) + size;
176 size_t current = atomic_load_explicit(&pool->current_bytes, memory_order_relaxed);
177
178 // Atomically try to reserve space
179 while (current + total_size <= pool->max_bytes) {
180 if (atomic_compare_exchange_weak_explicit(&pool->current_bytes, &current, current + total_size,
181 memory_order_relaxed, memory_order_relaxed)) {
182 // Reserved space - now allocate
183 // Allocate node + data in one chunk for cache efficiency
184 node = SAFE_MALLOC_ALIGNED(total_size, 64, buffer_node_t *);
185 if (!node) {
186 // Undo reservation
187 atomic_fetch_sub_explicit(&pool->current_bytes, total_size, memory_order_relaxed);
188 atomic_fetch_add_explicit(&pool->malloc_fallbacks, 1, memory_order_relaxed);
189 return SAFE_MALLOC(size, void *);
190 }
191
192 node->magic = BUFFER_POOL_MAGIC;
193 node->_pad = 0;
194 node->size = size;
195 atomic_init(&node->next, NULL);
196 atomic_init(&node->returned_at_ms, 0);
197 node->pool = pool;
198
199 atomic_fetch_add_explicit(&pool->used_bytes, size, memory_order_relaxed);
200 atomic_fetch_add_explicit(&pool->allocs, 1, memory_order_relaxed);
201 update_peak(&pool->peak_bytes, atomic_load(&pool->used_bytes));
202 update_peak(&pool->peak_pool_bytes, atomic_load(&pool->current_bytes));
203
204 return data_from_node(node);
205 }
206 // CAS failed - someone else allocated, reload and check again
207 }
208
209 // Pool at capacity - fall back to malloc
210 atomic_fetch_add_explicit(&pool->malloc_fallbacks, 1, memory_order_relaxed);
211 return SAFE_MALLOC(size, void *);
212}
213
214void buffer_pool_free(buffer_pool_t *pool, void *data, size_t size) {
215 (void)size; // Size parameter not needed with header-based detection
216
217 if (!data)
218 return;
219
220 // All buffer_pool allocations have headers, safe to check magic
221 buffer_node_t *node = node_from_data(data);
222
223 // If it's a malloc fallback (has fallback magic), free the node directly
224 if (node->magic == BUFFER_POOL_MAGIC_FALLBACK) {
225 SAFE_FREE(node); // Free the node (includes header + data)
226 return;
227 }
228
229 // If it's not a pooled buffer (no valid magic), it's external - use platform free
230 if (node->magic != BUFFER_POOL_MAGIC) {
231 free(data); // Unknown allocation, just free the data pointer
232 return;
233 }
234
235 // It's a pooled buffer - return to pool
236 // Use the pool stored in the node if none provided
237 if (!pool) {
238 pool = node->pool;
239 }
240
241 if (!pool) {
242 // Shouldn't happen, but safety
243 log_error("Pooled buffer has no pool reference!");
244 return;
245 }
246
247 // Update stats
248 atomic_fetch_sub_explicit(&pool->used_bytes, node->size, memory_order_relaxed);
249 atomic_fetch_add_explicit(&pool->returns, 1, memory_order_relaxed);
250
251 // Set return timestamp
252 atomic_store_explicit(&node->returned_at_ms, get_time_ms(), memory_order_relaxed);
253
254 // Push to lock-free stack
255 buffer_node_t *head = atomic_load_explicit(&pool->free_list, memory_order_relaxed);
256 do {
257 atomic_store_explicit(&node->next, head, memory_order_relaxed);
258 } while (!atomic_compare_exchange_weak_explicit(&pool->free_list, &head, node, memory_order_release,
259 memory_order_relaxed));
260
261 // Periodically trigger shrink (every 100 returns)
262 uint64_t returns = atomic_load_explicit(&pool->returns, memory_order_relaxed);
263 if (returns % 100 == 0) {
264 buffer_pool_shrink(pool);
265 }
266}
267
269 if (!pool || pool->shrink_delay_ms == 0)
270 return;
271
272 // Only one thread can shrink at a time
273 if (mutex_trylock(&pool->shrink_mutex) != 0) {
274 return; // Another thread is shrinking
275 }
276
277 uint64_t now = get_time_ms();
278 uint64_t cutoff = (now > pool->shrink_delay_ms) ? (now - pool->shrink_delay_ms) : 0;
279
280 // Atomically swap out the entire free list
281 buffer_node_t *list = atomic_exchange_explicit(&pool->free_list, NULL, memory_order_acquire);
282
283 // Partition into keep and free lists
284 buffer_node_t *keep_list = NULL;
285 buffer_node_t *free_list = NULL;
286
287 while (list) {
288 buffer_node_t *next = atomic_load_explicit(&list->next, memory_order_relaxed);
289 uint64_t returned_at = atomic_load_explicit(&list->returned_at_ms, memory_order_relaxed);
290
291 if (returned_at < cutoff) {
292 // Old buffer - add to free list
293 atomic_store_explicit(&list->next, free_list, memory_order_relaxed);
294 free_list = list;
295 } else {
296 // Recent buffer - keep it
297 atomic_store_explicit(&list->next, keep_list, memory_order_relaxed);
298 keep_list = list;
299 }
300 list = next;
301 }
302
303 // Push kept buffers back to free list
304 if (keep_list) {
305 // Find tail
306 buffer_node_t *tail = keep_list;
307 while (atomic_load(&tail->next)) {
308 tail = atomic_load(&tail->next);
309 }
310
311 // Atomically prepend to current free list
312 buffer_node_t *head = atomic_load_explicit(&pool->free_list, memory_order_relaxed);
313 do {
314 atomic_store_explicit(&tail->next, head, memory_order_relaxed);
315 } while (!atomic_compare_exchange_weak_explicit(&pool->free_list, &head, keep_list, memory_order_release,
316 memory_order_relaxed));
317 }
318
319 // Free old buffers
320 while (free_list) {
321 buffer_node_t *next = atomic_load_explicit(&free_list->next, memory_order_relaxed);
322 size_t total_size = sizeof(buffer_node_t) + free_list->size;
323 atomic_fetch_sub_explicit(&pool->current_bytes, total_size, memory_order_relaxed);
324 atomic_fetch_add_explicit(&pool->shrink_freed, 1, memory_order_relaxed);
325 SAFE_FREE(free_list);
326 free_list = next;
327 }
328
330}
331
332void buffer_pool_get_stats(buffer_pool_t *pool, size_t *current_bytes, size_t *used_bytes, size_t *free_bytes) {
333 if (!pool) {
334 if (current_bytes)
335 *current_bytes = 0;
336 if (used_bytes)
337 *used_bytes = 0;
338 if (free_bytes)
339 *free_bytes = 0;
340 return;
341 }
342
343 size_t current = atomic_load_explicit(&pool->current_bytes, memory_order_relaxed);
344 size_t used = atomic_load_explicit(&pool->used_bytes, memory_order_relaxed);
345
346 if (current_bytes)
347 *current_bytes = current;
348 if (used_bytes)
349 *used_bytes = used;
350 if (free_bytes)
351 *free_bytes = (current > used) ? (current - used) : 0;
352}
353
354void buffer_pool_log_stats(buffer_pool_t *pool, const char *name) {
355 if (!pool)
356 return;
357
358 size_t current = atomic_load(&pool->current_bytes);
359 size_t used = atomic_load(&pool->used_bytes);
360 size_t peak = atomic_load(&pool->peak_bytes);
361 size_t peak_pool = atomic_load(&pool->peak_pool_bytes);
362 uint64_t hits = atomic_load(&pool->hits);
363 uint64_t allocs = atomic_load(&pool->allocs);
364 uint64_t returns = atomic_load(&pool->returns);
365 uint64_t shrink_freed = atomic_load(&pool->shrink_freed);
366 uint64_t fallbacks = atomic_load(&pool->malloc_fallbacks);
367
368 char pretty_current[64], pretty_used[64], pretty_free[64];
369 char pretty_peak[64], pretty_peak_pool[64], pretty_max[64];
370
371 format_bytes_pretty(current, pretty_current, sizeof(pretty_current));
372 format_bytes_pretty(used, pretty_used, sizeof(pretty_used));
373 format_bytes_pretty(current > used ? current - used : 0, pretty_free, sizeof(pretty_free));
374 format_bytes_pretty(peak, pretty_peak, sizeof(pretty_peak));
375 format_bytes_pretty(peak_pool, pretty_peak_pool, sizeof(pretty_peak_pool));
376 format_bytes_pretty(pool->max_bytes, pretty_max, sizeof(pretty_max));
377
378 uint64_t total_requests = hits + allocs + fallbacks;
379 double hit_rate = total_requests > 0 ? (double)hits * 100.0 / (double)total_requests : 0;
380
381 log_info("=== Buffer Pool: %s ===", name ? name : "unnamed");
382 log_info(" Pool: %s / %s (peak: %s)", pretty_current, pretty_max, pretty_peak_pool);
383 log_info(" Used: %s, Free: %s (peak used: %s)", pretty_used, pretty_free, pretty_peak);
384 log_info(" Hits: %llu (%.1f%%), Allocs: %llu, Fallbacks: %llu", (unsigned long long)hits, hit_rate,
385 (unsigned long long)allocs, (unsigned long long)fallbacks);
386 log_info(" Returns: %llu, Shrink freed: %llu", (unsigned long long)returns, (unsigned long long)shrink_freed);
387}
388
389/* ============================================================================
390 * Global Buffer Pool
391 * ============================================================================
392 */
393
394static buffer_pool_t *g_global_pool = NULL;
395static static_mutex_t g_global_pool_mutex = STATIC_MUTEX_INIT;
396
398 static_mutex_lock(&g_global_pool_mutex);
399 if (!g_global_pool) {
400 g_global_pool = buffer_pool_create(0, 0);
401 if (g_global_pool) {
402 log_info("Initialized global buffer pool");
403 }
404 }
405 static_mutex_unlock(&g_global_pool_mutex);
406}
407
409 static_mutex_lock(&g_global_pool_mutex);
410 if (g_global_pool) {
411 buffer_pool_log_stats(g_global_pool, "Global (final)");
412 buffer_pool_destroy(g_global_pool);
413 g_global_pool = NULL;
414 }
415 static_mutex_unlock(&g_global_pool_mutex);
416}
417
419 return g_global_pool;
420}
⚠️‼️ Error and/or exit() when things go bad.
🗃️ Lock-Free Unified Memory Buffer Pool with Lazy Allocation
📊 String Formatting Utilities
#define BUFFER_POOL_MAGIC
Magic value to identify pooled buffers.
Definition buffer_pool.h:58
buffer_pool_t * buffer_pool_get_global(void)
#define BUFFER_POOL_MAX_BYTES
Maximum total bytes the pool can hold (337 MB)
Definition buffer_pool.h:46
void buffer_pool_shrink(buffer_pool_t *pool)
Force shrink the pool (free old unused buffers)
void buffer_pool_destroy(buffer_pool_t *pool)
Destroy a buffer pool and free all memory.
void buffer_pool_cleanup_global(void)
#define BUFFER_POOL_MAX_SINGLE_SIZE
Maximum single buffer size to pool (larger allocations use malloc directly)
Definition buffer_pool.h:55
#define BUFFER_POOL_SHRINK_DELAY_MS
Time in milliseconds before unused buffers are freed (5 seconds)
Definition buffer_pool.h:49
void buffer_pool_log_stats(buffer_pool_t *pool, const char *name)
Log pool statistics.
#define BUFFER_POOL_MAGIC_FALLBACK
Magic value for malloc fallback buffers (not in pool)
Definition buffer_pool.h:60
void buffer_pool_free(buffer_pool_t *pool, void *data, size_t size)
Free a buffer back to the pool (lock-free)
buffer_pool_t * buffer_pool_create(size_t max_bytes, uint64_t shrink_delay_ms)
Create a new buffer pool.
Definition buffer_pool.c:70
void buffer_pool_get_stats(buffer_pool_t *pool, size_t *current_bytes, size_t *used_bytes, size_t *free_bytes)
Get pool statistics (atomic reads)
void * buffer_pool_alloc(buffer_pool_t *pool, size_t size)
Allocate a buffer from the pool (lock-free fast path)
void buffer_pool_init_global(void)
struct buffer_node buffer_node_t
Node header embedded before user data.
#define SAFE_MALLOC_ALIGNED(size, alignment, cast)
Definition common.h:293
#define SAFE_FREE(ptr)
Definition common.h:320
#define SAFE_MALLOC(size, cast)
Definition common.h:208
unsigned long long uint64_t
Definition common.h:59
#define SET_ERRNO(code, context_msg,...)
Set error code with custom context message and log it.
@ ERROR_MEMORY
Definition error_codes.h:53
@ ERROR_THREAD
Definition error_codes.h:95
#define log_error(...)
Log an ERROR message.
#define log_info(...)
Log an INFO message.
int mutex_init(mutex_t *mutex)
Initialize a mutex.
#define mutex_trylock(mutex)
Try to lock a mutex without blocking (with debug tracking in debug builds)
Definition mutex.h:157
#define STATIC_MUTEX_INIT
Definition init.h:107
#define mutex_unlock(mutex)
Unlock a mutex (with debug tracking in debug builds)
Definition mutex.h:175
int mutex_destroy(mutex_t *mutex)
Destroy a mutex.
void format_bytes_pretty(size_t bytes, char *out, size_t out_capacity)
Format byte count into human-readable string.
Definition format.c:10
Platform initialization and static synchronization helpers.
Node header embedded before user data.
Definition buffer_pool.h:76
uint32_t _pad
Padding for alignment.
Definition buffer_pool.h:78
uint32_t magic
Magic to identify pooled buffers.
Definition buffer_pool.h:77
size_t size
Size of user data portion.
Definition buffer_pool.h:79
struct buffer_pool * pool
Owning pool (for free)
Definition buffer_pool.h:82
Unified buffer pool with lock-free fast path.
Definition buffer_pool.h:90
size_t max_bytes
Maximum total bytes allowed.
Definition buffer_pool.h:94
mutex_t shrink_mutex
Only used for shrinking.
Definition buffer_pool.h:92
uint64_t shrink_delay_ms
Time before unused buffers freed.
Definition buffer_pool.h:95
Static mutex structure for global mutexes requiring static initialization.
Definition init.h:40
Cross-platform system functions interface for ascii-chat.
Common SIMD utilities and structures.