ascii-chat 0.8.38
Real-time terminal-based video chat with ASCII art conversion
Loading...
Searching...
No Matches
packet_queue.c File Reference

📬 Lock-free packet queue with per-client isolation and memory pooling More...

Go to the source code of this file.

Functions

node_pool_t * node_pool_create (size_t pool_size)
 
void node_pool_destroy (node_pool_t *pool)
 
packet_node_t * node_pool_get (node_pool_t *pool)
 
void node_pool_put (node_pool_t *pool, packet_node_t *node)
 
packet_queue_t * packet_queue_create (size_t max_size)
 
packet_queue_t * packet_queue_create_with_pool (size_t max_size, size_t pool_size)
 
packet_queue_t * packet_queue_create_with_pools (size_t max_size, size_t node_pool_size, bool use_buffer_pool)
 
void packet_queue_destroy (packet_queue_t *queue)
 
int packet_queue_enqueue (packet_queue_t *queue, packet_type_t type, const void *data, size_t data_len, uint32_t client_id, bool copy_data)
 
int packet_queue_enqueue_packet (packet_queue_t *queue, const queued_packet_t *packet)
 
queued_packet_t * packet_queue_dequeue (packet_queue_t *queue)
 
queued_packet_t * packet_queue_try_dequeue (packet_queue_t *queue)
 
void packet_queue_free_packet (queued_packet_t *packet)
 
size_t packet_queue_size (packet_queue_t *queue)
 
bool packet_queue_is_empty (packet_queue_t *queue)
 
bool packet_queue_is_full (packet_queue_t *queue)
 
void packet_queue_stop (packet_queue_t *queue)
 
void packet_queue_clear (packet_queue_t *queue)
 
void packet_queue_get_stats (packet_queue_t *queue, uint64_t *enqueued, uint64_t *dequeued, uint64_t *dropped)
 
bool packet_queue_validate_packet (const queued_packet_t *packet)
 

Detailed Description

📬 Lock-free packet queue with per-client isolation and memory pooling

Definition in file packet_queue.c.

Function Documentation

◆ node_pool_create()

node_pool_t * node_pool_create ( size_t  pool_size)

Definition at line 25 of file packet_queue.c.

25 {
26 if (pool_size == 0) {
27 return NULL;
28 }
29
30 node_pool_t *pool;
31 pool = SAFE_MALLOC(sizeof(node_pool_t), node_pool_t *);
32
33 // Allocate all nodes at once
34 pool->nodes = SAFE_MALLOC(sizeof(packet_node_t) * pool_size, packet_node_t *);
35
36 // Link all nodes into free list (using atomic stores for consistency with atomic type)
37 for (size_t i = 0; i < pool_size - 1; i++) {
38 atomic_store_explicit(&pool->nodes[i].next, &pool->nodes[i + 1], memory_order_relaxed);
39 }
40 atomic_store_explicit(&pool->nodes[pool_size - 1].next, (packet_node_t *)NULL, memory_order_relaxed);
41
42 pool->free_list = &pool->nodes[0];
43 pool->pool_size = pool_size;
44 pool->used_count = 0;
45
46 if (mutex_init(&pool->pool_mutex) != 0) {
47 SET_ERRNO(ERROR_PLATFORM_INIT, "Failed to initialize mutex for node pool");
49 return NULL;
50 }
51
52 return pool;
53}
void node_pool_destroy(node_pool_t *pool)
int mutex_init(mutex_t *mutex)
Definition threading.c:16

References mutex_init(), and node_pool_destroy().

Referenced by packet_queue_create_with_pools().

◆ node_pool_destroy()

void node_pool_destroy ( node_pool_t *  pool)

Definition at line 55 of file packet_queue.c.

55 {
56 if (!pool) {
57 return;
58 }
59
60 mutex_destroy(&pool->pool_mutex);
61 SAFE_FREE(pool->nodes);
62 SAFE_FREE(pool);
63}
int mutex_destroy(mutex_t *mutex)
Definition threading.c:21

References mutex_destroy().

Referenced by node_pool_create(), and packet_queue_destroy().

◆ node_pool_get()

packet_node_t * node_pool_get ( node_pool_t *  pool)

Definition at line 65 of file packet_queue.c.

65 {
66 if (!pool) {
67 // No pool, fallback to malloc
68 packet_node_t *node;
69 node = SAFE_MALLOC(sizeof(packet_node_t), packet_node_t *);
70 return node;
71 }
72
73 mutex_lock(&pool->pool_mutex);
74
75 packet_node_t *node = pool->free_list;
76 if (node) {
77 pool->free_list = atomic_load_explicit(&node->next, memory_order_relaxed);
78 pool->used_count++;
79 atomic_store_explicit(&node->next, (packet_node_t *)NULL, memory_order_relaxed); // Clear next pointer
80 }
81
82 mutex_unlock(&pool->pool_mutex);
83
84 if (!node) {
85 // Pool exhausted, fallback to malloc
86 node = SAFE_MALLOC(sizeof(packet_node_t), packet_node_t *);
87 // Remove extra text from format string that had no matching argument
88 log_debug("Memory pool exhausted, falling back to SAFE_MALLOC (used: %zu/%zu)", pool->used_count, pool->pool_size);
89 }
90
91 return node;
92}

Referenced by packet_queue_enqueue(), and packet_queue_enqueue_packet().

◆ node_pool_put()

void node_pool_put ( node_pool_t *  pool,
packet_node_t *  node 
)

Definition at line 94 of file packet_queue.c.

94 {
95 if (!node) {
96 return;
97 }
98
99 if (!pool) {
100 // No pool, just free
101 SAFE_FREE(node);
102 return;
103 }
104
105 // Check if this node is from our pool
106 bool is_pool_node = (node >= pool->nodes && node < pool->nodes + pool->pool_size);
107
108 if (is_pool_node) {
109 mutex_lock(&pool->pool_mutex);
110
111 // Return to free list
112 atomic_store_explicit(&node->next, pool->free_list, memory_order_relaxed);
113 pool->free_list = node;
114 pool->used_count--;
115
116 mutex_unlock(&pool->pool_mutex);
117 } else {
118 // This was malloc'd, so free it
119 SAFE_FREE(node);
120 }
121}

Referenced by packet_queue_enqueue(), packet_queue_enqueue_packet(), and packet_queue_try_dequeue().

◆ packet_queue_clear()

void packet_queue_clear ( packet_queue_t *  queue)

Definition at line 606 of file packet_queue.c.

606 {
607 if (!queue)
608 return;
609
610 // Lock-free clear: drain queue by repeatedly dequeuing until empty
611 queued_packet_t *packet;
612 while ((packet = packet_queue_try_dequeue(queue)) != NULL) {
614 }
615}
void packet_queue_free_packet(queued_packet_t *packet)
queued_packet_t * packet_queue_try_dequeue(packet_queue_t *queue)

References packet_queue_free_packet(), and packet_queue_try_dequeue().

Referenced by packet_queue_destroy().

◆ packet_queue_create()

packet_queue_t * packet_queue_create ( size_t  max_size)

Definition at line 128 of file packet_queue.c.

128 {
129 return packet_queue_create_with_pool(max_size, 0); // No pool by default
130}
packet_queue_t * packet_queue_create_with_pool(size_t max_size, size_t pool_size)

References packet_queue_create_with_pool().

◆ packet_queue_create_with_pool()

packet_queue_t * packet_queue_create_with_pool ( size_t  max_size,
size_t  pool_size 
)

Definition at line 132 of file packet_queue.c.

132 {
133 return packet_queue_create_with_pools(max_size, pool_size, false);
134}
packet_queue_t * packet_queue_create_with_pools(size_t max_size, size_t node_pool_size, bool use_buffer_pool)

References packet_queue_create_with_pools().

Referenced by packet_queue_create().

◆ packet_queue_create_with_pools()

packet_queue_t * packet_queue_create_with_pools ( size_t  max_size,
size_t  node_pool_size,
bool  use_buffer_pool 
)

Definition at line 136 of file packet_queue.c.

136 {
137 packet_queue_t *queue;
138 queue = SAFE_MALLOC(sizeof(packet_queue_t), packet_queue_t *);
139
140 // Initialize atomic fields
141 // For atomic pointer types, use atomic_store with relaxed ordering for initialization
142 atomic_store_explicit(&queue->head, (packet_node_t *)NULL, memory_order_relaxed);
143 atomic_store_explicit(&queue->tail, (packet_node_t *)NULL, memory_order_relaxed);
144 atomic_init(&queue->count, (size_t)0);
145 queue->max_size = max_size;
146 atomic_init(&queue->bytes_queued, (size_t)0);
147
148 // Create memory pools if requested
149 queue->node_pool = node_pool_size > 0 ? node_pool_create(node_pool_size) : NULL;
150 queue->buffer_pool = use_buffer_pool ? buffer_pool_create(0, 0) : NULL;
151
152 // Initialize atomic statistics
153 atomic_init(&queue->packets_enqueued, (uint64_t)0);
154 atomic_init(&queue->packets_dequeued, (uint64_t)0);
155 atomic_init(&queue->packets_dropped, (uint64_t)0);
156 atomic_init(&queue->shutdown, false);
157
158 return queue;
159}
buffer_pool_t * buffer_pool_create(size_t max_bytes, uint64_t shrink_delay_ns)
Definition buffer_pool.c:48
node_pool_t * node_pool_create(size_t pool_size)

References buffer_pool_create(), and node_pool_create().

Referenced by add_client(), add_webrtc_client(), and packet_queue_create_with_pool().

◆ packet_queue_dequeue()

queued_packet_t * packet_queue_dequeue ( packet_queue_t *  queue)

Definition at line 459 of file packet_queue.c.

459 {
460 // Non-blocking dequeue (same as try_dequeue for lock-free design)
461 return packet_queue_try_dequeue(queue);
462}

References packet_queue_try_dequeue().

◆ packet_queue_destroy()

void packet_queue_destroy ( packet_queue_t *  queue)

Definition at line 161 of file packet_queue.c.

161 {
162 if (!queue)
163 return;
164
165 // Signal shutdown first
166 packet_queue_stop(queue);
167
168 // Clear any remaining packets
169 packet_queue_clear(queue);
170
171 // Destroy memory pools if present
172 if (queue->node_pool) {
173 node_pool_destroy(queue->node_pool);
174 }
175 if (queue->buffer_pool) {
176 buffer_pool_log_stats(queue->buffer_pool, "packet_queue");
177 buffer_pool_destroy(queue->buffer_pool);
178 }
179
180 // No mutex/cond to destroy (lock-free design)
181
182 SAFE_FREE(queue);
183}
void buffer_pool_destroy(buffer_pool_t *pool)
Definition buffer_pool.c:83
void buffer_pool_log_stats(buffer_pool_t *pool, const char *name)
void packet_queue_clear(packet_queue_t *queue)
void packet_queue_stop(packet_queue_t *queue)

References buffer_pool_destroy(), buffer_pool_log_stats(), node_pool_destroy(), packet_queue_clear(), and packet_queue_stop().

Referenced by add_client(), and cleanup_client_packet_queues().

◆ packet_queue_enqueue()

int packet_queue_enqueue ( packet_queue_t *  queue,
packet_type_t  type,
const void *  data,
size_t  data_len,
uint32_t  client_id,
bool  copy_data 
)

Definition at line 185 of file packet_queue.c.

186 {
187 if (!queue)
188 return -1;
189
190 // Check if shutdown (atomic read with acquire semantics)
191 if (atomic_load_explicit(&queue->shutdown, memory_order_acquire)) {
192 return -1;
193 }
194
195 // Check if queue is full and drop oldest packet if needed (lock-free)
196 size_t current_count = atomic_load_explicit(&queue->count, memory_order_acquire);
197 if (queue->max_size > 0 && current_count >= queue->max_size) {
198 // Drop oldest packet (head) using atomic compare-and-swap
199 packet_node_t *head = atomic_load_explicit(&queue->head, memory_order_acquire);
200 if (head) {
201 packet_node_t *next = atomic_load_explicit(&head->next, memory_order_acquire);
202 // Atomically update head pointer
203 if (atomic_compare_exchange_weak(&queue->head, &head, next)) {
204 // Successfully claimed head node
205 if (next == NULL) {
206 // Queue became empty, also update tail
207 atomic_store_explicit(&queue->tail, (packet_node_t *)NULL, memory_order_release);
208 }
209
210 // Update counters atomically
211 size_t bytes = head->packet.data_len;
212 atomic_fetch_sub(&queue->bytes_queued, bytes);
213 atomic_fetch_sub(&queue->count, (size_t)1);
214 atomic_fetch_add(&queue->packets_dropped, (uint64_t)1);
215
216 // Free dropped packet data
217 if (head->packet.owns_data && head->packet.data) {
218 buffer_pool_free(head->packet.buffer_pool, head->packet.data, head->packet.data_len);
219 }
220 node_pool_put(queue->node_pool, head);
221
222 log_dev_every(4500 * US_PER_MS_INT, "Dropped packet from queue (full): type=%d, client=%u", type, client_id);
223 }
224 // If CAS failed, another thread already dequeued - continue to enqueue
225 }
226 }
227
228 // Create new node (use pool if available)
229 packet_node_t *node = node_pool_get(queue->node_pool);
230 if (!node) {
231 SET_ERRNO(ERROR_MEMORY, "Failed to allocate packet node");
232 return -1;
233 }
234
235 // Build packet header
236 node->packet.header.magic = HOST_TO_NET_U64(PACKET_MAGIC);
237 node->packet.header.type = HOST_TO_NET_U16((uint16_t)type);
238 node->packet.header.length = HOST_TO_NET_U32((uint32_t)data_len);
239 node->packet.header.client_id = HOST_TO_NET_U32(client_id);
240 // Calculate CRC32 for the data (0 for empty packets)
241 node->packet.header.crc32 = HOST_TO_NET_U32(data_len > 0 ? asciichat_crc32(data, data_len) : 0);
242
243 // Handle data
244 if (data_len > 0 && data) {
245 if (copy_data) {
246 // Try to allocate from buffer pool (local or global)
247 if (queue->buffer_pool) {
248 node->packet.data = buffer_pool_alloc(queue->buffer_pool, data_len);
249 node->packet.buffer_pool = queue->buffer_pool;
250 } else {
251 // Use global pool if no local pool
252 node->packet.data = buffer_pool_alloc(NULL, data_len);
253 node->packet.buffer_pool = buffer_pool_get_global();
254 }
255 SAFE_MEMCPY(node->packet.data, data_len, data, data_len);
256 node->packet.owns_data = true;
257 } else {
258 // Use the data pointer directly (caller must ensure it stays valid)
259 node->packet.data = (void *)data;
260 node->packet.owns_data = false;
261 node->packet.buffer_pool = NULL;
262 }
263 } else {
264 node->packet.data = NULL;
265 node->packet.owns_data = false;
266 node->packet.buffer_pool = NULL;
267 }
268
269 node->packet.data_len = data_len;
270 atomic_store_explicit(&node->next, (packet_node_t *)NULL, memory_order_relaxed);
271
272 // Add to queue using lock-free CAS-based enqueue (Michael-Scott algorithm)
273 while (true) {
274 packet_node_t *tail = atomic_load_explicit(&queue->tail, memory_order_acquire);
275
276 if (tail == NULL) {
277 // Empty queue - atomically set both head and tail
278 packet_node_t *expected = NULL;
279 if (atomic_compare_exchange_weak_explicit(&queue->head, &expected, node, memory_order_release,
280 memory_order_acquire)) {
281 // Successfully set head (queue was empty)
282 atomic_store_explicit(&queue->tail, node, memory_order_release);
283 break; // Enqueue successful
284 }
285 // CAS failed - another thread initialized queue, retry
286 continue;
287 }
288
289 // Queue is non-empty - try to append to tail
290 packet_node_t *next = atomic_load_explicit(&tail->next, memory_order_acquire);
291 packet_node_t *current_tail = atomic_load_explicit(&queue->tail, memory_order_acquire);
292
293 // Verify tail hasn't changed (ABA problem mitigation)
294 if (tail != current_tail) {
295 // Tail was updated by another thread, retry
296 continue;
297 }
298
299 if (next == NULL) {
300 // Tail is actually the last node - try to link new node
301 packet_node_t *expected_null = NULL;
302 if (atomic_compare_exchange_weak_explicit(&tail->next, &expected_null, node, memory_order_release,
303 memory_order_acquire)) {
304 // Successfully linked node - try to swing tail forward (best-effort, ignore failure)
305 atomic_compare_exchange_weak_explicit(&queue->tail, &tail, node, memory_order_release, memory_order_relaxed);
306 break; // Enqueue successful
307 }
308 // CAS failed - another thread appended to tail, retry
309 } else {
310 // Tail is lagging behind - help advance it
311 atomic_compare_exchange_weak_explicit(&queue->tail, &tail, next, memory_order_release, memory_order_relaxed);
312 // Retry with new tail
313 }
314 }
315
316 // Update counters atomically
317 atomic_fetch_add(&queue->count, (size_t)1);
318 atomic_fetch_add(&queue->bytes_queued, data_len);
319 atomic_fetch_add(&queue->packets_enqueued, (uint64_t)1);
320
321 return 0;
322}
buffer_pool_t * buffer_pool_get_global(void)
void buffer_pool_free(buffer_pool_t *pool, void *data, size_t size)
void * buffer_pool_alloc(buffer_pool_t *pool, size_t size)
Definition buffer_pool.c:99
packet_node_t * node_pool_get(node_pool_t *pool)
void node_pool_put(node_pool_t *pool, packet_node_t *node)

References buffer_pool_alloc(), buffer_pool_free(), buffer_pool_get_global(), node_pool_get(), and node_pool_put().

Referenced by client_audio_render_thread(), client_receive_thread(), and queue_audio_for_client().

◆ packet_queue_enqueue_packet()

int packet_queue_enqueue_packet ( packet_queue_t *  queue,
const queued_packet_t *  packet 
)

Definition at line 324 of file packet_queue.c.

324 {
325 if (!queue || !packet) {
326 SET_ERRNO(ERROR_INVALID_PARAM, "Invalid parameters: queue=%p, packet=%p", queue, packet);
327 return -1;
328 }
329
330 // Validate packet before enqueueing
331 if (!packet_queue_validate_packet(packet)) {
332 SET_ERRNO(ERROR_INVALID_PARAM, "Refusing to enqueue invalid packet");
333 return -1;
334 }
335
336 // Check if shutdown (atomic read with acquire semantics)
337 if (atomic_load_explicit(&queue->shutdown, memory_order_acquire)) {
338 return -1;
339 }
340
341 // Check if queue is full and drop oldest packet if needed (lock-free)
342 size_t current_count = atomic_load_explicit(&queue->count, memory_order_acquire);
343 if (queue->max_size > 0 && current_count >= queue->max_size) {
344 // Drop oldest packet (head) using atomic compare-and-swap
345 packet_node_t *head = atomic_load_explicit(&queue->head, memory_order_acquire);
346 if (head) {
347 packet_node_t *next = atomic_load_explicit(&head->next, memory_order_acquire);
348 // Atomically update head pointer
349 if (atomic_compare_exchange_weak(&queue->head, &head, next)) {
350 // Successfully claimed head node
351 if (next == NULL) {
352 // Queue became empty, also update tail
353 atomic_store_explicit(&queue->tail, (packet_node_t *)NULL, memory_order_release);
354 }
355
356 // Update counters atomically
357 size_t bytes = head->packet.data_len;
358 atomic_fetch_sub(&queue->bytes_queued, bytes);
359 atomic_fetch_sub(&queue->count, (size_t)1);
360 atomic_fetch_add(&queue->packets_dropped, (uint64_t)1);
361
362 // Free dropped packet data
363 if (head->packet.owns_data && head->packet.data) {
364 buffer_pool_free(head->packet.buffer_pool, head->packet.data, head->packet.data_len);
365 }
366 node_pool_put(queue->node_pool, head);
367 }
368 // If CAS failed, another thread already dequeued - continue to enqueue
369 }
370 }
371
372 // Create new node (use pool if available)
373 packet_node_t *node = node_pool_get(queue->node_pool);
374 if (!node) {
375 SET_ERRNO(ERROR_MEMORY, "Failed to allocate packet node");
376 return -1;
377 }
378
379 // Copy the packet header
380 SAFE_MEMCPY(&node->packet, sizeof(queued_packet_t), packet, sizeof(queued_packet_t));
381
382 // Deep copy the data if needed
383 if (packet->data && packet->data_len > 0 && packet->owns_data) {
384 // If the packet owns its data, we need to make a copy
385 // Try to allocate from buffer pool (local or global)
386 void *data_copy;
387 if (queue->buffer_pool) {
388 data_copy = buffer_pool_alloc(queue->buffer_pool, packet->data_len);
389 node->packet.buffer_pool = queue->buffer_pool;
390 } else {
391 // Use global pool if no local pool
392 data_copy = buffer_pool_alloc(NULL, packet->data_len);
393 node->packet.buffer_pool = buffer_pool_get_global();
394 }
395 SAFE_MEMCPY(data_copy, packet->data_len, packet->data, packet->data_len);
396 node->packet.data = data_copy;
397 node->packet.owns_data = true;
398 } else {
399 // Either no data or packet doesn't own it (shared reference is OK)
400 node->packet.data = packet->data;
401 node->packet.owns_data = packet->owns_data;
402 node->packet.buffer_pool = packet->buffer_pool; // Preserve original pool reference
403 }
404
405 atomic_store_explicit(&node->next, (packet_node_t *)NULL, memory_order_relaxed);
406
407 // Add to queue using lock-free CAS-based enqueue (Michael-Scott algorithm)
408 while (true) {
409 packet_node_t *tail = atomic_load_explicit(&queue->tail, memory_order_acquire);
410
411 if (tail == NULL) {
412 // Empty queue - atomically set both head and tail
413 packet_node_t *expected = NULL;
414 if (atomic_compare_exchange_weak_explicit(&queue->head, &expected, node, memory_order_release,
415 memory_order_acquire)) {
416 // Successfully set head (queue was empty)
417 atomic_store_explicit(&queue->tail, node, memory_order_release);
418 break; // Enqueue successful
419 }
420 // CAS failed - another thread initialized queue, retry
421 continue;
422 }
423
424 // Queue is non-empty - try to append to tail
425 packet_node_t *next = atomic_load_explicit(&tail->next, memory_order_acquire);
426 packet_node_t *current_tail = atomic_load_explicit(&queue->tail, memory_order_acquire);
427
428 // Verify tail hasn't changed (ABA problem mitigation)
429 if (tail != current_tail) {
430 // Tail was updated by another thread, retry
431 continue;
432 }
433
434 if (next == NULL) {
435 // Tail is actually the last node - try to link new node
436 packet_node_t *expected_null = NULL;
437 if (atomic_compare_exchange_weak_explicit(&tail->next, &expected_null, node, memory_order_release,
438 memory_order_acquire)) {
439 // Successfully linked node - try to swing tail forward (best-effort, ignore failure)
440 atomic_compare_exchange_weak_explicit(&queue->tail, &tail, node, memory_order_release, memory_order_relaxed);
441 break; // Enqueue successful
442 }
443 // CAS failed - another thread appended to tail, retry
444 } else {
445 // Tail is lagging behind - help advance it
446 atomic_compare_exchange_weak_explicit(&queue->tail, &tail, next, memory_order_release, memory_order_relaxed);
447 // Retry with new tail
448 }
449 }
450
451 // Update counters atomically
452 atomic_fetch_add(&queue->count, (size_t)1);
453 atomic_fetch_add(&queue->bytes_queued, packet->data_len);
454 atomic_fetch_add(&queue->packets_enqueued, (uint64_t)1);
455
456 return 0;
457}
bool packet_queue_validate_packet(const queued_packet_t *packet)

References buffer_pool_alloc(), buffer_pool_free(), buffer_pool_get_global(), node_pool_get(), node_pool_put(), and packet_queue_validate_packet().

◆ packet_queue_free_packet()

void packet_queue_free_packet ( queued_packet_t *  packet)

Definition at line 548 of file packet_queue.c.

548 {
549 if (!packet)
550 return;
551
552 // Check if packet was already freed (detect double-free)
553 if (packet->header.magic != HOST_TO_NET_U64(PACKET_MAGIC)) {
554 log_warn("Attempted double-free of packet (magic=0x%llx, expected=0x%llx)", NET_TO_HOST_U64(packet->header.magic),
555 PACKET_MAGIC);
556 return;
557 }
558
559 if (packet->owns_data && packet->data) {
560 // Return to appropriate pool or free
561 if (packet->buffer_pool) {
562 buffer_pool_free(packet->buffer_pool, packet->data, packet->data_len);
563 } else {
564 // This was allocated from global pool or malloc, use buffer_pool_free which handles both
565 buffer_pool_free(NULL, packet->data, packet->data_len);
566 }
567 }
568
569 // Mark as freed to detect future double-free attempts
570 // Use network byte order for consistency on big-endian systems
571 packet->header.magic = HOST_TO_NET_U64(0xBEEFDEADULL); // Different magic in network byte order
572 SAFE_FREE(packet);
573}

References buffer_pool_free().

Referenced by client_dispatch_thread(), client_send_thread_func(), and packet_queue_clear().

◆ packet_queue_get_stats()

void packet_queue_get_stats ( packet_queue_t *  queue,
uint64_t *  enqueued,
uint64_t *  dequeued,
uint64_t *  dropped 
)

Definition at line 617 of file packet_queue.c.

617 {
618 if (!queue)
619 return;
620
621 // Lock-free atomic reads (acquire semantics for consistency)
622 if (enqueued)
623 *enqueued = atomic_load_explicit(&queue->packets_enqueued, memory_order_acquire);
624 if (dequeued)
625 *dequeued = atomic_load_explicit(&queue->packets_dequeued, memory_order_acquire);
626 if (dropped)
627 *dropped = atomic_load_explicit(&queue->packets_dropped, memory_order_acquire);
628}

Referenced by stats_logger_thread().

◆ packet_queue_is_empty()

bool packet_queue_is_empty ( packet_queue_t *  queue)

Definition at line 583 of file packet_queue.c.

583 {
584 return packet_queue_size(queue) == 0;
585}
size_t packet_queue_size(packet_queue_t *queue)

References packet_queue_size().

◆ packet_queue_is_full()

bool packet_queue_is_full ( packet_queue_t *  queue)

Definition at line 587 of file packet_queue.c.

587 {
588 if (!queue || queue->max_size == 0)
589 return false;
590
591 // Lock-free atomic read
592 size_t count = atomic_load_explicit(&queue->count, memory_order_acquire);
593 return (count >= queue->max_size);
594}

◆ packet_queue_size()

size_t packet_queue_size ( packet_queue_t *  queue)

Definition at line 575 of file packet_queue.c.

575 {
576 if (!queue)
577 return 0;
578
579 // Lock-free atomic read
580 return atomic_load_explicit(&queue->count, memory_order_acquire);
581}

Referenced by client_audio_render_thread(), and packet_queue_is_empty().

◆ packet_queue_stop()

void packet_queue_stop ( packet_queue_t *  queue)

Definition at line 596 of file packet_queue.c.

596 {
597 if (!queue) {
598 SET_ERRNO(ERROR_INVALID_PARAM, "Invalid parameters: queue=%p", queue);
599 return;
600 }
601
602 // Lock-free atomic store (release semantics ensures visibility to other threads)
603 atomic_store_explicit(&queue->shutdown, true, memory_order_release);
604}

Referenced by disconnect_client_for_bad_data(), packet_queue_destroy(), and remove_client().

◆ packet_queue_try_dequeue()

queued_packet_t * packet_queue_try_dequeue ( packet_queue_t *  queue)

Definition at line 464 of file packet_queue.c.

464 {
465 if (!queue)
466 return NULL;
467
468 // Check if shutdown (atomic read with acquire semantics)
469 if (atomic_load_explicit(&queue->shutdown, memory_order_acquire)) {
470 return NULL;
471 }
472
473 // Check if queue is empty (atomic read with acquire semantics)
474 size_t current_count = atomic_load_explicit(&queue->count, memory_order_acquire);
475 if (current_count == 0) {
476 return NULL;
477 }
478
479 // Remove from head atomically (lock-free dequeue)
480 packet_node_t *head = atomic_load_explicit(&queue->head, memory_order_acquire);
481 if (!head) {
482 return NULL;
483 }
484
485 // Atomically update head pointer
486 packet_node_t *next = atomic_load_explicit(&head->next, memory_order_acquire);
487 if (atomic_compare_exchange_weak(&queue->head, &head, next)) {
488 // Successfully claimed head node
489 if (next == NULL) {
490 // Queue became empty, also update tail atomically
491 atomic_store_explicit(&queue->tail, (packet_node_t *)NULL, memory_order_release);
492 }
493
494 // Update counters atomically
495 size_t bytes = head->packet.data_len;
496 atomic_fetch_sub(&queue->bytes_queued, bytes);
497 atomic_fetch_sub(&queue->count, (size_t)1);
498 atomic_fetch_add(&queue->packets_dequeued, (uint64_t)1);
499
500 // Verify packet magic number for corruption detection
501 uint64_t magic = NET_TO_HOST_U64(head->packet.header.magic);
502 if (magic != PACKET_MAGIC) {
503 SET_ERRNO(ERROR_BUFFER, "CORRUPTION: Invalid magic in try_dequeued packet: 0x%llx (expected 0x%llx), type=%u",
504 magic, PACKET_MAGIC, NET_TO_HOST_U16(head->packet.header.type));
505 // Still return node to pool but don't return corrupted packet
506 node_pool_put(queue->node_pool, head);
507 return NULL;
508 }
509
510 // Validate CRC if there's data
511 if (head->packet.data_len > 0 && head->packet.data) {
512 uint32_t expected_crc = NET_TO_HOST_U32(head->packet.header.crc32);
513 uint32_t actual_crc = asciichat_crc32(head->packet.data, head->packet.data_len);
514 if (actual_crc != expected_crc) {
515 SET_ERRNO(ERROR_BUFFER,
516 "CORRUPTION: CRC mismatch in try_dequeued packet: got 0x%x, expected 0x%x, type=%u, len=%zu",
517 actual_crc, expected_crc, NET_TO_HOST_U16(head->packet.header.type), head->packet.data_len);
518 // Free data if packet owns it
519 if (head->packet.owns_data && head->packet.data) {
520 // Use buffer_pool_free for global pool allocations, buffer_pool_free for local pools
521 if (head->packet.buffer_pool) {
522 buffer_pool_free(head->packet.buffer_pool, head->packet.data, head->packet.data_len);
523 } else {
524 // This was allocated from global pool or malloc, use buffer_pool_free which handles both
525 buffer_pool_free(NULL, head->packet.data, head->packet.data_len);
526 }
527 // Clear pointer to prevent double-free when packet is copied later.
528 head->packet.data = NULL;
529 head->packet.owns_data = false;
530 }
531 node_pool_put(queue->node_pool, head);
532 return NULL;
533 }
534 }
535
536 // Extract packet and return node to pool
537 queued_packet_t *packet;
538 packet = SAFE_MALLOC(sizeof(queued_packet_t), queued_packet_t *);
539 SAFE_MEMCPY(packet, sizeof(queued_packet_t), &head->packet, sizeof(queued_packet_t));
540 node_pool_put(queue->node_pool, head);
541 return packet;
542 }
543
544 // CAS failed - another thread dequeued, retry if needed (or return NULL for non-blocking)
545 return NULL;
546}

References buffer_pool_free(), and node_pool_put().

Referenced by client_dispatch_thread(), client_send_thread_func(), packet_queue_clear(), and packet_queue_dequeue().

◆ packet_queue_validate_packet()

bool packet_queue_validate_packet ( const queued_packet_t *  packet)

Definition at line 630 of file packet_queue.c.

630 {
631 if (!packet) {
632 return false;
633 }
634
635 // Check magic number
636 uint64_t magic = NET_TO_HOST_U64(packet->header.magic);
637 if (magic != PACKET_MAGIC) {
638 SET_ERRNO(ERROR_BUFFER, "Invalid packet magic: 0x%llx (expected 0x%llx)", magic, PACKET_MAGIC);
639 return false;
640 }
641
642 // Check packet type is valid (must be non-zero and within reasonable range)
643 uint16_t type = NET_TO_HOST_U16(packet->header.type);
644 if (type == 0 || type > 10000) {
645 SET_ERRNO(ERROR_BUFFER, "Invalid packet type: %u", type);
646 return false;
647 }
648
649 // Check length matches data_len
650 uint32_t length = NET_TO_HOST_U32(packet->header.length);
651 if (length != packet->data_len) {
652 SET_ERRNO(ERROR_BUFFER, "Packet length mismatch: header says %u, data_len is %zu", length, packet->data_len);
653 return false;
654 }
655
656 // Check CRC if there's data
657 if (packet->data_len > 0 && packet->data) {
658 uint32_t expected_crc = NET_TO_HOST_U32(packet->header.crc32);
659 uint32_t actual_crc = asciichat_crc32(packet->data, packet->data_len);
660 if (actual_crc != expected_crc) {
661 SET_ERRNO(ERROR_BUFFER, "Packet CRC mismatch: got 0x%x, expected 0x%x", actual_crc, expected_crc);
662 return false;
663 }
664 }
665
666 return true;
667}

Referenced by packet_queue_enqueue_packet().