ascii-chat 0.6.0
Real-time terminal-based video chat with ASCII art conversion
Loading...
Searching...
No Matches
packet_queue.c
Go to the documentation of this file.
1
7#include "packet_queue.h"
8#include "buffer_pool.h"
9#include "common.h"
10#include "util/endian.h"
11#include "asciichat_errno.h"
12#include "network/crc32.h"
13#include <stdatomic.h>
14#include <stdlib.h>
15#include <string.h>
16#ifndef _WIN32
17#include <unistd.h>
18#endif
19
20/* ============================================================================
21 * Memory Pool Implementation
22 * ============================================================================
23 */
24
25node_pool_t *node_pool_create(size_t pool_size) {
26 if (pool_size == 0) {
27 return NULL;
28 }
29
30 node_pool_t *pool;
31 pool = SAFE_MALLOC(sizeof(node_pool_t), node_pool_t *);
32
33 // Allocate all nodes at once
34 pool->nodes = SAFE_MALLOC(sizeof(packet_node_t) * pool_size, packet_node_t *);
35
36 // Link all nodes into free list (using atomic stores for consistency with atomic type)
37 for (size_t i = 0; i < pool_size - 1; i++) {
38 atomic_store_explicit(&pool->nodes[i].next, &pool->nodes[i + 1], memory_order_relaxed);
39 }
40 atomic_store_explicit(&pool->nodes[pool_size - 1].next, (packet_node_t *)NULL, memory_order_relaxed);
41
42 pool->free_list = &pool->nodes[0];
43 pool->pool_size = pool_size;
44 pool->used_count = 0;
45
46 if (mutex_init(&pool->pool_mutex) != 0) {
47 SET_ERRNO(ERROR_PLATFORM_INIT, "Failed to initialize mutex for node pool");
49 return NULL;
50 }
51
52 return pool;
53}
54
56 if (!pool) {
57 return;
58 }
59
61 SAFE_FREE(pool->nodes);
62 SAFE_FREE(pool);
63}
64
66 if (!pool) {
67 // No pool, fallback to malloc
68 packet_node_t *node;
69 node = SAFE_MALLOC(sizeof(packet_node_t), packet_node_t *);
70 return node;
71 }
72
73 mutex_lock(&pool->pool_mutex);
74
75 packet_node_t *node = pool->free_list;
76 if (node) {
77 pool->free_list = atomic_load_explicit(&node->next, memory_order_relaxed);
78 pool->used_count++;
79 atomic_store_explicit(&node->next, (packet_node_t *)NULL, memory_order_relaxed); // Clear next pointer
80 }
81
83
84 if (!node) {
85 // Pool exhausted, fallback to malloc
86 node = SAFE_MALLOC(sizeof(packet_node_t), packet_node_t *);
87 // Remove extra text from format string that had no matching argument
88 log_debug("Memory pool exhausted, falling back to SAFE_MALLOC (used: %zu/%zu)", pool->used_count, pool->pool_size);
89 }
90
91 return node;
92}
93
95 if (!node) {
96 return;
97 }
98
99 if (!pool) {
100 // No pool, just free
101 SAFE_FREE(node);
102 return;
103 }
104
105 // Check if this node is from our pool
106 bool is_pool_node = (node >= pool->nodes && node < pool->nodes + pool->pool_size);
107
108 if (is_pool_node) {
109 mutex_lock(&pool->pool_mutex);
110
111 // Return to free list
112 atomic_store_explicit(&node->next, pool->free_list, memory_order_relaxed);
113 pool->free_list = node;
114 pool->used_count--;
115
116 mutex_unlock(&pool->pool_mutex);
117 } else {
118 // This was malloc'd, so free it
119 SAFE_FREE(node);
120 }
121}
122
123/* ============================================================================
124 * Packet Queue Implementation
125 * ============================================================================
126 */
127
129 return packet_queue_create_with_pool(max_size, 0); // No pool by default
130}
131
132packet_queue_t *packet_queue_create_with_pool(size_t max_size, size_t pool_size) {
133 return packet_queue_create_with_pools(max_size, pool_size, false);
134}
135
136packet_queue_t *packet_queue_create_with_pools(size_t max_size, size_t node_pool_size, bool use_buffer_pool) {
137 packet_queue_t *queue;
138 queue = SAFE_MALLOC(sizeof(packet_queue_t), packet_queue_t *);
139
140 // Initialize atomic fields
141 // For atomic pointer types, use atomic_store with relaxed ordering for initialization
142 atomic_store_explicit(&queue->head, (packet_node_t *)NULL, memory_order_relaxed);
143 atomic_store_explicit(&queue->tail, (packet_node_t *)NULL, memory_order_relaxed);
144 atomic_init(&queue->count, (size_t)0);
145 queue->max_size = max_size;
146 atomic_init(&queue->bytes_queued, (size_t)0);
147
148 // Create memory pools if requested
149 queue->node_pool = node_pool_size > 0 ? node_pool_create(node_pool_size) : NULL;
150 queue->buffer_pool = use_buffer_pool ? buffer_pool_create(0, 0) : NULL;
151
152 // Initialize atomic statistics
153 atomic_init(&queue->packets_enqueued, (uint64_t)0);
154 atomic_init(&queue->packets_dequeued, (uint64_t)0);
155 atomic_init(&queue->packets_dropped, (uint64_t)0);
156 atomic_init(&queue->shutdown, false);
157
158 return queue;
159}
160
162 if (!queue)
163 return;
164
165 // Signal shutdown first
167
168 // Clear any remaining packets
169 packet_queue_clear(queue);
170
171 // Destroy memory pools if present
172 if (queue->node_pool) {
174 }
175 if (queue->buffer_pool) {
176 buffer_pool_log_stats(queue->buffer_pool, "packet_queue");
178 }
179
180 // No mutex/cond to destroy (lock-free design)
181
182 SAFE_FREE(queue);
183}
184
185int packet_queue_enqueue(packet_queue_t *queue, packet_type_t type, const void *data, size_t data_len,
186 uint32_t client_id, bool copy_data) {
187 if (!queue)
188 return -1;
189
190 // Check if shutdown (atomic read with acquire semantics)
191 if (atomic_load_explicit(&queue->shutdown, memory_order_acquire)) {
192 return -1;
193 }
194
195 // Check if queue is full and drop oldest packet if needed (lock-free)
196 size_t current_count = atomic_load_explicit(&queue->count, memory_order_acquire);
197 if (queue->max_size > 0 && current_count >= queue->max_size) {
198 // Drop oldest packet (head) using atomic compare-and-swap
199 packet_node_t *head = atomic_load_explicit(&queue->head, memory_order_acquire);
200 if (head) {
201 packet_node_t *next = atomic_load_explicit(&head->next, memory_order_acquire);
202 // Atomically update head pointer
203 if (atomic_compare_exchange_weak(&queue->head, &head, next)) {
204 // Successfully claimed head node
205 if (next == NULL) {
206 // Queue became empty, also update tail
207 atomic_store_explicit(&queue->tail, (packet_node_t *)NULL, memory_order_release);
208 }
209
210 // Update counters atomically
211 size_t bytes = head->packet.data_len;
212 atomic_fetch_sub(&queue->bytes_queued, bytes);
213 atomic_fetch_sub(&queue->count, (size_t)1);
214 atomic_fetch_add(&queue->packets_dropped, (uint64_t)1);
215
216 // Free dropped packet data
217 if (head->packet.owns_data && head->packet.data) {
219 }
220 node_pool_put(queue->node_pool, head);
221
222 log_debug_every(LOG_RATE_FAST, "Dropped packet from queue (full): type=%d, client=%u", type, client_id);
223 }
224 // If CAS failed, another thread already dequeued - continue to enqueue
225 }
226 }
227
228 // Create new node (use pool if available)
229 packet_node_t *node = node_pool_get(queue->node_pool);
230 if (!node) {
231 SET_ERRNO(ERROR_MEMORY, "Failed to allocate packet node");
232 return -1;
233 }
234
235 // Build packet header
238 node->packet.header.length = HOST_TO_NET_U32((uint32_t)data_len);
239 node->packet.header.client_id = HOST_TO_NET_U32(client_id);
240 // Calculate CRC32 for the data (0 for empty packets)
241 node->packet.header.crc32 = HOST_TO_NET_U32(data_len > 0 ? asciichat_crc32(data, data_len) : 0);
242
243 // Handle data
244 if (data_len > 0 && data) {
245 if (copy_data) {
246 // Try to allocate from buffer pool (local or global)
247 if (queue->buffer_pool) {
248 node->packet.data = buffer_pool_alloc(queue->buffer_pool, data_len);
249 node->packet.buffer_pool = queue->buffer_pool;
250 } else {
251 // Use global pool if no local pool
252 node->packet.data = buffer_pool_alloc(NULL, data_len);
254 }
255 SAFE_MEMCPY(node->packet.data, data_len, data, data_len);
256 node->packet.owns_data = true;
257 } else {
258 // Use the data pointer directly (caller must ensure it stays valid)
259 node->packet.data = (void *)data;
260 node->packet.owns_data = false;
261 node->packet.buffer_pool = NULL;
262 }
263 } else {
264 node->packet.data = NULL;
265 node->packet.owns_data = false;
266 node->packet.buffer_pool = NULL;
267 }
268
269 node->packet.data_len = data_len;
270 atomic_store_explicit(&node->next, (packet_node_t *)NULL, memory_order_relaxed);
271
272 // Add to queue using lock-free CAS-based enqueue (Michael-Scott algorithm)
273 while (true) {
274 packet_node_t *tail = atomic_load_explicit(&queue->tail, memory_order_acquire);
275
276 if (tail == NULL) {
277 // Empty queue - atomically set both head and tail
278 packet_node_t *expected = NULL;
279 if (atomic_compare_exchange_weak_explicit(&queue->head, &expected, node, memory_order_release,
280 memory_order_acquire)) {
281 // Successfully set head (queue was empty)
282 atomic_store_explicit(&queue->tail, node, memory_order_release);
283 break; // Enqueue successful
284 }
285 // CAS failed - another thread initialized queue, retry
286 continue;
287 }
288
289 // Queue is non-empty - try to append to tail
290 packet_node_t *next = atomic_load_explicit(&tail->next, memory_order_acquire);
291 packet_node_t *current_tail = atomic_load_explicit(&queue->tail, memory_order_acquire);
292
293 // Verify tail hasn't changed (ABA problem mitigation)
294 if (tail != current_tail) {
295 // Tail was updated by another thread, retry
296 continue;
297 }
298
299 if (next == NULL) {
300 // Tail is actually the last node - try to link new node
301 packet_node_t *expected_null = NULL;
302 if (atomic_compare_exchange_weak_explicit(&tail->next, &expected_null, node, memory_order_release,
303 memory_order_acquire)) {
304 // Successfully linked node - try to swing tail forward (best-effort, ignore failure)
305 atomic_compare_exchange_weak_explicit(&queue->tail, &tail, node, memory_order_release, memory_order_relaxed);
306 break; // Enqueue successful
307 }
308 // CAS failed - another thread appended to tail, retry
309 } else {
310 // Tail is lagging behind - help advance it
311 atomic_compare_exchange_weak_explicit(&queue->tail, &tail, next, memory_order_release, memory_order_relaxed);
312 // Retry with new tail
313 }
314 }
315
316 // Update counters atomically
317 atomic_fetch_add(&queue->count, (size_t)1);
318 atomic_fetch_add(&queue->bytes_queued, data_len);
319 atomic_fetch_add(&queue->packets_enqueued, (uint64_t)1);
320
321 return 0;
322}
323
325 if (!queue || !packet) {
326 SET_ERRNO(ERROR_INVALID_PARAM, "Invalid parameters: queue=%p, packet=%p", queue, packet);
327 return -1;
328 }
329
330 // Validate packet before enqueueing
331 if (!packet_queue_validate_packet(packet)) {
332 SET_ERRNO(ERROR_INVALID_PARAM, "Refusing to enqueue invalid packet");
333 return -1;
334 }
335
336 // Check if shutdown (atomic read with acquire semantics)
337 if (atomic_load_explicit(&queue->shutdown, memory_order_acquire)) {
338 return -1;
339 }
340
341 // Check if queue is full and drop oldest packet if needed (lock-free)
342 size_t current_count = atomic_load_explicit(&queue->count, memory_order_acquire);
343 if (queue->max_size > 0 && current_count >= queue->max_size) {
344 // Drop oldest packet (head) using atomic compare-and-swap
345 packet_node_t *head = atomic_load_explicit(&queue->head, memory_order_acquire);
346 if (head) {
347 packet_node_t *next = atomic_load_explicit(&head->next, memory_order_acquire);
348 // Atomically update head pointer
349 if (atomic_compare_exchange_weak(&queue->head, &head, next)) {
350 // Successfully claimed head node
351 if (next == NULL) {
352 // Queue became empty, also update tail
353 atomic_store_explicit(&queue->tail, (packet_node_t *)NULL, memory_order_release);
354 }
355
356 // Update counters atomically
357 size_t bytes = head->packet.data_len;
358 atomic_fetch_sub(&queue->bytes_queued, bytes);
359 atomic_fetch_sub(&queue->count, (size_t)1);
360 atomic_fetch_add(&queue->packets_dropped, (uint64_t)1);
361
362 // Free dropped packet data
363 if (head->packet.owns_data && head->packet.data) {
365 }
366 node_pool_put(queue->node_pool, head);
367 }
368 // If CAS failed, another thread already dequeued - continue to enqueue
369 }
370 }
371
372 // Create new node (use pool if available)
373 packet_node_t *node = node_pool_get(queue->node_pool);
374 if (!node) {
375 SET_ERRNO(ERROR_MEMORY, "Failed to allocate packet node");
376 return -1;
377 }
378
379 // Copy the packet header
380 SAFE_MEMCPY(&node->packet, sizeof(queued_packet_t), packet, sizeof(queued_packet_t));
381
382 // Deep copy the data if needed
383 if (packet->data && packet->data_len > 0 && packet->owns_data) {
384 // If the packet owns its data, we need to make a copy
385 // Try to allocate from buffer pool (local or global)
386 void *data_copy;
387 if (queue->buffer_pool) {
388 data_copy = buffer_pool_alloc(queue->buffer_pool, packet->data_len);
389 node->packet.buffer_pool = queue->buffer_pool;
390 } else {
391 // Use global pool if no local pool
392 data_copy = buffer_pool_alloc(NULL, packet->data_len);
394 }
395 SAFE_MEMCPY(data_copy, packet->data_len, packet->data, packet->data_len);
396 node->packet.data = data_copy;
397 node->packet.owns_data = true;
398 } else {
399 // Either no data or packet doesn't own it (shared reference is OK)
400 node->packet.data = packet->data;
401 node->packet.owns_data = packet->owns_data;
402 node->packet.buffer_pool = packet->buffer_pool; // Preserve original pool reference
403 }
404
405 atomic_store_explicit(&node->next, (packet_node_t *)NULL, memory_order_relaxed);
406
407 // Add to queue using lock-free CAS-based enqueue (Michael-Scott algorithm)
408 while (true) {
409 packet_node_t *tail = atomic_load_explicit(&queue->tail, memory_order_acquire);
410
411 if (tail == NULL) {
412 // Empty queue - atomically set both head and tail
413 packet_node_t *expected = NULL;
414 if (atomic_compare_exchange_weak_explicit(&queue->head, &expected, node, memory_order_release,
415 memory_order_acquire)) {
416 // Successfully set head (queue was empty)
417 atomic_store_explicit(&queue->tail, node, memory_order_release);
418 break; // Enqueue successful
419 }
420 // CAS failed - another thread initialized queue, retry
421 continue;
422 }
423
424 // Queue is non-empty - try to append to tail
425 packet_node_t *next = atomic_load_explicit(&tail->next, memory_order_acquire);
426 packet_node_t *current_tail = atomic_load_explicit(&queue->tail, memory_order_acquire);
427
428 // Verify tail hasn't changed (ABA problem mitigation)
429 if (tail != current_tail) {
430 // Tail was updated by another thread, retry
431 continue;
432 }
433
434 if (next == NULL) {
435 // Tail is actually the last node - try to link new node
436 packet_node_t *expected_null = NULL;
437 if (atomic_compare_exchange_weak_explicit(&tail->next, &expected_null, node, memory_order_release,
438 memory_order_acquire)) {
439 // Successfully linked node - try to swing tail forward (best-effort, ignore failure)
440 atomic_compare_exchange_weak_explicit(&queue->tail, &tail, node, memory_order_release, memory_order_relaxed);
441 break; // Enqueue successful
442 }
443 // CAS failed - another thread appended to tail, retry
444 } else {
445 // Tail is lagging behind - help advance it
446 atomic_compare_exchange_weak_explicit(&queue->tail, &tail, next, memory_order_release, memory_order_relaxed);
447 // Retry with new tail
448 }
449 }
450
451 // Update counters atomically
452 atomic_fetch_add(&queue->count, (size_t)1);
453 atomic_fetch_add(&queue->bytes_queued, packet->data_len);
454 atomic_fetch_add(&queue->packets_enqueued, (uint64_t)1);
455
456 return 0;
457}
458
460 // Non-blocking dequeue (same as try_dequeue for lock-free design)
461 return packet_queue_try_dequeue(queue);
462}
463
465 if (!queue)
466 return NULL;
467
468 // Check if shutdown (atomic read with acquire semantics)
469 if (atomic_load_explicit(&queue->shutdown, memory_order_acquire)) {
470 return NULL;
471 }
472
473 // Check if queue is empty (atomic read with acquire semantics)
474 size_t current_count = atomic_load_explicit(&queue->count, memory_order_acquire);
475 if (current_count == 0) {
476 return NULL;
477 }
478
479 // Remove from head atomically (lock-free dequeue)
480 packet_node_t *head = atomic_load_explicit(&queue->head, memory_order_acquire);
481 if (!head) {
482 return NULL;
483 }
484
485 // Atomically update head pointer
486 packet_node_t *next = atomic_load_explicit(&head->next, memory_order_acquire);
487 if (atomic_compare_exchange_weak(&queue->head, &head, next)) {
488 // Successfully claimed head node
489 if (next == NULL) {
490 // Queue became empty, also update tail atomically
491 atomic_store_explicit(&queue->tail, (packet_node_t *)NULL, memory_order_release);
492 }
493
494 // Update counters atomically
495 size_t bytes = head->packet.data_len;
496 atomic_fetch_sub(&queue->bytes_queued, bytes);
497 atomic_fetch_sub(&queue->count, (size_t)1);
498 atomic_fetch_add(&queue->packets_dequeued, (uint64_t)1);
499
500 // Verify packet magic number for corruption detection
502 if (magic != PACKET_MAGIC) {
503 SET_ERRNO(ERROR_BUFFER, "CORRUPTION: Invalid magic in try_dequeued packet: 0x%x (expected 0x%x), type=%u", magic,
505 // Still return node to pool but don't return corrupted packet
506 node_pool_put(queue->node_pool, head);
507 return NULL;
508 }
509
510 // Validate CRC if there's data
511 if (head->packet.data_len > 0 && head->packet.data) {
512 uint32_t expected_crc = NET_TO_HOST_U32(head->packet.header.crc32);
513 uint32_t actual_crc = asciichat_crc32(head->packet.data, head->packet.data_len);
514 if (actual_crc != expected_crc) {
516 "CORRUPTION: CRC mismatch in try_dequeued packet: got 0x%x, expected 0x%x, type=%u, len=%zu",
517 actual_crc, expected_crc, NET_TO_HOST_U16(head->packet.header.type), head->packet.data_len);
518 // Free data if packet owns it
519 if (head->packet.owns_data && head->packet.data) {
520 // Use buffer_pool_free for global pool allocations, buffer_pool_free for local pools
521 if (head->packet.buffer_pool) {
523 } else {
524 // This was allocated from global pool or malloc, use buffer_pool_free which handles both
525 buffer_pool_free(NULL, head->packet.data, head->packet.data_len);
526 }
527 // CRITICAL: Clear pointer to prevent double-free when packet is copied later
528 head->packet.data = NULL;
529 head->packet.owns_data = false;
530 }
531 node_pool_put(queue->node_pool, head);
532 return NULL;
533 }
534 }
535
536 // Extract packet and return node to pool
537 queued_packet_t *packet;
538 packet = SAFE_MALLOC(sizeof(queued_packet_t), queued_packet_t *);
539 SAFE_MEMCPY(packet, sizeof(queued_packet_t), &head->packet, sizeof(queued_packet_t));
540 node_pool_put(queue->node_pool, head);
541 return packet;
542 }
543
544 // CAS failed - another thread dequeued, retry if needed (or return NULL for non-blocking)
545 return NULL;
546}
547
549 if (!packet)
550 return;
551
552 // Check if packet was already freed (detect double-free)
553 if (packet->header.magic != HOST_TO_NET_U32(PACKET_MAGIC)) {
554 log_warn("Attempted double-free of packet (magic=0x%x, expected=0x%x)", NET_TO_HOST_U32(packet->header.magic),
556 return;
557 }
558
559 if (packet->owns_data && packet->data) {
560 // Return to appropriate pool or free
561 if (packet->buffer_pool) {
562 buffer_pool_free(packet->buffer_pool, packet->data, packet->data_len);
563 } else {
564 // This was allocated from global pool or malloc, use buffer_pool_free which handles both
565 buffer_pool_free(NULL, packet->data, packet->data_len);
566 }
567 }
568
569 // Mark as freed to detect future double-free attempts
570 // Use network byte order for consistency on big-endian systems
571 packet->header.magic = HOST_TO_NET_U32(0xBEEFDEAD); // Different magic in network byte order
572 SAFE_FREE(packet);
573}
574
576 if (!queue)
577 return 0;
578
579 // Lock-free atomic read
580 return atomic_load_explicit(&queue->count, memory_order_acquire);
581}
582
584 return packet_queue_size(queue) == 0;
585}
586
588 if (!queue || queue->max_size == 0)
589 return false;
590
591 // Lock-free atomic read
592 size_t count = atomic_load_explicit(&queue->count, memory_order_acquire);
593 return (count >= queue->max_size);
594}
595
597 if (!queue) {
598 SET_ERRNO(ERROR_INVALID_PARAM, "Invalid parameters: queue=%p", queue);
599 return;
600 }
601
602 // Lock-free atomic store (release semantics ensures visibility to other threads)
603 atomic_store_explicit(&queue->shutdown, true, memory_order_release);
604}
605
607 if (!queue)
608 return;
609
610 // Lock-free clear: drain queue by repeatedly dequeuing until empty
611 queued_packet_t *packet;
612 while ((packet = packet_queue_try_dequeue(queue)) != NULL) {
614 }
615}
616
617void packet_queue_get_stats(packet_queue_t *queue, uint64_t *enqueued, uint64_t *dequeued, uint64_t *dropped) {
618 if (!queue)
619 return;
620
621 // Lock-free atomic reads (acquire semantics for consistency)
622 if (enqueued)
623 *enqueued = atomic_load_explicit(&queue->packets_enqueued, memory_order_acquire);
624 if (dequeued)
625 *dequeued = atomic_load_explicit(&queue->packets_dequeued, memory_order_acquire);
626 if (dropped)
627 *dropped = atomic_load_explicit(&queue->packets_dropped, memory_order_acquire);
628}
629
631 if (!packet) {
632 return false;
633 }
634
635 // Check magic number
636 uint32_t magic = NET_TO_HOST_U32(packet->header.magic);
637 if (magic != PACKET_MAGIC) {
638 SET_ERRNO(ERROR_BUFFER, "Invalid packet magic: 0x%x (expected 0x%x)", magic, PACKET_MAGIC);
639 return false;
640 }
641
642 // Check packet type is valid
643 uint16_t type = NET_TO_HOST_U16(packet->header.type);
644 if (type < PACKET_TYPE_ASCII_FRAME || type > PACKET_TYPE_AUDIO_BATCH) {
645 SET_ERRNO(ERROR_BUFFER, "Invalid packet type: %u", type);
646 return false;
647 }
648
649 // Check length matches data_len
650 uint32_t length = NET_TO_HOST_U32(packet->header.length);
651 if (length != packet->data_len) {
652 SET_ERRNO(ERROR_BUFFER, "Packet length mismatch: header says %u, data_len is %zu", length, packet->data_len);
653 return false;
654 }
655
656 // Check CRC if there's data
657 if (packet->data_len > 0 && packet->data) {
658 uint32_t expected_crc = NET_TO_HOST_U32(packet->header.crc32);
659 uint32_t actual_crc = asciichat_crc32(packet->data, packet->data_len);
660 if (actual_crc != expected_crc) {
661 SET_ERRNO(ERROR_BUFFER, "Packet CRC mismatch: got 0x%x, expected 0x%x", actual_crc, expected_crc);
662 return false;
663 }
664 }
665
666 return true;
667}
⚠️‼️ Error and/or exit() when things go bad.
🗃️ Lock-Free Unified Memory Buffer Pool with Lazy Allocation
Hardware-Accelerated CRC32 Checksum Computation.
🔄 Network byte order conversion helpers
#define HOST_TO_NET_U16(val)
Definition endian.h:101
#define HOST_TO_NET_U32(val)
Definition endian.h:71
#define NET_TO_HOST_U16(val)
Definition endian.h:116
#define NET_TO_HOST_U32(val)
Definition endian.h:86
buffer_pool_t * buffer_pool_get_global(void)
void buffer_pool_destroy(buffer_pool_t *pool)
Destroy a buffer pool and free all memory.
void buffer_pool_log_stats(buffer_pool_t *pool, const char *name)
Log pool statistics.
void buffer_pool_free(buffer_pool_t *pool, void *data, size_t size)
Free a buffer back to the pool (lock-free)
buffer_pool_t * buffer_pool_create(size_t max_bytes, uint64_t shrink_delay_ms)
Create a new buffer pool.
Definition buffer_pool.c:70
void * buffer_pool_alloc(buffer_pool_t *pool, size_t size)
Allocate a buffer from the pool (lock-free fast path)
unsigned short uint16_t
Definition common.h:57
unsigned int uint32_t
Definition common.h:58
#define SAFE_FREE(ptr)
Definition common.h:320
#define SAFE_MALLOC(size, cast)
Definition common.h:208
unsigned long long uint64_t
Definition common.h:59
#define SAFE_MEMCPY(dest, dest_size, src, count)
Definition common.h:388
#define SET_ERRNO(code, context_msg,...)
Set error code with custom context message and log it.
@ ERROR_PLATFORM_INIT
Definition error_codes.h:57
@ ERROR_MEMORY
Definition error_codes.h:53
@ ERROR_INVALID_PARAM
@ ERROR_BUFFER
Definition error_codes.h:96
#define LOG_RATE_FAST
Log rate limit: 1 second (1,000,000 microseconds)
Definition log_rates.h:26
#define log_warn(...)
Log a WARN message.
#define log_debug_every(interval_us, fmt,...)
Rate-limited DEBUG logging.
#define log_debug(...)
Log a DEBUG message.
uint32_t magic
Magic number (PACKET_MAGIC) for packet validation.
Definition packet.h:492
uint32_t client_id
Client ID (0 = server, >0 = client identifier)
Definition packet.h:500
uint32_t length
Payload data length in bytes (0 for header-only packets)
Definition packet.h:496
uint16_t type
Packet type (packet_type_t enumeration)
Definition packet.h:494
uint32_t crc32
CRC32 checksum of payload data (0 if length == 0)
Definition packet.h:498
_Atomic size_t count
Number of packets currently in queue - atomic for lock-free access.
void packet_queue_shutdown(packet_queue_t *queue)
Signal queue shutdown (causes dequeue to return NULL)
size_t max_size
Maximum queue size (0 = unlimited)
size_t used_count
Number of nodes currently in use.
void packet_queue_clear(packet_queue_t *queue)
Clear all packets from queue.
_Atomic uint64_t packets_dropped
Total packets dropped due to queue full (statistics) - atomic for lock-free access.
buffer_pool_t * buffer_pool
Optional memory pool for data buffers (NULL = use malloc/free)
size_t pool_size
Total number of nodes in pool.
packet_queue_t * packet_queue_create(size_t max_size)
Create a new packet queue.
packet_node_t * node_pool_get(node_pool_t *pool)
Get a free node from the pool.
_Atomic uint64_t packets_dequeued
Total packets dequeued (statistics) - atomic for lock-free access.
packet_queue_t * packet_queue_create_with_pools(size_t max_size, size_t node_pool_size, bool use_buffer_pool)
Create a packet queue with both node and buffer pools.
void packet_queue_get_stats(packet_queue_t *queue, uint64_t *enqueued, uint64_t *dequeued, uint64_t *dropped)
Get queue statistics.
bool owns_data
If true, free data when packet is freed.
bool packet_queue_is_empty(packet_queue_t *queue)
Check if queue is empty.
size_t data_len
Length of payload data in bytes.
void packet_queue_free_packet(queued_packet_t *packet)
Free a dequeued packet.
_Atomic size_t bytes_queued
Total bytes of data queued (for monitoring) - atomic for lock-free access.
packet_node_t * nodes
Pre-allocated array of all nodes.
int packet_queue_enqueue_packet(packet_queue_t *queue, const queued_packet_t *packet)
Enqueue a pre-built packet (for special cases like compressed frames)
packet_queue_t * packet_queue_create_with_pool(size_t max_size, size_t pool_size)
Create a packet queue with node pool.
int packet_queue_enqueue(packet_queue_t *queue, packet_type_t type, const void *data, size_t data_len, uint32_t client_id, bool copy_data)
Enqueue a packet into the queue.
_Atomic bool shutdown
Shutdown flag (true = dequeue returns NULL) - atomic for lock-free access.
packet_header_t header
Complete packet header (already in network byte order)
bool packet_queue_is_full(packet_queue_t *queue)
Check if queue is full.
queued_packet_t * packet_queue_try_dequeue(packet_queue_t *queue)
Try to dequeue a packet without blocking.
void node_pool_put(node_pool_t *pool, packet_node_t *node)
Return a node to the pool.
buffer_pool_t * buffer_pool
Pool that allocated the data (NULL if malloc'd)
mutex_t pool_mutex
Mutex protecting free list access.
void node_pool_destroy(node_pool_t *pool)
Destroy a node pool and free all memory.
void packet_queue_destroy(packet_queue_t *queue)
Destroy a packet queue and free all resources.
_Atomic uint64_t packets_enqueued
Total packets enqueued (statistics) - atomic for lock-free access.
queued_packet_t packet
The queued packet data.
bool packet_queue_validate_packet(const queued_packet_t *packet)
Validate packet integrity.
queued_packet_t * packet_queue_dequeue(packet_queue_t *queue)
Dequeue a packet from the queue (non-blocking)
size_t packet_queue_size(packet_queue_t *queue)
Get current number of packets in queue.
packet_node_t * free_list
Stack of free nodes (LIFO for cache locality)
void * data
Packet payload data (can be NULL for header-only packets)
node_pool_t * node_pool
Optional memory pool for nodes (NULL = use malloc/free)
node_pool_t * node_pool_create(size_t pool_size)
Create a memory pool for packet queue nodes.
packet_type_t
Network protocol packet type enumeration.
Definition packet.h:281
#define PACKET_MAGIC
Packet magic number (0xDEADBEEF)
Definition packet.h:251
@ PACKET_TYPE_AUDIO_BATCH
Batched audio packets for efficiency.
Definition packet.h:343
int mutex_init(mutex_t *mutex)
Initialize a mutex.
#define mutex_lock(mutex)
Lock a mutex (with debug tracking in debug builds)
Definition mutex.h:140
#define mutex_unlock(mutex)
Unlock a mutex (with debug tracking in debug builds)
Definition mutex.h:175
int mutex_destroy(mutex_t *mutex)
Destroy a mutex.
#define asciichat_crc32(data, len)
Main CRC32 dispatcher macro - use this in application code.
Definition crc32.h:144
📬 Thread-safe packet queue system for per-client send threads
Memory pool for packet nodes to reduce malloc/free overhead.
Node in the packet queue linked list.
Thread-safe packet queue for producer-consumer communication.
Single packet ready to send (header already in network byte order)