ascii-chat 0.6.0
Real-time terminal-based video chat with ASCII art conversion
Loading...
Searching...
No Matches
ringbuffer.c
Go to the documentation of this file.
1
7#include "ringbuffer.h"
8#include "common.h"
9#include "asciichat_errno.h" // For asciichat_errno system
10#include "buffer_pool.h"
11#include "util/math.h" // For power-of-two utilities
12#include "util/bits.h" // For is_power_of_two, next_power_of_two
13#include <stdatomic.h>
14#include <stdlib.h>
15#include <string.h>
16#include <assert.h>
17
18/* ============================================================================
19 * Ring Buffer Implementation
20 * ============================================================================
21 *
22 * THREAD SAFETY NOTE: This ring buffer is designed for single-producer,
23 * single-consumer (SPSC) use only. The atomic operations provide memory
24 * ordering guarantees but do NOT support concurrent writes from multiple
25 * producers. For multi-writer scenarios, external synchronization is required.
26 */
27
28ringbuffer_t *ringbuffer_create(size_t element_size, size_t capacity) {
29 if (element_size == 0 || capacity == 0) {
30 SET_ERRNO(ERROR_INVALID_PARAM, "Invalid ring buffer parameters: element_size=%zu, capacity=%zu", element_size,
31 capacity);
32 return NULL;
33 }
34
35 ringbuffer_t *rb;
36 rb = SAFE_CALLOC(1, sizeof(ringbuffer_t), ringbuffer_t *);
37
38 /* Round capacity up to power of 2 for optimization */
39 size_t actual_capacity = math_next_power_of_two(capacity);
40
41 rb->buffer = SAFE_CALLOC(actual_capacity, element_size, char *);
42
43 rb->element_size = element_size;
44 rb->capacity = actual_capacity;
45 rb->is_power_of_two = true;
46 rb->capacity_mask = actual_capacity - 1;
47 atomic_init(&rb->head, 0);
48 atomic_init(&rb->tail, 0);
49 atomic_init(&rb->size, 0);
50
51 return rb;
52}
53
55 if (rb) {
56 SAFE_FREE(rb->buffer);
57 SAFE_FREE(rb);
58 }
59}
60
61bool ringbuffer_write(ringbuffer_t *rb, const void *data) {
62 if (!rb || !data)
63 return false;
64
65 size_t current_size = atomic_load(&rb->size);
66 if (current_size >= rb->capacity) {
67 return false; /* Buffer full */
68 }
69
70 size_t head = atomic_load(&rb->head);
71 size_t next_head = (head + 1) & rb->capacity_mask;
72
73 /* Copy data */
74 SAFE_MEMCPY(rb->buffer + (head * rb->element_size), rb->element_size, data, rb->element_size);
75
76 /* Update head and size atomically */
77 atomic_store(&rb->head, next_head);
78 atomic_fetch_add(&rb->size, 1);
79
80 return true;
81}
82
83bool ringbuffer_read(ringbuffer_t *rb, void *data) {
84 if (!rb || !data)
85 return false;
86
87 size_t current_size = atomic_load(&rb->size);
88 if (current_size == 0) {
89 return false; /* Buffer empty */
90 }
91
92 size_t tail = atomic_load(&rb->tail);
93 size_t next_tail = (tail + 1) & rb->capacity_mask;
94
95 /* Copy data */
96 SAFE_MEMCPY(data, rb->element_size, rb->buffer + (tail * rb->element_size), rb->element_size);
97
98 /* Update tail and size atomically */
99 atomic_store(&rb->tail, next_tail);
100 atomic_fetch_sub(&rb->size, 1);
101
102 return true;
103}
104
105bool ringbuffer_peek(ringbuffer_t *rb, void *data) {
106 if (!rb || !data)
107 return false;
108
109 size_t current_size = atomic_load(&rb->size);
110 if (current_size == 0) {
111 return false; /* Buffer empty */
112 }
113
114 size_t tail = atomic_load(&rb->tail);
115
116 /* Copy data without updating tail */
117 SAFE_MEMCPY(data, rb->element_size, rb->buffer + (tail * rb->element_size), rb->element_size);
118
119 return true;
120}
121
122size_t ringbuffer_size(const ringbuffer_t *rb) {
123 return rb ? atomic_load(&rb->size) : 0;
124}
125
127 return ringbuffer_size(rb) == 0;
128}
129
131 return rb ? ringbuffer_size(rb) >= rb->capacity : true;
132}
133
135 if (rb) {
136 atomic_store(&rb->head, 0);
137 atomic_store(&rb->tail, 0);
138 atomic_store(&rb->size, 0);
139 }
140}
141
142/* ============================================================================
143 * Frame Buffer Implementation
144 * ============================================================================
145 */
146
148 if (capacity == 0) {
149 SET_ERRNO(ERROR_INVALID_PARAM, "Invalid frame buffer parameters");
150 return NULL;
151 }
152
153 framebuffer_t *fb;
154 fb = SAFE_CALLOC(1, sizeof(framebuffer_t), framebuffer_t *);
155
156 // Initialize mutex for thread-safe access
157 if (mutex_init(&fb->mutex) != 0) {
158 SET_ERRNO(ERROR_THREAD, "Failed to initialize framebuffer mutex");
159 SAFE_FREE(fb);
160 return NULL;
161 }
162
163 // Create ringbuffer to store frame_t structs
164 fb->rb = ringbuffer_create(sizeof(frame_t), capacity);
165 if (!fb->rb) {
166 SET_ERRNO(ERROR_MEMORY, "Failed to allocate frame buffer");
167 mutex_destroy(&fb->mutex);
168 SAFE_FREE(fb);
169 return NULL;
170 }
171
172 return fb;
173}
174
176 if (capacity == 0) {
177 SET_ERRNO(ERROR_INVALID_PARAM, "Invalid capacity: %zu", capacity);
178 return NULL;
179 }
180
181 framebuffer_t *fb;
182 fb = SAFE_CALLOC(1, sizeof(framebuffer_t), framebuffer_t *);
183
184 // Initialize mutex for thread-safe access
185 if (mutex_init(&fb->mutex) != 0) {
186 SET_ERRNO(ERROR_THREAD, "Failed to initialize framebuffer mutex");
187 SAFE_FREE(fb);
188 return NULL;
189 }
190
191 // Create ringbuffer to store multi_source_frame_t structs
192 fb->rb = ringbuffer_create(sizeof(multi_source_frame_t), capacity);
193
194 if (!fb->rb) {
195 mutex_destroy(&fb->mutex);
196 SAFE_FREE(fb);
197 return NULL;
198 }
199
200 return fb;
201}
202
204 if (!fb)
205 return;
206
207 // Add magic number check to detect double-free using rb pointer
208 if (fb->rb == (ringbuffer_t *)0xDEADBEEF) {
209 SET_ERRNO(ERROR_INVALID_STATE, "DOUBLE-FREE DETECTED: framebuffer %p already destroyed!", fb);
210 return;
211 }
212
215 mutex_destroy(&fb->mutex);
216
217 // Mark as destroyed before freeing
218 fb->rb = (ringbuffer_t *)0xDEADBEEF;
219 SAFE_FREE(fb);
220}
221
222bool framebuffer_write_frame(framebuffer_t *fb, const char *frame_data, size_t frame_size) {
223 if (!fb || !frame_data || frame_size == 0)
224 return false;
225
226 // Validate frame size to prevent overflow
227 if (frame_size > 10 * 1024 * 1024) { // 10MB max for ASCII frames
228 SET_ERRNO(ERROR_INVALID_PARAM, "Rejecting oversized frame: %zu bytes", frame_size);
229 return false;
230 }
231
232 // Thread-safe access to framebuffer (was missing, causing race conditions)
233 mutex_lock(&fb->mutex);
234
235 // Check if buffer is full - if so, we need to drop the oldest frame
236 if (ringbuffer_size(fb->rb) >= fb->rb->capacity) {
237 // Buffer is full, read and free the oldest frame before writing new one
238 frame_t old_frame;
239 if (ringbuffer_read(fb->rb, &old_frame)) {
240 if (old_frame.magic == FRAME_MAGIC && old_frame.data) {
241 old_frame.magic = FRAME_FREED;
242 // Use buffer_pool_free since data was allocated with buffer_pool_alloc
243 buffer_pool_free(NULL, old_frame.data, old_frame.size);
244 } else if (old_frame.magic != FRAME_MAGIC) {
245 SET_ERRNO(ERROR_INVALID_STATE, "CORRUPTION: Invalid old frame magic 0x%x when dropping", old_frame.magic);
246 }
247 }
248 }
249
250 // Allocate a copy of the frame data using buffer pool for better performance
251 char *frame_copy = (char *)buffer_pool_alloc(NULL, frame_size + 1);
252 if (!frame_copy) {
253 mutex_unlock(&fb->mutex);
254 SET_ERRNO(ERROR_MEMORY, "Failed to allocate %zu bytes from buffer pool for frame", frame_size + 1);
255 return false;
256 }
257
258 SAFE_MEMCPY(frame_copy, frame_size, frame_data, frame_size);
259 frame_copy[frame_size] = '\0'; // Ensure null termination
260
261 // Create a frame_t struct with the copy (store allocated size for proper cleanup)
262 frame_t frame = {.magic = FRAME_MAGIC, .size = frame_size + 1, .data = frame_copy};
263
264 bool result = ringbuffer_write(fb->rb, &frame);
265
266 if (!result) {
267 // If we still couldn't write to ringbuffer, return the buffer to pool
268 buffer_pool_free(NULL, frame_copy, frame_size + 1);
269 SET_ERRNO(ERROR_INVALID_STATE, "Failed to write frame to ringbuffer even after dropping oldest");
270 }
271
272 mutex_unlock(&fb->mutex);
273 return result;
274}
275
277 if (!fb || !frame) {
278 return false;
279 }
280
281 // Initialize frame to safe values
282 frame->magic = 0;
283 frame->data = NULL;
284 frame->size = 0;
285
286 // Thread-safe access to framebuffer (fixes TOCTOU race condition)
287 mutex_lock(&fb->mutex);
288
289 bool result = ringbuffer_read(fb->rb, frame);
290
291 // Validate the frame we just read
292 if (result) {
293 if (frame->magic != FRAME_MAGIC) {
294 SET_ERRNO(ERROR_INVALID_STATE, "CORRUPTION: Invalid frame magic 0x%x (expected 0x%x)", frame->magic, FRAME_MAGIC);
295 frame->data = NULL;
296 frame->size = 0;
297 mutex_unlock(&fb->mutex);
298 return false;
299 }
300
301 if (frame->magic == FRAME_FREED) {
302 SET_ERRNO(ERROR_INVALID_STATE, "CORRUPTION: Reading already-freed frame!");
303 frame->data = NULL;
304 frame->size = 0;
305 mutex_unlock(&fb->mutex);
306 return false;
307 }
308
309 if (frame->size > 10 * 1024 * 1024) {
310 SET_ERRNO(ERROR_INVALID_STATE, "CORRUPTION: Frame size too large: %zu", frame->size);
311 SAFE_FREE(frame->data);
312 frame->data = NULL;
313 frame->size = 0;
314 mutex_unlock(&fb->mutex);
315 return false;
316 }
317 }
318
319 mutex_unlock(&fb->mutex);
320 return result;
321}
322
324 if (!fb || !fb->rb)
325 return;
326
327 // Thread-safe access to framebuffer (was missing, causing race conditions)
328 mutex_lock(&fb->mutex);
329
330 // Check the element size to determine frame type
331 if (fb->rb->element_size == sizeof(multi_source_frame_t)) {
332 // Multi-source frame buffer - read and free multi_source_frame_t
333 multi_source_frame_t multi_frame;
334 while (ringbuffer_read(fb->rb, &multi_frame)) {
335 if (multi_frame.magic == FRAME_MAGIC && multi_frame.data) {
336 multi_frame.magic = FRAME_FREED; // Mark as freed to detect use-after-free
337 buffer_pool_free(NULL, multi_frame.data, multi_frame.size);
338 } else if (multi_frame.magic != FRAME_MAGIC && multi_frame.magic != 0) {
339 SET_ERRNO(ERROR_INVALID_STATE, "CORRUPTION: Invalid multi-source frame magic 0x%x during clear",
340 multi_frame.magic);
341 }
342 }
343 } else if (fb->rb->element_size == sizeof(frame_t)) {
344 // Single-source frame buffer - read and free frame_t
345 frame_t frame;
346 while (ringbuffer_read(fb->rb, &frame)) {
347 if (frame.magic == FRAME_MAGIC && frame.data) {
348 frame.magic = FRAME_FREED; // Mark as freed to detect use-after-free
349 buffer_pool_free(NULL, frame.data, frame.size);
350 } else if (frame.magic != FRAME_MAGIC && frame.magic != 0) {
351 SET_ERRNO(ERROR_INVALID_STATE, "CORRUPTION: Invalid frame magic 0x%x during clear", frame.magic);
352 }
353 }
354 } else {
355 SET_ERRNO(ERROR_INVALID_STATE, "Unknown frame buffer type with element size %zu", fb->rb->element_size);
356 }
357
358 // Clear the ringbuffer indices
359 ringbuffer_clear(fb->rb);
360
361 // Zero out the entire buffer to prevent any dangling pointers
362 if (fb->rb->buffer) {
363 // Check for integer overflow before multiplication
364 if (fb->rb->capacity > SIZE_MAX / fb->rb->element_size) {
365 SET_ERRNO(ERROR_INVALID_PARAM, "Buffer size would overflow: capacity=%zu, element_size=%zu", fb->rb->capacity,
366 fb->rb->element_size);
367 mutex_unlock(&fb->mutex);
368 return;
369 }
370 size_t buffer_size = fb->rb->capacity * fb->rb->element_size;
371 SAFE_MEMSET(fb->rb->buffer, buffer_size, 0, buffer_size);
372 }
373
374 mutex_unlock(&fb->mutex);
375}
376
377// Multi-source frame functions for multi-user support
378
379bool framebuffer_write_multi_frame(framebuffer_t *fb, const char *frame_data, size_t frame_size,
380 uint32_t source_client_id, uint32_t frame_sequence, uint32_t timestamp) {
381 if (!fb || !fb->rb || !frame_data || frame_size == 0) {
382 return false;
383 }
384
385 // Allocate memory for frame data using buffer pool for better performance
386 char *data_copy = (char *)buffer_pool_alloc(NULL, frame_size);
387 if (!data_copy) {
388 SET_ERRNO(ERROR_MEMORY, "Failed to allocate %zu bytes from buffer pool for multi-source frame", frame_size);
389 return false;
390 }
391
392 // Copy frame data
393 SAFE_MEMCPY(data_copy, frame_size, frame_data, frame_size);
394
395 // Create multi-source frame
396 multi_source_frame_t multi_frame = {.magic = FRAME_MAGIC,
397 .source_client_id = source_client_id,
398 .frame_sequence = frame_sequence,
399 .timestamp = timestamp,
400 .size = frame_size,
401 .data = data_copy};
402
403 // Thread-safe access to framebuffer
404 mutex_lock(&fb->mutex);
405
406 // Try to write to ring buffer
407 bool success = ringbuffer_write(fb->rb, &multi_frame);
408 if (!success) {
409 // Buffer full, return the buffer to pool
410 buffer_pool_free(NULL, data_copy, frame_size);
411 log_debug("Frame buffer full, dropping multi-source frame from client %u", source_client_id);
412 }
413
414 mutex_unlock(&fb->mutex);
415 return success;
416}
417
419 if (!fb || !fb->rb || !frame) {
420 return false;
421 }
422
423 // Thread-safe access to framebuffer
424 mutex_lock(&fb->mutex);
425
426 bool result = ringbuffer_read(fb->rb, frame);
427
428 if (result) {
429 // Validate frame magic
430 if (frame->magic != FRAME_MAGIC) {
431 SET_ERRNO(ERROR_INVALID_STATE, "CORRUPTION: Invalid multi-source frame magic 0x%x (expected 0x%x)", frame->magic,
433 frame->data = NULL;
434 frame->size = 0;
435 frame->source_client_id = 0;
436 mutex_unlock(&fb->mutex);
437 return false;
438 }
439
440 // Additional validation
441 if (frame->size == 0 || !frame->data) {
442 SET_ERRNO(ERROR_INVALID_STATE, "CORRUPTION: Invalid multi-source frame data (size=%zu, data=%p)", frame->size,
443 frame->data);
444 mutex_unlock(&fb->mutex);
445 return false;
446 }
447 }
448
449 mutex_unlock(&fb->mutex);
450 return result;
451}
452
454 if (!fb || !fb->rb || !frame) {
455 return false;
456 }
457
458 // Thread-safe access to framebuffer
459 mutex_lock(&fb->mutex);
460
461 // Use ringbuffer_peek to get the frame without consuming it
462 bool result = ringbuffer_peek(fb->rb, frame);
463
464 if (result) {
465 // Validate frame magic
466 if (frame->magic != FRAME_MAGIC) {
467 SET_ERRNO(ERROR_INVALID_STATE, "CORRUPTION: Invalid multi-source frame magic 0x%x (expected 0x%x) in peek",
468 frame->magic, FRAME_MAGIC);
469 frame->data = NULL;
470 frame->size = 0;
471 frame->source_client_id = 0;
472 mutex_unlock(&fb->mutex);
473 return false;
474 }
475
476 // Additional validation
477 if (frame->size == 0 || !frame->data) {
478 SET_ERRNO(ERROR_INVALID_STATE, "CORRUPTION: Invalid multi-source frame data (size=%zu, data=%p) in peek",
479 frame->size, frame->data);
480 mutex_unlock(&fb->mutex);
481 return false;
482 }
483
484 // IMPORTANT: We need to make a copy of the data since we're not consuming the frame
485 // The original data pointer will remain valid in the ring buffer
486 // Caller is responsible for freeing this copy with buffer_pool_free()
487 char *data_copy;
488 data_copy = (char *)buffer_pool_alloc(NULL, frame->size);
489 if (!data_copy) {
490 SET_ERRNO(ERROR_MEMORY, "Failed to allocate memory for frame data copy in peek");
491 mutex_unlock(&fb->mutex);
492 return false;
493 }
494 SAFE_MEMCPY(data_copy, frame->size, frame->data, frame->size);
495 frame->data = data_copy;
496 }
497
498 mutex_unlock(&fb->mutex);
499 return result;
500}
⚠️‼️ Error and/or exit() when things go bad.
🔢 Bit Manipulation Utilities
🗃️ Lock-Free Unified Memory Buffer Pool with Lazy Allocation
void buffer_pool_free(buffer_pool_t *pool, void *data, size_t size)
Free a buffer back to the pool (lock-free)
void * buffer_pool_alloc(buffer_pool_t *pool, size_t size)
Allocate a buffer from the pool (lock-free fast path)
unsigned int uint32_t
Definition common.h:58
#define SAFE_FREE(ptr)
Definition common.h:320
#define SAFE_MEMSET(dest, dest_size, ch, count)
Definition common.h:389
#define SAFE_CALLOC(count, size, cast)
Definition common.h:218
#define SAFE_MEMCPY(dest, dest_size, src, count)
Definition common.h:388
#define SET_ERRNO(code, context_msg,...)
Set error code with custom context message and log it.
@ ERROR_INVALID_STATE
@ ERROR_MEMORY
Definition error_codes.h:53
@ ERROR_INVALID_PARAM
@ ERROR_THREAD
Definition error_codes.h:95
#define log_debug(...)
Log a DEBUG message.
int mutex_init(mutex_t *mutex)
Initialize a mutex.
#define mutex_lock(mutex)
Lock a mutex (with debug tracking in debug builds)
Definition mutex.h:140
#define mutex_unlock(mutex)
Unlock a mutex (with debug tracking in debug builds)
Definition mutex.h:175
int mutex_destroy(mutex_t *mutex)
Destroy a mutex.
void ringbuffer_clear(ringbuffer_t *rb)
Clear all elements from the buffer.
Definition ringbuffer.c:134
void framebuffer_destroy(framebuffer_t *fb)
Destroy a frame buffer.
Definition ringbuffer.c:203
void framebuffer_clear(framebuffer_t *fb)
Clear all frames from the buffer, freeing their data.
Definition ringbuffer.c:323
bool framebuffer_read_multi_frame(framebuffer_t *fb, multi_source_frame_t *frame)
Read a multi-source frame from the buffer.
Definition ringbuffer.c:418
framebuffer_t * framebuffer_create_multi(size_t capacity)
Create a multi-source frame buffer for multi-user support.
Definition ringbuffer.c:175
ringbuffer_t * ringbuffer_create(size_t element_size, size_t capacity)
Create a new ring buffer.
Definition ringbuffer.c:28
framebuffer_t * framebuffer_create(size_t capacity)
Create a frame buffer for ASCII frames.
Definition ringbuffer.c:147
size_t ringbuffer_size(const ringbuffer_t *rb)
Get current number of elements in the buffer.
Definition ringbuffer.c:122
bool framebuffer_read_frame(framebuffer_t *fb, frame_t *frame)
Read a frame from the buffer.
Definition ringbuffer.c:276
bool ringbuffer_is_full(const ringbuffer_t *rb)
Check if buffer is full.
Definition ringbuffer.c:130
void ringbuffer_destroy(ringbuffer_t *rb)
Destroy a ring buffer and free its memory.
Definition ringbuffer.c:54
#define FRAME_MAGIC
Magic number indicating a valid frame (0xDEADBEEF)
Definition ringbuffer.h:421
bool ringbuffer_is_empty(const ringbuffer_t *rb)
Check if buffer is empty.
Definition ringbuffer.c:126
bool framebuffer_write_frame(framebuffer_t *fb, const char *frame_data, size_t frame_size)
Write a frame to the buffer.
Definition ringbuffer.c:222
#define FRAME_FREED
Magic number indicating a freed frame (0xFEEDFACE)
Definition ringbuffer.h:423
bool framebuffer_write_multi_frame(framebuffer_t *fb, const char *frame_data, size_t frame_size, uint32_t source_client_id, uint32_t frame_sequence, uint32_t timestamp)
Write a multi-source frame to the buffer (for multi-user support)
Definition ringbuffer.c:379
bool framebuffer_peek_latest_multi_frame(framebuffer_t *fb, multi_source_frame_t *frame)
Peek at the latest multi-source frame without removing it.
Definition ringbuffer.c:453
bool ringbuffer_read(ringbuffer_t *rb, void *data)
Try to read an element from the ring buffer (non-blocking)
Definition ringbuffer.c:83
bool ringbuffer_write(ringbuffer_t *rb, const void *data)
Try to write an element to the ring buffer (non-blocking)
Definition ringbuffer.c:61
bool ringbuffer_peek(ringbuffer_t *rb, void *data)
Peek at the next element without removing it.
Definition ringbuffer.c:105
🔢 Mathematical Utility Functions
Lock-Free Ring Buffer and Frame Buffer Management.
Frame structure that stores both data and actual size.
Definition ringbuffer.h:382
char * data
Pointer to frame data (not owned by this struct)
Definition ringbuffer.h:388
uint32_t magic
Magic number to detect corruption (FRAME_MAGIC when valid)
Definition ringbuffer.h:384
size_t size
Actual size of frame data in bytes.
Definition ringbuffer.h:386
Frame buffer structure for managing video frames.
Definition ringbuffer.h:435
mutex_t mutex
Mutex for thread-safe access to framebuffer operations.
Definition ringbuffer.h:439
ringbuffer_t * rb
Underlying ring buffer for frame storage.
Definition ringbuffer.h:437
Multi-source frame structure for multi-user support.
Definition ringbuffer.h:399
char * data
Pointer to frame data (not owned by this struct)
Definition ringbuffer.h:411
uint32_t magic
Magic number to detect corruption (FRAME_MAGIC when valid)
Definition ringbuffer.h:401
uint32_t source_client_id
Client ID that sent this frame.
Definition ringbuffer.h:403
size_t size
Actual size of frame data in bytes.
Definition ringbuffer.h:409
Lock-free ring buffer structure.
Definition ringbuffer.h:95
bool is_power_of_two
Whether capacity is power of 2 (enables bit masking optimization)
Definition ringbuffer.h:109
size_t capacity
Number of elements that can be stored.
Definition ringbuffer.h:101
size_t capacity_mask
Mask for fast modulo when capacity is power of 2.
Definition ringbuffer.h:111
_Atomic size_t tail
Read position (consumer) - atomic for lock-free operations.
Definition ringbuffer.h:105
_Atomic size_t size
Current number of elements - atomic for fast size checks.
Definition ringbuffer.h:107
size_t element_size
Size of each element in bytes.
Definition ringbuffer.h:99
char * buffer
The actual buffer memory.
Definition ringbuffer.h:97
_Atomic size_t head
Write position (producer) - atomic for lock-free operations.
Definition ringbuffer.h:103
Common SIMD utilities and structures.