ascii-chat 0.6.0
Real-time terminal-based video chat with ASCII art conversion
Loading...
Searching...
No Matches
Packet Queue

📬 Thread-safe per-client packet queues More...

Files

file  packet_queue.c
 📬 Lock-free packet queue with per-client isolation and memory pooling
 
file  packet_queue.h
 📬 Thread-safe packet queue system for per-client send threads
 

Data Structures

struct  queued_packet_t
 Single packet ready to send (header already in network byte order) More...
 
struct  packet_node
 Node in the packet queue linked list. More...
 
struct  node_pool
 Memory pool for packet nodes to reduce malloc/free overhead. More...
 
struct  packet_queue_t
 Thread-safe packet queue for producer-consumer communication. More...
 

Typedefs

typedef struct packet_node packet_node_t
 Forward declaration for packet queue node.
 
typedef struct node_pool node_pool_t
 Memory pool for packet nodes to reduce malloc/free overhead.
 

Functions

 packet_node::_Atomic (packet_node_t *) next
 Pointer to next node in linked list (NULL for tail) - atomic for lock-free operations.
 
 packet_queue_t::_Atomic (packet_node_t *) head
 Front of queue (dequeue from here) - atomic for lock-free access.
 

Variables

packet_header_t queued_packet_t::header
 Complete packet header (already in network byte order)
 
void * queued_packet_t::data
 Packet payload data (can be NULL for header-only packets)
 
size_t queued_packet_t::data_len
 Length of payload data in bytes.
 
bool queued_packet_t::owns_data
 If true, free data when packet is freed.
 
buffer_pool_tqueued_packet_t::buffer_pool
 Pool that allocated the data (NULL if malloc'd)
 
queued_packet_t packet_node::packet
 The queued packet data.
 
packet_node_tnode_pool::free_list
 Stack of free nodes (LIFO for cache locality)
 
packet_node_tnode_pool::nodes
 Pre-allocated array of all nodes.
 
size_t node_pool::pool_size
 Total number of nodes in pool.
 
size_t node_pool::used_count
 Number of nodes currently in use.
 
mutex_t node_pool::pool_mutex
 Mutex protecting free list access.
 
_Atomic size_t packet_queue_t::count
 Number of packets currently in queue - atomic for lock-free access.
 
size_t packet_queue_t::max_size
 Maximum queue size (0 = unlimited)
 
_Atomic size_t packet_queue_t::bytes_queued
 Total bytes of data queued (for monitoring) - atomic for lock-free access.
 
node_pool_tpacket_queue_t::node_pool
 Optional memory pool for nodes (NULL = use malloc/free)
 
buffer_pool_tpacket_queue_t::buffer_pool
 Optional memory pool for data buffers (NULL = use malloc/free)
 
_Atomic uint64_t packet_queue_t::packets_enqueued
 Total packets enqueued (statistics) - atomic for lock-free access.
 
_Atomic uint64_t packet_queue_t::packets_dequeued
 Total packets dequeued (statistics) - atomic for lock-free access.
 
_Atomic uint64_t packet_queue_t::packets_dropped
 Total packets dropped due to queue full (statistics) - atomic for lock-free access.
 
_Atomic bool packet_queue_t::shutdown
 Shutdown flag (true = dequeue returns NULL) - atomic for lock-free access.
 

Node Pool Functions

node_pool_tnode_pool_create (size_t pool_size)
 Create a memory pool for packet queue nodes.
 
void node_pool_destroy (node_pool_t *pool)
 Destroy a node pool and free all memory.
 
packet_node_tnode_pool_get (node_pool_t *pool)
 Get a free node from the pool.
 
void node_pool_put (node_pool_t *pool, packet_node_t *node)
 Return a node to the pool.
 

Queue Management Functions

packet_queue_tpacket_queue_create (size_t max_size)
 Create a new packet queue.
 
packet_queue_tpacket_queue_create_with_pool (size_t max_size, size_t pool_size)
 Create a packet queue with node pool.
 
packet_queue_tpacket_queue_create_with_pools (size_t max_size, size_t node_pool_size, bool use_buffer_pool)
 Create a packet queue with both node and buffer pools.
 
void packet_queue_destroy (packet_queue_t *queue)
 Destroy a packet queue and free all resources.
 

Queue Operations

int packet_queue_enqueue (packet_queue_t *queue, packet_type_t type, const void *data, size_t data_len, uint32_t client_id, bool copy_data)
 Enqueue a packet into the queue.
 
int packet_queue_enqueue_packet (packet_queue_t *queue, const queued_packet_t *packet)
 Enqueue a pre-built packet (for special cases like compressed frames)
 
queued_packet_tpacket_queue_dequeue (packet_queue_t *queue)
 Dequeue a packet from the queue (non-blocking)
 
queued_packet_tpacket_queue_try_dequeue (packet_queue_t *queue)
 Try to dequeue a packet without blocking.
 
void packet_queue_free_packet (queued_packet_t *packet)
 Free a dequeued packet.
 

Queue Status Functions

size_t packet_queue_size (packet_queue_t *queue)
 Get current number of packets in queue.
 
bool packet_queue_is_empty (packet_queue_t *queue)
 Check if queue is empty.
 
bool packet_queue_is_full (packet_queue_t *queue)
 Check if queue is full.
 

Queue Control Functions

void packet_queue_shutdown (packet_queue_t *queue)
 Signal queue shutdown (causes dequeue to return NULL)
 
void packet_queue_clear (packet_queue_t *queue)
 Clear all packets from queue.
 

Statistics Functions

void packet_queue_get_stats (packet_queue_t *queue, uint64_t *enqueued, uint64_t *dequeued, uint64_t *dropped)
 Get queue statistics.
 
bool packet_queue_validate_packet (const queued_packet_t *packet)
 Validate packet integrity.
 

Detailed Description

📬 Thread-safe per-client packet queues

This module implements a high-performance thread-safe queue for network packets, enabling producer threads (audio mixer, video broadcast) to enqueue packets while consumer threads (per-client send threads) dequeue and transmit them. This design eliminates shared bottlenecks and enables linear scaling across multiple clients.

CORE RESPONSIBILITIES:

  1. Thread-safe packet queuing for producer-consumer architecture
  2. Per-client queue isolation (each client has separate audio/video queues)
  3. Memory-efficient packet node pooling to reduce malloc/free overhead
  4. Queue size limits with automatic packet dropping under load
  5. Graceful shutdown handling for clean thread termination
  6. Comprehensive statistics collection for performance monitoring

ARCHITECTURAL OVERVIEW:

PRODUCER-CONSUMER MODEL:

QUEUE DESIGN:

MEMORY MANAGEMENT:

THREAD SAFETY:

SYNCHRONIZATION PRIMITIVES:

LOCK-FREE PATTERN:

INTEGRATION WITH OTHER MODULES:

PERFORMANCE CHARACTERISTICS:

Note
Each client should have separate queues for audio and video to enable prioritization and prevent one stream from blocking the other.
When max_size > 0, full queues drop oldest packets automatically. This prevents memory exhaustion but may cause frame drops under load.
Queue shutdown causes dequeue operations to return NULL, allowing consumer threads to exit gracefully without blocking forever.
Warning
Always free dequeued packets with packet_queue_free_packet() to properly release memory (either to pool or free()).
Queue destruction does NOT free packets still in the queue. Clear the queue first or ensure all packets are dequeued.
Author
Zachary Fogg me@zf.nosp@m.o.gg
Date
September 2025
Version
2.0 (Post-Modularization)

Packet Queue

Overview

Welcome! Let's talk about packet queues—the unsung heroes that keep ascii-chat's network communication smooth and organized.

Imagine you're at a busy restaurant. Orders are coming in from multiple tables, and the kitchen needs to process them one at a time. You don't want orders getting mixed up or lost, right? That's exactly what packet queues do for network data—they line up incoming packets so each client's data gets processed in order, without chaos.

Each client connected to the server gets their own dedicated packet queue. When packets arrive over the network, they get placed in the appropriate client's queue. Then, a separate thread can process packets at its own pace without blocking the network thread. It's like having a separate order ticket for each table—clean, organized, and efficient.

Implementation: lib/packet_queue.c/h

What makes packet queues useful?

  • Per-client queues: Each client has their own queue (no cross-contamination!)
  • Thread-safe: Multiple threads can safely interact with the queue
  • Flexible ownership: Choose whether the queue copies data or just references it
  • Buffer pool integration: Works with our buffer pool for efficient memory usage
  • Statistics tracking: See how well your queues are performing

Architecture

Queue Structure

// Single packet node in the queue
typedef struct packet_node {
packet_header_t header; // Type, length, CRC, client_id
void *data; // Payload
struct packet_node *next; // Next packet in queue
// Per-client packet queue
typedef struct packet_queue {
packet_node_t *head; // Front of queue (dequeue here)
packet_node_t *tail; // Back of queue (enqueue here)
size_t size; // Current packet count
size_t max_size; // Maximum allowed packets (0 = unlimited)
bool owns_data; // If true, queue frees packet data
mutex_t mutex; // Thread safety
// Statistics
uint64_t packets_enqueued;
uint64_t packets_dequeued;
uint64_t packets_dropped; // Dropped due to max_size limit
size_t peak_size; // Peak queue depth
unsigned long long uint64_t
Definition common.h:59
struct packet_node packet_node_t
Forward declaration for packet queue node.
pthread_mutex_t mutex_t
Mutex type (POSIX: pthread_mutex_t)
Definition mutex.h:38
Network packet header structure.
Definition packet.h:490
Node in the packet queue linked list.
Thread-safe packet queue for producer-consumer communication.

Data Ownership

Queues can either copy packet data or reference it:

Deep copy mode (owns_data = true):

  • Queue allocates its own copy of packet data
  • Safe for producer to free original data immediately
  • Queue frees data when packet dequeued
  • Higher memory usage, but safer

Reference mode (owns_data = false):

  • Queue stores pointer to original data
  • Producer must keep data alive until dequeued
  • Consumer responsible for freeing data
  • Lower memory usage, but requires careful lifetime management
// Deep copy mode (safer) - copy_data=true
char *data = SAFE_MALLOC(1024, char*);
strcpy(data, "Hello");
packet_queue_enqueue(q, PACKET_TYPE_ASCII_FRAME, data, strlen(data), client_id, true);
SAFE_FREE(data); // Can free immediately since copy_data=true!
// Reference mode (lower overhead) - copy_data=false
char *data2 = SAFE_MALLOC(1024, char*);
strcpy(data2, "Hello");
packet_queue_enqueue(q2, PACKET_TYPE_ASCII_FRAME, data2, strlen(data2), client_id, false);
// data2 must stay alive until dequeued since copy_data=false!
// Dequeue returns a queued_packet_t pointer
if (packet) {
process_packet(packet->data);
packet_queue_free_packet(q2, packet); // Free packet properly
}
#define SAFE_FREE(ptr)
Definition common.h:320
#define SAFE_MALLOC(size, cast)
Definition common.h:208
packet_queue_t * packet_queue_create(size_t max_size)
Create a new packet queue.
void packet_queue_free_packet(queued_packet_t *packet)
Free a dequeued packet.
int packet_queue_enqueue(packet_queue_t *queue, packet_type_t type, const void *data, size_t data_len, uint32_t client_id, bool copy_data)
Enqueue a packet into the queue.
queued_packet_t * packet_queue_dequeue(packet_queue_t *queue)
Dequeue a packet from the queue (non-blocking)
void * data
Packet payload data (can be NULL for header-only packets)
@ PACKET_TYPE_ASCII_FRAME
Complete ASCII frame with all metadata.
Definition packet.h:286
Single packet ready to send (header already in network byte order)

API Reference

Creation/Destruction

// Create packet queue
packet_queue_t *queue = packet_queue_create(100, true);
if (!queue) {
log_error("Failed to create packet queue");
return NULL;
}
// Use queue...
// Destroy queue
#define log_error(...)
Log an ERROR message.
void packet_queue_destroy(packet_queue_t *queue)
Destroy a packet queue and free all resources.

Integration

Buffer Pool Integration:

  • Packet queues use buffer pool for efficient memory allocation
  • Node pool pre-allocates packet nodes
  • Data pool pre-allocates packet data buffers
  • Reduces malloc/free overhead in high-throughput scenarios

Network Integration:

  • Each client has a dedicated packet queue
  • Network receive thread enqueues packets
  • Client processing thread dequeues packets
  • Decouples network I/O from packet processing

Performance

Throughput:

  • Enqueue: ~1M packets/second (mutex-protected)
  • Dequeue: ~1M packets/second (mutex-protected)
  • Memory overhead: ~40 bytes per queued packet

Scalability:

  • Per-client queues eliminate contention between clients
  • Statistics tracking has minimal overhead
  • Buffer pool reduces allocation overhead

Thread Safety

Mutex Protection:

  • All queue operations are mutex-protected
  • Thread-safe for concurrent enqueue/dequeue
  • No lock-free optimizations (simplicity over speed)

Concurrent Access:

  • Multiple threads can enqueue concurrently
  • Only one thread should dequeue (per queue)
  • Statistics are updated atomically
See also
packet_queue.h
buffer_pool.h

Typedef Documentation

◆ node_pool_t

typedef struct node_pool node_pool_t

#include <packet_queue.h>

Memory pool for packet nodes to reduce malloc/free overhead.

Pre-allocates a fixed-size array of packet nodes and maintains a free list. This eliminates per-packet malloc/free calls, significantly improving performance for high-frequency packet queue operations.

Note
Pool operations are thread-safe via pool_mutex.
If pool is exhausted, operations fall back to malloc/free.

◆ packet_node_t

typedef struct packet_node packet_node_t

#include <packet_queue.h>

Forward declaration for packet queue node.

Definition at line 137 of file packet_queue.h.

Function Documentation

◆ _Atomic() [1/2]

packet_queue_t::_Atomic ( packet_node_t )

#include <packet_queue.h>

Front of queue (dequeue from here) - atomic for lock-free access.

◆ _Atomic() [2/2]

packet_node::_Atomic ( packet_node_t )

#include <packet_queue.h>

Pointer to next node in linked list (NULL for tail) - atomic for lock-free operations.

◆ node_pool_create()

node_pool_t * node_pool_create ( size_t  pool_size)

#include <packet_queue.h>

Create a memory pool for packet queue nodes.

Parameters
pool_sizeNumber of nodes to pre-allocate
Returns
Pointer to node pool, or NULL on failure

Pre-allocates a fixed-size array of packet nodes to eliminate per-packet malloc/free overhead. All nodes are initialized and added to the free list.

Note
Pool size should be chosen based on expected queue depth. Too small causes fallback to malloc, too large wastes memory.

Definition at line 25 of file packet_queue.c.

25 {
26 if (pool_size == 0) {
27 return NULL;
28 }
29
30 node_pool_t *pool;
31 pool = SAFE_MALLOC(sizeof(node_pool_t), node_pool_t *);
32
33 // Allocate all nodes at once
34 pool->nodes = SAFE_MALLOC(sizeof(packet_node_t) * pool_size, packet_node_t *);
35
36 // Link all nodes into free list (using atomic stores for consistency with atomic type)
37 for (size_t i = 0; i < pool_size - 1; i++) {
38 atomic_store_explicit(&pool->nodes[i].next, &pool->nodes[i + 1], memory_order_relaxed);
39 }
40 atomic_store_explicit(&pool->nodes[pool_size - 1].next, (packet_node_t *)NULL, memory_order_relaxed);
41
42 pool->free_list = &pool->nodes[0];
43 pool->pool_size = pool_size;
44 pool->used_count = 0;
45
46 if (mutex_init(&pool->pool_mutex) != 0) {
47 SET_ERRNO(ERROR_PLATFORM_INIT, "Failed to initialize mutex for node pool");
49 return NULL;
50 }
51
52 return pool;
53}
#define SET_ERRNO(code, context_msg,...)
Set error code with custom context message and log it.
@ ERROR_PLATFORM_INIT
Definition error_codes.h:57
size_t used_count
Number of nodes currently in use.
size_t pool_size
Total number of nodes in pool.
packet_node_t * nodes
Pre-allocated array of all nodes.
mutex_t pool_mutex
Mutex protecting free list access.
void node_pool_destroy(node_pool_t *pool)
Destroy a node pool and free all memory.
packet_node_t * free_list
Stack of free nodes (LIFO for cache locality)
int mutex_init(mutex_t *mutex)
Initialize a mutex.
Memory pool for packet nodes to reduce malloc/free overhead.

References ERROR_PLATFORM_INIT, node_pool::free_list, mutex_init(), node_pool_destroy(), node_pool::nodes, node_pool::pool_mutex, node_pool::pool_size, SAFE_MALLOC, SET_ERRNO, and node_pool::used_count.

Referenced by packet_queue_create_with_pools().

◆ node_pool_destroy()

void node_pool_destroy ( node_pool_t pool)

#include <packet_queue.h>

Destroy a node pool and free all memory.

Parameters
poolNode pool to destroy (can be NULL)

Frees all pre-allocated nodes and the pool structure itself.

Warning
Pool must not be in use by any queues when destroyed.

Definition at line 55 of file packet_queue.c.

55 {
56 if (!pool) {
57 return;
58 }
59
61 SAFE_FREE(pool->nodes);
62 SAFE_FREE(pool);
63}
int mutex_destroy(mutex_t *mutex)
Destroy a mutex.

References mutex_destroy(), node_pool::nodes, node_pool::pool_mutex, and SAFE_FREE.

Referenced by node_pool_create(), and packet_queue_destroy().

◆ node_pool_get()

packet_node_t * node_pool_get ( node_pool_t pool)

#include <packet_queue.h>

Get a free node from the pool.

Parameters
poolNode pool
Returns
Pointer to free node, or NULL if pool is exhausted

Removes a node from the free list. If pool is exhausted, returns NULL (caller should fall back to malloc).

Note
Thread-safe (protected by pool_mutex).

Definition at line 65 of file packet_queue.c.

65 {
66 if (!pool) {
67 // No pool, fallback to malloc
68 packet_node_t *node;
69 node = SAFE_MALLOC(sizeof(packet_node_t), packet_node_t *);
70 return node;
71 }
72
73 mutex_lock(&pool->pool_mutex);
74
75 packet_node_t *node = pool->free_list;
76 if (node) {
77 pool->free_list = atomic_load_explicit(&node->next, memory_order_relaxed);
78 pool->used_count++;
79 atomic_store_explicit(&node->next, (packet_node_t *)NULL, memory_order_relaxed); // Clear next pointer
80 }
81
83
84 if (!node) {
85 // Pool exhausted, fallback to malloc
86 node = SAFE_MALLOC(sizeof(packet_node_t), packet_node_t *);
87 // Remove extra text from format string that had no matching argument
88 log_debug("Memory pool exhausted, falling back to SAFE_MALLOC (used: %zu/%zu)", pool->used_count, pool->pool_size);
89 }
90
91 return node;
92}
#define log_debug(...)
Log a DEBUG message.
#define mutex_lock(mutex)
Lock a mutex (with debug tracking in debug builds)
Definition mutex.h:140
#define mutex_unlock(mutex)
Unlock a mutex (with debug tracking in debug builds)
Definition mutex.h:175

References node_pool::free_list, log_debug, mutex_lock, mutex_unlock, node_pool::pool_mutex, node_pool::pool_size, SAFE_MALLOC, and node_pool::used_count.

Referenced by packet_queue_enqueue(), and packet_queue_enqueue_packet().

◆ node_pool_put()

void node_pool_put ( node_pool_t pool,
packet_node_t node 
)

#include <packet_queue.h>

Return a node to the pool.

Parameters
poolNode pool
nodeNode to return (must have been allocated from this pool)

Returns a node to the free list for reuse. Node should not be in use by any queue when returned.

Note
Thread-safe (protected by pool_mutex).
Warning
Do not return nodes that are still in use by queues.

Definition at line 94 of file packet_queue.c.

94 {
95 if (!node) {
96 return;
97 }
98
99 if (!pool) {
100 // No pool, just free
101 SAFE_FREE(node);
102 return;
103 }
104
105 // Check if this node is from our pool
106 bool is_pool_node = (node >= pool->nodes && node < pool->nodes + pool->pool_size);
107
108 if (is_pool_node) {
109 mutex_lock(&pool->pool_mutex);
110
111 // Return to free list
112 atomic_store_explicit(&node->next, pool->free_list, memory_order_relaxed);
113 pool->free_list = node;
114 pool->used_count--;
115
116 mutex_unlock(&pool->pool_mutex);
117 } else {
118 // This was malloc'd, so free it
119 SAFE_FREE(node);
120 }
121}

References node_pool::free_list, mutex_lock, mutex_unlock, node_pool::nodes, node_pool::pool_mutex, node_pool::pool_size, SAFE_FREE, and node_pool::used_count.

Referenced by packet_queue_enqueue(), packet_queue_enqueue_packet(), and packet_queue_try_dequeue().

◆ packet_queue_clear()

void packet_queue_clear ( packet_queue_t queue)

#include <packet_queue.h>

Clear all packets from queue.

Parameters
queuePacket queue

Removes and frees all packets currently in the queue. Useful for cleanup before queue destruction or when resetting queue state.

Note
Thread-safe (lock-free using atomic operations).
This function frees all queued packets, so ensure nothing is still referencing them.

Definition at line 606 of file packet_queue.c.

606 {
607 if (!queue)
608 return;
609
610 // Lock-free clear: drain queue by repeatedly dequeuing until empty
611 queued_packet_t *packet;
612 while ((packet = packet_queue_try_dequeue(queue)) != NULL) {
614 }
615}
queued_packet_t * packet_queue_try_dequeue(packet_queue_t *queue)
Try to dequeue a packet without blocking.

References packet_queue_free_packet(), and packet_queue_try_dequeue().

Referenced by packet_queue_destroy().

◆ packet_queue_create()

packet_queue_t * packet_queue_create ( size_t  max_size)

#include <packet_queue.h>

Create a new packet queue.

Parameters
max_sizeMaximum queue size (0 = unlimited)
Returns
Pointer to new queue, or NULL on failure

Creates a packet queue without memory pools. Queue will use malloc/free for all node and data allocations.

Note
For high-performance scenarios, use packet_queue_create_with_pools() instead to enable zero-allocation operation.

Definition at line 128 of file packet_queue.c.

128 {
129 return packet_queue_create_with_pool(max_size, 0); // No pool by default
130}
packet_queue_t * packet_queue_create_with_pool(size_t max_size, size_t pool_size)
Create a packet queue with node pool.

References packet_queue_create_with_pool().

◆ packet_queue_create_with_pool()

packet_queue_t * packet_queue_create_with_pool ( size_t  max_size,
size_t  pool_size 
)

#include <packet_queue.h>

Create a packet queue with node pool.

Parameters
max_sizeMaximum queue size (0 = unlimited)
pool_sizeNumber of nodes to pre-allocate in pool
Returns
Pointer to new queue, or NULL on failure

Creates a packet queue with optional node pool to reduce malloc overhead. Queue will still use malloc/free for packet data allocations.

Note
For zero-allocation operation, use packet_queue_create_with_pools() with use_buffer_pool = true.

Definition at line 132 of file packet_queue.c.

132 {
133 return packet_queue_create_with_pools(max_size, pool_size, false);
134}
packet_queue_t * packet_queue_create_with_pools(size_t max_size, size_t node_pool_size, bool use_buffer_pool)
Create a packet queue with both node and buffer pools.

References packet_queue_create_with_pools().

Referenced by packet_queue_create().

◆ packet_queue_create_with_pools()

packet_queue_t * packet_queue_create_with_pools ( size_t  max_size,
size_t  node_pool_size,
bool  use_buffer_pool 
)

#include <packet_queue.h>

Create a packet queue with both node and buffer pools.

Parameters
max_sizeMaximum queue size (0 = unlimited)
node_pool_sizeNumber of nodes to pre-allocate
use_buffer_poolIf true, use global buffer pool for packet data
Returns
Pointer to new queue, or NULL on failure

Creates a high-performance packet queue with both node and buffer pools enabled. This enables zero-allocation operation when pools are configured.

Note
When use_buffer_pool is true, queue uses global buffer pool singleton. Buffer pool must be initialized before queue creation.

Definition at line 136 of file packet_queue.c.

136 {
137 packet_queue_t *queue;
138 queue = SAFE_MALLOC(sizeof(packet_queue_t), packet_queue_t *);
139
140 // Initialize atomic fields
141 // For atomic pointer types, use atomic_store with relaxed ordering for initialization
142 atomic_store_explicit(&queue->head, (packet_node_t *)NULL, memory_order_relaxed);
143 atomic_store_explicit(&queue->tail, (packet_node_t *)NULL, memory_order_relaxed);
144 atomic_init(&queue->count, (size_t)0);
145 queue->max_size = max_size;
146 atomic_init(&queue->bytes_queued, (size_t)0);
147
148 // Create memory pools if requested
149 queue->node_pool = node_pool_size > 0 ? node_pool_create(node_pool_size) : NULL;
150 queue->buffer_pool = use_buffer_pool ? buffer_pool_create(0, 0) : NULL;
151
152 // Initialize atomic statistics
153 atomic_init(&queue->packets_enqueued, (uint64_t)0);
154 atomic_init(&queue->packets_dequeued, (uint64_t)0);
155 atomic_init(&queue->packets_dropped, (uint64_t)0);
156 atomic_init(&queue->shutdown, false);
157
158 return queue;
159}
buffer_pool_t * buffer_pool_create(size_t max_bytes, uint64_t shrink_delay_ms)
Create a new buffer pool.
Definition buffer_pool.c:70
_Atomic size_t count
Number of packets currently in queue - atomic for lock-free access.
size_t max_size
Maximum queue size (0 = unlimited)
_Atomic uint64_t packets_dropped
Total packets dropped due to queue full (statistics) - atomic for lock-free access.
buffer_pool_t * buffer_pool
Optional memory pool for data buffers (NULL = use malloc/free)
_Atomic uint64_t packets_dequeued
Total packets dequeued (statistics) - atomic for lock-free access.
_Atomic size_t bytes_queued
Total bytes of data queued (for monitoring) - atomic for lock-free access.
_Atomic bool shutdown
Shutdown flag (true = dequeue returns NULL) - atomic for lock-free access.
_Atomic uint64_t packets_enqueued
Total packets enqueued (statistics) - atomic for lock-free access.
node_pool_t * node_pool
Optional memory pool for nodes (NULL = use malloc/free)
node_pool_t * node_pool_create(size_t pool_size)
Create a memory pool for packet queue nodes.

References packet_queue_t::buffer_pool, buffer_pool_create(), packet_queue_t::bytes_queued, packet_queue_t::count, packet_queue_t::max_size, packet_queue_t::node_pool, node_pool_create(), packet_queue_t::packets_dequeued, packet_queue_t::packets_dropped, packet_queue_t::packets_enqueued, SAFE_MALLOC, and packet_queue_t::shutdown.

Referenced by packet_queue_create_with_pool().

◆ packet_queue_dequeue()

queued_packet_t * packet_queue_dequeue ( packet_queue_t queue)

#include <packet_queue.h>

Dequeue a packet from the queue (non-blocking)

Parameters
queuePacket queue
Returns
Pointer to dequeued packet, or NULL if queue is empty or shutdown

Removes and returns the packet at the head of the queue. Returns NULL immediately if queue is empty or shutdown (non-blocking operation).

Note
Caller must free the returned packet with packet_queue_free_packet() to properly release memory (either to pool or free()).
Returns NULL when queue is empty or shutdown (allows consumer threads to exit). Caller should retry periodically if queue is expected to have data.
Thread-safe (lock-free using atomic operations).
Warning
Always free dequeued packets with packet_queue_free_packet().

Definition at line 459 of file packet_queue.c.

459 {
460 // Non-blocking dequeue (same as try_dequeue for lock-free design)
461 return packet_queue_try_dequeue(queue);
462}

References packet_queue_try_dequeue().

◆ packet_queue_destroy()

void packet_queue_destroy ( packet_queue_t queue)

#include <packet_queue.h>

Destroy a packet queue and free all resources.

Parameters
queueQueue to destroy (can be NULL)

Destroys the queue and all associated resources (node pool, etc.).

Warning
Packets still in the queue are NOT freed. Clear the queue first or ensure all packets are dequeued before destruction.

Definition at line 161 of file packet_queue.c.

161 {
162 if (!queue)
163 return;
164
165 // Signal shutdown first
167
168 // Clear any remaining packets
169 packet_queue_clear(queue);
170
171 // Destroy memory pools if present
172 if (queue->node_pool) {
174 }
175 if (queue->buffer_pool) {
176 buffer_pool_log_stats(queue->buffer_pool, "packet_queue");
178 }
179
180 // No mutex/cond to destroy (lock-free design)
181
182 SAFE_FREE(queue);
183}
void buffer_pool_destroy(buffer_pool_t *pool)
Destroy a buffer pool and free all memory.
void buffer_pool_log_stats(buffer_pool_t *pool, const char *name)
Log pool statistics.
void packet_queue_shutdown(packet_queue_t *queue)
Signal queue shutdown (causes dequeue to return NULL)
void packet_queue_clear(packet_queue_t *queue)
Clear all packets from queue.

References packet_queue_t::buffer_pool, buffer_pool_destroy(), buffer_pool_log_stats(), packet_queue_t::node_pool, node_pool_destroy(), packet_queue_clear(), packet_queue_shutdown(), and SAFE_FREE.

Referenced by cleanup_client_packet_queues().

◆ packet_queue_enqueue()

int packet_queue_enqueue ( packet_queue_t queue,
packet_type_t  type,
const void *  data,
size_t  data_len,
uint32_t  client_id,
bool  copy_data 
)

#include <packet_queue.h>

Enqueue a packet into the queue.

Parameters
queuePacket queue
typePacket type (from packet_types.h)
dataPacket payload data (can be NULL)
data_lenLength of payload data in bytes
client_idClient ID for packet header
copy_dataIf true, copy data into pool/malloc buffer; if false, use data pointer directly
Returns
0 on success, -1 on error

Enqueues a new packet at the tail of the queue. If copy_data is true, packet data is copied into a buffer (from pool if available, otherwise malloc). If copy_data is false, the data pointer is used directly (caller must ensure data remains valid until packet is dequeued and freed).

Note
If queue is full (max_size > 0 and count >= max_size), oldest packet is automatically dropped to make room (packet count does not exceed max_size).
Thread-safe (lock-free using atomic operations).
Warning
When copy_data is false, caller must ensure data pointer remains valid until packet is freed.

Definition at line 185 of file packet_queue.c.

186 {
187 if (!queue)
188 return -1;
189
190 // Check if shutdown (atomic read with acquire semantics)
191 if (atomic_load_explicit(&queue->shutdown, memory_order_acquire)) {
192 return -1;
193 }
194
195 // Check if queue is full and drop oldest packet if needed (lock-free)
196 size_t current_count = atomic_load_explicit(&queue->count, memory_order_acquire);
197 if (queue->max_size > 0 && current_count >= queue->max_size) {
198 // Drop oldest packet (head) using atomic compare-and-swap
199 packet_node_t *head = atomic_load_explicit(&queue->head, memory_order_acquire);
200 if (head) {
201 packet_node_t *next = atomic_load_explicit(&head->next, memory_order_acquire);
202 // Atomically update head pointer
203 if (atomic_compare_exchange_weak(&queue->head, &head, next)) {
204 // Successfully claimed head node
205 if (next == NULL) {
206 // Queue became empty, also update tail
207 atomic_store_explicit(&queue->tail, (packet_node_t *)NULL, memory_order_release);
208 }
209
210 // Update counters atomically
211 size_t bytes = head->packet.data_len;
212 atomic_fetch_sub(&queue->bytes_queued, bytes);
213 atomic_fetch_sub(&queue->count, (size_t)1);
214 atomic_fetch_add(&queue->packets_dropped, (uint64_t)1);
215
216 // Free dropped packet data
217 if (head->packet.owns_data && head->packet.data) {
219 }
220 node_pool_put(queue->node_pool, head);
221
222 log_debug_every(LOG_RATE_FAST, "Dropped packet from queue (full): type=%d, client=%u", type, client_id);
223 }
224 // If CAS failed, another thread already dequeued - continue to enqueue
225 }
226 }
227
228 // Create new node (use pool if available)
229 packet_node_t *node = node_pool_get(queue->node_pool);
230 if (!node) {
231 SET_ERRNO(ERROR_MEMORY, "Failed to allocate packet node");
232 return -1;
233 }
234
235 // Build packet header
238 node->packet.header.length = HOST_TO_NET_U32((uint32_t)data_len);
239 node->packet.header.client_id = HOST_TO_NET_U32(client_id);
240 // Calculate CRC32 for the data (0 for empty packets)
241 node->packet.header.crc32 = HOST_TO_NET_U32(data_len > 0 ? asciichat_crc32(data, data_len) : 0);
242
243 // Handle data
244 if (data_len > 0 && data) {
245 if (copy_data) {
246 // Try to allocate from buffer pool (local or global)
247 if (queue->buffer_pool) {
248 node->packet.data = buffer_pool_alloc(queue->buffer_pool, data_len);
249 node->packet.buffer_pool = queue->buffer_pool;
250 } else {
251 // Use global pool if no local pool
252 node->packet.data = buffer_pool_alloc(NULL, data_len);
254 }
255 SAFE_MEMCPY(node->packet.data, data_len, data, data_len);
256 node->packet.owns_data = true;
257 } else {
258 // Use the data pointer directly (caller must ensure it stays valid)
259 node->packet.data = (void *)data;
260 node->packet.owns_data = false;
261 node->packet.buffer_pool = NULL;
262 }
263 } else {
264 node->packet.data = NULL;
265 node->packet.owns_data = false;
266 node->packet.buffer_pool = NULL;
267 }
268
269 node->packet.data_len = data_len;
270 atomic_store_explicit(&node->next, (packet_node_t *)NULL, memory_order_relaxed);
271
272 // Add to queue using lock-free CAS-based enqueue (Michael-Scott algorithm)
273 while (true) {
274 packet_node_t *tail = atomic_load_explicit(&queue->tail, memory_order_acquire);
275
276 if (tail == NULL) {
277 // Empty queue - atomically set both head and tail
278 packet_node_t *expected = NULL;
279 if (atomic_compare_exchange_weak_explicit(&queue->head, &expected, node, memory_order_release,
280 memory_order_acquire)) {
281 // Successfully set head (queue was empty)
282 atomic_store_explicit(&queue->tail, node, memory_order_release);
283 break; // Enqueue successful
284 }
285 // CAS failed - another thread initialized queue, retry
286 continue;
287 }
288
289 // Queue is non-empty - try to append to tail
290 packet_node_t *next = atomic_load_explicit(&tail->next, memory_order_acquire);
291 packet_node_t *current_tail = atomic_load_explicit(&queue->tail, memory_order_acquire);
292
293 // Verify tail hasn't changed (ABA problem mitigation)
294 if (tail != current_tail) {
295 // Tail was updated by another thread, retry
296 continue;
297 }
298
299 if (next == NULL) {
300 // Tail is actually the last node - try to link new node
301 packet_node_t *expected_null = NULL;
302 if (atomic_compare_exchange_weak_explicit(&tail->next, &expected_null, node, memory_order_release,
303 memory_order_acquire)) {
304 // Successfully linked node - try to swing tail forward (best-effort, ignore failure)
305 atomic_compare_exchange_weak_explicit(&queue->tail, &tail, node, memory_order_release, memory_order_relaxed);
306 break; // Enqueue successful
307 }
308 // CAS failed - another thread appended to tail, retry
309 } else {
310 // Tail is lagging behind - help advance it
311 atomic_compare_exchange_weak_explicit(&queue->tail, &tail, next, memory_order_release, memory_order_relaxed);
312 // Retry with new tail
313 }
314 }
315
316 // Update counters atomically
317 atomic_fetch_add(&queue->count, (size_t)1);
318 atomic_fetch_add(&queue->bytes_queued, data_len);
319 atomic_fetch_add(&queue->packets_enqueued, (uint64_t)1);
320
321 return 0;
322}
#define HOST_TO_NET_U16(val)
Definition endian.h:101
#define HOST_TO_NET_U32(val)
Definition endian.h:71
buffer_pool_t * buffer_pool_get_global(void)
void buffer_pool_free(buffer_pool_t *pool, void *data, size_t size)
Free a buffer back to the pool (lock-free)
void * buffer_pool_alloc(buffer_pool_t *pool, size_t size)
Allocate a buffer from the pool (lock-free fast path)
unsigned short uint16_t
Definition common.h:57
unsigned int uint32_t
Definition common.h:58
#define SAFE_MEMCPY(dest, dest_size, src, count)
Definition common.h:388
@ ERROR_MEMORY
Definition error_codes.h:53
#define LOG_RATE_FAST
Log rate limit: 1 second (1,000,000 microseconds)
Definition log_rates.h:26
#define log_debug_every(interval_us, fmt,...)
Rate-limited DEBUG logging.
uint32_t magic
Magic number (PACKET_MAGIC) for packet validation.
Definition packet.h:492
uint32_t client_id
Client ID (0 = server, >0 = client identifier)
Definition packet.h:500
uint32_t length
Payload data length in bytes (0 for header-only packets)
Definition packet.h:496
uint16_t type
Packet type (packet_type_t enumeration)
Definition packet.h:494
uint32_t crc32
CRC32 checksum of payload data (0 if length == 0)
Definition packet.h:498
packet_node_t * node_pool_get(node_pool_t *pool)
Get a free node from the pool.
bool owns_data
If true, free data when packet is freed.
size_t data_len
Length of payload data in bytes.
packet_header_t header
Complete packet header (already in network byte order)
void node_pool_put(node_pool_t *pool, packet_node_t *node)
Return a node to the pool.
buffer_pool_t * buffer_pool
Pool that allocated the data (NULL if malloc'd)
queued_packet_t packet
The queued packet data.
#define PACKET_MAGIC
Packet magic number (0xDEADBEEF)
Definition packet.h:251
#define asciichat_crc32(data, len)
Main CRC32 dispatcher macro - use this in application code.
Definition crc32.h:144

References asciichat_crc32, queued_packet_t::buffer_pool, packet_queue_t::buffer_pool, buffer_pool_alloc(), buffer_pool_free(), buffer_pool_get_global(), packet_queue_t::bytes_queued, packet_header_t::client_id, packet_queue_t::count, packet_header_t::crc32, queued_packet_t::data, queued_packet_t::data_len, ERROR_MEMORY, queued_packet_t::header, HOST_TO_NET_U16, HOST_TO_NET_U32, packet_header_t::length, log_debug_every, LOG_RATE_FAST, packet_header_t::magic, packet_queue_t::max_size, packet_queue_t::node_pool, node_pool_get(), node_pool_put(), queued_packet_t::owns_data, packet_node::packet, PACKET_MAGIC, packet_queue_t::packets_dropped, packet_queue_t::packets_enqueued, SAFE_MEMCPY, SET_ERRNO, packet_queue_t::shutdown, and packet_header_t::type.

Referenced by client_audio_render_thread(), and queue_audio_for_client().

◆ packet_queue_enqueue_packet()

int packet_queue_enqueue_packet ( packet_queue_t queue,
const queued_packet_t packet 
)

#include <packet_queue.h>

Enqueue a pre-built packet (for special cases like compressed frames)

Parameters
queuePacket queue
packetPre-built packet structure
Returns
0 on success, -1 on error

Enqueues a packet that has already been built (header, data, etc.). Useful for special cases like compressed frames where packet construction is done outside the normal enqueue path.

Note
Packet is copied into the queue, but data ownership semantics follow the packet's owns_data flag.
Thread-safe (lock-free using atomic operations).

Definition at line 324 of file packet_queue.c.

324 {
325 if (!queue || !packet) {
326 SET_ERRNO(ERROR_INVALID_PARAM, "Invalid parameters: queue=%p, packet=%p", queue, packet);
327 return -1;
328 }
329
330 // Validate packet before enqueueing
331 if (!packet_queue_validate_packet(packet)) {
332 SET_ERRNO(ERROR_INVALID_PARAM, "Refusing to enqueue invalid packet");
333 return -1;
334 }
335
336 // Check if shutdown (atomic read with acquire semantics)
337 if (atomic_load_explicit(&queue->shutdown, memory_order_acquire)) {
338 return -1;
339 }
340
341 // Check if queue is full and drop oldest packet if needed (lock-free)
342 size_t current_count = atomic_load_explicit(&queue->count, memory_order_acquire);
343 if (queue->max_size > 0 && current_count >= queue->max_size) {
344 // Drop oldest packet (head) using atomic compare-and-swap
345 packet_node_t *head = atomic_load_explicit(&queue->head, memory_order_acquire);
346 if (head) {
347 packet_node_t *next = atomic_load_explicit(&head->next, memory_order_acquire);
348 // Atomically update head pointer
349 if (atomic_compare_exchange_weak(&queue->head, &head, next)) {
350 // Successfully claimed head node
351 if (next == NULL) {
352 // Queue became empty, also update tail
353 atomic_store_explicit(&queue->tail, (packet_node_t *)NULL, memory_order_release);
354 }
355
356 // Update counters atomically
357 size_t bytes = head->packet.data_len;
358 atomic_fetch_sub(&queue->bytes_queued, bytes);
359 atomic_fetch_sub(&queue->count, (size_t)1);
360 atomic_fetch_add(&queue->packets_dropped, (uint64_t)1);
361
362 // Free dropped packet data
363 if (head->packet.owns_data && head->packet.data) {
365 }
366 node_pool_put(queue->node_pool, head);
367 }
368 // If CAS failed, another thread already dequeued - continue to enqueue
369 }
370 }
371
372 // Create new node (use pool if available)
373 packet_node_t *node = node_pool_get(queue->node_pool);
374 if (!node) {
375 SET_ERRNO(ERROR_MEMORY, "Failed to allocate packet node");
376 return -1;
377 }
378
379 // Copy the packet header
380 SAFE_MEMCPY(&node->packet, sizeof(queued_packet_t), packet, sizeof(queued_packet_t));
381
382 // Deep copy the data if needed
383 if (packet->data && packet->data_len > 0 && packet->owns_data) {
384 // If the packet owns its data, we need to make a copy
385 // Try to allocate from buffer pool (local or global)
386 void *data_copy;
387 if (queue->buffer_pool) {
388 data_copy = buffer_pool_alloc(queue->buffer_pool, packet->data_len);
389 node->packet.buffer_pool = queue->buffer_pool;
390 } else {
391 // Use global pool if no local pool
392 data_copy = buffer_pool_alloc(NULL, packet->data_len);
394 }
395 SAFE_MEMCPY(data_copy, packet->data_len, packet->data, packet->data_len);
396 node->packet.data = data_copy;
397 node->packet.owns_data = true;
398 } else {
399 // Either no data or packet doesn't own it (shared reference is OK)
400 node->packet.data = packet->data;
401 node->packet.owns_data = packet->owns_data;
402 node->packet.buffer_pool = packet->buffer_pool; // Preserve original pool reference
403 }
404
405 atomic_store_explicit(&node->next, (packet_node_t *)NULL, memory_order_relaxed);
406
407 // Add to queue using lock-free CAS-based enqueue (Michael-Scott algorithm)
408 while (true) {
409 packet_node_t *tail = atomic_load_explicit(&queue->tail, memory_order_acquire);
410
411 if (tail == NULL) {
412 // Empty queue - atomically set both head and tail
413 packet_node_t *expected = NULL;
414 if (atomic_compare_exchange_weak_explicit(&queue->head, &expected, node, memory_order_release,
415 memory_order_acquire)) {
416 // Successfully set head (queue was empty)
417 atomic_store_explicit(&queue->tail, node, memory_order_release);
418 break; // Enqueue successful
419 }
420 // CAS failed - another thread initialized queue, retry
421 continue;
422 }
423
424 // Queue is non-empty - try to append to tail
425 packet_node_t *next = atomic_load_explicit(&tail->next, memory_order_acquire);
426 packet_node_t *current_tail = atomic_load_explicit(&queue->tail, memory_order_acquire);
427
428 // Verify tail hasn't changed (ABA problem mitigation)
429 if (tail != current_tail) {
430 // Tail was updated by another thread, retry
431 continue;
432 }
433
434 if (next == NULL) {
435 // Tail is actually the last node - try to link new node
436 packet_node_t *expected_null = NULL;
437 if (atomic_compare_exchange_weak_explicit(&tail->next, &expected_null, node, memory_order_release,
438 memory_order_acquire)) {
439 // Successfully linked node - try to swing tail forward (best-effort, ignore failure)
440 atomic_compare_exchange_weak_explicit(&queue->tail, &tail, node, memory_order_release, memory_order_relaxed);
441 break; // Enqueue successful
442 }
443 // CAS failed - another thread appended to tail, retry
444 } else {
445 // Tail is lagging behind - help advance it
446 atomic_compare_exchange_weak_explicit(&queue->tail, &tail, next, memory_order_release, memory_order_relaxed);
447 // Retry with new tail
448 }
449 }
450
451 // Update counters atomically
452 atomic_fetch_add(&queue->count, (size_t)1);
453 atomic_fetch_add(&queue->bytes_queued, packet->data_len);
454 atomic_fetch_add(&queue->packets_enqueued, (uint64_t)1);
455
456 return 0;
457}
@ ERROR_INVALID_PARAM
bool packet_queue_validate_packet(const queued_packet_t *packet)
Validate packet integrity.

References queued_packet_t::buffer_pool, packet_queue_t::buffer_pool, buffer_pool_alloc(), buffer_pool_free(), buffer_pool_get_global(), packet_queue_t::bytes_queued, packet_queue_t::count, queued_packet_t::data, queued_packet_t::data_len, ERROR_INVALID_PARAM, ERROR_MEMORY, packet_queue_t::max_size, packet_queue_t::node_pool, node_pool_get(), node_pool_put(), queued_packet_t::owns_data, packet_node::packet, packet_queue_validate_packet(), packet_queue_t::packets_dropped, packet_queue_t::packets_enqueued, SAFE_MEMCPY, SET_ERRNO, and packet_queue_t::shutdown.

◆ packet_queue_free_packet()

void packet_queue_free_packet ( queued_packet_t packet)

#include <packet_queue.h>

Free a dequeued packet.

Parameters
packetPacket to free (can be NULL)

Properly frees packet memory, returning data to buffer pool if applicable, or calling free() if allocated with malloc. Also frees packet structure itself.

Note
Safe to call with NULL (no-op).
Always call this for packets returned by dequeue functions.

Definition at line 548 of file packet_queue.c.

548 {
549 if (!packet)
550 return;
551
552 // Check if packet was already freed (detect double-free)
553 if (packet->header.magic != HOST_TO_NET_U32(PACKET_MAGIC)) {
554 log_warn("Attempted double-free of packet (magic=0x%x, expected=0x%x)", NET_TO_HOST_U32(packet->header.magic),
556 return;
557 }
558
559 if (packet->owns_data && packet->data) {
560 // Return to appropriate pool or free
561 if (packet->buffer_pool) {
562 buffer_pool_free(packet->buffer_pool, packet->data, packet->data_len);
563 } else {
564 // This was allocated from global pool or malloc, use buffer_pool_free which handles both
565 buffer_pool_free(NULL, packet->data, packet->data_len);
566 }
567 }
568
569 // Mark as freed to detect future double-free attempts
570 // Use network byte order for consistency on big-endian systems
571 packet->header.magic = HOST_TO_NET_U32(0xBEEFDEAD); // Different magic in network byte order
572 SAFE_FREE(packet);
573}
#define NET_TO_HOST_U32(val)
Definition endian.h:86
#define log_warn(...)
Log a WARN message.

References queued_packet_t::buffer_pool, buffer_pool_free(), queued_packet_t::data, queued_packet_t::data_len, queued_packet_t::header, HOST_TO_NET_U32, log_warn, packet_header_t::magic, NET_TO_HOST_U32, queued_packet_t::owns_data, PACKET_MAGIC, and SAFE_FREE.

Referenced by client_send_thread_func(), and packet_queue_clear().

◆ packet_queue_get_stats()

void packet_queue_get_stats ( packet_queue_t queue,
uint64_t enqueued,
uint64_t dequeued,
uint64_t dropped 
)

#include <packet_queue.h>

Get queue statistics.

Parameters
queuePacket queue
enqueuedOutput: Total packets enqueued
dequeuedOutput: Total packets dequeued
droppedOutput: Total packets dropped (due to queue full)

Retrieves cumulative statistics for the queue. Useful for performance monitoring and debugging.

Note
Thread-safe (lock-free using atomic operations).
All output parameters must be non-NULL.

Definition at line 617 of file packet_queue.c.

617 {
618 if (!queue)
619 return;
620
621 // Lock-free atomic reads (acquire semantics for consistency)
622 if (enqueued)
623 *enqueued = atomic_load_explicit(&queue->packets_enqueued, memory_order_acquire);
624 if (dequeued)
625 *dequeued = atomic_load_explicit(&queue->packets_dequeued, memory_order_acquire);
626 if (dropped)
627 *dropped = atomic_load_explicit(&queue->packets_dropped, memory_order_acquire);
628}

References packet_queue_t::packets_dequeued, packet_queue_t::packets_dropped, and packet_queue_t::packets_enqueued.

Referenced by stats_logger_thread().

◆ packet_queue_is_empty()

bool packet_queue_is_empty ( packet_queue_t queue)

#include <packet_queue.h>

Check if queue is empty.

Parameters
queuePacket queue
Returns
true if queue is empty, false otherwise

Returns true if queue contains no packets. Snapshot value may change immediately after return due to concurrent operations.

Note
Thread-safe (lock-free using atomic operations).

Definition at line 583 of file packet_queue.c.

583 {
584 return packet_queue_size(queue) == 0;
585}
size_t packet_queue_size(packet_queue_t *queue)
Get current number of packets in queue.

References packet_queue_size().

◆ packet_queue_is_full()

bool packet_queue_is_full ( packet_queue_t queue)

#include <packet_queue.h>

Check if queue is full.

Parameters
queuePacket queue
Returns
true if queue is full (max_size > 0 and count >= max_size), false otherwise

Returns true if queue has reached its maximum size. Unlimited queues (max_size == 0) never return true from this function.

Note
Thread-safe (lock-free using atomic operations).

Definition at line 587 of file packet_queue.c.

587 {
588 if (!queue || queue->max_size == 0)
589 return false;
590
591 // Lock-free atomic read
592 size_t count = atomic_load_explicit(&queue->count, memory_order_acquire);
593 return (count >= queue->max_size);
594}

References packet_queue_t::count, and packet_queue_t::max_size.

◆ packet_queue_shutdown()

void packet_queue_shutdown ( packet_queue_t queue)

#include <packet_queue.h>

Signal queue shutdown (causes dequeue to return NULL)

Parameters
queuePacket queue

Sets the shutdown flag, causing all dequeue operations to return NULL. This allows consumer threads to exit gracefully without blocking forever.

Note
After shutdown, enqueue operations still succeed, but dequeue returns NULL.
Thread-safe (wakes all waiting dequeue threads).

Definition at line 596 of file packet_queue.c.

596 {
597 if (!queue) {
598 SET_ERRNO(ERROR_INVALID_PARAM, "Invalid parameters: queue=%p", queue);
599 return;
600 }
601
602 // Lock-free atomic store (release semantics ensures visibility to other threads)
603 atomic_store_explicit(&queue->shutdown, true, memory_order_release);
604}

References ERROR_INVALID_PARAM, SET_ERRNO, and packet_queue_t::shutdown.

Referenced by disconnect_client_for_bad_data(), and packet_queue_destroy().

◆ packet_queue_size()

size_t packet_queue_size ( packet_queue_t queue)

#include <packet_queue.h>

Get current number of packets in queue.

Parameters
queuePacket queue
Returns
Number of packets currently queued

Returns the current queue size (count field). This is a snapshot and may change immediately after return due to concurrent enqueue/dequeue operations.

Note
Thread-safe (lock-free using atomic operations).

Definition at line 575 of file packet_queue.c.

575 {
576 if (!queue)
577 return 0;
578
579 // Lock-free atomic read
580 return atomic_load_explicit(&queue->count, memory_order_acquire);
581}

References packet_queue_t::count.

Referenced by client_audio_render_thread(), and packet_queue_is_empty().

◆ packet_queue_try_dequeue()

queued_packet_t * packet_queue_try_dequeue ( packet_queue_t queue)

#include <packet_queue.h>

Try to dequeue a packet without blocking.

Parameters
queuePacket queue
Returns
Pointer to dequeued packet, or NULL if queue is empty or shutdown

Non-blocking version of packet_queue_dequeue(). Returns immediately with NULL if queue is empty or shutdown, otherwise returns next packet.

Note
Caller must free the returned packet with packet_queue_free_packet().
Thread-safe (lock-free using atomic operations).

Definition at line 464 of file packet_queue.c.

464 {
465 if (!queue)
466 return NULL;
467
468 // Check if shutdown (atomic read with acquire semantics)
469 if (atomic_load_explicit(&queue->shutdown, memory_order_acquire)) {
470 return NULL;
471 }
472
473 // Check if queue is empty (atomic read with acquire semantics)
474 size_t current_count = atomic_load_explicit(&queue->count, memory_order_acquire);
475 if (current_count == 0) {
476 return NULL;
477 }
478
479 // Remove from head atomically (lock-free dequeue)
480 packet_node_t *head = atomic_load_explicit(&queue->head, memory_order_acquire);
481 if (!head) {
482 return NULL;
483 }
484
485 // Atomically update head pointer
486 packet_node_t *next = atomic_load_explicit(&head->next, memory_order_acquire);
487 if (atomic_compare_exchange_weak(&queue->head, &head, next)) {
488 // Successfully claimed head node
489 if (next == NULL) {
490 // Queue became empty, also update tail atomically
491 atomic_store_explicit(&queue->tail, (packet_node_t *)NULL, memory_order_release);
492 }
493
494 // Update counters atomically
495 size_t bytes = head->packet.data_len;
496 atomic_fetch_sub(&queue->bytes_queued, bytes);
497 atomic_fetch_sub(&queue->count, (size_t)1);
498 atomic_fetch_add(&queue->packets_dequeued, (uint64_t)1);
499
500 // Verify packet magic number for corruption detection
502 if (magic != PACKET_MAGIC) {
503 SET_ERRNO(ERROR_BUFFER, "CORRUPTION: Invalid magic in try_dequeued packet: 0x%x (expected 0x%x), type=%u", magic,
505 // Still return node to pool but don't return corrupted packet
506 node_pool_put(queue->node_pool, head);
507 return NULL;
508 }
509
510 // Validate CRC if there's data
511 if (head->packet.data_len > 0 && head->packet.data) {
512 uint32_t expected_crc = NET_TO_HOST_U32(head->packet.header.crc32);
513 uint32_t actual_crc = asciichat_crc32(head->packet.data, head->packet.data_len);
514 if (actual_crc != expected_crc) {
516 "CORRUPTION: CRC mismatch in try_dequeued packet: got 0x%x, expected 0x%x, type=%u, len=%zu",
517 actual_crc, expected_crc, NET_TO_HOST_U16(head->packet.header.type), head->packet.data_len);
518 // Free data if packet owns it
519 if (head->packet.owns_data && head->packet.data) {
520 // Use buffer_pool_free for global pool allocations, buffer_pool_free for local pools
521 if (head->packet.buffer_pool) {
523 } else {
524 // This was allocated from global pool or malloc, use buffer_pool_free which handles both
525 buffer_pool_free(NULL, head->packet.data, head->packet.data_len);
526 }
527 // CRITICAL: Clear pointer to prevent double-free when packet is copied later
528 head->packet.data = NULL;
529 head->packet.owns_data = false;
530 }
531 node_pool_put(queue->node_pool, head);
532 return NULL;
533 }
534 }
535
536 // Extract packet and return node to pool
537 queued_packet_t *packet;
538 packet = SAFE_MALLOC(sizeof(queued_packet_t), queued_packet_t *);
539 SAFE_MEMCPY(packet, sizeof(queued_packet_t), &head->packet, sizeof(queued_packet_t));
540 node_pool_put(queue->node_pool, head);
541 return packet;
542 }
543
544 // CAS failed - another thread dequeued, retry if needed (or return NULL for non-blocking)
545 return NULL;
546}
#define NET_TO_HOST_U16(val)
Definition endian.h:116
@ ERROR_BUFFER
Definition error_codes.h:96

References asciichat_crc32, queued_packet_t::buffer_pool, buffer_pool_free(), packet_queue_t::bytes_queued, packet_queue_t::count, packet_header_t::crc32, queued_packet_t::data, queued_packet_t::data_len, ERROR_BUFFER, queued_packet_t::header, packet_header_t::magic, NET_TO_HOST_U16, NET_TO_HOST_U32, packet_queue_t::node_pool, node_pool_put(), queued_packet_t::owns_data, packet_node::packet, PACKET_MAGIC, packet_queue_t::packets_dequeued, SAFE_MALLOC, SAFE_MEMCPY, SET_ERRNO, packet_queue_t::shutdown, and packet_header_t::type.

Referenced by client_send_thread_func(), packet_queue_clear(), and packet_queue_dequeue().

◆ packet_queue_validate_packet()

bool packet_queue_validate_packet ( const queued_packet_t packet)

#include <packet_queue.h>

Validate packet integrity.

Parameters
packetPacket to validate
Returns
true if packet is valid, false otherwise

Validates packet structure and header integrity. Useful for debugging and detecting memory corruption.

Note
This is a basic validation - does not verify payload contents.

Definition at line 630 of file packet_queue.c.

630 {
631 if (!packet) {
632 return false;
633 }
634
635 // Check magic number
636 uint32_t magic = NET_TO_HOST_U32(packet->header.magic);
637 if (magic != PACKET_MAGIC) {
638 SET_ERRNO(ERROR_BUFFER, "Invalid packet magic: 0x%x (expected 0x%x)", magic, PACKET_MAGIC);
639 return false;
640 }
641
642 // Check packet type is valid
643 uint16_t type = NET_TO_HOST_U16(packet->header.type);
644 if (type < PACKET_TYPE_ASCII_FRAME || type > PACKET_TYPE_AUDIO_BATCH) {
645 SET_ERRNO(ERROR_BUFFER, "Invalid packet type: %u", type);
646 return false;
647 }
648
649 // Check length matches data_len
650 uint32_t length = NET_TO_HOST_U32(packet->header.length);
651 if (length != packet->data_len) {
652 SET_ERRNO(ERROR_BUFFER, "Packet length mismatch: header says %u, data_len is %zu", length, packet->data_len);
653 return false;
654 }
655
656 // Check CRC if there's data
657 if (packet->data_len > 0 && packet->data) {
658 uint32_t expected_crc = NET_TO_HOST_U32(packet->header.crc32);
659 uint32_t actual_crc = asciichat_crc32(packet->data, packet->data_len);
660 if (actual_crc != expected_crc) {
661 SET_ERRNO(ERROR_BUFFER, "Packet CRC mismatch: got 0x%x, expected 0x%x", actual_crc, expected_crc);
662 return false;
663 }
664 }
665
666 return true;
667}
@ PACKET_TYPE_AUDIO_BATCH
Batched audio packets for efficiency.
Definition packet.h:343

References asciichat_crc32, packet_header_t::crc32, queued_packet_t::data, queued_packet_t::data_len, ERROR_BUFFER, queued_packet_t::header, packet_header_t::length, packet_header_t::magic, NET_TO_HOST_U16, NET_TO_HOST_U32, PACKET_MAGIC, PACKET_TYPE_AUDIO_BATCH, SET_ERRNO, and packet_header_t::type.

Referenced by packet_queue_enqueue_packet().

Variable Documentation

◆ buffer_pool [1/2]

buffer_pool_t* queued_packet_t::buffer_pool

Pool that allocated the data (NULL if malloc'd)

Definition at line 130 of file packet_queue.h.

Referenced by packet_queue_enqueue(), packet_queue_enqueue_packet(), packet_queue_free_packet(), and packet_queue_try_dequeue().

◆ buffer_pool [2/2]

buffer_pool_t* packet_queue_t::buffer_pool

Optional memory pool for data buffers (NULL = use malloc/free)

Definition at line 228 of file packet_queue.h.

Referenced by packet_queue_create_with_pools(), packet_queue_destroy(), packet_queue_enqueue(), and packet_queue_enqueue_packet().

◆ bytes_queued

_Atomic size_t packet_queue_t::bytes_queued

Total bytes of data queued (for monitoring) - atomic for lock-free access.

Definition at line 223 of file packet_queue.h.

Referenced by packet_queue_create_with_pools(), packet_queue_enqueue(), packet_queue_enqueue_packet(), and packet_queue_try_dequeue().

◆ count

_Atomic size_t packet_queue_t::count

Number of packets currently in queue - atomic for lock-free access.

Definition at line 219 of file packet_queue.h.

Referenced by packet_queue_create_with_pools(), packet_queue_enqueue(), packet_queue_enqueue_packet(), packet_queue_is_full(), packet_queue_size(), and packet_queue_try_dequeue().

◆ data

void* queued_packet_t::data

Packet payload data (can be NULL for header-only packets)

Definition at line 124 of file packet_queue.h.

Referenced by client_send_thread_func(), packet_queue_enqueue(), packet_queue_enqueue_packet(), packet_queue_free_packet(), packet_queue_try_dequeue(), and packet_queue_validate_packet().

◆ data_len

size_t queued_packet_t::data_len

◆ free_list

packet_node_t* node_pool::free_list

Stack of free nodes (LIFO for cache locality)

Definition at line 169 of file packet_queue.h.

Referenced by node_pool_create(), node_pool_get(), and node_pool_put().

◆ header

packet_header_t queued_packet_t::header

Complete packet header (already in network byte order)

Definition at line 122 of file packet_queue.h.

Referenced by packet_queue_enqueue(), packet_queue_free_packet(), packet_queue_try_dequeue(), and packet_queue_validate_packet().

◆ max_size

size_t packet_queue_t::max_size

Maximum queue size (0 = unlimited)

Definition at line 221 of file packet_queue.h.

Referenced by packet_queue_create_with_pools(), packet_queue_enqueue(), packet_queue_enqueue_packet(), and packet_queue_is_full().

◆ node_pool

node_pool_t* packet_queue_t::node_pool

Optional memory pool for nodes (NULL = use malloc/free)

Definition at line 226 of file packet_queue.h.

Referenced by packet_queue_create_with_pools(), packet_queue_destroy(), packet_queue_enqueue(), packet_queue_enqueue_packet(), and packet_queue_try_dequeue().

◆ nodes

packet_node_t* node_pool::nodes

Pre-allocated array of all nodes.

Definition at line 171 of file packet_queue.h.

Referenced by node_pool_create(), node_pool_destroy(), and node_pool_put().

◆ owns_data

bool queued_packet_t::owns_data

If true, free data when packet is freed.

Definition at line 128 of file packet_queue.h.

Referenced by packet_queue_enqueue(), packet_queue_enqueue_packet(), packet_queue_free_packet(), and packet_queue_try_dequeue().

◆ packet

queued_packet_t packet_node::packet

The queued packet data.

Definition at line 149 of file packet_queue.h.

Referenced by packet_queue_enqueue(), packet_queue_enqueue_packet(), and packet_queue_try_dequeue().

◆ packets_dequeued

_Atomic uint64_t packet_queue_t::packets_dequeued

Total packets dequeued (statistics) - atomic for lock-free access.

Definition at line 233 of file packet_queue.h.

Referenced by packet_queue_create_with_pools(), packet_queue_get_stats(), and packet_queue_try_dequeue().

◆ packets_dropped

_Atomic uint64_t packet_queue_t::packets_dropped

Total packets dropped due to queue full (statistics) - atomic for lock-free access.

Definition at line 235 of file packet_queue.h.

Referenced by packet_queue_create_with_pools(), packet_queue_enqueue(), packet_queue_enqueue_packet(), and packet_queue_get_stats().

◆ packets_enqueued

_Atomic uint64_t packet_queue_t::packets_enqueued

Total packets enqueued (statistics) - atomic for lock-free access.

Definition at line 231 of file packet_queue.h.

Referenced by packet_queue_create_with_pools(), packet_queue_enqueue(), packet_queue_enqueue_packet(), and packet_queue_get_stats().

◆ pool_mutex

mutex_t node_pool::pool_mutex

Mutex protecting free list access.

Definition at line 177 of file packet_queue.h.

Referenced by node_pool_create(), node_pool_destroy(), node_pool_get(), and node_pool_put().

◆ pool_size

size_t node_pool::pool_size

Total number of nodes in pool.

Definition at line 173 of file packet_queue.h.

Referenced by node_pool_create(), node_pool_get(), and node_pool_put().

◆ shutdown

_Atomic bool packet_queue_t::shutdown

Shutdown flag (true = dequeue returns NULL) - atomic for lock-free access.

Definition at line 238 of file packet_queue.h.

Referenced by packet_queue_create_with_pools(), packet_queue_enqueue(), packet_queue_enqueue_packet(), packet_queue_shutdown(), and packet_queue_try_dequeue().

◆ used_count

size_t node_pool::used_count

Number of nodes currently in use.

Definition at line 175 of file packet_queue.h.

Referenced by node_pool_create(), node_pool_get(), and node_pool_put().