ascii-chat 0.8.38
Real-time terminal-based video chat with ASCII art conversion
Loading...
Searching...
No Matches
buffer_pool.c
Go to the documentation of this file.
1
7#include <ascii-chat/buffer_pool.h>
8#include <ascii-chat/common.h>
9#include <ascii-chat/asciichat_errno.h>
10#include <ascii-chat/platform/system.h>
11#include <ascii-chat/platform/init.h>
12#include <ascii-chat/util/format.h>
13#include <ascii-chat/util/time.h>
14#include <ascii-chat/util/magic.h>
15#include <stdlib.h>
16#include <string.h>
17
18/* ============================================================================
19 * Internal Helpers
20 * ============================================================================
21 */
22
24static inline buffer_node_t *node_from_data(void *data) {
25 return (buffer_node_t *)((char *)data - sizeof(buffer_node_t));
26}
27
29static inline void *data_from_node(buffer_node_t *node) {
30 return (void *)((char *)node + sizeof(buffer_node_t));
31}
32
34static inline void update_peak(atomic_size_t *peak, size_t value) {
35 size_t old = atomic_load_explicit(peak, memory_order_relaxed);
36 while (value > old) {
37 if (atomic_compare_exchange_weak_explicit(peak, &old, value, memory_order_relaxed, memory_order_relaxed)) {
38 break;
39 }
40 }
41}
42
43/* ============================================================================
44 * Buffer Pool Implementation
45 * ============================================================================
46 */
47
48buffer_pool_t *buffer_pool_create(size_t max_bytes, uint64_t shrink_delay_ns) {
49 buffer_pool_t *pool = SAFE_MALLOC(sizeof(buffer_pool_t), buffer_pool_t *);
50 if (!pool) {
51 SET_ERRNO(ERROR_MEMORY, "Failed to allocate buffer pool");
52 return NULL;
53 }
54
55 if (mutex_init(&pool->shrink_mutex) != 0) {
56 SET_ERRNO(ERROR_THREAD, "Failed to initialize shrink mutex");
57 SAFE_FREE(pool);
58 return NULL;
59 }
60
61 atomic_init(&pool->free_list, NULL);
62 pool->max_bytes = max_bytes > 0 ? max_bytes : BUFFER_POOL_MAX_BYTES;
63 pool->shrink_delay_ns = shrink_delay_ns > 0 ? shrink_delay_ns : BUFFER_POOL_SHRINK_DELAY_NS;
64
65 atomic_init(&pool->current_bytes, 0);
66 atomic_init(&pool->used_bytes, 0);
67 atomic_init(&pool->peak_bytes, 0);
68 atomic_init(&pool->peak_pool_bytes, 0);
69 atomic_init(&pool->hits, 0);
70 atomic_init(&pool->allocs, 0);
71 atomic_init(&pool->returns, 0);
72 atomic_init(&pool->shrink_freed, 0);
73 atomic_init(&pool->malloc_fallbacks, 0);
74
75 char pretty_max[64];
76 format_bytes_pretty(pool->max_bytes, pretty_max, sizeof(pretty_max));
77 log_dev("Created buffer pool (max: %s, shrink: %llu ns, lock-free)", pretty_max,
78 (unsigned long long)pool->shrink_delay_ns);
79
80 return pool;
81}
82
83void buffer_pool_destroy(buffer_pool_t *pool) {
84 if (!pool)
85 return;
86
87 // Drain the free list
88 buffer_node_t *node = atomic_load(&pool->free_list);
89 while (node) {
90 buffer_node_t *next = atomic_load(&node->next);
91 SAFE_FREE(node); // Node and data are one allocation
92 node = next;
93 }
94
95 mutex_destroy(&pool->shrink_mutex);
96 SAFE_FREE(pool);
97}
98
99void *buffer_pool_alloc(buffer_pool_t *pool, size_t size) {
100 // Use global pool if none specified
101 if (!pool) {
102 pool = buffer_pool_get_global();
103 }
104
105 // Size out of range - use malloc with node header for consistent cleanup
106 if (!pool || size < BUFFER_POOL_MIN_SIZE || size > BUFFER_POOL_MAX_SINGLE_SIZE) {
107 size_t total_size = sizeof(buffer_node_t) + size;
108 buffer_node_t *node = SAFE_MALLOC(total_size, buffer_node_t *);
109 node->magic = MAGIC_BUFFER_POOL_FALLBACK; // Different magic for fallbacks
110 node->_pad = 0;
111 node->size = size;
112 atomic_init(&node->next, NULL);
113 atomic_init(&node->returned_at_ns, 0);
114 node->pool = NULL; // No pool for fallbacks
115
116 if (pool) {
117 atomic_fetch_add_explicit(&pool->malloc_fallbacks, 1, memory_order_relaxed);
118 }
119 return data_from_node(node);
120 }
121
122 // Try to pop from lock-free stack (LIFO)
123 buffer_node_t *node = atomic_load_explicit(&pool->free_list, memory_order_acquire);
124 while (node) {
125 buffer_node_t *next = atomic_load_explicit(&node->next, memory_order_relaxed);
126 if (atomic_compare_exchange_weak_explicit(&pool->free_list, &node, next, memory_order_release,
127 memory_order_acquire)) {
128 // Successfully popped - check if it's big enough
129 if (node->size >= size) {
130 // Reuse this buffer
131 atomic_store_explicit(&node->next, NULL, memory_order_relaxed);
132 size_t node_size = node->size;
133 atomic_fetch_add_explicit(&pool->used_bytes, node_size, memory_order_relaxed);
134 atomic_fetch_add_explicit(&pool->hits, 1, memory_order_relaxed);
135 update_peak(&pool->peak_bytes, atomic_load(&pool->used_bytes));
136 return data_from_node(node);
137 } else {
138 // Too small - push it back and allocate new
139 // (This is rare with LIFO - usually we get similar sizes)
140 buffer_node_t *head = atomic_load_explicit(&pool->free_list, memory_order_relaxed);
141 do {
142 atomic_store_explicit(&node->next, head, memory_order_relaxed);
143 } while (!atomic_compare_exchange_weak_explicit(&pool->free_list, &head, node, memory_order_release,
144 memory_order_relaxed));
145 break; // Fall through to allocate new
146 }
147 }
148 // CAS failed - reload and retry
149 node = atomic_load_explicit(&pool->free_list, memory_order_acquire);
150 }
151
152 // Check if we can allocate more
153 size_t total_size = sizeof(buffer_node_t) + size;
154 size_t current = atomic_load_explicit(&pool->current_bytes, memory_order_relaxed);
155
156 // Atomically try to reserve space
157 while (current + total_size <= pool->max_bytes) {
158 if (atomic_compare_exchange_weak_explicit(&pool->current_bytes, &current, current + total_size,
159 memory_order_relaxed, memory_order_relaxed)) {
160 // Reserved space - now allocate
161 // Allocate node + data in one chunk for cache efficiency
162 node = SAFE_MALLOC_ALIGNED(total_size, 64, buffer_node_t *);
163 if (!node) {
164 // Undo reservation
165 atomic_fetch_sub_explicit(&pool->current_bytes, total_size, memory_order_relaxed);
166 atomic_fetch_add_explicit(&pool->malloc_fallbacks, 1, memory_order_relaxed);
167 return SAFE_MALLOC(size, void *);
168 }
169
170 node->magic = MAGIC_BUFFER_POOL_VALID;
171 node->_pad = 0;
172 node->size = size;
173 atomic_init(&node->next, NULL);
174 atomic_init(&node->returned_at_ns, 0);
175 node->pool = pool;
176
177 atomic_fetch_add_explicit(&pool->used_bytes, size, memory_order_relaxed);
178 atomic_fetch_add_explicit(&pool->allocs, 1, memory_order_relaxed);
179 update_peak(&pool->peak_bytes, atomic_load(&pool->used_bytes));
180 update_peak(&pool->peak_pool_bytes, atomic_load(&pool->current_bytes));
181
182 return data_from_node(node);
183 }
184 // CAS failed - someone else allocated, reload and check again
185 }
186
187 // Pool at capacity - fall back to malloc with node header for consistent cleanup
188 total_size = sizeof(buffer_node_t) + size;
189 node = SAFE_MALLOC(total_size, buffer_node_t *);
190 if (!node) {
191 return NULL;
192 }
193 node->magic = MAGIC_BUFFER_POOL_FALLBACK;
194 node->_pad = 0;
195 node->size = size;
196 atomic_init(&node->next, NULL);
197 atomic_init(&node->returned_at_ns, 0);
198 node->pool = pool;
199
200 atomic_fetch_add_explicit(&pool->malloc_fallbacks, 1, memory_order_relaxed);
201 return data_from_node(node);
202}
203
204void buffer_pool_free(buffer_pool_t *pool, void *data, size_t size) {
205 (void)size; // Size parameter not needed with header-based detection
206
207 if (!data)
208 return;
209
210 // All buffer_pool allocations have headers, safe to check magic
211 buffer_node_t *node = node_from_data(data);
212
213 // If it's a malloc fallback (has fallback magic), free the node directly
214 if (IS_MAGIC_VALID(node->magic, MAGIC_BUFFER_POOL_FALLBACK)) {
215 SAFE_FREE(node); // Free the node (includes header + data)
216 return;
217 }
218
219 // If it's not a pooled buffer (no valid magic), it's external - use platform free
220 if (!IS_MAGIC_VALID(node->magic, MAGIC_BUFFER_POOL_VALID)) {
221 free(data); // Unknown allocation, just free the data pointer
222 return;
223 }
224
225 // It's a pooled buffer - return to pool
226 // Use the pool stored in the node if none provided
227 if (!pool) {
228 pool = node->pool;
229 }
230
231 if (!pool) {
232 // Shouldn't happen, but safety
233 log_error("Pooled buffer has no pool reference!");
234 return;
235 }
236
237 // Update stats
238 atomic_fetch_sub_explicit(&pool->used_bytes, node->size, memory_order_relaxed);
239 atomic_fetch_add_explicit(&pool->returns, 1, memory_order_relaxed);
240
241 // Set return timestamp
242 atomic_store_explicit(&node->returned_at_ns, time_get_ns(), memory_order_relaxed);
243
244 // Push to lock-free stack
245 buffer_node_t *head = atomic_load_explicit(&pool->free_list, memory_order_relaxed);
246 do {
247 atomic_store_explicit(&node->next, head, memory_order_relaxed);
248 } while (!atomic_compare_exchange_weak_explicit(&pool->free_list, &head, node, memory_order_release,
249 memory_order_relaxed));
250
251 // Periodically trigger shrink (every 100 returns)
252 uint64_t returns = atomic_load_explicit(&pool->returns, memory_order_relaxed);
253 if (returns % 100 == 0) {
254 buffer_pool_shrink(pool);
255 }
256}
257
258void buffer_pool_shrink(buffer_pool_t *pool) {
259 if (!pool || pool->shrink_delay_ns == 0)
260 return;
261
262 // Only one thread can shrink at a time
263 if (mutex_trylock(&pool->shrink_mutex) != 0) {
264 return; // Another thread is shrinking
265 }
266
267 uint64_t now = time_get_ns();
268 uint64_t cutoff = (now > pool->shrink_delay_ns) ? (now - pool->shrink_delay_ns) : 0;
269
270 // Atomically swap out the entire free list
271 buffer_node_t *list = atomic_exchange_explicit(&pool->free_list, NULL, memory_order_acquire);
272
273 // Partition into keep and free lists
274 buffer_node_t *keep_list = NULL;
275 buffer_node_t *free_list = NULL;
276
277 while (list) {
278 buffer_node_t *next = atomic_load_explicit(&list->next, memory_order_relaxed);
279 uint64_t returned_at = atomic_load_explicit(&list->returned_at_ns, memory_order_relaxed);
280
281 if (returned_at < cutoff) {
282 // Old buffer - add to free list
283 atomic_store_explicit(&list->next, free_list, memory_order_relaxed);
284 free_list = list;
285 } else {
286 // Recent buffer - keep it
287 atomic_store_explicit(&list->next, keep_list, memory_order_relaxed);
288 keep_list = list;
289 }
290 list = next;
291 }
292
293 // Push kept buffers back to free list
294 if (keep_list) {
295 // Find tail
296 buffer_node_t *tail = keep_list;
297 while (atomic_load(&tail->next)) {
298 tail = atomic_load(&tail->next);
299 }
300
301 // Atomically prepend to current free list
302 buffer_node_t *head = atomic_load_explicit(&pool->free_list, memory_order_relaxed);
303 do {
304 atomic_store_explicit(&tail->next, head, memory_order_relaxed);
305 } while (!atomic_compare_exchange_weak_explicit(&pool->free_list, &head, keep_list, memory_order_release,
306 memory_order_relaxed));
307 }
308
309 // Free old buffers
310 while (free_list) {
311 buffer_node_t *next = atomic_load_explicit(&free_list->next, memory_order_relaxed);
312 size_t total_size = sizeof(buffer_node_t) + free_list->size;
313 atomic_fetch_sub_explicit(&pool->current_bytes, total_size, memory_order_relaxed);
314 atomic_fetch_add_explicit(&pool->shrink_freed, 1, memory_order_relaxed);
315 SAFE_FREE(free_list);
316 free_list = next;
317 }
318
319 mutex_unlock(&pool->shrink_mutex);
320}
321
322void buffer_pool_get_stats(buffer_pool_t *pool, size_t *current_bytes, size_t *used_bytes, size_t *free_bytes) {
323 if (!pool) {
324 if (current_bytes)
325 *current_bytes = 0;
326 if (used_bytes)
327 *used_bytes = 0;
328 if (free_bytes)
329 *free_bytes = 0;
330 return;
331 }
332
333 size_t current = atomic_load_explicit(&pool->current_bytes, memory_order_relaxed);
334 size_t used = atomic_load_explicit(&pool->used_bytes, memory_order_relaxed);
335
336 if (current_bytes)
337 *current_bytes = current;
338 if (used_bytes)
339 *used_bytes = used;
340 if (free_bytes)
341 *free_bytes = (current > used) ? (current - used) : 0;
342}
343
344void buffer_pool_log_stats(buffer_pool_t *pool, const char *name) {
345 if (!pool)
346 return;
347
348 size_t current = atomic_load(&pool->current_bytes);
349 size_t used = atomic_load(&pool->used_bytes);
350 size_t peak = atomic_load(&pool->peak_bytes);
351 size_t peak_pool = atomic_load(&pool->peak_pool_bytes);
352 uint64_t hits = atomic_load(&pool->hits);
353 uint64_t allocs = atomic_load(&pool->allocs);
354 uint64_t returns = atomic_load(&pool->returns);
355 uint64_t shrink_freed = atomic_load(&pool->shrink_freed);
356 uint64_t fallbacks = atomic_load(&pool->malloc_fallbacks);
357
358 char pretty_current[64], pretty_used[64], pretty_free[64];
359 char pretty_peak[64], pretty_peak_pool[64], pretty_max[64];
360
361 format_bytes_pretty(current, pretty_current, sizeof(pretty_current));
362 format_bytes_pretty(used, pretty_used, sizeof(pretty_used));
363 format_bytes_pretty(current > used ? current - used : 0, pretty_free, sizeof(pretty_free));
364 format_bytes_pretty(peak, pretty_peak, sizeof(pretty_peak));
365 format_bytes_pretty(peak_pool, pretty_peak_pool, sizeof(pretty_peak_pool));
366 format_bytes_pretty(pool->max_bytes, pretty_max, sizeof(pretty_max));
367
368 uint64_t total_requests = hits + allocs + fallbacks;
369 double hit_rate = total_requests > 0 ? (double)hits * 100.0 / (double)total_requests : 0;
370
371 log_debug("=== Buffer Pool: %s ===", name ? name : "unnamed");
372 log_debug(" Current: %s / %s capacity (peak used: %s)", pretty_current, pretty_max, pretty_peak_pool);
373 log_debug(" Buffers: %s used, %s free (peak: %s)", pretty_used, pretty_free, pretty_peak);
374 log_debug(" Hits: %llu (%.1f%%), Allocs: %llu, Fallbacks: %llu", (unsigned long long)hits, hit_rate,
375 (unsigned long long)allocs, (unsigned long long)fallbacks);
376 log_debug(" Returns: %llu, Shrink freed: %llu", (unsigned long long)returns, (unsigned long long)shrink_freed);
377}
378
379/* ============================================================================
380 * Global Buffer Pool
381 * ============================================================================
382 */
383
384static buffer_pool_t *g_global_pool = NULL;
385static static_mutex_t g_global_pool_mutex = STATIC_MUTEX_INIT;
386
388 static_mutex_lock(&g_global_pool_mutex);
389 if (!g_global_pool) {
390 g_global_pool = buffer_pool_create(0, 0);
391 if (g_global_pool) {
392 log_dev("Initialized global buffer pool");
393 }
394 }
395 static_mutex_unlock(&g_global_pool_mutex);
396}
397
399 static_mutex_lock(&g_global_pool_mutex);
400 if (g_global_pool) {
401 buffer_pool_log_stats(g_global_pool, "Global (final)");
402 buffer_pool_destroy(g_global_pool);
403 g_global_pool = NULL;
404 }
405 static_mutex_unlock(&g_global_pool_mutex);
406}
407
408buffer_pool_t *buffer_pool_get_global(void) {
409 return g_global_pool;
410}
buffer_pool_t * buffer_pool_get_global(void)
void buffer_pool_shrink(buffer_pool_t *pool)
void buffer_pool_destroy(buffer_pool_t *pool)
Definition buffer_pool.c:83
void buffer_pool_cleanup_global(void)
void buffer_pool_log_stats(buffer_pool_t *pool, const char *name)
void buffer_pool_free(buffer_pool_t *pool, void *data, size_t size)
void buffer_pool_get_stats(buffer_pool_t *pool, size_t *current_bytes, size_t *used_bytes, size_t *free_bytes)
void * buffer_pool_alloc(buffer_pool_t *pool, size_t size)
Definition buffer_pool.c:99
void buffer_pool_init_global(void)
buffer_pool_t * buffer_pool_create(size_t max_bytes, uint64_t shrink_delay_ns)
Definition buffer_pool.c:48
int mutex_init(mutex_t *mutex)
Definition threading.c:16
int mutex_destroy(mutex_t *mutex)
Definition threading.c:21
void format_bytes_pretty(size_t bytes, char *out, size_t out_capacity)
Definition util/format.c:10
uint64_t time_get_ns(void)
Definition util/time.c:48