ascii-chat 0.8.38
Real-time terminal-based video chat with ASCII art conversion
Loading...
Searching...
No Matches
ringbuffer.c
Go to the documentation of this file.
1
7#include <ascii-chat/ringbuffer.h>
8#include <ascii-chat/common.h>
9#include <ascii-chat/asciichat_errno.h> // For asciichat_errno system
10#include <ascii-chat/buffer_pool.h>
11#include <ascii-chat/util/math.h> // For power-of-two utilities
12#include <ascii-chat/util/bits.h> // For is_power_of_two, next_power_of_two
13#include <stdatomic.h>
14#include <stdlib.h>
15#include <string.h>
16#include <assert.h>
17
18/* ============================================================================
19 * Ring Buffer Implementation
20 * ============================================================================
21 *
22 * THREAD SAFETY NOTE: This ring buffer is designed for single-producer,
23 * single-consumer (SPSC) use only. The atomic operations provide memory
24 * ordering guarantees but do NOT support concurrent writes from multiple
25 * producers. For multi-writer scenarios, external synchronization is required.
26 */
27
28ringbuffer_t *ringbuffer_create(size_t element_size, size_t capacity) {
29 if (element_size == 0 || capacity == 0) {
30 SET_ERRNO(ERROR_INVALID_PARAM, "Invalid ring buffer parameters: element_size=%zu, capacity=%zu", element_size,
31 capacity);
32 return NULL;
33 }
34
35 ringbuffer_t *rb;
36 rb = SAFE_CALLOC(1, sizeof(ringbuffer_t), ringbuffer_t *);
37
38 /* Round capacity up to power of 2 for optimization */
39 size_t actual_capacity = math_next_power_of_two(capacity);
40
41 rb->buffer = SAFE_CALLOC(actual_capacity, element_size, char *);
42
43 rb->element_size = element_size;
44 rb->capacity = actual_capacity;
45 rb->is_power_of_two = true;
46 rb->capacity_mask = actual_capacity - 1;
47 atomic_init(&rb->head, 0);
48 atomic_init(&rb->tail, 0);
49 atomic_init(&rb->size, 0);
50
51 return rb;
52}
53
54void ringbuffer_destroy(ringbuffer_t *rb) {
55 if (rb) {
56 SAFE_FREE(rb->buffer);
57 SAFE_FREE(rb);
58 }
59}
60
61bool ringbuffer_write(ringbuffer_t *rb, const void *data) {
62 if (!rb || !data)
63 return false;
64
65 size_t current_size = atomic_load(&rb->size);
66 if (current_size >= rb->capacity) {
67 return false; /* Buffer full */
68 }
69
70 size_t head = atomic_load(&rb->head);
71 size_t next_head = (head + 1) & rb->capacity_mask;
72
73 /* Copy data */
74 SAFE_MEMCPY(rb->buffer + (head * rb->element_size), rb->element_size, data, rb->element_size);
75
76 /* Update head and size atomically */
77 atomic_store(&rb->head, next_head);
78 atomic_fetch_add(&rb->size, 1);
79
80 return true;
81}
82
83bool ringbuffer_read(ringbuffer_t *rb, void *data) {
84 if (!rb || !data)
85 return false;
86
87 size_t current_size = atomic_load(&rb->size);
88 if (current_size == 0) {
89 return false; /* Buffer empty */
90 }
91
92 size_t tail = atomic_load(&rb->tail);
93 size_t next_tail = (tail + 1) & rb->capacity_mask;
94
95 /* Copy data */
96 SAFE_MEMCPY(data, rb->element_size, rb->buffer + (tail * rb->element_size), rb->element_size);
97
98 /* Update tail and size atomically */
99 atomic_store(&rb->tail, next_tail);
100 atomic_fetch_sub(&rb->size, 1);
101
102 return true;
103}
104
105bool ringbuffer_peek(ringbuffer_t *rb, void *data) {
106 if (!rb || !data)
107 return false;
108
109 size_t current_size = atomic_load(&rb->size);
110 if (current_size == 0) {
111 return false; /* Buffer empty */
112 }
113
114 size_t tail = atomic_load(&rb->tail);
115
116 /* Copy data without updating tail */
117 SAFE_MEMCPY(data, rb->element_size, rb->buffer + (tail * rb->element_size), rb->element_size);
118
119 return true;
120}
121
122size_t ringbuffer_size(const ringbuffer_t *rb) {
123 return rb ? atomic_load(&rb->size) : 0;
124}
125
126bool ringbuffer_is_empty(const ringbuffer_t *rb) {
127 return ringbuffer_size(rb) == 0;
128}
129
130bool ringbuffer_is_full(const ringbuffer_t *rb) {
131 return rb ? ringbuffer_size(rb) >= rb->capacity : true;
132}
133
134void ringbuffer_clear(ringbuffer_t *rb) {
135 if (rb) {
136 atomic_store(&rb->head, 0);
137 atomic_store(&rb->tail, 0);
138 atomic_store(&rb->size, 0);
139 }
140}
141
142/* ============================================================================
143 * Frame Buffer Implementation
144 * ============================================================================
145 */
146
147framebuffer_t *framebuffer_create(size_t capacity) {
148 if (capacity == 0) {
149 SET_ERRNO(ERROR_INVALID_PARAM, "Invalid frame buffer parameters");
150 return NULL;
151 }
152
153 framebuffer_t *fb;
154 fb = SAFE_CALLOC(1, sizeof(framebuffer_t), framebuffer_t *);
155
156 // Initialize mutex for thread-safe access
157 if (mutex_init(&fb->mutex) != 0) {
158 SET_ERRNO(ERROR_THREAD, "Failed to initialize framebuffer mutex");
159 SAFE_FREE(fb);
160 return NULL;
161 }
162
163 // Create ringbuffer to store frame_t structs
164 fb->rb = ringbuffer_create(sizeof(frame_t), capacity);
165 if (!fb->rb) {
166 SET_ERRNO(ERROR_MEMORY, "Failed to allocate frame buffer");
167 mutex_destroy(&fb->mutex);
168 SAFE_FREE(fb);
169 return NULL;
170 }
171
172 return fb;
173}
174
175framebuffer_t *framebuffer_create_multi(size_t capacity) {
176 if (capacity == 0) {
177 SET_ERRNO(ERROR_INVALID_PARAM, "Invalid capacity: %zu", capacity);
178 return NULL;
179 }
180
181 framebuffer_t *fb;
182 fb = SAFE_CALLOC(1, sizeof(framebuffer_t), framebuffer_t *);
183
184 // Initialize mutex for thread-safe access
185 if (mutex_init(&fb->mutex) != 0) {
186 SET_ERRNO(ERROR_THREAD, "Failed to initialize framebuffer mutex");
187 SAFE_FREE(fb);
188 return NULL;
189 }
190
191 // Create ringbuffer to store multi_source_frame_t structs
192 fb->rb = ringbuffer_create(sizeof(multi_source_frame_t), capacity);
193
194 if (!fb->rb) {
195 mutex_destroy(&fb->mutex);
196 SAFE_FREE(fb);
197 return NULL;
198 }
199
200 return fb;
201}
202
203void framebuffer_destroy(framebuffer_t *fb) {
204 if (!fb)
205 return;
206
207 // Add magic number check to detect double-free using rb pointer
208 if (fb->rb == (ringbuffer_t *)0xDEADBEEF) {
209 SET_ERRNO(ERROR_INVALID_STATE, "DOUBLE-FREE DETECTED: framebuffer %p already destroyed!", fb);
210 return;
211 }
212
214 ringbuffer_destroy(fb->rb);
215 mutex_destroy(&fb->mutex);
216
217 // Mark as destroyed before freeing
218 fb->rb = (ringbuffer_t *)0xDEADBEEF;
219 SAFE_FREE(fb);
220}
221
222bool framebuffer_write_frame(framebuffer_t *fb, const char *frame_data, size_t frame_size) {
223 if (!fb || !frame_data || frame_size == 0)
224 return false;
225
226 // Validate frame size to prevent overflow
227 if (frame_size > 10 * 1024 * 1024) { // 10MB max for ASCII frames
228 SET_ERRNO(ERROR_INVALID_PARAM, "Rejecting oversized frame: %zu bytes", frame_size);
229 return false;
230 }
231
232 // Thread-safe access to framebuffer (was missing, causing race conditions)
233 mutex_lock(&fb->mutex);
234
235 // Check if buffer is full - if so, we need to drop the oldest frame
236 if (ringbuffer_size(fb->rb) >= fb->rb->capacity) {
237 // Buffer is full, read and free the oldest frame before writing new one
238 frame_t old_frame;
239 if (ringbuffer_read(fb->rb, &old_frame)) {
240 if (IS_FRAME_VALID(&old_frame)) {
241 MARK_FRAME_FREED(&old_frame);
242 // Use buffer_pool_free since data was allocated with buffer_pool_alloc
243 buffer_pool_free(NULL, old_frame.data, old_frame.size);
244 } else if (!IS_MAGIC_VALID(old_frame.magic, FRAME_MAGIC)) {
245 SET_ERRNO(ERROR_INVALID_STATE, "CORRUPTION: Invalid old frame magic 0x%x when dropping", old_frame.magic);
246 }
247 }
248 }
249
250 // Allocate a copy of the frame data using buffer pool for better performance
251 char *frame_copy = (char *)buffer_pool_alloc(NULL, frame_size + 1);
252 if (!frame_copy) {
253 mutex_unlock(&fb->mutex);
254 SET_ERRNO(ERROR_MEMORY, "Failed to allocate %zu bytes from buffer pool for frame", frame_size + 1);
255 return false;
256 }
257
258 SAFE_MEMCPY(frame_copy, frame_size, frame_data, frame_size);
259 frame_copy[frame_size] = '\0'; // Ensure null termination
260
261 // Create a frame_t struct with the copy (store allocated size for proper cleanup)
262 frame_t frame = {.magic = MAGIC_FRAME_VALID, .size = frame_size + 1, .data = frame_copy};
263
264 bool result = ringbuffer_write(fb->rb, &frame);
265
266 if (!result) {
267 // If we still couldn't write to ringbuffer, return the buffer to pool
268 buffer_pool_free(NULL, frame_copy, frame_size + 1);
269 SET_ERRNO(ERROR_INVALID_STATE, "Failed to write frame to ringbuffer even after dropping oldest");
270 }
271
272 mutex_unlock(&fb->mutex);
273 return result;
274}
275
276bool framebuffer_read_frame(framebuffer_t *fb, frame_t *frame) {
277 if (!fb || !frame) {
278 return false;
279 }
280
281 // Initialize frame to safe values
282 frame->magic = 0;
283 frame->data = NULL;
284 frame->size = 0;
285
286 // Thread-safe access to framebuffer (fixes TOCTOU race condition)
287 mutex_lock(&fb->mutex);
288
289 bool result = ringbuffer_read(fb->rb, frame);
290
291 // Validate the frame we just read
292 if (result) {
293 if (!IS_MAGIC_VALID(frame->magic, FRAME_MAGIC)) {
294 SET_ERRNO(ERROR_INVALID_STATE, "CORRUPTION: Invalid frame magic 0x%x (expected 0x%x)", frame->magic, FRAME_MAGIC);
295 frame->data = NULL;
296 frame->size = 0;
297 mutex_unlock(&fb->mutex);
298 return false;
299 }
300
301 if (IS_FRAME_FREED(frame)) {
302 SET_ERRNO(ERROR_INVALID_STATE, "CORRUPTION: Reading already-freed frame!");
303 frame->data = NULL;
304 frame->size = 0;
305 mutex_unlock(&fb->mutex);
306 return false;
307 }
308
309 if (frame->size > 10 * 1024 * 1024) {
310 SET_ERRNO(ERROR_INVALID_STATE, "CORRUPTION: Frame size too large: %zu", frame->size);
311 SAFE_FREE(frame->data);
312 frame->data = NULL;
313 frame->size = 0;
314 mutex_unlock(&fb->mutex);
315 return false;
316 }
317 }
318
319 mutex_unlock(&fb->mutex);
320 return result;
321}
322
323void framebuffer_clear(framebuffer_t *fb) {
324 if (!fb || !fb->rb)
325 return;
326
327 // Thread-safe access to framebuffer (was missing, causing race conditions)
328 mutex_lock(&fb->mutex);
329
330 // Check the element size to determine frame type
331 if (fb->rb->element_size == sizeof(multi_source_frame_t)) {
332 // Multi-source frame buffer - read and free multi_source_frame_t
333 multi_source_frame_t multi_frame;
334 while (ringbuffer_read(fb->rb, &multi_frame)) {
335 if (IS_FRAME_VALID((frame_t *)&multi_frame)) {
336 MARK_FRAME_FREED((frame_t *)&multi_frame); // Mark as freed to detect use-after-free
337 buffer_pool_free(NULL, multi_frame.data, multi_frame.size);
338 } else if (!IS_MAGIC_VALID(multi_frame.magic, FRAME_MAGIC) && multi_frame.magic != 0) {
339 SET_ERRNO(ERROR_INVALID_STATE, "CORRUPTION: Invalid multi-source frame magic 0x%x during clear",
340 multi_frame.magic);
341 }
342 }
343 } else if (fb->rb->element_size == sizeof(frame_t)) {
344 // Single-source frame buffer - read and free frame_t
345 frame_t frame;
346 while (ringbuffer_read(fb->rb, &frame)) {
347 if (IS_FRAME_VALID(&frame)) {
348 MARK_FRAME_FREED(&frame); // Mark as freed to detect use-after-free
349 buffer_pool_free(NULL, frame.data, frame.size);
350 } else if (!IS_MAGIC_VALID(frame.magic, FRAME_MAGIC) && frame.magic != 0) {
351 SET_ERRNO(ERROR_INVALID_STATE, "CORRUPTION: Invalid frame magic 0x%x during clear", frame.magic);
352 }
353 }
354 } else {
355 SET_ERRNO(ERROR_INVALID_STATE, "Unknown frame buffer type with element size %zu", fb->rb->element_size);
356 }
357
358 // Clear the ringbuffer indices
359 ringbuffer_clear(fb->rb);
360
361 // Zero out the entire buffer to prevent any dangling pointers
362 if (fb->rb->buffer) {
363 // Check for integer overflow before multiplication
364 if (fb->rb->capacity > SIZE_MAX / fb->rb->element_size) {
365 SET_ERRNO(ERROR_INVALID_PARAM, "Buffer size would overflow: capacity=%zu, element_size=%zu", fb->rb->capacity,
366 fb->rb->element_size);
367 mutex_unlock(&fb->mutex);
368 return;
369 }
370 size_t buffer_size = fb->rb->capacity * fb->rb->element_size;
371 SAFE_MEMSET(fb->rb->buffer, buffer_size, 0, buffer_size);
372 }
373
374 mutex_unlock(&fb->mutex);
375}
376
377// Multi-source frame functions for multi-user support
378
379bool framebuffer_write_multi_frame(framebuffer_t *fb, const char *frame_data, size_t frame_size,
380 uint32_t source_client_id, uint32_t frame_sequence, uint32_t timestamp) {
381 if (!fb || !fb->rb || !frame_data || frame_size == 0) {
382 return false;
383 }
384
385 // Allocate memory for frame data using buffer pool for better performance
386 char *data_copy = (char *)buffer_pool_alloc(NULL, frame_size);
387 if (!data_copy) {
388 SET_ERRNO(ERROR_MEMORY, "Failed to allocate %zu bytes from buffer pool for multi-source frame", frame_size);
389 return false;
390 }
391
392 // Copy frame data
393 SAFE_MEMCPY(data_copy, frame_size, frame_data, frame_size);
394
395 // Create multi-source frame
396 multi_source_frame_t multi_frame = {.magic = MAGIC_FRAME_VALID,
397 .source_client_id = source_client_id,
398 .frame_sequence = frame_sequence,
399 .timestamp = timestamp,
400 .size = frame_size,
401 .data = data_copy};
402
403 // Thread-safe access to framebuffer
404 mutex_lock(&fb->mutex);
405
406 // Try to write to ring buffer
407 bool success = ringbuffer_write(fb->rb, &multi_frame);
408 if (!success) {
409 // Buffer full, return the buffer to pool
410 buffer_pool_free(NULL, data_copy, frame_size);
411 log_debug("Frame buffer full, dropping multi-source frame from client %u", source_client_id);
412 }
413
414 mutex_unlock(&fb->mutex);
415 return success;
416}
417
418bool framebuffer_read_multi_frame(framebuffer_t *fb, multi_source_frame_t *frame) {
419 if (!fb || !fb->rb || !frame) {
420 return false;
421 }
422
423 // Thread-safe access to framebuffer
424 mutex_lock(&fb->mutex);
425
426 bool result = ringbuffer_read(fb->rb, frame);
427
428 if (result) {
429 // Validate frame magic
430 if (!IS_MAGIC_VALID(frame->magic, FRAME_MAGIC)) {
431 SET_ERRNO(ERROR_INVALID_STATE, "CORRUPTION: Invalid multi-source frame magic 0x%x (expected 0x%x)", frame->magic,
432 FRAME_MAGIC);
433 frame->data = NULL;
434 frame->size = 0;
435 frame->source_client_id = 0;
436 mutex_unlock(&fb->mutex);
437 return false;
438 }
439
440 // Additional validation
441 if (frame->size == 0 || !frame->data) {
442 SET_ERRNO(ERROR_INVALID_STATE, "CORRUPTION: Invalid multi-source frame data (size=%zu, data=%p)", frame->size,
443 frame->data);
444 mutex_unlock(&fb->mutex);
445 return false;
446 }
447 }
448
449 mutex_unlock(&fb->mutex);
450 return result;
451}
452
453bool framebuffer_peek_latest_multi_frame(framebuffer_t *fb, multi_source_frame_t *frame) {
454 if (!fb || !fb->rb || !frame) {
455 return false;
456 }
457
458 // Thread-safe access to framebuffer
459 mutex_lock(&fb->mutex);
460
461 // Use ringbuffer_peek to get the frame without consuming it
462 bool result = ringbuffer_peek(fb->rb, frame);
463
464 if (result) {
465 // Validate frame magic
466 if (!IS_MAGIC_VALID(frame->magic, FRAME_MAGIC)) {
467 SET_ERRNO(ERROR_INVALID_STATE, "CORRUPTION: Invalid multi-source frame magic 0x%x (expected 0x%x) in peek",
468 frame->magic, FRAME_MAGIC);
469 frame->data = NULL;
470 frame->size = 0;
471 frame->source_client_id = 0;
472 mutex_unlock(&fb->mutex);
473 return false;
474 }
475
476 // Additional validation
477 if (frame->size == 0 || !frame->data) {
478 SET_ERRNO(ERROR_INVALID_STATE, "CORRUPTION: Invalid multi-source frame data (size=%zu, data=%p) in peek",
479 frame->size, frame->data);
480 mutex_unlock(&fb->mutex);
481 return false;
482 }
483
484 // IMPORTANT: We need to make a copy of the data since we're not consuming the frame
485 // The original data pointer will remain valid in the ring buffer
486 // Caller is responsible for freeing this copy with buffer_pool_free()
487 char *data_copy;
488 data_copy = (char *)buffer_pool_alloc(NULL, frame->size);
489 if (!data_copy) {
490 SET_ERRNO(ERROR_MEMORY, "Failed to allocate memory for frame data copy in peek");
491 mutex_unlock(&fb->mutex);
492 return false;
493 }
494 SAFE_MEMCPY(data_copy, frame->size, frame->data, frame->size);
495 frame->data = data_copy;
496 }
497
498 mutex_unlock(&fb->mutex);
499 return result;
500}
void buffer_pool_free(buffer_pool_t *pool, void *data, size_t size)
void * buffer_pool_alloc(buffer_pool_t *pool, size_t size)
Definition buffer_pool.c:99
int buffer_size
Size of circular buffer.
Definition grep.c:84
void ringbuffer_clear(ringbuffer_t *rb)
Definition ringbuffer.c:134
void framebuffer_destroy(framebuffer_t *fb)
Definition ringbuffer.c:203
void framebuffer_clear(framebuffer_t *fb)
Definition ringbuffer.c:323
bool framebuffer_read_multi_frame(framebuffer_t *fb, multi_source_frame_t *frame)
Definition ringbuffer.c:418
framebuffer_t * framebuffer_create_multi(size_t capacity)
Definition ringbuffer.c:175
ringbuffer_t * ringbuffer_create(size_t element_size, size_t capacity)
Definition ringbuffer.c:28
framebuffer_t * framebuffer_create(size_t capacity)
Definition ringbuffer.c:147
size_t ringbuffer_size(const ringbuffer_t *rb)
Definition ringbuffer.c:122
bool framebuffer_read_frame(framebuffer_t *fb, frame_t *frame)
Definition ringbuffer.c:276
bool ringbuffer_is_full(const ringbuffer_t *rb)
Definition ringbuffer.c:130
void ringbuffer_destroy(ringbuffer_t *rb)
Definition ringbuffer.c:54
bool ringbuffer_is_empty(const ringbuffer_t *rb)
Definition ringbuffer.c:126
bool framebuffer_write_frame(framebuffer_t *fb, const char *frame_data, size_t frame_size)
Definition ringbuffer.c:222
bool framebuffer_write_multi_frame(framebuffer_t *fb, const char *frame_data, size_t frame_size, uint32_t source_client_id, uint32_t frame_sequence, uint32_t timestamp)
Definition ringbuffer.c:379
bool framebuffer_peek_latest_multi_frame(framebuffer_t *fb, multi_source_frame_t *frame)
Definition ringbuffer.c:453
bool ringbuffer_read(ringbuffer_t *rb, void *data)
Definition ringbuffer.c:83
bool ringbuffer_write(ringbuffer_t *rb, const void *data)
Definition ringbuffer.c:61
bool ringbuffer_peek(ringbuffer_t *rb, void *data)
Definition ringbuffer.c:105
int mutex_init(mutex_t *mutex)
Definition threading.c:16
int mutex_destroy(mutex_t *mutex)
Definition threading.c:21