31 pool = SAFE_MALLOC(
sizeof(node_pool_t), node_pool_t *);
34 pool->nodes = SAFE_MALLOC(
sizeof(packet_node_t) * pool_size, packet_node_t *);
37 for (
size_t i = 0; i < pool_size - 1; i++) {
38 atomic_store_explicit(&pool->nodes[i].next, &pool->nodes[i + 1], memory_order_relaxed);
40 atomic_store_explicit(&pool->nodes[pool_size - 1].next, (packet_node_t *)NULL, memory_order_relaxed);
42 pool->free_list = &pool->nodes[0];
43 pool->pool_size = pool_size;
47 SET_ERRNO(ERROR_PLATFORM_INIT,
"Failed to initialize mutex for node pool");
69 node = SAFE_MALLOC(
sizeof(packet_node_t), packet_node_t *);
73 mutex_lock(&pool->pool_mutex);
75 packet_node_t *node = pool->free_list;
77 pool->free_list = atomic_load_explicit(&node->next, memory_order_relaxed);
79 atomic_store_explicit(&node->next, (packet_node_t *)NULL, memory_order_relaxed);
82 mutex_unlock(&pool->pool_mutex);
86 node = SAFE_MALLOC(
sizeof(packet_node_t), packet_node_t *);
88 log_debug(
"Memory pool exhausted, falling back to SAFE_MALLOC (used: %zu/%zu)", pool->used_count, pool->pool_size);
137 packet_queue_t *queue;
138 queue = SAFE_MALLOC(
sizeof(packet_queue_t), packet_queue_t *);
142 atomic_store_explicit(&queue->head, (packet_node_t *)NULL, memory_order_relaxed);
143 atomic_store_explicit(&queue->tail, (packet_node_t *)NULL, memory_order_relaxed);
144 atomic_init(&queue->count, (
size_t)0);
145 queue->max_size = max_size;
146 atomic_init(&queue->bytes_queued, (
size_t)0);
149 queue->node_pool = node_pool_size > 0 ?
node_pool_create(node_pool_size) : NULL;
153 atomic_init(&queue->packets_enqueued, (uint64_t)0);
154 atomic_init(&queue->packets_dequeued, (uint64_t)0);
155 atomic_init(&queue->packets_dropped, (uint64_t)0);
156 atomic_init(&queue->shutdown,
false);
186 uint32_t client_id,
bool copy_data) {
191 if (atomic_load_explicit(&queue->shutdown, memory_order_acquire)) {
196 size_t current_count = atomic_load_explicit(&queue->count, memory_order_acquire);
197 if (queue->max_size > 0 && current_count >= queue->max_size) {
199 packet_node_t *head = atomic_load_explicit(&queue->head, memory_order_acquire);
201 packet_node_t *next = atomic_load_explicit(&head->next, memory_order_acquire);
203 if (atomic_compare_exchange_weak(&queue->head, &head, next)) {
207 atomic_store_explicit(&queue->tail, (packet_node_t *)NULL, memory_order_release);
211 size_t bytes = head->packet.data_len;
212 atomic_fetch_sub(&queue->bytes_queued, bytes);
213 atomic_fetch_sub(&queue->count, (
size_t)1);
214 atomic_fetch_add(&queue->packets_dropped, (uint64_t)1);
217 if (head->packet.owns_data && head->packet.data) {
218 buffer_pool_free(head->packet.buffer_pool, head->packet.data, head->packet.data_len);
222 log_dev_every(4500 * US_PER_MS_INT,
"Dropped packet from queue (full): type=%d, client=%u", type, client_id);
231 SET_ERRNO(ERROR_MEMORY,
"Failed to allocate packet node");
236 node->packet.header.magic = HOST_TO_NET_U64(PACKET_MAGIC);
237 node->packet.header.type = HOST_TO_NET_U16((uint16_t)type);
238 node->packet.header.length = HOST_TO_NET_U32((uint32_t)data_len);
239 node->packet.header.client_id = HOST_TO_NET_U32(client_id);
241 node->packet.header.crc32 = HOST_TO_NET_U32(data_len > 0 ? asciichat_crc32(data, data_len) : 0);
244 if (data_len > 0 && data) {
247 if (queue->buffer_pool) {
249 node->packet.buffer_pool = queue->buffer_pool;
255 SAFE_MEMCPY(node->packet.data, data_len, data, data_len);
256 node->packet.owns_data =
true;
259 node->packet.data = (
void *)data;
260 node->packet.owns_data =
false;
261 node->packet.buffer_pool = NULL;
264 node->packet.data = NULL;
265 node->packet.owns_data =
false;
266 node->packet.buffer_pool = NULL;
269 node->packet.data_len = data_len;
270 atomic_store_explicit(&node->next, (packet_node_t *)NULL, memory_order_relaxed);
274 packet_node_t *tail = atomic_load_explicit(&queue->tail, memory_order_acquire);
278 packet_node_t *expected = NULL;
279 if (atomic_compare_exchange_weak_explicit(&queue->head, &expected, node, memory_order_release,
280 memory_order_acquire)) {
282 atomic_store_explicit(&queue->tail, node, memory_order_release);
290 packet_node_t *next = atomic_load_explicit(&tail->next, memory_order_acquire);
291 packet_node_t *current_tail = atomic_load_explicit(&queue->tail, memory_order_acquire);
294 if (tail != current_tail) {
301 packet_node_t *expected_null = NULL;
302 if (atomic_compare_exchange_weak_explicit(&tail->next, &expected_null, node, memory_order_release,
303 memory_order_acquire)) {
305 atomic_compare_exchange_weak_explicit(&queue->tail, &tail, node, memory_order_release, memory_order_relaxed);
311 atomic_compare_exchange_weak_explicit(&queue->tail, &tail, next, memory_order_release, memory_order_relaxed);
317 atomic_fetch_add(&queue->count, (
size_t)1);
318 atomic_fetch_add(&queue->bytes_queued, data_len);
319 atomic_fetch_add(&queue->packets_enqueued, (uint64_t)1);
325 if (!queue || !packet) {
326 SET_ERRNO(ERROR_INVALID_PARAM,
"Invalid parameters: queue=%p, packet=%p", queue, packet);
332 SET_ERRNO(ERROR_INVALID_PARAM,
"Refusing to enqueue invalid packet");
337 if (atomic_load_explicit(&queue->shutdown, memory_order_acquire)) {
342 size_t current_count = atomic_load_explicit(&queue->count, memory_order_acquire);
343 if (queue->max_size > 0 && current_count >= queue->max_size) {
345 packet_node_t *head = atomic_load_explicit(&queue->head, memory_order_acquire);
347 packet_node_t *next = atomic_load_explicit(&head->next, memory_order_acquire);
349 if (atomic_compare_exchange_weak(&queue->head, &head, next)) {
353 atomic_store_explicit(&queue->tail, (packet_node_t *)NULL, memory_order_release);
357 size_t bytes = head->packet.data_len;
358 atomic_fetch_sub(&queue->bytes_queued, bytes);
359 atomic_fetch_sub(&queue->count, (
size_t)1);
360 atomic_fetch_add(&queue->packets_dropped, (uint64_t)1);
363 if (head->packet.owns_data && head->packet.data) {
364 buffer_pool_free(head->packet.buffer_pool, head->packet.data, head->packet.data_len);
375 SET_ERRNO(ERROR_MEMORY,
"Failed to allocate packet node");
380 SAFE_MEMCPY(&node->packet,
sizeof(queued_packet_t), packet,
sizeof(queued_packet_t));
383 if (packet->data && packet->data_len > 0 && packet->owns_data) {
387 if (queue->buffer_pool) {
389 node->packet.buffer_pool = queue->buffer_pool;
395 SAFE_MEMCPY(data_copy, packet->data_len, packet->data, packet->data_len);
396 node->packet.data = data_copy;
397 node->packet.owns_data =
true;
400 node->packet.data = packet->data;
401 node->packet.owns_data = packet->owns_data;
402 node->packet.buffer_pool = packet->buffer_pool;
405 atomic_store_explicit(&node->next, (packet_node_t *)NULL, memory_order_relaxed);
409 packet_node_t *tail = atomic_load_explicit(&queue->tail, memory_order_acquire);
413 packet_node_t *expected = NULL;
414 if (atomic_compare_exchange_weak_explicit(&queue->head, &expected, node, memory_order_release,
415 memory_order_acquire)) {
417 atomic_store_explicit(&queue->tail, node, memory_order_release);
425 packet_node_t *next = atomic_load_explicit(&tail->next, memory_order_acquire);
426 packet_node_t *current_tail = atomic_load_explicit(&queue->tail, memory_order_acquire);
429 if (tail != current_tail) {
436 packet_node_t *expected_null = NULL;
437 if (atomic_compare_exchange_weak_explicit(&tail->next, &expected_null, node, memory_order_release,
438 memory_order_acquire)) {
440 atomic_compare_exchange_weak_explicit(&queue->tail, &tail, node, memory_order_release, memory_order_relaxed);
446 atomic_compare_exchange_weak_explicit(&queue->tail, &tail, next, memory_order_release, memory_order_relaxed);
452 atomic_fetch_add(&queue->count, (
size_t)1);
453 atomic_fetch_add(&queue->bytes_queued, packet->data_len);
454 atomic_fetch_add(&queue->packets_enqueued, (uint64_t)1);
469 if (atomic_load_explicit(&queue->shutdown, memory_order_acquire)) {
474 size_t current_count = atomic_load_explicit(&queue->count, memory_order_acquire);
475 if (current_count == 0) {
480 packet_node_t *head = atomic_load_explicit(&queue->head, memory_order_acquire);
486 packet_node_t *next = atomic_load_explicit(&head->next, memory_order_acquire);
487 if (atomic_compare_exchange_weak(&queue->head, &head, next)) {
491 atomic_store_explicit(&queue->tail, (packet_node_t *)NULL, memory_order_release);
495 size_t bytes = head->packet.data_len;
496 atomic_fetch_sub(&queue->bytes_queued, bytes);
497 atomic_fetch_sub(&queue->count, (
size_t)1);
498 atomic_fetch_add(&queue->packets_dequeued, (uint64_t)1);
501 uint64_t magic = NET_TO_HOST_U64(head->packet.header.magic);
502 if (magic != PACKET_MAGIC) {
503 SET_ERRNO(ERROR_BUFFER,
"CORRUPTION: Invalid magic in try_dequeued packet: 0x%llx (expected 0x%llx), type=%u",
504 magic, PACKET_MAGIC, NET_TO_HOST_U16(head->packet.header.type));
511 if (head->packet.data_len > 0 && head->packet.data) {
512 uint32_t expected_crc = NET_TO_HOST_U32(head->packet.header.crc32);
513 uint32_t actual_crc = asciichat_crc32(head->packet.data, head->packet.data_len);
514 if (actual_crc != expected_crc) {
515 SET_ERRNO(ERROR_BUFFER,
516 "CORRUPTION: CRC mismatch in try_dequeued packet: got 0x%x, expected 0x%x, type=%u, len=%zu",
517 actual_crc, expected_crc, NET_TO_HOST_U16(head->packet.header.type), head->packet.data_len);
519 if (head->packet.owns_data && head->packet.data) {
521 if (head->packet.buffer_pool) {
522 buffer_pool_free(head->packet.buffer_pool, head->packet.data, head->packet.data_len);
528 head->packet.data = NULL;
529 head->packet.owns_data =
false;
537 queued_packet_t *packet;
538 packet = SAFE_MALLOC(
sizeof(queued_packet_t), queued_packet_t *);
539 SAFE_MEMCPY(packet,
sizeof(queued_packet_t), &head->packet,
sizeof(queued_packet_t));
636 uint64_t magic = NET_TO_HOST_U64(packet->header.magic);
637 if (magic != PACKET_MAGIC) {
638 SET_ERRNO(ERROR_BUFFER,
"Invalid packet magic: 0x%llx (expected 0x%llx)", magic, PACKET_MAGIC);
643 uint16_t type = NET_TO_HOST_U16(packet->header.type);
644 if (type == 0 || type > 10000) {
645 SET_ERRNO(ERROR_BUFFER,
"Invalid packet type: %u", type);
650 uint32_t length = NET_TO_HOST_U32(packet->header.length);
651 if (length != packet->data_len) {
652 SET_ERRNO(ERROR_BUFFER,
"Packet length mismatch: header says %u, data_len is %zu", length, packet->data_len);
657 if (packet->data_len > 0 && packet->data) {
658 uint32_t expected_crc = NET_TO_HOST_U32(packet->header.crc32);
659 uint32_t actual_crc = asciichat_crc32(packet->data, packet->data_len);
660 if (actual_crc != expected_crc) {
661 SET_ERRNO(ERROR_BUFFER,
"Packet CRC mismatch: got 0x%x, expected 0x%x", actual_crc, expected_crc);