37 for (
size_t i = 0; i < pool_size - 1; i++) {
38 atomic_store_explicit(&pool->
nodes[i].next, &pool->
nodes[i + 1], memory_order_relaxed);
40 atomic_store_explicit(&pool->
nodes[pool_size - 1].next, (
packet_node_t *)NULL, memory_order_relaxed);
77 pool->
free_list = atomic_load_explicit(&node->next, memory_order_relaxed);
79 atomic_store_explicit(&node->next, (
packet_node_t *)NULL, memory_order_relaxed);
112 atomic_store_explicit(&node->next, pool->
free_list, memory_order_relaxed);
142 atomic_store_explicit(&queue->head, (
packet_node_t *)NULL, memory_order_relaxed);
143 atomic_store_explicit(&queue->tail, (
packet_node_t *)NULL, memory_order_relaxed);
144 atomic_init(&queue->
count, (
size_t)0);
156 atomic_init(&queue->
shutdown,
false);
186 uint32_t client_id,
bool copy_data) {
191 if (atomic_load_explicit(&queue->
shutdown, memory_order_acquire)) {
196 size_t current_count = atomic_load_explicit(&queue->
count, memory_order_acquire);
199 packet_node_t *head = atomic_load_explicit(&queue->head, memory_order_acquire);
201 packet_node_t *next = atomic_load_explicit(&head->next, memory_order_acquire);
203 if (atomic_compare_exchange_weak(&queue->head, &head, next)) {
207 atomic_store_explicit(&queue->tail, (
packet_node_t *)NULL, memory_order_release);
213 atomic_fetch_sub(&queue->
count, (
size_t)1);
244 if (data_len > 0 && data) {
270 atomic_store_explicit(&node->next, (
packet_node_t *)NULL, memory_order_relaxed);
274 packet_node_t *tail = atomic_load_explicit(&queue->tail, memory_order_acquire);
279 if (atomic_compare_exchange_weak_explicit(&queue->head, &expected, node, memory_order_release,
280 memory_order_acquire)) {
282 atomic_store_explicit(&queue->tail, node, memory_order_release);
290 packet_node_t *next = atomic_load_explicit(&tail->next, memory_order_acquire);
291 packet_node_t *current_tail = atomic_load_explicit(&queue->tail, memory_order_acquire);
294 if (tail != current_tail) {
302 if (atomic_compare_exchange_weak_explicit(&tail->next, &expected_null, node, memory_order_release,
303 memory_order_acquire)) {
305 atomic_compare_exchange_weak_explicit(&queue->tail, &tail, node, memory_order_release, memory_order_relaxed);
311 atomic_compare_exchange_weak_explicit(&queue->tail, &tail, next, memory_order_release, memory_order_relaxed);
317 atomic_fetch_add(&queue->
count, (
size_t)1);
325 if (!queue || !packet) {
337 if (atomic_load_explicit(&queue->
shutdown, memory_order_acquire)) {
342 size_t current_count = atomic_load_explicit(&queue->
count, memory_order_acquire);
345 packet_node_t *head = atomic_load_explicit(&queue->head, memory_order_acquire);
347 packet_node_t *next = atomic_load_explicit(&head->next, memory_order_acquire);
349 if (atomic_compare_exchange_weak(&queue->head, &head, next)) {
353 atomic_store_explicit(&queue->tail, (
packet_node_t *)NULL, memory_order_release);
359 atomic_fetch_sub(&queue->
count, (
size_t)1);
405 atomic_store_explicit(&node->next, (
packet_node_t *)NULL, memory_order_relaxed);
409 packet_node_t *tail = atomic_load_explicit(&queue->tail, memory_order_acquire);
414 if (atomic_compare_exchange_weak_explicit(&queue->head, &expected, node, memory_order_release,
415 memory_order_acquire)) {
417 atomic_store_explicit(&queue->tail, node, memory_order_release);
425 packet_node_t *next = atomic_load_explicit(&tail->next, memory_order_acquire);
426 packet_node_t *current_tail = atomic_load_explicit(&queue->tail, memory_order_acquire);
429 if (tail != current_tail) {
437 if (atomic_compare_exchange_weak_explicit(&tail->next, &expected_null, node, memory_order_release,
438 memory_order_acquire)) {
440 atomic_compare_exchange_weak_explicit(&queue->tail, &tail, node, memory_order_release, memory_order_relaxed);
446 atomic_compare_exchange_weak_explicit(&queue->tail, &tail, next, memory_order_release, memory_order_relaxed);
452 atomic_fetch_add(&queue->
count, (
size_t)1);
469 if (atomic_load_explicit(&queue->
shutdown, memory_order_acquire)) {
474 size_t current_count = atomic_load_explicit(&queue->
count, memory_order_acquire);
475 if (current_count == 0) {
480 packet_node_t *head = atomic_load_explicit(&queue->head, memory_order_acquire);
486 packet_node_t *next = atomic_load_explicit(&head->next, memory_order_acquire);
487 if (atomic_compare_exchange_weak(&queue->head, &head, next)) {
491 atomic_store_explicit(&queue->tail, (
packet_node_t *)NULL, memory_order_release);
497 atomic_fetch_sub(&queue->
count, (
size_t)1);
503 SET_ERRNO(
ERROR_BUFFER,
"CORRUPTION: Invalid magic in try_dequeued packet: 0x%x (expected 0x%x), type=%u", magic,
514 if (actual_crc != expected_crc) {
516 "CORRUPTION: CRC mismatch in try_dequeued packet: got 0x%x, expected 0x%x, type=%u, len=%zu",
580 return atomic_load_explicit(&queue->
count, memory_order_acquire);
592 size_t count = atomic_load_explicit(&queue->
count, memory_order_acquire);
603 atomic_store_explicit(&queue->
shutdown,
true, memory_order_release);
623 *enqueued = atomic_load_explicit(&queue->
packets_enqueued, memory_order_acquire);
625 *dequeued = atomic_load_explicit(&queue->
packets_dequeued, memory_order_acquire);
627 *dropped = atomic_load_explicit(&queue->
packets_dropped, memory_order_acquire);
660 if (actual_crc != expected_crc) {
⚠️‼️ Error and/or exit() when things go bad.
🗃️ Lock-Free Unified Memory Buffer Pool with Lazy Allocation
Hardware-Accelerated CRC32 Checksum Computation.
🔄 Network byte order conversion helpers
#define HOST_TO_NET_U16(val)
#define HOST_TO_NET_U32(val)
#define NET_TO_HOST_U16(val)
#define NET_TO_HOST_U32(val)
buffer_pool_t * buffer_pool_get_global(void)
void buffer_pool_destroy(buffer_pool_t *pool)
Destroy a buffer pool and free all memory.
void buffer_pool_log_stats(buffer_pool_t *pool, const char *name)
Log pool statistics.
void buffer_pool_free(buffer_pool_t *pool, void *data, size_t size)
Free a buffer back to the pool (lock-free)
buffer_pool_t * buffer_pool_create(size_t max_bytes, uint64_t shrink_delay_ms)
Create a new buffer pool.
void * buffer_pool_alloc(buffer_pool_t *pool, size_t size)
Allocate a buffer from the pool (lock-free fast path)
#define SAFE_MALLOC(size, cast)
unsigned long long uint64_t
#define SAFE_MEMCPY(dest, dest_size, src, count)
#define SET_ERRNO(code, context_msg,...)
Set error code with custom context message and log it.
#define LOG_RATE_FAST
Log rate limit: 1 second (1,000,000 microseconds)
#define log_warn(...)
Log a WARN message.
#define log_debug_every(interval_us, fmt,...)
Rate-limited DEBUG logging.
#define log_debug(...)
Log a DEBUG message.
uint32_t magic
Magic number (PACKET_MAGIC) for packet validation.
uint32_t client_id
Client ID (0 = server, >0 = client identifier)
uint32_t length
Payload data length in bytes (0 for header-only packets)
uint16_t type
Packet type (packet_type_t enumeration)
uint32_t crc32
CRC32 checksum of payload data (0 if length == 0)
_Atomic size_t count
Number of packets currently in queue - atomic for lock-free access.
void packet_queue_shutdown(packet_queue_t *queue)
Signal queue shutdown (causes dequeue to return NULL)
size_t max_size
Maximum queue size (0 = unlimited)
size_t used_count
Number of nodes currently in use.
void packet_queue_clear(packet_queue_t *queue)
Clear all packets from queue.
_Atomic uint64_t packets_dropped
Total packets dropped due to queue full (statistics) - atomic for lock-free access.
buffer_pool_t * buffer_pool
Optional memory pool for data buffers (NULL = use malloc/free)
size_t pool_size
Total number of nodes in pool.
packet_queue_t * packet_queue_create(size_t max_size)
Create a new packet queue.
packet_node_t * node_pool_get(node_pool_t *pool)
Get a free node from the pool.
_Atomic uint64_t packets_dequeued
Total packets dequeued (statistics) - atomic for lock-free access.
packet_queue_t * packet_queue_create_with_pools(size_t max_size, size_t node_pool_size, bool use_buffer_pool)
Create a packet queue with both node and buffer pools.
void packet_queue_get_stats(packet_queue_t *queue, uint64_t *enqueued, uint64_t *dequeued, uint64_t *dropped)
Get queue statistics.
bool owns_data
If true, free data when packet is freed.
bool packet_queue_is_empty(packet_queue_t *queue)
Check if queue is empty.
size_t data_len
Length of payload data in bytes.
void packet_queue_free_packet(queued_packet_t *packet)
Free a dequeued packet.
_Atomic size_t bytes_queued
Total bytes of data queued (for monitoring) - atomic for lock-free access.
packet_node_t * nodes
Pre-allocated array of all nodes.
int packet_queue_enqueue_packet(packet_queue_t *queue, const queued_packet_t *packet)
Enqueue a pre-built packet (for special cases like compressed frames)
packet_queue_t * packet_queue_create_with_pool(size_t max_size, size_t pool_size)
Create a packet queue with node pool.
int packet_queue_enqueue(packet_queue_t *queue, packet_type_t type, const void *data, size_t data_len, uint32_t client_id, bool copy_data)
Enqueue a packet into the queue.
_Atomic bool shutdown
Shutdown flag (true = dequeue returns NULL) - atomic for lock-free access.
packet_header_t header
Complete packet header (already in network byte order)
bool packet_queue_is_full(packet_queue_t *queue)
Check if queue is full.
queued_packet_t * packet_queue_try_dequeue(packet_queue_t *queue)
Try to dequeue a packet without blocking.
void node_pool_put(node_pool_t *pool, packet_node_t *node)
Return a node to the pool.
buffer_pool_t * buffer_pool
Pool that allocated the data (NULL if malloc'd)
mutex_t pool_mutex
Mutex protecting free list access.
void node_pool_destroy(node_pool_t *pool)
Destroy a node pool and free all memory.
void packet_queue_destroy(packet_queue_t *queue)
Destroy a packet queue and free all resources.
_Atomic uint64_t packets_enqueued
Total packets enqueued (statistics) - atomic for lock-free access.
queued_packet_t packet
The queued packet data.
bool packet_queue_validate_packet(const queued_packet_t *packet)
Validate packet integrity.
queued_packet_t * packet_queue_dequeue(packet_queue_t *queue)
Dequeue a packet from the queue (non-blocking)
size_t packet_queue_size(packet_queue_t *queue)
Get current number of packets in queue.
packet_node_t * free_list
Stack of free nodes (LIFO for cache locality)
void * data
Packet payload data (can be NULL for header-only packets)
node_pool_t * node_pool
Optional memory pool for nodes (NULL = use malloc/free)
node_pool_t * node_pool_create(size_t pool_size)
Create a memory pool for packet queue nodes.
packet_type_t
Network protocol packet type enumeration.
#define PACKET_MAGIC
Packet magic number (0xDEADBEEF)
@ PACKET_TYPE_AUDIO_BATCH
Batched audio packets for efficiency.
#define asciichat_crc32(data, len)
Main CRC32 dispatcher macro - use this in application code.
📬 Thread-safe packet queue system for per-client send threads
Memory pool for packet nodes to reduce malloc/free overhead.
Node in the packet queue linked list.
Thread-safe packet queue for producer-consumer communication.
Single packet ready to send (header already in network byte order)