ascii-chat 0.6.0
Real-time terminal-based video chat with ASCII art conversion
Loading...
Searching...
No Matches
src/server/client.c
Go to the documentation of this file.
1
103#include <stdatomic.h>
104#include <stdio.h>
105#include <string.h>
106#include <time.h>
107#include <errno.h>
108#ifndef _WIN32
109#include <netinet/tcp.h>
110#endif
111
112#include "client.h"
113#include "main.h"
114#include "protocol.h"
115#include "render.h"
116#include "stream.h"
117#include "crypto.h"
119#include "crypto/crypto.h"
120#include "common.h"
121#include "util/endian.h"
122#include "asciichat_errno.h"
123#include "options/options.h"
124#include "options/rcu.h" // For RCU-based options access
125#include "buffer_pool.h"
126#include "network/network.h"
127#include "network/packet.h"
128#include "network/packet_queue.h"
129#include "network/errors.h"
132#include "network/acip/send.h"
133#include "network/acip/server.h"
134#include "audio/audio.h"
135#include "audio/mixer.h"
136#include "audio/opus_codec.h"
137#include "video/video_frame.h"
138#include "util/uthash.h"
139#include "util/endian.h"
140#include "util/format.h"
141#include "util/time.h"
142#include "platform/abstraction.h"
143#include "platform/string.h"
144#include "platform/socket.h"
145#include "network/crc32.h"
146#include "network/logging.h"
147
148// Debug flags
149#define DEBUG_NETWORK 1
150#define DEBUG_THREADS 1
151#define DEBUG_MEMORY 1
152
153// Forward declarations for static helper functions
154static inline void cleanup_client_all_buffers(client_info_t *client);
155
156static void handle_client_error_packet(client_info_t *client, const void *data, size_t len) {
157 asciichat_error_t reported_error = ASCIICHAT_OK;
158 char message[MAX_ERROR_MESSAGE_LENGTH + 1] = {0};
159
160 asciichat_error_t parse_result =
161 packet_parse_error_message(data, len, &reported_error, message, sizeof(message), NULL);
162 uint32_t client_id = client ? atomic_load(&client->client_id) : 0;
163
164 if (parse_result != ASCIICHAT_OK) {
165 log_warn("Failed to parse error packet from client %u: %s", client_id, asciichat_error_string(parse_result));
166 return;
167 }
168
169 log_error("Client %u reported error %d (%s): %s", client_id, reported_error, asciichat_error_string(reported_error),
170 message);
171}
172
190
205
206// External globals from main.c
207extern atomic_bool g_server_should_exit;
208extern mixer_t *g_audio_mixer;
209
210// Forward declarations for internal functions
211// client_receive_thread is implemented below
212void *client_send_thread_func(void *arg);
214
215/* ============================================================================
216 * Client Lookup Functions
217 * ============================================================================
218 */
219
243// NOLINTNEXTLINE: uthash intentionally uses unsigned overflow for hash operations
244__attribute__((no_sanitize("integer"))) client_info_t *find_client_by_id(uint32_t client_id) {
245 if (client_id == 0) {
246 SET_ERRNO(ERROR_INVALID_PARAM, "Invalid client ID");
247 return NULL;
248 }
249
250 // Protect uthash lookup with read lock to prevent concurrent access issues
252
253 client_info_t *result = NULL;
254 uint32_t search_id = client_id; // uthash needs an lvalue for the key
255 HASH_FIND_INT(g_client_manager.clients_by_id, &search_id, result);
256
258
259 if (!result) {
260 log_warn("Client not found for ID %u", client_id);
261 }
262
263 return result;
264}
265
291
292 for (int i = 0; i < MAX_CLIENTS; i++) {
293 if (g_client_manager.clients[i].socket == socket && atomic_load(&g_client_manager.clients[i].active)) {
296 return client;
297 }
298 }
299
301 return NULL;
302}
303
304/* ============================================================================
305 * Client Management Functions
306 * ============================================================================
307 */
308
315static void configure_client_socket(socket_t socket, uint32_t client_id) {
316 // Enable TCP keepalive to detect dead connections
317 asciichat_error_t keepalive_result = set_socket_keepalive(socket);
318 if (keepalive_result != ASCIICHAT_OK) {
319 log_warn("Failed to set socket keepalive for client %u: %s", client_id, asciichat_error_string(keepalive_result));
320 }
321
322 // Set socket buffer sizes for large data transmission
323 const int SOCKET_SEND_BUFFER_SIZE = 1024 * 1024; // 1MB send buffer
324 const int SOCKET_RECV_BUFFER_SIZE = 1024 * 1024; // 1MB receive buffer
325
326 if (socket_setsockopt(socket, SOL_SOCKET, SO_SNDBUF, &SOCKET_SEND_BUFFER_SIZE, sizeof(SOCKET_SEND_BUFFER_SIZE)) < 0) {
327 log_warn("Failed to set send buffer size for client %u: %s", client_id, network_error_string());
328 }
329
330 if (socket_setsockopt(socket, SOL_SOCKET, SO_RCVBUF, &SOCKET_RECV_BUFFER_SIZE, sizeof(SOCKET_RECV_BUFFER_SIZE)) < 0) {
331 log_warn("Failed to set receive buffer size for client %u: %s", client_id, network_error_string());
332 }
333
334 // Enable TCP_NODELAY to reduce latency for large packets (disables Nagle algorithm)
335 const int TCP_NODELAY_VALUE = 1;
336 if (socket_setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, &TCP_NODELAY_VALUE, sizeof(TCP_NODELAY_VALUE)) < 0) {
337 log_warn("Failed to set TCP_NODELAY for client %u: %s", client_id, network_error_string());
338 }
339}
340
341// NOLINTNEXTLINE: uthash intentionally uses unsigned overflow for hash operations
342__attribute__((no_sanitize("integer"))) int add_client(server_context_t *server_ctx, socket_t socket,
343 const char *client_ip, int port) {
345
346 // Find empty slot - this is the authoritative check
347 int slot = -1;
348 int existing_count = 0;
349 for (int i = 0; i < MAX_CLIENTS; i++) {
350 if (slot == -1 && atomic_load(&g_client_manager.clients[i].client_id) == 0) {
351 slot = i; // Take first available slot
352 }
353 // Count only active clients
354 if (atomic_load(&g_client_manager.clients[i].client_id) != 0 && atomic_load(&g_client_manager.clients[i].active)) {
355 existing_count++;
356 }
357 }
358
359 // Check if we've hit the configured max-clients limit (not the array size)
360 if (existing_count >= GET_OPTION(max_clients)) {
362 SET_ERRNO(ERROR_RESOURCE_EXHAUSTED, "Maximum client limit reached (%d/%d active clients)", existing_count,
363 GET_OPTION(max_clients));
364 log_error("Maximum client limit reached (%d/%d active clients)", existing_count, GET_OPTION(max_clients));
365
366 // Send a rejection message to the client before closing
367 // Use platform-abstracted socket_send() instead of raw send() for Windows portability
368 const char *reject_msg = "SERVER_FULL: Maximum client limit reached\n";
369 ssize_t send_result = socket_send(socket, reject_msg, strlen(reject_msg), 0);
370 if (send_result < 0) {
371 log_warn("Failed to send rejection message to client: %s", SAFE_STRERROR(errno));
372 }
373
374 return -1;
375 }
376
377 if (slot == -1) {
379 SET_ERRNO(ERROR_RESOURCE_EXHAUSTED, "No available client slots (all %d array slots are in use)", MAX_CLIENTS);
380 log_error("No available client slots (all %d array slots are in use)", MAX_CLIENTS);
381
382 // Send a rejection message to the client before closing
383 // Use platform-abstracted socket_send() instead of raw send() for Windows portability
384 const char *reject_msg = "SERVER_FULL: Maximum client limit reached\n";
385 ssize_t send_result = socket_send(socket, reject_msg, strlen(reject_msg), 0);
386 if (send_result < 0) {
387 log_warn("Failed to send rejection message to client: %s", SAFE_STRERROR(errno));
388 }
389
390 return -1;
391 }
392
393 // Update client_count to match actual count before adding new client
394 g_client_manager.client_count = existing_count;
395
396 // Initialize client
397 client_info_t *client = &g_client_manager.clients[slot];
398 memset(client, 0, sizeof(client_info_t));
399
400 client->socket = socket;
401 uint32_t new_client_id = atomic_fetch_add(&g_client_manager.next_client_id, 1) + 1;
402 atomic_store(&client->client_id, new_client_id);
403 SAFE_STRNCPY(client->client_ip, client_ip, sizeof(client->client_ip) - 1);
404 client->port = port;
405 atomic_store(&client->active, true);
406 log_info("Added new client ID=%u from %s:%d (socket=%d, slot=%d)", new_client_id, client_ip, port, socket, slot);
407 atomic_store(&client->shutting_down, false);
408 atomic_store(&client->last_rendered_grid_sources, 0); // Render thread updates this
409 atomic_store(&client->last_sent_grid_sources, 0); // Send thread updates this
410 log_debug("Client slot assigned: client_id=%u assigned to slot %d, socket=%d", atomic_load(&client->client_id), slot,
411 socket);
412 client->connected_at = time(NULL);
413
414 // Initialize crypto context for this client
415 memset(&client->crypto_handshake_ctx, 0, sizeof(client->crypto_handshake_ctx));
416 client->crypto_initialized = false;
417
418 // Initialize pending packet storage (for --no-encrypt mode)
419 client->pending_packet_type = 0;
420 client->pending_packet_payload = NULL;
421 client->pending_packet_length = 0;
422
423 // Configure socket options for optimal performance
424 configure_client_socket(socket, atomic_load(&client->client_id));
425
426 // Register socket with tcp_server for thread pool management
427 // Must be done before spawning any threads
428 asciichat_error_t reg_result = tcp_server_add_client(server_ctx->tcp_server, socket, client);
429 if (reg_result != ASCIICHAT_OK) {
430 SET_ERRNO(ERROR_INTERNAL, "Failed to register client socket with tcp_server");
431 log_error("Failed to register client %u socket with tcp_server", atomic_load(&client->client_id));
432 goto error_cleanup;
433 }
434
435 safe_snprintf(client->display_name, sizeof(client->display_name), "Client%u", atomic_load(&client->client_id));
436
437 // Create individual video buffer for this client using modern double-buffering
438 client->incoming_video_buffer = video_frame_buffer_create(atomic_load(&client->client_id));
439 if (!client->incoming_video_buffer) {
440 SET_ERRNO(ERROR_MEMORY, "Failed to create video buffer for client %u", atomic_load(&client->client_id));
441 log_error("Failed to create video buffer for client %u", atomic_load(&client->client_id));
442 goto error_cleanup;
443 }
444
445 // Create individual audio buffer for this client
446 // NOTE: Use capture version (no jitter buffering) because incoming audio is from network decode,
447 // not from real-time microphone. Jitter buffering would cause buffer overflow since decoder
448 // outputs at constant rate (48kHz) but mixer needs time to process.
450 if (!client->incoming_audio_buffer) {
451 SET_ERRNO(ERROR_MEMORY, "Failed to create audio buffer for client %u", atomic_load(&client->client_id));
452 log_error("Failed to create audio buffer for client %u", atomic_load(&client->client_id));
453 goto error_cleanup;
454 }
455
456 // Create packet queues for outgoing data
457 // Use node pools but share the global buffer pool
458 // Audio queue needs larger capacity to handle jitter and render thread lag
459 // 500 packets @ 172fps = ~2.9 seconds of buffering (was 100 = 0.58s)
460 client->audio_queue =
461 packet_queue_create_with_pools(500, 1000, false); // Max 500 audio packets, 1000 nodes, NO local buffer pool
462 if (!client->audio_queue) {
463 LOG_ERRNO_IF_SET("Failed to create audio queue for client");
464 goto error_cleanup;
465 }
466
467 // Create outgoing video buffer for ASCII frames (double buffered, no dropping)
468 client->outgoing_video_buffer = video_frame_buffer_create(atomic_load(&client->client_id));
469 if (!client->outgoing_video_buffer) {
470 LOG_ERRNO_IF_SET("Failed to create outgoing video buffer for client");
471 goto error_cleanup;
472 }
473
474 // Pre-allocate send buffer to avoid malloc/free in send thread (prevents deadlocks)
475 client->send_buffer_size = 2 * 1024 * 1024; // 2MB should handle largest frames
476 // 64-byte cache-line alignment improves performance for large network buffers
477 client->send_buffer = SAFE_MALLOC_ALIGNED(client->send_buffer_size, 64, void *);
478 if (!client->send_buffer) {
479 log_error("Failed to allocate send buffer for client %u", atomic_load(&client->client_id));
480 goto error_cleanup;
481 }
482
483 g_client_manager.client_count = existing_count + 1; // We just added a client
484 log_debug("Client count updated: now %d clients (added client_id=%u to slot %d)", g_client_manager.client_count,
485 atomic_load(&client->client_id), slot);
486
487 // Add client to uthash table for O(1) lookup
488 // Note: HASH_ADD_INT uses the client_id field directly from the client structure
489 uint32_t cid = atomic_load(&client->client_id);
490 HASH_ADD_INT(g_client_manager.clients_by_id, client_id, client);
491 log_debug("Added client %u to uthash table", cid);
492
493 // Register this client's audio buffer with the mixer
494 if (g_audio_mixer && client->incoming_audio_buffer) {
495 if (mixer_add_source(g_audio_mixer, atomic_load(&client->client_id), client->incoming_audio_buffer) < 0) {
496 log_warn("Failed to add client %u to audio mixer", atomic_load(&client->client_id));
497 } else {
498#ifdef DEBUG_AUDIO
499 log_debug("Added client %u to audio mixer", atomic_load(&client->client_id));
500#endif
501 }
502 }
503
504 // Initialize mutexes BEFORE creating any threads to prevent race conditions
505 // These mutexes might be accessed by receive thread which starts before render threads
506 if (mutex_init(&client->client_state_mutex) != 0) {
507 log_error("Failed to initialize client state mutex for client %u", atomic_load(&client->client_id));
508 goto error_cleanup;
509 }
510
511 // Initialize send mutex to protect concurrent socket writes
512 if (mutex_init(&client->send_mutex) != 0) {
513 log_error("Failed to initialize send mutex for client %u", atomic_load(&client->client_id));
514 goto error_cleanup;
515 }
516
518
519 // CRITICAL: Perform crypto handshake BEFORE starting threads
520 // This ensures the handshake uses the socket directly without interference from receive thread
521 if (server_crypto_init() == 0) {
522 // Set timeout for crypto handshake to prevent indefinite blocking
523 // This prevents clients from connecting but never completing the handshake
524 const int HANDSHAKE_TIMEOUT_SECONDS = 30;
525 asciichat_error_t timeout_result = set_socket_timeout(socket, HANDSHAKE_TIMEOUT_SECONDS);
526 if (timeout_result != ASCIICHAT_OK) {
527 log_warn("Failed to set handshake timeout for client %u: %s", atomic_load(&client->client_id),
528 asciichat_error_string(timeout_result));
529 // Continue anyway - timeout is a safety feature, not critical
530 }
531
532 int crypto_result = server_crypto_handshake(client);
533 if (crypto_result != 0) {
534 log_error("Crypto handshake failed for client %u: %s", atomic_load(&client->client_id), network_error_string());
535 if (remove_client(server_ctx, atomic_load(&client->client_id)) != 0) {
536 log_error("Failed to remove client after crypto handshake failure");
537 }
538 return -1;
539 }
540
541 // Clear socket timeout after handshake completes successfully
542 // This allows normal operation without timeouts on data transfer
543 asciichat_error_t clear_timeout_result = set_socket_timeout(socket, 0);
544 if (clear_timeout_result != ASCIICHAT_OK) {
545 log_warn("Failed to clear handshake timeout for client %u: %s", atomic_load(&client->client_id),
546 asciichat_error_string(clear_timeout_result));
547 // Continue anyway - we can still communicate even with timeout set
548 }
549
550 log_debug("Crypto handshake completed successfully for client %u", atomic_load(&client->client_id));
551
552 // Create ACIP transport for protocol-agnostic packet sending
553 // The transport wraps the socket with encryption context from the handshake
554 const crypto_context_t *crypto_ctx = crypto_server_get_context(atomic_load(&client->client_id));
555 client->transport = acip_tcp_transport_create(socket, (crypto_context_t *)crypto_ctx);
556 if (!client->transport) {
557 log_error("Failed to create ACIP transport for client %u", atomic_load(&client->client_id));
558 if (remove_client(server_ctx, atomic_load(&client->client_id)) != 0) {
559 log_error("Failed to remove client after transport creation failure");
560 }
561 return -1;
562 }
563 log_debug("Created ACIP transport for client %u with crypto context", atomic_load(&client->client_id));
564
565 // After handshake completes, the client immediately sends PACKET_TYPE_CLIENT_CAPABILITIES
566 // We must read and process this packet BEFORE starting the receive thread to avoid a race condition
567 // where the packet arrives but no thread is listening for it.
568 //
569 // SPECIAL CASE: If client used --no-encrypt, we already received this packet during the handshake
570 // attempt and stored it in pending_packet_*. Use that instead of receiving a new one.
571 packet_envelope_t envelope;
572 bool used_pending_packet = false;
573
574 if (client->pending_packet_payload) {
575 // Client used --no-encrypt mode - use the packet we already received
576 log_info("Client %u using --no-encrypt mode - processing pending packet type %u", atomic_load(&client->client_id),
577 client->pending_packet_type);
578 envelope.type = client->pending_packet_type;
579 envelope.data = client->pending_packet_payload;
580 envelope.len = client->pending_packet_length;
581 envelope.allocated_buffer = client->pending_packet_payload; // Will be freed below
582 envelope.allocated_size = client->pending_packet_length;
583 used_pending_packet = true;
584
585 // Clear pending packet fields
586 client->pending_packet_type = 0;
587 client->pending_packet_payload = NULL;
588 client->pending_packet_length = 0;
589 } else {
590 // Normal encrypted mode - receive capabilities packet
591 log_debug("Waiting for initial capabilities packet from client %u", atomic_load(&client->client_id));
592
593 // Protect crypto context access with client state mutex
595 const crypto_context_t *crypto_ctx = crypto_server_get_context(atomic_load(&client->client_id));
596
597 // Use per-client crypto state to determine enforcement
598 // At this point, handshake is complete, so crypto_initialized=true and handshake is ready
599 bool enforce_encryption = !GET_OPTION(no_encrypt) && client->crypto_initialized &&
601
602 packet_recv_result_t result = receive_packet_secure(socket, (void *)crypto_ctx, enforce_encryption, &envelope);
604
605 if (result != PACKET_RECV_SUCCESS) {
606 log_error("Failed to receive initial capabilities packet from client %u: result=%d",
607 atomic_load(&client->client_id), result);
608 if (envelope.allocated_buffer) {
609 buffer_pool_free(NULL, envelope.allocated_buffer, envelope.allocated_size);
610 }
611 if (remove_client(server_ctx, atomic_load(&client->client_id)) != 0) {
612 log_error("Failed to remove client after crypto handshake failure");
613 }
614 return -1;
615 }
616 }
617
618 if (envelope.type != PACKET_TYPE_CLIENT_CAPABILITIES) {
619 log_error("Expected PACKET_TYPE_CLIENT_CAPABILITIES but got packet type %d from client %u", envelope.type,
620 atomic_load(&client->client_id));
621 if (envelope.allocated_buffer) {
622 buffer_pool_free(NULL, envelope.allocated_buffer, envelope.allocated_size);
623 }
624 if (remove_client(server_ctx, atomic_load(&client->client_id)) != 0) {
625 log_error("Failed to remove client after crypto handshake failure");
626 }
627 return -1;
628 }
629
630 // Process the capabilities packet directly
631 log_debug("Processing initial capabilities packet from client %u (from %s)", atomic_load(&client->client_id),
632 used_pending_packet ? "pending packet" : "network");
633 handle_client_capabilities_packet(client, envelope.data, envelope.len);
634
635 // Free the packet data
636 if (envelope.allocated_buffer) {
637 buffer_pool_free(NULL, envelope.allocated_buffer, envelope.allocated_size);
638 }
639 log_debug("Successfully received and processed initial capabilities for client %u",
640 atomic_load(&client->client_id));
641 }
642
643 // Start threads for this client (AFTER crypto handshake AND initial capabilities)
644 // Use tcp_server thread pool for managed cleanup with stop_id ordering:
645 // stop_id=1: receive thread (stop first to prevent new data)
646 // stop_id=2: render threads (stop after receive)
647 // stop_id=3: send thread (stop last after all processing done)
648 char thread_name[64];
649 snprintf(thread_name, sizeof(thread_name), "receive_%u", atomic_load(&client->client_id));
650 asciichat_error_t recv_result =
651 tcp_server_spawn_thread(server_ctx->tcp_server, client->socket, client_receive_thread, client, 1, thread_name);
652 if (recv_result != ASCIICHAT_OK) {
653 // Don't destroy mutexes here - remove_client() will handle it
654 if (remove_client(server_ctx, atomic_load(&client->client_id)) != 0) {
655 log_error("Failed to remove client after receive thread creation failure");
656 }
657 return -1;
658 }
659
660 // Start send thread for this client (stop_id=3, stop last)
661 snprintf(thread_name, sizeof(thread_name), "send_%u", atomic_load(&client->client_id));
662 asciichat_error_t send_result =
663 tcp_server_spawn_thread(server_ctx->tcp_server, client->socket, client_send_thread_func, client, 3, thread_name);
664 if (send_result != ASCIICHAT_OK) {
665 // tcp_server_stop_client_threads() will be called by remove_client()
666 // to clean up the receive thread we just created
667 if (remove_client(server_ctx, atomic_load(&client->client_id)) != 0) {
668 log_error("Failed to remove client after send thread creation failure");
669 }
670 return -1;
671 }
672
673 // Send initial server state to the new client
674 if (send_server_state_to_client(client) != 0) {
675 log_warn("Failed to send initial server state to client %u", atomic_load(&client->client_id));
676 } else {
677#ifdef DEBUG_NETWORK
678 log_info("Sent initial server state to client %u", atomic_load(&client->client_id));
679#endif
680 }
681
682 // Queue initial server state to the new client
683 // THREAD SAFETY: Protect read of client_count with rwlock
685 uint32_t connected_count = g_client_manager.client_count;
687
689 state.connected_client_count = connected_count;
690 state.active_client_count = 0; // Will be updated by broadcast thread
691 memset(state.reserved, 0, sizeof(state.reserved));
692
693 // Convert to network byte order
694 server_state_packet_t net_state;
697 memset(net_state.reserved, 0, sizeof(net_state.reserved));
698
699 // Send initial server state via ACIP transport
700 asciichat_error_t packet_send_result = acip_send_server_state(client->transport, &net_state);
701 if (packet_send_result != ASCIICHAT_OK) {
702 log_warn("Failed to send initial server state to client %u: %s", atomic_load(&client->client_id),
703 asciichat_error_string(packet_send_result));
704 } else {
705 log_debug("Sent initial server state to client %u: %u connected clients", atomic_load(&client->client_id),
707 }
708
709 // NEW: Create per-client rendering threads
710 // CRITICAL: Use atomic_load for client_id to prevent data races
711 uint32_t client_id_snapshot = atomic_load(&client->client_id);
712 log_debug("Creating render threads for client %u", client_id_snapshot);
713 if (create_client_render_threads(server_ctx, client) != 0) {
714 log_error("Failed to create render threads for client %u", client_id_snapshot);
715 if (remove_client(server_ctx, client_id_snapshot) != 0) {
716 log_error("Failed to remove client after render thread creation failure");
717 }
718 return -1;
719 }
720 log_debug("Successfully created render threads for client %u", client_id_snapshot);
721
722 // Broadcast server state to ALL clients AFTER the new client is fully set up
723 // This notifies all clients (including the new one) about the updated grid
725
726 return (int)client_id_snapshot;
727
728error_cleanup:
729 // Clean up all partially allocated resources
730 // NOTE: This label is reached when allocation or initialization fails
731 // Resources are cleaned up in reverse order of allocation
732}
733
757// NOLINTNEXTLINE: uthash intentionally uses unsigned overflow for hash operations
758__attribute__((no_sanitize("integer"))) int add_webrtc_client(server_context_t *server_ctx, acip_transport_t *transport,
759 const char *client_ip) {
760 if (!server_ctx || !transport || !client_ip) {
761 SET_ERRNO(ERROR_INVALID_PARAM, "Invalid parameters to add_webrtc_client");
762 return -1;
763 }
764
766
767 // Find empty slot - this is the authoritative check
768 int slot = -1;
769 int existing_count = 0;
770 for (int i = 0; i < MAX_CLIENTS; i++) {
771 if (slot == -1 && atomic_load(&g_client_manager.clients[i].client_id) == 0) {
772 slot = i; // Take first available slot
773 }
774 // Count only active clients
775 if (atomic_load(&g_client_manager.clients[i].client_id) != 0 && atomic_load(&g_client_manager.clients[i].active)) {
776 existing_count++;
777 }
778 }
779
780 // Check if we've hit the configured max-clients limit (not the array size)
781 if (existing_count >= GET_OPTION(max_clients)) {
783 SET_ERRNO(ERROR_RESOURCE_EXHAUSTED, "Maximum client limit reached (%d/%d active clients)", existing_count,
784 GET_OPTION(max_clients));
785 log_error("Maximum client limit reached (%d/%d active clients)", existing_count, GET_OPTION(max_clients));
786 return -1;
787 }
788
789 if (slot == -1) {
791 SET_ERRNO(ERROR_RESOURCE_EXHAUSTED, "No available client slots (all %d array slots are in use)", MAX_CLIENTS);
792 log_error("No available client slots (all %d array slots are in use)", MAX_CLIENTS);
793 return -1;
794 }
795
796 // Update client_count to match actual count before adding new client
797 g_client_manager.client_count = existing_count;
798
799 // Initialize client
800 client_info_t *client = &g_client_manager.clients[slot];
801 memset(client, 0, sizeof(client_info_t));
802
803 // Set up WebRTC-specific fields
804 client->socket = INVALID_SOCKET_VALUE; // WebRTC has no traditional socket
805 client->transport = transport; // Use provided transport
806 uint32_t new_client_id = atomic_fetch_add(&g_client_manager.next_client_id, 1) + 1;
807 atomic_store(&client->client_id, new_client_id);
808 SAFE_STRNCPY(client->client_ip, client_ip, sizeof(client->client_ip) - 1);
809 client->port = 0; // WebRTC doesn't use port numbers
810 atomic_store(&client->active, true);
811 log_info("Added new WebRTC client ID=%u from %s (transport=%p, slot=%d)", new_client_id, client_ip, transport, slot);
812 atomic_store(&client->shutting_down, false);
813 atomic_store(&client->last_rendered_grid_sources, 0); // Render thread updates this
814 atomic_store(&client->last_sent_grid_sources, 0); // Send thread updates this
815 log_debug("WebRTC client slot assigned: client_id=%u assigned to slot %d", atomic_load(&client->client_id), slot);
816 client->connected_at = time(NULL);
817
818 // Initialize crypto context for this client
819 memset(&client->crypto_handshake_ctx, 0, sizeof(client->crypto_handshake_ctx));
820 client->crypto_initialized = true; // Already done via ACDS signaling
821
822 // Initialize pending packet storage (unused for WebRTC, but keep for consistency)
823 client->pending_packet_type = 0;
824 client->pending_packet_payload = NULL;
825 client->pending_packet_length = 0;
826
827 safe_snprintf(client->display_name, sizeof(client->display_name), "WebRTC%u", atomic_load(&client->client_id));
828
829 // Create individual video buffer for this client using modern double-buffering
830 client->incoming_video_buffer = video_frame_buffer_create(atomic_load(&client->client_id));
831 if (!client->incoming_video_buffer) {
832 SET_ERRNO(ERROR_MEMORY, "Failed to create video buffer for WebRTC client %u", atomic_load(&client->client_id));
833 log_error("Failed to create video buffer for WebRTC client %u", atomic_load(&client->client_id));
834 goto error_cleanup_webrtc;
835 }
836
837 // Create individual audio buffer for this client
839 if (!client->incoming_audio_buffer) {
840 SET_ERRNO(ERROR_MEMORY, "Failed to create audio buffer for WebRTC client %u", atomic_load(&client->client_id));
841 log_error("Failed to create audio buffer for WebRTC client %u", atomic_load(&client->client_id));
842 goto error_cleanup_webrtc;
843 }
844
845 // Create packet queues for outgoing data
846 client->audio_queue = packet_queue_create_with_pools(500, 1000, false);
847 if (!client->audio_queue) {
848 LOG_ERRNO_IF_SET("Failed to create audio queue for WebRTC client");
849 goto error_cleanup_webrtc;
850 }
851
852 // Create outgoing video buffer for ASCII frames (double buffered, no dropping)
853 client->outgoing_video_buffer = video_frame_buffer_create(atomic_load(&client->client_id));
854 if (!client->outgoing_video_buffer) {
855 LOG_ERRNO_IF_SET("Failed to create outgoing video buffer for WebRTC client");
856 goto error_cleanup_webrtc;
857 }
858
859 // Pre-allocate send buffer to avoid malloc/free in send thread (prevents deadlocks)
860 client->send_buffer_size = 2 * 1024 * 1024; // 2MB should handle largest frames
861 client->send_buffer = SAFE_MALLOC_ALIGNED(client->send_buffer_size, 64, void *);
862 if (!client->send_buffer) {
863 log_error("Failed to allocate send buffer for WebRTC client %u", atomic_load(&client->client_id));
864 goto error_cleanup_webrtc;
865 }
866
867 g_client_manager.client_count = existing_count + 1; // We just added a client
868 log_debug("Client count updated: now %d clients (added WebRTC client_id=%u to slot %d)",
869 g_client_manager.client_count, atomic_load(&client->client_id), slot);
870
871 // Add client to uthash table for O(1) lookup
872 uint32_t cid = atomic_load(&client->client_id);
873 HASH_ADD_INT(g_client_manager.clients_by_id, client_id, client);
874 log_debug("Added WebRTC client %u to uthash table", cid);
875
876 // Register this client's audio buffer with the mixer
877 if (g_audio_mixer && client->incoming_audio_buffer) {
878 if (mixer_add_source(g_audio_mixer, atomic_load(&client->client_id), client->incoming_audio_buffer) < 0) {
879 log_warn("Failed to add WebRTC client %u to audio mixer", atomic_load(&client->client_id));
880 } else {
881#ifdef DEBUG_AUDIO
882 log_debug("Added WebRTC client %u to audio mixer", atomic_load(&client->client_id));
883#endif
884 }
885 }
886
887 // Initialize mutexes BEFORE creating any threads to prevent race conditions
888 if (mutex_init(&client->client_state_mutex) != 0) {
889 log_error("Failed to initialize client state mutex for WebRTC client %u", atomic_load(&client->client_id));
890 goto error_cleanup_webrtc;
891 }
892
893 // Initialize send mutex to protect concurrent socket writes
894 if (mutex_init(&client->send_mutex) != 0) {
895 log_error("Failed to initialize send mutex for WebRTC client %u", atomic_load(&client->client_id));
896 goto error_cleanup_webrtc;
897 }
898
900
901 // For WebRTC clients, the capabilities packet will be received by the receive thread
902 // when it starts. Unlike TCP clients where we handle it synchronously in add_client(),
903 // WebRTC uses the transport abstraction which handles packet reception automatically.
904 log_debug("WebRTC client %u initialized - receive thread will process capabilities", atomic_load(&client->client_id));
905
906 // Start threads for this client
907 // Note: WebRTC clients don't use tcp_server thread pool (no socket)
908 // Instead, use generic thread creation
909 char thread_name[64];
910 uint32_t client_id_snapshot = atomic_load(&client->client_id);
911
912 // Create receive thread for WebRTC client
913 snprintf(thread_name, sizeof(thread_name), "webrtc_recv_%u", client_id_snapshot);
915 if (recv_result != ASCIICHAT_OK) {
916 log_error("Failed to create receive thread for WebRTC client %u: %s", client_id_snapshot,
917 asciichat_error_string(recv_result));
918 if (remove_client(server_ctx, client_id_snapshot) != 0) {
919 log_error("Failed to remove WebRTC client after receive thread creation failure");
920 }
921 return -1;
922 }
923 log_debug("Created receive thread for WebRTC client %u", client_id_snapshot);
924
925 // Create send thread for this client
926 snprintf(thread_name, sizeof(thread_name), "webrtc_send_%u", client_id_snapshot);
928 if (send_result != ASCIICHAT_OK) {
929 log_error("Failed to create send thread for WebRTC client %u: %s", client_id_snapshot,
930 asciichat_error_string(send_result));
931 if (remove_client(server_ctx, client_id_snapshot) != 0) {
932 log_error("Failed to remove WebRTC client after send thread creation failure");
933 }
934 return -1;
935 }
936 log_debug("Created send thread for WebRTC client %u", client_id_snapshot);
937
938 // Send initial server state to the new client
939 if (send_server_state_to_client(client) != 0) {
940 log_warn("Failed to send initial server state to WebRTC client %u", client_id_snapshot);
941 } else {
942#ifdef DEBUG_NETWORK
943 log_info("Sent initial server state to WebRTC client %u", client_id_snapshot);
944#endif
945 }
946
947 // Send initial server state via ACIP transport
949 uint32_t connected_count = g_client_manager.client_count;
951
953 state.connected_client_count = connected_count;
954 state.active_client_count = 0; // Will be updated by broadcast thread
955 memset(state.reserved, 0, sizeof(state.reserved));
956
957 // Convert to network byte order
958 server_state_packet_t net_state;
961 memset(net_state.reserved, 0, sizeof(net_state.reserved));
962
963 asciichat_error_t packet_send_result = acip_send_server_state(client->transport, &net_state);
964 if (packet_send_result != ASCIICHAT_OK) {
965 log_warn("Failed to send initial server state to WebRTC client %u: %s", client_id_snapshot,
966 asciichat_error_string(packet_send_result));
967 } else {
968 log_debug("Sent initial server state to WebRTC client %u: %u connected clients", client_id_snapshot,
970 }
971
972 // Create per-client rendering threads
973 log_debug("Creating render threads for WebRTC client %u", client_id_snapshot);
974 if (create_client_render_threads(server_ctx, client) != 0) {
975 log_error("Failed to create render threads for WebRTC client %u", client_id_snapshot);
976 if (remove_client(server_ctx, client_id_snapshot) != 0) {
977 log_error("Failed to remove WebRTC client after render thread creation failure");
978 }
979 return -1;
980 }
981 log_debug("Successfully created render threads for WebRTC client %u", client_id_snapshot);
982
983 // Broadcast server state to ALL clients AFTER the new client is fully set up
984 // This notifies all clients (including the new one) about the updated grid
986
987 return (int)client_id_snapshot;
988
989error_cleanup_webrtc:
990 // Clean up all partially allocated resources for WebRTC client
991 if (client->send_buffer) {
992 SAFE_FREE(client->send_buffer);
993 client->send_buffer = NULL;
994 }
995 if (client->outgoing_video_buffer) {
997 client->outgoing_video_buffer = NULL;
998 }
999 if (client->audio_queue) {
1001 client->audio_queue = NULL;
1002 }
1003 if (client->incoming_audio_buffer) {
1005 client->incoming_audio_buffer = NULL;
1006 }
1007 if (client->incoming_video_buffer) {
1009 client->incoming_video_buffer = NULL;
1010 }
1012 mutex_destroy(&client->send_mutex);
1014 return -1;
1015}
1016
1017// NOLINTNEXTLINE: uthash intentionally uses unsigned overflow for hash operations
1018__attribute__((no_sanitize("integer"))) int remove_client(server_context_t *server_ctx, uint32_t client_id) {
1019 if (!server_ctx) {
1020 SET_ERRNO(ERROR_INVALID_PARAM, "Cannot remove client %u: NULL server_ctx", client_id);
1021 return -1;
1022 }
1023
1024 // Phase 1: Mark client inactive and prepare for cleanup while holding write lock
1025 client_info_t *target_client = NULL;
1026 char display_name_copy[MAX_DISPLAY_NAME_LEN];
1027 socket_t client_socket = INVALID_SOCKET_VALUE; // Save socket for thread cleanup
1028
1029 log_debug("SOCKET_DEBUG: Attempting to remove client %d", client_id);
1031
1032 for (int i = 0; i < MAX_CLIENTS; i++) {
1034 uint32_t cid = atomic_load(&client->client_id);
1035 if (cid == client_id && cid != 0) {
1036 // Mark as shutting down and inactive immediately to stop new operations
1037 log_info("Removing client %d (socket=%d) - marking inactive and clearing video flags", client_id, client->socket);
1038 atomic_store(&client->shutting_down, true);
1039 atomic_store(&client->active, false);
1040 atomic_store(&client->is_sending_video, false);
1041 atomic_store(&client->is_sending_audio, false);
1042 target_client = client;
1043
1044 // Store display name before clearing
1045 SAFE_STRNCPY(display_name_copy, client->display_name, MAX_DISPLAY_NAME_LEN - 1);
1046
1047 // Save socket for tcp_server_stop_client_threads() before closing
1049 client_socket = client->socket; // Save socket for thread cleanup
1050 if (client->socket != INVALID_SOCKET_VALUE) {
1051 log_debug("SOCKET_DEBUG: Client %d shutting down socket %d", client->client_id, client->socket);
1052 // Shutdown both send and receive operations to unblock any pending I/O
1053 socket_shutdown(client->socket, 2); // 2 = SHUT_RDWR on POSIX, SD_BOTH on Windows
1054 // Don't close yet - tcp_server needs socket as lookup key
1055 }
1057
1058 // Shutdown packet queues to unblock send thread
1059 if (client->audio_queue) {
1061 }
1062 // Video now uses double buffer, no queue to shutdown
1063
1064 break;
1065 }
1066 }
1067
1068 // If client not found, unlock and return
1069 if (!target_client) {
1071 log_warn("Cannot remove client %u: not found", client_id);
1072 return -1;
1073 }
1074
1075 // CRITICAL: Release write lock before joining threads
1076 // This prevents deadlock with render threads that need read locks
1078
1079 // Phase 2: Stop all client threads
1080 // For TCP clients: use tcp_server thread pool management
1081 // For WebRTC clients: manually join threads (no socket-based thread pool)
1082 log_debug("Stopping all threads for client %u (socket %d)", client_id, client_socket);
1083
1084 if (client_socket != INVALID_SOCKET_VALUE) {
1085 // TCP client: use tcp_server thread pool
1086 // This joins threads in stop_id order: receive(1), render(2), send(3)
1087 asciichat_error_t stop_result = tcp_server_stop_client_threads(server_ctx->tcp_server, client_socket);
1088 if (stop_result != ASCIICHAT_OK) {
1089 log_warn("Failed to stop threads for TCP client %u: error %d", client_id, stop_result);
1090 // Continue with cleanup even if thread stopping failed
1091 }
1092 } else {
1093 // WebRTC client: manually join threads
1094 log_debug("Stopping WebRTC client %u threads (receive and send)", client_id);
1095 if (target_client) {
1096 // Join receive thread
1097 void *recv_result = NULL;
1098 asciichat_error_t recv_join_result = asciichat_thread_join(&target_client->receive_thread, &recv_result);
1099 if (recv_join_result != ASCIICHAT_OK) {
1100 log_warn("Failed to join receive thread for WebRTC client %u: error %d", client_id, recv_join_result);
1101 } else {
1102 log_debug("Joined receive thread for WebRTC client %u", client_id);
1103 }
1104 // Join send thread
1105 void *send_result = NULL;
1106 asciichat_error_t send_join_result = asciichat_thread_join(&target_client->send_thread, &send_result);
1107 if (send_join_result != ASCIICHAT_OK) {
1108 log_warn("Failed to join send thread for WebRTC client %u: error %d", client_id, send_join_result);
1109 } else {
1110 log_debug("Joined send thread for WebRTC client %u", client_id);
1111 }
1112 }
1113 // Note: Render threads still need to be stopped - they're created the same way for both TCP and WebRTC
1114 // For now, render threads are expected to exit when they check g_server_should_exit and client->active
1115 }
1116
1117 // Destroy ACIP transport before closing socket
1118 if (target_client && target_client->transport) {
1119 acip_transport_destroy(target_client->transport);
1120 target_client->transport = NULL;
1121 log_debug("Destroyed ACIP transport for client %u", client_id);
1122 }
1123
1124 // Now safe to close the socket (threads are stopped)
1125 if (client_socket != INVALID_SOCKET_VALUE) {
1126 log_debug("SOCKET_DEBUG: Closing socket %d for client %u after thread cleanup", client_socket, client_id);
1127 socket_close(client_socket);
1128 }
1129
1130 // Phase 3: Clean up resources with write lock
1132
1133 // Mark socket as closed in client structure
1134 if (target_client && target_client->socket != INVALID_SOCKET_VALUE) {
1135 mutex_lock(&target_client->client_state_mutex);
1136 target_client->socket = INVALID_SOCKET_VALUE;
1137 mutex_unlock(&target_client->client_state_mutex);
1138 log_debug("SOCKET_DEBUG: Client %u socket set to INVALID", client_id);
1139 }
1140
1141 // Use the dedicated cleanup function to ensure all resources are freed
1142 cleanup_client_all_buffers(target_client);
1143
1144 // Remove from audio mixer
1145 if (g_audio_mixer) {
1147#ifdef DEBUG_AUDIO
1148 log_debug("Removed client %u from audio mixer", client_id);
1149#endif
1150 }
1151
1152 // Remove from uthash table
1153 if (target_client) {
1154 HASH_DELETE(hh, g_client_manager.clients_by_id, target_client);
1155 log_debug("Removed client %u from uthash table", client_id);
1156 } else {
1157 log_warn("Failed to remove client %u from hash table (client not found)", client_id);
1158 }
1159
1160 // Cleanup crypto context for this client
1161 if (target_client->crypto_initialized) {
1163 target_client->crypto_initialized = false;
1164 log_debug("Crypto context cleaned up for client %u", client_id);
1165 }
1166
1167 // CRITICAL: Verify all threads have actually exited before resetting client_id
1168 // Threads that are still starting (at RtlUserThreadStart) haven't checked client_id yet
1169 // We must ensure threads are fully joined before zeroing the client struct
1170 // Use exponential backoff for thread termination verification
1171 int retry_count = 0;
1172 const int max_retries = 5;
1173 while (retry_count < max_retries && (asciichat_thread_is_initialized(&target_client->send_thread) ||
1177 // Exponential backoff: 10ms, 20ms, 40ms, 80ms, 160ms
1178 uint32_t delay_ms = 10 * (1 << retry_count);
1179 log_warn("Client %u: Some threads still appear initialized (attempt %d/%d), waiting %ums", client_id,
1180 retry_count + 1, max_retries, delay_ms);
1181 platform_sleep_usec(delay_ms * 1000);
1182 retry_count++;
1183 }
1184
1185 if (retry_count == max_retries) {
1186 log_error("Client %u: Threads did not terminate after %d retries, proceeding with cleanup anyway", client_id,
1187 max_retries);
1188 }
1189
1190 // Only reset client_id to 0 AFTER confirming threads are joined
1191 // This prevents threads that are starting from accessing a zeroed client struct
1192 // CRITICAL: Reset client_id to 0 BEFORE destroying mutexes to prevent race conditions
1193 // This ensures worker threads can detect shutdown and exit BEFORE the mutex is destroyed
1194 // If we destroy the mutex first, threads might try to access a destroyed mutex
1195 atomic_store(&target_client->client_id, 0);
1196
1197 // Wait for threads to observe the client_id reset
1198 // Use sufficient delay for memory visibility across all CPU cores
1199 platform_sleep_usec(5000); // 5ms delay for memory barrier propagation
1200
1201 // Destroy mutexes
1202 // IMPORTANT: Always destroy these even if threads didn't join properly
1203 // to prevent issues when the slot is reused
1204 mutex_destroy(&target_client->client_state_mutex);
1205 mutex_destroy(&target_client->send_mutex);
1206
1207 // Clear client structure
1208 // NOTE: After memset, the mutex handles are zeroed but the OS resources
1209 // have been released by the destroy calls above
1210 memset(target_client, 0, sizeof(client_info_t));
1211
1212 // Recalculate client count using atomic reads
1213 int remaining_count = 0;
1214 for (int j = 0; j < MAX_CLIENTS; j++) {
1215 if (atomic_load(&g_client_manager.clients[j].client_id) != 0) {
1216 remaining_count++;
1217 }
1218 }
1219 g_client_manager.client_count = remaining_count;
1220
1221 log_debug("Client removed: client_id=%u (%s) removed, remaining clients: %d", client_id, display_name_copy,
1222 remaining_count);
1223
1225
1226 // Broadcast updated state
1228
1229 return 0;
1230}
1231
1232/* ============================================================================
1233 * Client Thread Functions
1234 * ============================================================================
1235 */
1236
1237// Forward declaration for ACIP server callbacks (defined later in file)
1238static const acip_server_callbacks_t g_acip_server_callbacks;
1239
1240void *client_receive_thread(void *arg) {
1241 client_info_t *client = (client_info_t *)arg;
1242
1243 // CRITICAL: Validate client pointer immediately before any access
1244 // This prevents crashes if remove_client() has zeroed the client struct
1245 // while the thread was still starting at RtlUserThreadStart
1246 if (!client) {
1247 log_error("Invalid client info in receive thread (NULL pointer)");
1248 return NULL;
1249 }
1250
1251 if (atomic_load(&client->protocol_disconnect_requested)) {
1252 log_debug("Receive thread for client %u exiting before start (protocol disconnect requested)",
1253 atomic_load(&client->client_id));
1254 return NULL;
1255 }
1256
1257 // Check if client_id is 0 (client struct has been zeroed by remove_client)
1258 // This must be checked BEFORE accessing any client fields
1259 if (atomic_load(&client->client_id) == 0) {
1260 log_debug("Receive thread: client_id is 0, client struct may have been zeroed, exiting");
1261 return NULL;
1262 }
1263
1264 // Additional validation: check socket is valid
1265 if (client->socket == INVALID_SOCKET_VALUE) {
1266 log_error("Invalid client socket in receive thread");
1267 return NULL;
1268 }
1269
1270 // Enable thread cancellation for clean shutdown
1271 // Thread cancellation not available in platform abstraction
1272 // Threads should exit when g_server_should_exit is set
1273
1274 log_debug("Started receive thread for client %u (%s)", atomic_load(&client->client_id), client->display_name);
1275
1276 // DEBUG: Check loop entry conditions
1277 bool should_exit = atomic_load(&g_server_should_exit);
1278 bool is_active = atomic_load(&client->active);
1279 socket_t sock = client->socket;
1280 log_debug("RECV_THREAD_START: Client %u conditions: should_exit=%d, active=%d, socket=%d (INVALID=%d)",
1281 atomic_load(&client->client_id), should_exit, is_active, sock, INVALID_SOCKET_VALUE);
1282
1283 while (!atomic_load(&g_server_should_exit) && atomic_load(&client->active) &&
1284 client->socket != INVALID_SOCKET_VALUE) {
1285
1286 // Use unified secure packet reception with auto-decryption
1287 // CRITICAL: Check client_id is still valid before accessing transport
1288 // This prevents accessing freed memory if remove_client() has zeroed the client struct
1289 if (atomic_load(&client->client_id) == 0) {
1290 log_debug("Client client_id reset, exiting receive thread");
1291 break;
1292 }
1293
1294 // Receive and dispatch packet using ACIP transport API
1295 // This combines packet reception, decryption, parsing, handler dispatch, and cleanup
1296 asciichat_error_t acip_result =
1297 acip_server_receive_and_dispatch(client->transport, client, &g_acip_server_callbacks);
1298
1299 // Check if shutdown was requested during the network call
1300 if (atomic_load(&g_server_should_exit)) {
1301 break;
1302 }
1303
1304 // Handle receive errors
1305 if (acip_result != ASCIICHAT_OK) {
1306 // Check error type to determine if we should disconnect
1308 if (HAS_ERRNO(&err_ctx)) {
1309 if (err_ctx.code == ERROR_NETWORK) {
1310 // Network error or EOF - client disconnected
1311 log_debug("Client %u disconnected (network error): %s", client->client_id, err_ctx.context_message);
1312 break;
1313 } else if (err_ctx.code == ERROR_CRYPTO) {
1314 // Security violation
1315 log_error_client(client,
1316 "SECURITY VIOLATION: Unencrypted packet when encryption required - terminating connection");
1317 atomic_store(&g_server_should_exit, true);
1318 break;
1319 }
1320 }
1321
1322 // Other errors - log but don't disconnect immediately
1323 log_warn("ACIP receive/dispatch failed for client %u: %s", client->client_id,
1324 asciichat_error_string(acip_result));
1325 }
1326 }
1327
1328 // Mark client as inactive and stop all threads
1329 // CRITICAL: Must stop render threads when client disconnects
1330 // OPTIMIZED: Use atomic operations for thread control flags (lock-free)
1331 atomic_store(&client->active, false);
1332 atomic_store(&client->send_thread_running, false);
1333 atomic_store(&client->video_render_thread_running, false);
1334 atomic_store(&client->audio_render_thread_running, false);
1335
1336 // Don't call remove_client() from the receive thread itself - this causes a deadlock
1337 // because main thread may be trying to join this thread via remove_client()
1338 // The main cleanup code will handle client removal after threads exit
1339
1340 log_debug("Receive thread for client %u terminated, signaled all threads to stop", client->client_id);
1341
1342 // Clean up thread-local error context before exit
1344
1345 return NULL;
1346}
1347
1348// Thread function to handle sending data to a specific client
1349void *client_send_thread_func(void *arg) {
1350 client_info_t *client = (client_info_t *)arg;
1351
1352 // CRITICAL: Validate client pointer immediately before any access
1353 // This prevents crashes if remove_client() has zeroed the client struct
1354 // while the thread was still starting at RtlUserThreadStart
1355 if (!client) {
1356 log_error("Invalid client info in send thread (NULL pointer)");
1357 return NULL;
1358 }
1359
1360 // Check if client_id is 0 (client struct has been zeroed by remove_client)
1361 // This must be checked BEFORE accessing any client fields
1362 if (atomic_load(&client->client_id) == 0) {
1363 log_debug("Send thread: client_id is 0, client struct may have been zeroed, exiting");
1364 return NULL;
1365 }
1366
1367 // Additional validation: check socket is valid
1368 if (client->socket == INVALID_SOCKET_VALUE) {
1369 log_error("Invalid client socket in send thread");
1370 return NULL;
1371 }
1372
1373 log_debug("Started send thread for client %u (%s)", client->client_id, client->display_name);
1374
1375 // Mark thread as running
1376 atomic_store(&client->send_thread_running, true);
1377
1378 // Track timing for video frame sends
1379 uint64_t last_video_send_time = 0;
1380 const uint64_t video_send_interval_us = 16666; // 60fps = ~16.67ms
1381
1382 // High-frequency audio loop - separate from video frame loop
1383 // to ensure audio packets are sent immediately, not rate-limited by video
1384#define MAX_AUDIO_BATCH 8
1385 int loop_iteration_count = 0;
1386 while (!atomic_load(&g_server_should_exit) && !atomic_load(&client->shutting_down) && atomic_load(&client->active) &&
1387 atomic_load(&client->send_thread_running)) {
1388 loop_iteration_count++;
1389 log_debug_every(LOG_RATE_FAST, "Send thread loop iteration %d for client %u", loop_iteration_count,
1390 client->client_id);
1391 bool sent_something = false;
1392
1393 // PRIORITY: Drain all queued audio packets before video
1394 // Audio must not be rate-limited by video frame sending (16.67ms)
1395 queued_packet_t *audio_packets[MAX_AUDIO_BATCH];
1396 int audio_packet_count = 0;
1397
1398 if (client->audio_queue) {
1399 // Try to dequeue multiple audio packets
1400 for (int i = 0; i < MAX_AUDIO_BATCH; i++) {
1401 audio_packets[i] = packet_queue_try_dequeue(client->audio_queue);
1402 if (audio_packets[i]) {
1403 audio_packet_count++;
1404 } else {
1405 break; // No more packets available
1406 }
1407 }
1408 }
1409
1410 // Send batched audio if we have packets
1411 if (audio_packet_count > 0) {
1412 // Get crypto context for this client
1413 // Protect crypto field access with mutex
1414 const crypto_context_t *crypto_ctx = NULL;
1416 bool crypto_ready = !GET_OPTION(no_encrypt) && client->crypto_initialized &&
1418 if (crypto_ready) {
1420 }
1422
1424
1425 if (audio_packet_count == 1) {
1426 // Single packet - send directly for low latency using ACIP transport
1427 packet_type_t pkt_type = (packet_type_t)NET_TO_HOST_U16(audio_packets[0]->header.type);
1428
1429 mutex_lock(&client->send_mutex);
1430 if (pkt_type == PACKET_TYPE_AUDIO_OPUS) {
1431 result = acip_send_audio_opus(client->transport, audio_packets[0]->data, audio_packets[0]->data_len);
1432 } else {
1433 // Raw float audio - use generic packet sender
1434 result = packet_send_via_transport(client->transport, pkt_type, audio_packets[0]->data,
1435 audio_packets[0]->data_len);
1436 }
1437 mutex_unlock(&client->send_mutex);
1438 } else {
1439 // Multiple packets - batch them together
1440 // Check if these are Opus-encoded packets or raw float audio
1441 packet_type_t first_pkt_type = (packet_type_t)NET_TO_HOST_U16(audio_packets[0]->header.type);
1442
1443 if (first_pkt_type == PACKET_TYPE_AUDIO_OPUS) {
1444 // Opus packets - batch using proper Opus batching format
1445 // Calculate total Opus data size
1446 size_t total_opus_size = 0;
1447 for (int i = 0; i < audio_packet_count; i++) {
1448 total_opus_size += audio_packets[i]->data_len;
1449 }
1450
1451 // Allocate buffers for batched Opus data and frame sizes
1452 uint8_t *batched_opus = SAFE_MALLOC(total_opus_size, uint8_t *);
1453 uint16_t *frame_sizes = SAFE_MALLOC((size_t)audio_packet_count * sizeof(uint16_t), uint16_t *);
1454
1455 if (batched_opus && frame_sizes) {
1456 // Copy all Opus frames into batch buffer
1457 size_t offset = 0;
1458 for (int i = 0; i < audio_packet_count; i++) {
1459 frame_sizes[i] = (uint16_t)audio_packets[i]->data_len;
1460 memcpy(batched_opus + offset, audio_packets[i]->data, audio_packets[i]->data_len);
1461 offset += audio_packets[i]->data_len;
1462 }
1463
1464 // Send batched Opus packet
1465 mutex_lock(&client->send_mutex);
1466 result =
1467 av_send_audio_opus_batch(client->socket, batched_opus, total_opus_size, frame_sizes, AUDIO_SAMPLE_RATE,
1468 20, audio_packet_count, (crypto_context_t *)crypto_ctx);
1469 mutex_unlock(&client->send_mutex);
1470
1471 log_debug_every(LOG_RATE_FAST, "Sent Opus batch: %d frames (%zu bytes) to client %u", audio_packet_count,
1472 total_opus_size, client->client_id);
1473 } else {
1474 log_error("Failed to allocate buffer for Opus batch");
1475 result = ERROR_MEMORY;
1476 }
1477
1478 if (batched_opus)
1479 SAFE_FREE(batched_opus);
1480 if (frame_sizes)
1481 SAFE_FREE(frame_sizes);
1482 } else {
1483 // Raw float audio - use existing batching logic
1484 size_t total_samples = 0;
1485 for (int i = 0; i < audio_packet_count; i++) {
1486 total_samples += audio_packets[i]->data_len / sizeof(float);
1487 }
1488
1489 // Allocate buffer for batched audio
1490 float *batched_audio = SAFE_MALLOC(total_samples * sizeof(float), float *);
1491 if (batched_audio) {
1492 // Copy all audio packets into batch buffer
1493 size_t offset = 0;
1494 for (int i = 0; i < audio_packet_count; i++) {
1495 size_t packet_samples = audio_packets[i]->data_len / sizeof(float);
1496 memcpy(batched_audio + offset, audio_packets[i]->data, audio_packets[i]->data_len);
1497 offset += packet_samples;
1498 }
1499
1500 // Send batched audio packet
1501 mutex_lock(&client->send_mutex);
1502 result = send_audio_batch_packet(client->socket, batched_audio, (int)total_samples, audio_packet_count,
1503 (crypto_context_t *)crypto_ctx);
1504 mutex_unlock(&client->send_mutex);
1505
1506 SAFE_FREE(batched_audio);
1507
1508 log_debug_every(LOG_RATE_FAST, "Sent audio batch: %d packets (%zu samples) to client %u",
1509 audio_packet_count, total_samples, client->client_id);
1510 } else {
1511 log_error("Failed to allocate buffer for audio batch");
1512 result = ERROR_MEMORY;
1513 }
1514 }
1515 }
1516
1517 // Free all audio packets
1518 for (int i = 0; i < audio_packet_count; i++) {
1519 packet_queue_free_packet(audio_packets[i]);
1520 }
1521
1522 if (result != ASCIICHAT_OK) {
1523 if (!atomic_load(&g_server_should_exit)) {
1524 log_error("Failed to send audio to client %u: %s", client->client_id, asciichat_error_string(result));
1525 }
1526 break; // Socket error, exit thread
1527 }
1528
1529 sent_something = true;
1530
1531 // Small sleep to let more audio packets queue (helps batching efficiency)
1532 if (audio_packet_count > 0) {
1533 platform_sleep_usec(100); // 0.1ms - minimal delay
1534 }
1535 } else {
1536 // No audio packets - brief sleep to avoid busy-looping, then check for other tasks
1537 platform_sleep_usec(1000); // 1ms - enough for audio render thread to queue more packets
1538
1539 // Check if session rekeying should be triggered
1541 bool should_rekey = !GET_OPTION(no_encrypt) && client->crypto_initialized &&
1545
1546 if (should_rekey) {
1547 log_debug("Rekey threshold reached for client %u, initiating session rekey", client->client_id);
1549 // CRITICAL: Protect socket write with send_mutex to prevent concurrent writes
1550 mutex_lock(&client->send_mutex);
1552 mutex_unlock(&client->send_mutex);
1554
1555 if (result != ASCIICHAT_OK) {
1556 log_error("Failed to send REKEY_REQUEST to client %u: %d", client->client_id, result);
1557 } else {
1558 log_debug("Sent REKEY_REQUEST to client %u", client->client_id);
1559 // Notify client that session rekeying has been initiated (old keys still active)
1560 log_info_client(client, "Session rekey initiated - rotating encryption keys");
1561 }
1562 }
1563 }
1564
1565 // Always consume frames from the buffer to prevent accumulation
1566 // Rate-limit the actual sending, but always mark frames as consumed
1567 if (!client->outgoing_video_buffer) {
1568 // CRITICAL: Buffer has been destroyed (client is shutting down)
1569 // Exit cleanly instead of looping forever trying to access freed memory
1570 log_debug("Client %u send thread exiting: outgoing_video_buffer is NULL", client->client_id);
1571 break;
1572 }
1573
1574 // Get latest frame from double buffer (lock-free operation)
1575 // This marks the frame as consumed even if we don't send it yet
1577 log_debug("Send thread: video_frame_get_latest returned %p for client %u", (void *)frame, client->client_id);
1578
1579 // Check if get_latest failed (buffer might have been destroyed)
1580 if (!frame) {
1581 log_debug("Client %u send thread: video_frame_get_latest returned NULL, buffer may be destroyed",
1582 client->client_id);
1583 break; // Exit thread if buffer is invalid
1584 }
1585
1586 // Check if it's time to send a video frame (60fps rate limiting)
1587 // Only rate-limit the SEND operation, not frame consumption
1588 struct timespec now_ts, frame_start, frame_end, step1, step2, step3, step4, step5;
1589 (void)clock_gettime(CLOCK_MONOTONIC, &now_ts);
1590 uint64_t current_time = (uint64_t)now_ts.tv_sec * 1000000 + (uint64_t)now_ts.tv_nsec / 1000;
1591 uint64_t time_since_last_send = current_time - last_video_send_time;
1592 log_debug("Send thread timing check: time_since_last=%lu us, interval=%lu us, should_send=%d", time_since_last_send,
1593 video_send_interval_us, (time_since_last_send >= video_send_interval_us));
1594
1595 if (current_time - last_video_send_time >= video_send_interval_us) {
1596 (void)clock_gettime(CLOCK_MONOTONIC, &frame_start);
1597
1598 // GRID LAYOUT CHANGE: Check if render thread has buffered a frame with different source count
1599 // If so, send CLEAR_CONSOLE before sending the new frame
1600 int rendered_sources = atomic_load(&client->last_rendered_grid_sources);
1601 int sent_sources = atomic_load(&client->last_sent_grid_sources);
1602
1603 if (rendered_sources != sent_sources && rendered_sources > 0) {
1604 // Grid layout changed! Send CLEAR_CONSOLE before next frame using ACIP transport
1605 // CRITICAL: Protect socket write with send_mutex to prevent concurrent writes
1606 mutex_lock(&client->send_mutex);
1608 mutex_unlock(&client->send_mutex);
1609 log_debug_every(LOG_RATE_FAST, "Client %u: Sent CLEAR_CONSOLE (grid changed %d → %d sources)",
1610 client->client_id, sent_sources, rendered_sources);
1611 atomic_store(&client->last_sent_grid_sources, rendered_sources);
1612 sent_something = true;
1613 }
1614
1615 log_debug("Send thread: frame validation - frame=%p, frame->data=%p, frame->size=%zu", (void *)frame,
1616 (void *)frame->data, frame->size);
1617
1618 if (!frame->data) {
1619 SET_ERRNO(ERROR_INVALID_STATE, "Client %u has no valid frame data: frame=%p, data=%p", client->client_id, frame,
1620 frame->data);
1621 log_debug("Send thread: Skipping frame send due to NULL frame->data");
1622 continue;
1623 }
1624
1625 if (frame->data && frame->size == 0) {
1626 // NOTE: This means the we're not ready to send ascii to the client and
1627 // should wait a little bit.
1628 log_debug("Send thread: Skipping frame send due to frame->size == 0");
1629 log_warn_every(LOG_RATE_FAST, "Client %u has no valid frame size: size=%zu", client->client_id, frame->size);
1630 platform_sleep_usec(1000); // 1ms sleep
1631 continue;
1632 }
1633
1634 // Snapshot frame metadata (safe with double-buffer system)
1635 const char *frame_data = (const char *)frame->data; // Pointer snapshot - data is stable in front buffer
1636 uint32_t width = atomic_load(&client->width);
1637 uint32_t height = atomic_load(&client->height);
1638 (void)clock_gettime(CLOCK_MONOTONIC, &step1);
1639 (void)clock_gettime(CLOCK_MONOTONIC, &step2);
1640 (void)clock_gettime(CLOCK_MONOTONIC, &step3);
1641 (void)clock_gettime(CLOCK_MONOTONIC, &step4);
1642
1643 // CRITICAL: Protect socket write with send_mutex to prevent concurrent writes
1644 // ACIP transport handles header building, CRC32, encryption internally
1645 log_debug("Send thread: About to send frame to client %u (width=%u, height=%u, data=%p)", client->client_id,
1646 width, height, (void *)frame_data);
1647 mutex_lock(&client->send_mutex);
1648 asciichat_error_t send_result = acip_send_ascii_frame(client->transport, frame_data, width, height);
1649 mutex_unlock(&client->send_mutex);
1650 (void)clock_gettime(CLOCK_MONOTONIC, &step5);
1651
1652 if (send_result != ASCIICHAT_OK) {
1653 if (!atomic_load(&g_server_should_exit)) {
1654 SET_ERRNO(ERROR_NETWORK, "Failed to send video frame to client %u: %s", client->client_id,
1655 asciichat_error_string(send_result));
1656 }
1657 log_debug("Send thread: Frame send FAILED for client %u: result=%d", client->client_id, send_result);
1658 break;
1659 }
1660
1661 log_debug("Send thread: Frame sent SUCCESSFULLY to client %u", client->client_id);
1662 sent_something = true;
1663 last_video_send_time = current_time;
1664
1665 (void)clock_gettime(CLOCK_MONOTONIC, &frame_end);
1666 uint64_t frame_time_us = ((uint64_t)frame_end.tv_sec * 1000000 + (uint64_t)frame_end.tv_nsec / 1000) -
1667 ((uint64_t)frame_start.tv_sec * 1000000 + (uint64_t)frame_start.tv_nsec / 1000);
1668 if (frame_time_us > 15000) { // Log if sending a frame takes > 15ms (encryption adds ~5-6ms)
1669 uint64_t step1_us = ((uint64_t)step1.tv_sec * 1000000 + (uint64_t)step1.tv_nsec / 1000) -
1670 ((uint64_t)frame_start.tv_sec * 1000000 + (uint64_t)frame_start.tv_nsec / 1000);
1671 uint64_t step2_us = ((uint64_t)step2.tv_sec * 1000000 + (uint64_t)step2.tv_nsec / 1000) -
1672 ((uint64_t)step1.tv_sec * 1000000 + (uint64_t)step1.tv_nsec / 1000);
1673 uint64_t step3_us = ((uint64_t)step3.tv_sec * 1000000 + (uint64_t)step3.tv_nsec / 1000) -
1674 ((uint64_t)step2.tv_sec * 1000000 + (uint64_t)step2.tv_nsec / 1000);
1675 uint64_t step4_us = ((uint64_t)step4.tv_sec * 1000000 + (uint64_t)step4.tv_nsec / 1000) -
1676 ((uint64_t)step3.tv_sec * 1000000 + (uint64_t)step3.tv_nsec / 1000);
1677 uint64_t step5_us = ((uint64_t)step5.tv_sec * 1000000 + (uint64_t)step5.tv_nsec / 1000) -
1678 ((uint64_t)step4.tv_sec * 1000000 + (uint64_t)step4.tv_nsec / 1000);
1681 "SEND_THREAD: Frame send took %.2fms for client %u | Snapshot: %.2fms | Memcpy: %.2fms | CRC32: %.2fms | "
1682 "Header: %.2fms | send_packet_secure: %.2fms",
1683 frame_time_us / 1000.0, client->client_id, step1_us / 1000.0, step2_us / 1000.0, step3_us / 1000.0,
1684 step4_us / 1000.0, step5_us / 1000.0);
1685 }
1686 }
1687
1688 // If we didn't send anything, sleep briefly to prevent busy waiting
1689 if (!sent_something) {
1690 platform_sleep_usec(1000); // 1ms sleep
1691 }
1692 }
1693
1694 // Mark thread as stopped
1695 atomic_store(&client->send_thread_running, false);
1696 log_debug("Send thread for client %u terminated", client->client_id);
1697
1698 // Clean up thread-local error context before exit
1700
1701 return NULL;
1702}
1703
1704/* ============================================================================
1705 * Broadcast Functions
1706 * ============================================================================
1707 */
1708
1709// Broadcast server state to all connected clients
1711 // SNAPSHOT PATTERN: Collect client data while holding lock, then release before network I/O
1712 typedef struct {
1713 socket_t socket;
1714 uint32_t client_id;
1715 const crypto_context_t *crypto_ctx;
1716 } client_snapshot_t;
1717
1718 client_snapshot_t client_snapshots[MAX_CLIENTS];
1719 int snapshot_count = 0;
1720 int active_video_count = 0;
1721
1722 struct timespec lock_start, lock_end;
1723 (void)clock_gettime(CLOCK_MONOTONIC, &lock_start);
1725 (void)clock_gettime(CLOCK_MONOTONIC, &lock_end);
1726 uint64_t lock_time_us = ((uint64_t)lock_end.tv_sec * 1000000 + (uint64_t)lock_end.tv_nsec / 1000) -
1727 ((uint64_t)lock_start.tv_sec * 1000000 + (uint64_t)lock_start.tv_nsec / 1000);
1728 if (lock_time_us > 1000) { // Log if > 1ms
1729 double lock_time_ms = lock_time_us / 1000.0;
1730 char duration_str[32];
1731 format_duration_ms(lock_time_ms, duration_str, sizeof(duration_str));
1732 log_warn("broadcast_server_state: rwlock_rdlock took %s", duration_str);
1733 }
1734
1735 // Count active clients and snapshot client data while holding lock
1736 // CRITICAL: Use atomic_load for all atomic fields to prevent data races
1737 for (int i = 0; i < MAX_CLIENTS; i++) {
1738 bool is_active = atomic_load(&g_client_manager.clients[i].active);
1739 if (is_active && atomic_load(&g_client_manager.clients[i].is_sending_video)) {
1740 active_video_count++;
1741 }
1742 if (is_active && g_client_manager.clients[i].socket != INVALID_SOCKET_VALUE) {
1743 // Get crypto context and skip if not ready (handshake still in progress)
1744 const crypto_context_t *crypto_ctx =
1746 if (!crypto_ctx) {
1747 // Skip clients that haven't completed crypto handshake yet
1748 log_debug("Skipping server_state broadcast to client %u: crypto handshake not ready",
1749 atomic_load(&g_client_manager.clients[i].client_id));
1750 continue;
1751 }
1752
1753 client_snapshots[snapshot_count].socket = g_client_manager.clients[i].socket;
1754 client_snapshots[snapshot_count].client_id = atomic_load(&g_client_manager.clients[i].client_id);
1755 client_snapshots[snapshot_count].crypto_ctx = crypto_ctx;
1756 snapshot_count++;
1757 }
1758 }
1759
1760 // Prepare server state packet while still holding lock (fast operation)
1763 state.active_client_count = active_video_count;
1764 memset(state.reserved, 0, sizeof(state.reserved));
1765
1766 // Convert to network byte order
1767 server_state_packet_t net_state;
1770 memset(net_state.reserved, 0, sizeof(net_state.reserved));
1771
1772 // Release lock BEFORE sending (snapshot pattern)
1773 // Sending while holding lock blocks all client operations
1774 (void)clock_gettime(CLOCK_MONOTONIC, &lock_end);
1775 uint64_t lock_held_us = ((uint64_t)lock_end.tv_sec * 1000000 + (uint64_t)lock_end.tv_nsec / 1000) -
1776 ((uint64_t)lock_start.tv_sec * 1000000 + (uint64_t)lock_start.tv_nsec / 1000);
1778
1779 // Send to all clients AFTER releasing the lock
1780 // This prevents blocking other threads during network I/O
1781 for (int i = 0; i < snapshot_count; i++) {
1782 log_debug("BROADCAST_DEBUG: Sending SERVER_STATE to client %u (socket %d) with crypto_ctx=%p",
1783 client_snapshots[i].client_id, client_snapshots[i].socket, (void *)client_snapshots[i].crypto_ctx);
1784
1785 // CRITICAL: Protect socket write with per-client send_mutex
1786 client_info_t *target = find_client_by_id(client_snapshots[i].client_id);
1787 if (target) {
1788 // IMPORTANT: Verify client_id matches expected value - prevents use-after-free
1789 // if client was removed and replaced with another client in same slot
1790 if (atomic_load(&target->client_id) != client_snapshots[i].client_id) {
1791 log_warn("Client %u ID mismatch during broadcast (found %u), skipping send", client_snapshots[i].client_id,
1792 atomic_load(&target->client_id));
1793 continue;
1794 }
1795
1796 mutex_lock(&target->send_mutex);
1797
1798 // Double-check client_id again after acquiring mutex (stronger protection)
1799 if (atomic_load(&target->client_id) != client_snapshots[i].client_id) {
1800 mutex_unlock(&target->send_mutex);
1801 log_warn("Client %u was removed during broadcast send (now %u), skipping", client_snapshots[i].client_id,
1802 atomic_load(&target->client_id));
1803 continue;
1804 }
1805
1806 // Send via ACIP transport
1807 asciichat_error_t result = acip_send_server_state(target->transport, &net_state);
1808 mutex_unlock(&target->send_mutex);
1809
1810 if (result != ASCIICHAT_OK) {
1811 log_error("Failed to send server state to client %u: %s", client_snapshots[i].client_id,
1812 asciichat_error_string(result));
1813 } else {
1814 log_debug("Sent server state to client %u: %u connected, %u active", client_snapshots[i].client_id,
1816 }
1817 } else {
1818 log_warn("Client %u removed before broadcast send could complete", client_snapshots[i].client_id);
1819 }
1820 }
1821
1822 if (lock_held_us > 1000) { // Log if held > 1ms (should be very rare now with optimized send)
1823 log_warn("broadcast_server_state: rwlock held for %.2fms (includes network I/O)", lock_held_us / 1000.0);
1824 }
1825}
1826
1827/* ============================================================================
1828 * Helper Functions
1829 * ============================================================================
1830 */
1831
1833 if (!client) {
1834 SET_ERRNO(ERROR_INVALID_PARAM, "Client is NULL");
1835 return;
1836 }
1837
1838 // Signal threads to stop
1839 atomic_store(&client->active, false);
1840 atomic_store(&client->send_thread_running, false);
1841
1842 // Wait for threads to finish
1844 asciichat_thread_join(&client->send_thread, NULL);
1845 }
1847 asciichat_thread_join(&client->receive_thread, NULL);
1848 }
1849}
1850
1852 if (!client) {
1853 SET_ERRNO(ERROR_INVALID_PARAM, "Client is NULL");
1854 return;
1855 }
1856
1857 if (client->incoming_video_buffer) {
1859 client->incoming_video_buffer = NULL;
1860 }
1861
1862 // Clean up outgoing video buffer (for ASCII frames)
1863 if (client->outgoing_video_buffer) {
1865 client->outgoing_video_buffer = NULL;
1866 }
1867
1868 // Clean up pre-allocated send buffer
1869 if (client->send_buffer) {
1870 SAFE_FREE(client->send_buffer);
1871 client->send_buffer = NULL;
1872 client->send_buffer_size = 0;
1873 }
1874
1875 if (client->incoming_audio_buffer) {
1877 client->incoming_audio_buffer = NULL;
1878 }
1879
1880 // Clean up Opus decoder
1881 if (client->opus_decoder) {
1883 client->opus_decoder = NULL;
1884 }
1885}
1886
1888 if (!client)
1889 return;
1890
1891 if (client->audio_queue) {
1893 client->audio_queue = NULL;
1894 }
1895
1896 // Video now uses double buffer, cleaned up in cleanup_client_media_buffers
1897}
1898
1908static inline void cleanup_client_all_buffers(client_info_t *client) {
1911}
1912
1923int process_encrypted_packet(client_info_t *client, packet_type_t *type, void **data, size_t *len,
1924 uint32_t *sender_id) {
1925 if (!crypto_server_is_ready(client->client_id)) {
1926 log_error("Received encrypted packet but crypto not ready for client %u", client->client_id);
1927 buffer_pool_free(NULL, *data, *len);
1928 *data = NULL;
1929 return -1;
1930 }
1931
1932 // Store original allocation size before it gets modified
1933 size_t original_alloc_size = *len;
1934 void *decrypted_data = buffer_pool_alloc(NULL, original_alloc_size);
1935 size_t decrypted_len;
1936 int decrypt_result = crypto_server_decrypt_packet(client->client_id, (const uint8_t *)*data, *len,
1937 (uint8_t *)decrypted_data, original_alloc_size, &decrypted_len);
1938
1939 if (decrypt_result != 0) {
1940 SET_ERRNO(ERROR_CRYPTO, "Failed to process encrypted packet from client %u (result=%d)", client->client_id,
1941 decrypt_result);
1942 buffer_pool_free(NULL, *data, original_alloc_size);
1943 buffer_pool_free(NULL, decrypted_data, original_alloc_size);
1944 *data = NULL;
1945 return -1;
1946 }
1947
1948 // Replace encrypted data with decrypted data
1949 // Use original allocation size for freeing the encrypted buffer
1950 buffer_pool_free(NULL, *data, original_alloc_size);
1951
1952 *data = decrypted_data;
1953 *len = decrypted_len;
1954
1955 // Now process the decrypted packet by parsing its header
1956 if (*len < sizeof(packet_header_t)) {
1957 SET_ERRNO(ERROR_CRYPTO, "Decrypted packet too small for header from client %u", client->client_id);
1958 buffer_pool_free(NULL, *data, *len);
1959 *data = NULL;
1960 return -1;
1961 }
1962
1963 packet_header_t *header = (packet_header_t *)*data;
1964 *type = (packet_type_t)NET_TO_HOST_U16(header->type);
1965 *sender_id = NET_TO_HOST_U32(header->client_id);
1966
1967 // Adjust data pointer to skip header
1968 *data = (uint8_t *)*data + sizeof(packet_header_t);
1969 *len -= sizeof(packet_header_t);
1970
1971 return 0;
1972}
1973
1974/* ============================================================================
1975 * ACIP Server Callback Wrappers
1976 * ============================================================================ */
1977
1978// Forward declarations for ACIP server callbacks
1979static void acip_server_on_protocol_version(const protocol_version_packet_t *version, void *client_ctx, void *app_ctx);
1980static void acip_server_on_image_frame(const image_frame_packet_t *header, const void *pixel_data, size_t data_len,
1981 void *client_ctx, void *app_ctx);
1982static void acip_server_on_audio(const void *audio_data, size_t audio_len, void *client_ctx, void *app_ctx);
1983static void acip_server_on_audio_batch(const audio_batch_packet_t *header, const float *samples, size_t num_samples,
1984 void *client_ctx, void *app_ctx);
1985static void acip_server_on_audio_opus(const void *opus_data, size_t opus_len, void *client_ctx, void *app_ctx);
1986static void acip_server_on_audio_opus_batch(const void *batch_data, size_t batch_len, void *client_ctx, void *app_ctx);
1987static void acip_server_on_client_join(const void *join_data, size_t data_len, void *client_ctx, void *app_ctx);
1988static void acip_server_on_client_leave(void *client_ctx, void *app_ctx);
1989static void acip_server_on_stream_start(uint32_t stream_types, void *client_ctx, void *app_ctx);
1990static void acip_server_on_stream_stop(uint32_t stream_types, void *client_ctx, void *app_ctx);
1991static void acip_server_on_capabilities(const void *cap_data, size_t data_len, void *client_ctx, void *app_ctx);
1992static void acip_server_on_ping(void *client_ctx, void *app_ctx);
1993static void acip_server_on_pong(void *client_ctx, void *app_ctx);
1994static void acip_server_on_error(const error_packet_t *header, const char *message, void *client_ctx, void *app_ctx);
1995static void acip_server_on_remote_log(const remote_log_packet_t *header, const char *message, void *client_ctx,
1996 void *app_ctx);
1997static void acip_server_on_crypto_rekey_request(const void *payload, size_t payload_len, void *client_ctx,
1998 void *app_ctx);
1999static void acip_server_on_crypto_rekey_response(const void *payload, size_t payload_len, void *client_ctx,
2000 void *app_ctx);
2001static void acip_server_on_crypto_rekey_complete(const void *payload, size_t payload_len, void *client_ctx,
2002 void *app_ctx);
2003
2010static const acip_server_callbacks_t g_acip_server_callbacks = {
2011 .on_protocol_version = acip_server_on_protocol_version,
2012 .on_image_frame = acip_server_on_image_frame,
2013 .on_audio = acip_server_on_audio,
2014 .on_audio_batch = acip_server_on_audio_batch,
2015 .on_audio_opus = acip_server_on_audio_opus,
2016 .on_audio_opus_batch = acip_server_on_audio_opus_batch,
2017 .on_client_join = acip_server_on_client_join,
2018 .on_client_leave = acip_server_on_client_leave,
2019 .on_stream_start = acip_server_on_stream_start,
2020 .on_stream_stop = acip_server_on_stream_stop,
2021 .on_capabilities = acip_server_on_capabilities,
2022 .on_ping = acip_server_on_ping,
2023 .on_pong = acip_server_on_pong,
2024 .on_error = acip_server_on_error,
2025 .on_remote_log = acip_server_on_remote_log,
2026 .on_crypto_rekey_request = acip_server_on_crypto_rekey_request,
2027 .on_crypto_rekey_response = acip_server_on_crypto_rekey_response,
2028 .on_crypto_rekey_complete = acip_server_on_crypto_rekey_complete,
2029 .app_ctx = NULL // Not used - client context passed per-call
2030};
2031
2032// Callback implementations (delegate to existing handlers)
2033
2034static void acip_server_on_protocol_version(const protocol_version_packet_t *version, void *client_ctx, void *app_ctx) {
2035 // TODO: Use app_ctx for context-aware protocol handling or metrics collection in future versions
2036 (void)app_ctx;
2037 client_info_t *client = (client_info_t *)client_ctx;
2038 handle_protocol_version_packet(client, (void *)version, sizeof(*version));
2039}
2040
2041static void acip_server_on_image_frame(const image_frame_packet_t *header, const void *pixel_data, size_t data_len,
2042 void *client_ctx, void *app_ctx) {
2043 (void)app_ctx;
2044 client_info_t *client = (client_info_t *)client_ctx;
2045
2046 log_debug(
2047 "ACIP callback received IMAGE_FRAME: width=%u, height=%u, pixel_format=%u, compressed_size=%u, data_len=%zu",
2048 header->width, header->height, header->pixel_format, header->compressed_size, data_len);
2049
2050 // Validate frame dimensions to prevent DoS and buffer overflow attacks
2051 if (header->width == 0 || header->height == 0) {
2052 log_error("Invalid image dimensions: %ux%u (width and height must be > 0)", header->width, header->height);
2053 disconnect_client_for_bad_data(client, "IMAGE_FRAME invalid dimensions");
2054 return;
2055 }
2056
2057 const uint32_t MAX_WIDTH = 8192;
2058 const uint32_t MAX_HEIGHT = 8192;
2059 if (header->width > MAX_WIDTH || header->height > MAX_HEIGHT) {
2060 log_error("Image dimensions too large: %ux%u (max: %ux%u)", header->width, header->height, MAX_WIDTH, MAX_HEIGHT);
2061 disconnect_client_for_bad_data(client, "IMAGE_FRAME dimensions too large");
2062 return;
2063 }
2064
2065 // Auto-enable video stream if not already enabled
2066 bool was_sending_video = atomic_load(&client->is_sending_video);
2067 if (!was_sending_video) {
2068 if (atomic_compare_exchange_strong(&client->is_sending_video, &was_sending_video, true)) {
2069 log_info("Client %u auto-enabled video stream (received IMAGE_FRAME)", atomic_load(&client->client_id));
2070 log_info_client(client, "First video frame received - streaming active");
2071 }
2072 } else {
2073 // Log periodically
2075 client->frames_received_logged++;
2076 if (client->frames_received_logged % 25000 == 0) {
2077 char pretty[64];
2078 format_bytes_pretty(data_len, pretty, sizeof(pretty));
2079 log_debug("Client %u has sent %u IMAGE_FRAME packets (%s)", atomic_load(&client->client_id),
2080 client->frames_received_logged, pretty);
2081 }
2083 }
2084
2085 // Reconstruct packet in old format for legacy handler: [width:4][height:4][rgb_data]
2086 // Header from ACIP is in HOST byte order, must convert back to NETWORK byte order
2087 const size_t legacy_header_size = sizeof(uint32_t) * 2;
2088 size_t total_len = legacy_header_size + data_len;
2089
2090 uint8_t *full_packet = SAFE_MALLOC(total_len, uint8_t *);
2091 if (!full_packet) {
2092 log_error("Failed to allocate buffer for IMAGE_FRAME reconstruction");
2093 return;
2094 }
2095
2096 uint32_t width_net = HOST_TO_NET_U32(header->width);
2097 uint32_t height_net = HOST_TO_NET_U32(header->height);
2098
2099 memcpy(full_packet, &width_net, sizeof(uint32_t));
2100 memcpy(full_packet + sizeof(uint32_t), &height_net, sizeof(uint32_t));
2101 if (data_len > 0) {
2102 memcpy(full_packet + legacy_header_size, pixel_data, data_len);
2103 }
2104
2105 log_debug("Reconstructed old-format packet (total_len=%zu), calling legacy handler", total_len);
2106 handle_image_frame_packet(client, full_packet, total_len);
2107 SAFE_FREE(full_packet);
2108}
2109
2110static void acip_server_on_audio(const void *audio_data, size_t audio_len, void *client_ctx, void *app_ctx) {
2111 (void)app_ctx;
2112 client_info_t *client = (client_info_t *)client_ctx;
2113 handle_audio_packet(client, (void *)audio_data, audio_len);
2114}
2115
2116static void acip_server_on_audio_batch(const audio_batch_packet_t *header, const float *samples, size_t num_samples,
2117 void *client_ctx, void *app_ctx) {
2118 (void)app_ctx;
2119 (void)header; // Header info not needed - ACIP already validated
2120 client_info_t *client = (client_info_t *)client_ctx;
2121
2122 // ACIP handler already dequantized samples - write directly to audio buffer
2123 // This is more efficient than calling the existing handler which would re-dequantize
2124 log_debug_every(LOG_RATE_DEFAULT, "Received audio batch from client %u (samples=%zu, is_sending_audio=%d)",
2125 atomic_load(&client->client_id), num_samples, atomic_load(&client->is_sending_audio));
2126
2127 if (!atomic_load(&client->is_sending_audio)) {
2128 log_debug("Ignoring audio batch - client %u not in audio streaming mode", client->client_id);
2129 return;
2130 }
2131
2132 if (client->incoming_audio_buffer) {
2133 asciichat_error_t write_result =
2134 audio_ring_buffer_write(client->incoming_audio_buffer, (float *)samples, num_samples);
2135 if (write_result != ASCIICHAT_OK) {
2136 log_error("Failed to write decoded audio batch to buffer: %s", asciichat_error_string(write_result));
2137 }
2138 }
2139}
2140
2141static void acip_server_on_audio_opus(const void *opus_data, size_t opus_len, void *client_ctx, void *app_ctx) {
2142 (void)app_ctx;
2143 client_info_t *client = (client_info_t *)client_ctx;
2144
2145 // Special handling: Convert single-frame Opus to batch format
2146 // This maintains compatibility with existing server-side Opus batch processing
2147
2148 if (opus_len < 16) {
2149 log_warn("AUDIO_OPUS packet too small: %zu bytes", opus_len);
2150 return;
2151 }
2152
2153 const uint8_t *payload = (const uint8_t *)opus_data;
2154 // Use unaligned read helpers - network data may not be aligned
2155 int sample_rate = (int)NET_TO_HOST_U32(read_u32_unaligned(payload));
2156 int frame_duration = (int)NET_TO_HOST_U32(read_u32_unaligned(payload + 4));
2157 // Reserved bytes at offset 8-15
2158 size_t actual_opus_size = opus_len - 16;
2159
2160 if (actual_opus_size > 0 && actual_opus_size <= 1024 && sample_rate == 48000 && frame_duration == 20) {
2161 // Create a synthetic Opus batch packet (frame_count=1)
2162 uint8_t batch_buffer[1024 + 20]; // Max Opus + header
2163 uint8_t *batch_ptr = batch_buffer;
2164
2165 // Write batch header (batch_buffer is stack-aligned, writes are safe)
2166 write_u32_unaligned(batch_ptr, HOST_TO_NET_U32((uint32_t)sample_rate));
2167 batch_ptr += 4;
2168 write_u32_unaligned(batch_ptr, HOST_TO_NET_U32((uint32_t)frame_duration));
2169 batch_ptr += 4;
2170 write_u32_unaligned(batch_ptr, HOST_TO_NET_U32(1)); // frame_count = 1
2171 batch_ptr += 4;
2172 memset(batch_ptr, 0, 4); // reserved
2173 batch_ptr += 4;
2174
2175 // Write frame size
2176 write_u16_unaligned(batch_ptr, HOST_TO_NET_U16((uint16_t)actual_opus_size));
2177 batch_ptr += 2;
2178
2179 // Write Opus data
2180 memcpy(batch_ptr, payload + 16, actual_opus_size);
2181 batch_ptr += actual_opus_size;
2182
2183 // Process as batch packet
2184 size_t batch_size = (size_t)(batch_ptr - batch_buffer);
2185 handle_audio_opus_batch_packet(client, batch_buffer, batch_size);
2186 }
2187}
2188
2189static void acip_server_on_audio_opus_batch(const void *batch_data, size_t batch_len, void *client_ctx, void *app_ctx) {
2190 (void)app_ctx;
2191 client_info_t *client = (client_info_t *)client_ctx;
2192 handle_audio_opus_batch_packet(client, (void *)batch_data, batch_len);
2193}
2194
2195static void acip_server_on_client_join(const void *join_data, size_t data_len, void *client_ctx, void *app_ctx) {
2196 (void)app_ctx;
2197 client_info_t *client = (client_info_t *)client_ctx;
2198 handle_client_join_packet(client, (void *)join_data, data_len);
2199}
2200
2201static void acip_server_on_client_leave(void *client_ctx, void *app_ctx) {
2202 (void)app_ctx;
2203 client_info_t *client = (client_info_t *)client_ctx;
2204 handle_client_leave_packet(client, NULL, 0);
2205}
2206
2207static void acip_server_on_stream_start(uint32_t stream_types, void *client_ctx, void *app_ctx) {
2208 (void)app_ctx;
2209 client_info_t *client = (client_info_t *)client_ctx;
2210 // ACIP layer provides stream_types in host byte order, but handle_stream_start_packet()
2211 // expects network byte order (it does NET_TO_HOST_U32 internally)
2212 uint32_t stream_types_net = HOST_TO_NET_U32(stream_types);
2213 handle_stream_start_packet(client, &stream_types_net, sizeof(stream_types_net));
2214}
2215
2216static void acip_server_on_stream_stop(uint32_t stream_types, void *client_ctx, void *app_ctx) {
2217 (void)app_ctx;
2218 client_info_t *client = (client_info_t *)client_ctx;
2219 // ACIP layer provides stream_types in host byte order, but handle_stream_stop_packet()
2220 // expects network byte order (it does NET_TO_HOST_U32 internally)
2221 uint32_t stream_types_net = HOST_TO_NET_U32(stream_types);
2222 handle_stream_stop_packet(client, &stream_types_net, sizeof(stream_types_net));
2223}
2224
2225static void acip_server_on_capabilities(const void *cap_data, size_t data_len, void *client_ctx, void *app_ctx) {
2226 (void)app_ctx;
2227 client_info_t *client = (client_info_t *)client_ctx;
2228 handle_client_capabilities_packet(client, (void *)cap_data, data_len);
2229}
2230
2231static void acip_server_on_ping(void *client_ctx, void *app_ctx) {
2232 (void)app_ctx;
2233 client_info_t *client = (client_info_t *)client_ctx;
2234
2235 // Respond with PONG using ACIP transport
2236 // CRITICAL: Protect socket write with send_mutex to prevent concurrent writes
2237 mutex_lock(&client->send_mutex);
2238 asciichat_error_t pong_result = acip_send_pong(client->transport);
2239 mutex_unlock(&client->send_mutex);
2240
2241 if (pong_result != ASCIICHAT_OK) {
2242 SET_ERRNO(ERROR_NETWORK, "Failed to send PONG response to client %u: %s", client->client_id,
2243 asciichat_error_string(pong_result));
2244 }
2245}
2246
2247static void acip_server_on_pong(void *client_ctx, void *app_ctx) {
2248 (void)client_ctx;
2249 (void)app_ctx;
2250 // Client acknowledged our PING - no action needed
2251}
2252
2253static void acip_server_on_error(const error_packet_t *header, const char *message, void *client_ctx, void *app_ctx) {
2254 (void)app_ctx;
2255 client_info_t *client = (client_info_t *)client_ctx;
2256
2257 // Reconstruct full packet for existing handler
2258 size_t msg_len = strlen(message);
2259 size_t total_len = sizeof(*header) + msg_len;
2260 uint8_t *full_packet = SAFE_MALLOC(total_len, uint8_t *);
2261 if (!full_packet) {
2262 log_error("Failed to allocate buffer for ERROR_MESSAGE reconstruction");
2263 return;
2264 }
2265
2266 memcpy(full_packet, header, sizeof(*header));
2267 memcpy(full_packet + sizeof(*header), message, msg_len);
2268
2269 handle_client_error_packet(client, full_packet, total_len);
2270 SAFE_FREE(full_packet);
2271}
2272
2273static void acip_server_on_remote_log(const remote_log_packet_t *header, const char *message, void *client_ctx,
2274 void *app_ctx) {
2275 (void)app_ctx;
2276 client_info_t *client = (client_info_t *)client_ctx;
2277
2278 // Reconstruct full packet for existing handler
2279 size_t msg_len = strlen(message);
2280 size_t total_len = sizeof(*header) + msg_len;
2281 uint8_t *full_packet = SAFE_MALLOC(total_len, uint8_t *);
2282 if (!full_packet) {
2283 log_error("Failed to allocate buffer for REMOTE_LOG reconstruction");
2284 return;
2285 }
2286
2287 memcpy(full_packet, header, sizeof(*header));
2288 memcpy(full_packet + sizeof(*header), message, msg_len);
2289
2290 handle_remote_log_packet_from_client(client, full_packet, total_len);
2291 SAFE_FREE(full_packet);
2292}
2293
2294static void acip_server_on_crypto_rekey_request(const void *payload, size_t payload_len, void *client_ctx,
2295 void *app_ctx) {
2296 (void)app_ctx;
2297 client_info_t *client = (client_info_t *)client_ctx;
2298
2299 log_debug("Received REKEY_REQUEST from client %u", client->client_id);
2300
2301 // Process the client's rekey request
2303 asciichat_error_t crypto_result =
2304 crypto_handshake_process_rekey_request(&client->crypto_handshake_ctx, (void *)payload, payload_len);
2306
2307 if (crypto_result != ASCIICHAT_OK) {
2308 log_error("Failed to process REKEY_REQUEST from client %u: %d", client->client_id, crypto_result);
2309 return;
2310 }
2311
2312 // Send REKEY_RESPONSE
2314 // CRITICAL: Also protect socket write with send_mutex (follows lock ordering)
2315 mutex_lock(&client->send_mutex);
2316 crypto_result = crypto_handshake_rekey_response(&client->crypto_handshake_ctx, client->socket);
2317 mutex_unlock(&client->send_mutex);
2319
2320 if (crypto_result != ASCIICHAT_OK) {
2321 log_error("Failed to send REKEY_RESPONSE to client %u: %d", client->client_id, crypto_result);
2322 } else {
2323 log_debug("Sent REKEY_RESPONSE to client %u", client->client_id);
2324 }
2325}
2326
2327static void acip_server_on_crypto_rekey_response(const void *payload, size_t payload_len, void *client_ctx,
2328 void *app_ctx) {
2329 (void)app_ctx;
2330 client_info_t *client = (client_info_t *)client_ctx;
2331
2332 log_debug("Received REKEY_RESPONSE from client %u", client->client_id);
2333
2334 // Process the client's rekey response
2336 asciichat_error_t crypto_result =
2337 crypto_handshake_process_rekey_response(&client->crypto_handshake_ctx, (void *)payload, payload_len);
2339
2340 if (crypto_result != ASCIICHAT_OK) {
2341 log_error("Failed to process REKEY_RESPONSE from client %u: %d", client->client_id, crypto_result);
2342 return;
2343 }
2344
2345 // Send REKEY_COMPLETE to confirm and activate new key
2347 // CRITICAL: Also protect socket write with send_mutex (follows lock ordering)
2348 mutex_lock(&client->send_mutex);
2349 crypto_result = crypto_handshake_rekey_complete(&client->crypto_handshake_ctx, client->socket);
2350 mutex_unlock(&client->send_mutex);
2352
2353 if (crypto_result != ASCIICHAT_OK) {
2354 log_error("Failed to send REKEY_COMPLETE to client %u: %d", client->client_id, crypto_result);
2355 } else {
2356 log_debug("Sent REKEY_COMPLETE to client %u - session rekeying complete", client->client_id);
2357 }
2358}
2359
2360static void acip_server_on_crypto_rekey_complete(const void *payload, size_t payload_len, void *client_ctx,
2361 void *app_ctx) {
2362 (void)app_ctx;
2363 client_info_t *client = (client_info_t *)client_ctx;
2364
2365 log_debug("Received REKEY_COMPLETE from client %u", client->client_id);
2366
2367 // Process and commit to new key
2369 asciichat_error_t crypto_result =
2370 crypto_handshake_process_rekey_complete(&client->crypto_handshake_ctx, (void *)payload, payload_len);
2372
2373 if (crypto_result != ASCIICHAT_OK) {
2374 log_error("Failed to process REKEY_COMPLETE from client %u: %d", client->client_id, crypto_result);
2375 } else {
2376 log_debug("Session rekeying completed successfully with client %u", client->client_id);
2377 // Notify client that rekeying is complete (new keys now active on both sides)
2378 log_info_client(client, "Session rekey complete - new encryption keys active");
2379 }
2380}
2381
2390void process_decrypted_packet(client_info_t *client, packet_type_t type, void *data, size_t len) {
2391 // Rate limiting: Check and record packet-specific rate limits
2392 if (g_rate_limiter) {
2393 if (!check_and_record_packet_rate_limit(g_rate_limiter, client->client_ip, client->socket, type)) {
2394 // Rate limit exceeded - error response already sent by utility function
2395 return;
2396 }
2397 }
2398
2399 switch (type) {
2401 handle_protocol_version_packet(client, data, len);
2402 break;
2403
2405 handle_image_frame_packet(client, data, len);
2406 break;
2407
2408 case PACKET_TYPE_AUDIO:
2409 handle_audio_packet(client, data, len);
2410 break;
2411
2413 handle_audio_batch_packet(client, data, len);
2414 break;
2415
2417 // Single-frame Opus packet: 16-byte header (sample_rate + frame_duration + reserved) + Opus data
2418 // Extract metadata and forward to mixer
2419 if (len >= 16) {
2420 const uint8_t *payload = (const uint8_t *)data;
2421 // Use unaligned read helpers - network data may not be aligned
2422 int sample_rate = (int)NET_TO_HOST_U32(read_u32_unaligned(payload));
2423 int frame_duration = (int)NET_TO_HOST_U32(read_u32_unaligned(payload + 4));
2424 // Reserved bytes at offset 8-15
2425 size_t opus_size = len - 16;
2426
2427 if (opus_size > 0 && opus_size <= 1024 && sample_rate == 48000 && frame_duration == 20) {
2428 // Create a synthetic Opus batch packet (frame_count=1) and process it
2429 // This reuses the batch handler logic
2430 uint8_t batch_buffer[1024 + 20]; // Max Opus + header
2431 uint8_t *batch_ptr = batch_buffer;
2432
2433 // Write batch header (batch_buffer is stack-aligned, writes are safe)
2434 write_u32_unaligned(batch_ptr, HOST_TO_NET_U32((uint32_t)sample_rate));
2435 batch_ptr += 4;
2436 write_u32_unaligned(batch_ptr, HOST_TO_NET_U32((uint32_t)frame_duration));
2437 batch_ptr += 4;
2438 write_u32_unaligned(batch_ptr, HOST_TO_NET_U32(1)); // frame_count = 1
2439 batch_ptr += 4;
2440 memset(batch_ptr, 0, 4); // reserved
2441 batch_ptr += 4;
2442
2443 // Write frame size
2444 write_u16_unaligned(batch_ptr, HOST_TO_NET_U16((uint16_t)opus_size));
2445 batch_ptr += 2;
2446
2447 // Write Opus data
2448 memcpy(batch_ptr, payload + 16, opus_size);
2449 batch_ptr += opus_size;
2450
2451 // Process as batch packet
2452 size_t batch_size = (size_t)(batch_ptr - batch_buffer);
2453 handle_audio_opus_batch_packet(client, batch_buffer, batch_size);
2454 }
2455 }
2456 break;
2457
2459 handle_audio_opus_batch_packet(client, data, len);
2460 break;
2461
2463 handle_client_join_packet(client, data, len);
2464 break;
2465
2467 handle_client_leave_packet(client, data, len);
2468 break;
2469
2471 handle_stream_start_packet(client, data, len);
2472 break;
2473
2475 handle_stream_stop_packet(client, data, len);
2476 break;
2477
2479 handle_client_capabilities_packet(client, data, len);
2480 break;
2481
2482 case PACKET_TYPE_PING: {
2483 // Respond with PONG using ACIP transport
2484 // CRITICAL: Protect socket write with send_mutex to prevent concurrent writes
2485 mutex_lock(&client->send_mutex);
2486 asciichat_error_t pong_result = acip_send_pong(client->transport);
2487 mutex_unlock(&client->send_mutex);
2488 if (pong_result != ASCIICHAT_OK) {
2489 SET_ERRNO(ERROR_NETWORK, "Failed to send PONG response to client %u: %s", client->client_id,
2490 asciichat_error_string(pong_result));
2491 }
2492 break;
2493 }
2494
2495 case PACKET_TYPE_PONG:
2496 // Client acknowledged our PING - no action needed
2497 break;
2498
2500 handle_remote_log_packet_from_client(client, data, len);
2501 break;
2502
2503 default:
2504 disconnect_client_for_bad_data(client, "Unknown packet type: %d (len=%zu)", type, len);
2505 break;
2506 }
2507}
🔌 Cross-platform abstraction layer umbrella header for ascii-chat
⚠️‼️ Error and/or exit() when things go bad.
#define LOG_ERRNO_IF_SET(message)
Check if any error occurred and log it if so.
🗃️ Lock-Free Unified Memory Buffer Pool with Lazy Allocation
bool should_exit()
Check if client should exit.
Hardware-Accelerated CRC32 Checksum Computation.
asciichat_error_t crypto_handshake_rekey_complete(crypto_handshake_context_t *ctx, socket_t socket)
Send REKEY_COMPLETE packet (initiator side)
const crypto_context_t * crypto_handshake_get_context(const crypto_handshake_context_t *ctx)
Get the crypto context for encryption/decryption.
asciichat_error_t crypto_handshake_process_rekey_request(crypto_handshake_context_t *ctx, const uint8_t *packet, size_t packet_len)
Process received REKEY_REQUEST packet (responder side)
bool crypto_handshake_is_ready(const crypto_handshake_context_t *ctx)
Check if handshake is complete and encryption is ready.
bool crypto_handshake_should_rekey(const crypto_handshake_context_t *ctx)
Check if rekeying should be triggered for this handshake context.
asciichat_error_t crypto_handshake_rekey_response(crypto_handshake_context_t *ctx, socket_t socket)
Send REKEY_RESPONSE packet (responder side)
void crypto_handshake_cleanup(crypto_handshake_context_t *ctx)
Cleanup crypto handshake context with secure memory wiping.
asciichat_error_t crypto_handshake_process_rekey_response(crypto_handshake_context_t *ctx, const uint8_t *packet, size_t packet_len)
Process received REKEY_RESPONSE packet (initiator side)
asciichat_error_t crypto_handshake_process_rekey_complete(crypto_handshake_context_t *ctx, const uint8_t *packet, size_t packet_len)
Process received REKEY_COMPLETE packet (responder side)
asciichat_error_t crypto_handshake_rekey_request(crypto_handshake_context_t *ctx, socket_t socket)
Send REKEY_REQUEST packet (initiator side)
Common declarations and data structures for cryptographic handshake.
🔄 Network byte order conversion helpers
#define HOST_TO_NET_U16(val)
Definition endian.h:101
#define HOST_TO_NET_U32(val)
Definition endian.h:71
#define NET_TO_HOST_U16(val)
Definition endian.h:116
#define NET_TO_HOST_U32(val)
Definition endian.h:86
bool check_and_record_packet_rate_limit(rate_limiter_t *rate_limiter, const char *client_ip, socket_t client_socket, packet_type_t packet_type)
Map packet type to rate event type and check rate limit.
Definition errors.c:54
Network error handling utilities.
📊 String Formatting Utilities
asciichat_error_t audio_ring_buffer_write(audio_ring_buffer_t *rb, const float *data, int samples)
Write audio samples to ring buffer.
#define AUDIO_SAMPLE_RATE
Audio sample rate (48kHz professional quality, Opus-compatible)
void audio_ring_buffer_destroy(audio_ring_buffer_t *rb)
Destroy an audio ring buffer.
int mixer_add_source(mixer_t *mixer, uint32_t client_id, audio_ring_buffer_t *buffer)
Add an audio source to the mixer.
Definition mixer.c:363
audio_ring_buffer_t * audio_ring_buffer_create_for_capture(void)
Create a new audio ring buffer for capture (without jitter buffering)
void opus_codec_destroy(opus_codec_t *codec)
Destroy an Opus codec instance.
Definition opus_codec.c:215
void mixer_remove_source(mixer_t *mixer, uint32_t client_id)
Remove an audio source from the mixer.
Definition mixer.c:400
void buffer_pool_free(buffer_pool_t *pool, void *data, size_t size)
Free a buffer back to the pool (lock-free)
void * buffer_pool_alloc(buffer_pool_t *pool, size_t size)
Allocate a buffer from the pool (lock-free fast path)
unsigned short uint16_t
Definition common.h:57
#define SAFE_MALLOC_ALIGNED(size, alignment, cast)
Definition common.h:293
unsigned int uint32_t
Definition common.h:58
#define SAFE_STRNCPY(dst, src, size)
Definition common.h:358
#define SAFE_FREE(ptr)
Definition common.h:320
#define SAFE_MALLOC(size, cast)
Definition common.h:208
#define SAFE_STRERROR(errnum)
Definition common.h:385
#define read_u32_unaligned
Definition common.h:405
unsigned long long uint64_t
Definition common.h:59
unsigned char uint8_t
Definition common.h:56
#define write_u32_unaligned
Definition common.h:407
#define write_u16_unaligned
Definition common.h:406
#define SET_ERRNO(code, context_msg,...)
Set error code with custom context message and log it.
#define HAS_ERRNO(var)
Check if an error occurred and get full context.
void asciichat_errno_cleanup(void)
Cleanup error system resources.
asciichat_error_t
Error and exit codes - unified status values (0-255)
Definition error_codes.h:46
@ ERROR_INVALID_STATE
@ ERROR_NETWORK
Definition error_codes.h:69
@ ERROR_RESOURCE_EXHAUSTED
@ ERROR_MEMORY
Definition error_codes.h:53
@ ASCIICHAT_OK
Definition error_codes.h:48
@ ERROR_CRYPTO
Definition error_codes.h:88
@ ERROR_INVALID_PARAM
@ ERROR_INTERNAL
Definition error_codes.h:84
#define MAX_CLIENTS
Maximum possible clients (static array size) - actual runtime limit set by –max-clients (1-32)
Definition limits.h:23
#define MAX_DISPLAY_NAME_LEN
Maximum display name length in characters.
Definition limits.h:20
#define LOG_RATE_FAST
Log rate limit: 1 second (1,000,000 microseconds)
Definition log_rates.h:26
#define LOG_RATE_DEFAULT
Log rate limit: 5 seconds (5,000,000 microseconds) - default for audio/video packets.
Definition log_rates.h:32
#define log_warn(...)
Log a WARN message.
#define log_info_client(client, fmt,...)
Server sends INFO log message to client.
#define log_error(...)
Log an ERROR message.
#define log_debug_every(interval_us, fmt,...)
Rate-limited DEBUG logging.
#define log_info(...)
Log an INFO message.
#define log_debug(...)
Log a DEBUG message.
#define log_error_client(client, fmt,...)
Server sends ERROR log message to client.
#define log_warn_every(interval_us, fmt,...)
Rate-limited WARN logging.
int format_duration_ms(double milliseconds, char *buffer, size_t buffer_size)
Format milliseconds as human-readable duration string.
Definition time.c:269
uint32_t pixel_format
Pixel format enum (0=RGB24, 1=RGBA32, 2=BGR24, etc.)
Definition packet.h:774
asciichat_error_t set_socket_timeout(socket_t sockfd, int timeout_seconds)
Set socket timeout.
Definition network.c:436
uint32_t reserved[6]
Reserved fields for future use (must be zero)
Definition packet.h:604
asciichat_error_t set_socket_keepalive(socket_t sockfd)
Set socket keepalive.
Definition network.c:461
uint32_t client_id
Client ID (0 = server, >0 = client identifier)
Definition packet.h:500
uint32_t active_client_count
Number of clients actively sending video/audio streams.
Definition packet.h:602
void * allocated_buffer
Buffer that needs to be freed by caller (may be NULL if not allocated)
Definition packet.h:990
uint32_t connected_client_count
Total number of currently connected clients.
Definition packet.h:600
asciichat_error_t send_audio_batch_packet(socket_t sockfd, const float *samples, int num_samples, int batch_count, crypto_context_t *crypto_ctx)
Send a batched audio packet with encryption support.
Definition packet.c:1072
asciichat_error_t packet_parse_error_message(const void *data, size_t len, asciichat_error_t *out_error_code, char *message_buffer, size_t message_buffer_size, size_t *out_message_length)
Parse an error packet payload into components.
Definition packet.c:854
asciichat_error_t av_send_audio_opus_batch(socket_t sockfd, const uint8_t *opus_data, size_t opus_size, const uint16_t *frame_sizes, int sample_rate, int frame_duration, int frame_count, crypto_context_t *crypto_ctx)
Send Opus-encoded audio batch packet with encryption support.
Definition packet.c:1136
size_t allocated_size
Size of allocated buffer in bytes.
Definition packet.h:992
size_t len
Length of payload data in bytes.
Definition packet.h:986
void * data
Packet payload data (decrypted and decompressed if applicable)
Definition packet.h:984
uint32_t compressed_size
Compressed data size (0 = not compressed, >0 = compressed)
Definition packet.h:776
packet_recv_result_t receive_packet_secure(socket_t sockfd, void *crypto_ctx, bool enforce_encryption, packet_envelope_t *envelope)
Receive a packet with decryption and decompression support.
Definition packet.c:568
const char * network_error_string()
Get human-readable error string for network errors.
Definition network.c:535
uint16_t type
Packet type (packet_type_t enumeration)
Definition packet.h:494
uint32_t height
Image height in pixels.
Definition packet.h:772
packet_type_t type
Packet type (from packet_types.h)
Definition packet.h:982
uint32_t width
Image width in pixels.
Definition packet.h:770
packet_recv_result_t
Packet reception result codes.
Definition packet.h:1003
@ PACKET_RECV_SUCCESS
Packet received successfully.
Definition packet.h:1005
#define GET_OPTION(field)
Safely get a specific option field (lock-free read)
Definition options.h:644
void packet_queue_shutdown(packet_queue_t *queue)
Signal queue shutdown (causes dequeue to return NULL)
packet_queue_t * packet_queue_create_with_pools(size_t max_size, size_t node_pool_size, bool use_buffer_pool)
Create a packet queue with both node and buffer pools.
size_t data_len
Length of payload data in bytes.
void packet_queue_free_packet(queued_packet_t *packet)
Free a dequeued packet.
queued_packet_t * packet_queue_try_dequeue(packet_queue_t *queue)
Try to dequeue a packet without blocking.
void packet_queue_destroy(packet_queue_t *queue)
Destroy a packet queue and free all resources.
void * data
Packet payload data (can be NULL for header-only packets)
packet_type_t
Network protocol packet type enumeration.
Definition packet.h:281
#define MAX_ERROR_MESSAGE_LENGTH
Maximum error message length (512 bytes)
Definition packet.h:122
@ PACKET_TYPE_AUDIO_OPUS_BATCH
Batched Opus-encoded audio frames.
Definition packet.h:359
@ PACKET_TYPE_AUDIO_OPUS
Opus-encoded single audio frame.
Definition packet.h:357
@ PACKET_TYPE_CLIENT_LEAVE
Clean disconnect notification.
Definition packet.h:302
@ PACKET_TYPE_IMAGE_FRAME
Complete RGB image with dimensions.
Definition packet.h:288
@ PACKET_TYPE_PONG
Keepalive pong response.
Definition packet.h:297
@ PACKET_TYPE_STREAM_START
Client requests to start sending video/audio.
Definition packet.h:304
@ PACKET_TYPE_AUDIO
Single audio packet (legacy)
Definition packet.h:291
@ PACKET_TYPE_REMOTE_LOG
Bidirectional remote logging packet.
Definition packet.h:354
@ PACKET_TYPE_PROTOCOL_VERSION
Protocol version and capabilities negotiation.
Definition packet.h:283
@ PACKET_TYPE_CLIENT_JOIN
Client announces capability to send media.
Definition packet.h:300
@ PACKET_TYPE_CLIENT_CAPABILITIES
Client reports terminal capabilities.
Definition packet.h:293
@ PACKET_TYPE_PING
Keepalive ping packet.
Definition packet.h:295
@ PACKET_TYPE_AUDIO_BATCH
Batched audio packets for efficiency.
Definition packet.h:343
@ PACKET_TYPE_STREAM_STOP
Client stops sending media.
Definition packet.h:306
int socket_shutdown(socket_t sock, int how)
Shutdown socket I/O.
int mutex_init(mutex_t *mutex)
Initialize a mutex.
#define rwlock_wrunlock(lock)
Release a write lock (with debug tracking in debug builds)
Definition rwlock.h:249
int socket_setsockopt(socket_t sock, int level, int optname, const void *optval, socklen_t optlen)
Set socket option.
int socket_t
Socket handle type (POSIX: int)
Definition socket.h:50
int safe_snprintf(char *buffer, size_t buffer_size, const char *format,...)
Safe version of snprintf that ensures null termination.
#define mutex_lock(mutex)
Lock a mutex (with debug tracking in debug builds)
Definition mutex.h:140
#define INVALID_SOCKET_VALUE
Invalid socket value (POSIX: -1)
Definition socket.h:52
#define rwlock_rdlock(lock)
Acquire a read lock (with debug tracking in debug builds)
Definition rwlock.h:194
int socket_close(socket_t sock)
Close a socket.
ssize_t socket_send(socket_t sock, const void *buf, size_t len, int flags)
Send data on a socket.
bool asciichat_thread_is_initialized(asciichat_thread_t *thread)
Check if a thread handle has been initialized.
void platform_sleep_usec(unsigned int usec)
High-precision sleep function with microsecond precision.
pthread_rwlock_t rwlock_t
Read-write lock type (POSIX: pthread_rwlock_t)
Definition rwlock.h:40
#define rwlock_wrlock(lock)
Acquire a write lock (with debug tracking in debug builds)
Definition rwlock.h:213
int asciichat_thread_join(asciichat_thread_t *thread, void **retval)
Wait for a thread to complete (blocking)
int errno
#define mutex_unlock(mutex)
Unlock a mutex (with debug tracking in debug builds)
Definition mutex.h:175
#define rwlock_rdunlock(lock)
Release a read lock (with debug tracking in debug builds)
Definition rwlock.h:231
int asciichat_thread_create(asciichat_thread_t *thread, void *(*func)(void *), void *arg)
Create a new thread.
int mutex_destroy(mutex_t *mutex)
Destroy a mutex.
void handle_audio_opus_batch_packet(client_info_t *client, const void *data, size_t len)
Process AUDIO_OPUS_BATCH packet - efficient Opus-encoded audio batch from client.
void format_bytes_pretty(size_t bytes, char *out, size_t out_capacity)
Format byte count into human-readable string.
Definition format.c:10
void disconnect_client_for_bad_data(client_info_t *client, const char *format,...)
void video_frame_buffer_destroy(video_frame_buffer_t *vfb)
Destroy frame buffer and free all resources.
Definition video_frame.c:83
video_frame_buffer_t * video_frame_buffer_create(uint32_t client_id)
Create a double-buffered video frame manager.
Definition video_frame.c:15
const video_frame_t * video_frame_get_latest(video_frame_buffer_t *vfb)
Reader API: Get latest frame if available.
ACIP protocol packet handlers (transport-agnostic)
🔊 Audio Capture and Playback Interface for ascii-chat
asciichat_error_t acip_send_server_state(acip_transport_t *transport, const server_state_packet_t *state)
Send server state update to client (server → client)
asciichat_error_t acip_server_receive_and_dispatch(acip_transport_t *transport, void *client_ctx, const acip_server_callbacks_t *callbacks)
Receive packet from client and dispatch to callbacks.
asciichat_error_t acip_send_ascii_frame(acip_transport_t *transport, const char *frame_data, uint32_t width, uint32_t height)
Send ASCII frame to client (server → client)
asciichat_error_t acip_send_clear_console(acip_transport_t *transport)
Send clear console command to client (server → client)
ACIP server-side protocol library.
asciichat_error_t tcp_server_spawn_thread(tcp_server_t *server, socket_t client_socket, void *(*thread_func)(void *), void *thread_arg, int stop_id, const char *thread_name)
Spawn a worker thread for a client.
asciichat_error_t tcp_server_stop_client_threads(tcp_server_t *server, socket_t client_socket)
Stop all threads for a client in stop_id order.
asciichat_error_t tcp_server_add_client(tcp_server_t *server, socket_t socket, void *client_data)
Add client to registry.
Multi-Source Audio Mixing and Processing System.
Network logging macros and remote log direction enumeration.
🌐 Core network I/O operations with timeout support
⚙️ Command-line options parsing and configuration management for ascii-chat
Opus audio codec wrapper for real-time encoding/decoding.
Packet protocol implementation with encryption and compression support.
📬 Thread-safe packet queue system for per-client send threads
Platform-independent safe string functions.
int create_client_render_threads(server_context_t *server_ctx, client_info_t *client)
Create and initialize per-client rendering threads.
Definition render.c:1121
Per-client rendering threads with rate limiting.
asciichat_error_t acip_send_audio_opus(acip_transport_t *transport, const void *opus_data, size_t opus_len)
Send Opus-encoded audio packet.
Definition send.c:122
asciichat_error_t packet_send_via_transport(acip_transport_t *transport, packet_type_t type, const void *payload, size_t payload_len)
Send packet via transport with proper header (exported for generic wrappers)
Definition send.c:40
asciichat_error_t acip_send_pong(acip_transport_t *transport)
Send pong packet.
Definition send.c:184
ACIP shared/bidirectional packet sending functions.
rate_limiter_t * g_rate_limiter
Global rate limiter for connection attempts and packet processing.
ascii-chat Server Mode Entry Point Header
void handle_client_join_packet(client_info_t *client, const void *data, size_t len)
Process CLIENT_JOIN packet - client announces identity and capabilities.
void handle_protocol_version_packet(client_info_t *client, const void *data, size_t len)
Process PROTOCOL_VERSION packet - validate protocol compatibility.
void handle_image_frame_packet(client_info_t *client, void *data, size_t len)
Process IMAGE_FRAME packet - store client's video data for rendering.
void handle_audio_batch_packet(client_info_t *client, const void *data, size_t len)
Process AUDIO_BATCH packet - store efficiently batched audio samples.
void handle_audio_packet(client_info_t *client, const void *data, size_t len)
Process AUDIO packet - store single audio sample batch (legacy format)
void handle_client_leave_packet(client_info_t *client, const void *data, size_t len)
Process CLIENT_LEAVE packet - handle clean client disconnect.
void handle_stream_stop_packet(client_info_t *client, const void *data, size_t len)
Process STREAM_STOP packet - client requests to halt media transmission.
int send_server_state_to_client(client_info_t *client)
Send current server state to a specific client.
void handle_client_capabilities_packet(client_info_t *client, const void *data, size_t len)
Process CLIENT_CAPABILITIES packet - configure client-specific rendering.
void handle_stream_start_packet(client_info_t *client, const void *data, size_t len)
Process STREAM_START packet - client requests to begin media transmission.
void handle_remote_log_packet_from_client(client_info_t *client, const void *data, size_t len)
Cross-platform socket interface for ascii-chat.
void * client_send_thread_func(void *arg)
Client packet send thread.
void cleanup_client_media_buffers(client_info_t *client)
void process_decrypted_packet(client_info_t *client, packet_type_t type, void *data, size_t len)
mixer_t * g_audio_mixer
Global audio mixer from main.c.
void cleanup_client_packet_queues(client_info_t *client)
rwlock_t g_client_manager_rwlock
Reader-writer lock protecting the global client manager.
void * client_receive_thread(void *arg)
void stop_client_threads(client_info_t *client)
client_info_t * find_client_by_socket(socket_t socket)
Find client by socket descriptor using linear search.
int process_encrypted_packet(client_info_t *client, packet_type_t *type, void **data, size_t *len, uint32_t *sender_id)
#define MAX_AUDIO_BATCH
atomic_bool g_server_should_exit
Global shutdown flag from main.c.
client_manager_t g_client_manager
Global client manager singleton - central coordination point.
void broadcast_server_state_to_all_clients(void)
Notify all clients of state changes.
Per-client state management and lifecycle orchestration.
client_info_t * find_client_by_id(uint32_t client_id)
int add_client(server_context_t *server_ctx, socket_t socket, const char *client_ip, int port)
int remove_client(server_context_t *server_ctx, uint32_t client_id)
int add_webrtc_client(server_context_t *server_ctx, acip_transport_t *transport, const char *client_ip)
int crypto_server_decrypt_packet(uint32_t client_id, const uint8_t *ciphertext, size_t ciphertext_len, uint8_t *plaintext, size_t plaintext_size, size_t *plaintext_len)
const crypto_context_t * crypto_server_get_context(uint32_t client_id)
int server_crypto_init(void)
bool crypto_server_is_ready(uint32_t client_id)
int server_crypto_handshake(client_info_t *client)
Server cryptographic operations and per-client handshake management.
Server packet processing and protocol implementation.
Multi-client video mixing and ASCII frame generation.
RGB pixel structure.
Definition video/image.h:80
Server-side packet handler callbacks.
Definition handlers.h:130
void(* on_protocol_version)(const protocol_version_packet_t *version, void *client_ctx, void *app_ctx)
Called when client sends protocol version.
Definition handlers.h:132
Transport instance structure.
Definition transport.h:169
Error context structure.
char * context_message
Optional custom message (dynamically allocated, owned by system)
asciichat_error_t code
Error code (asciichat_error_t enum value)
Audio batch packet structure (Packet Type 28)
Definition packet.h:796
Per-client state structure for server-side client management.
atomic_bool video_render_thread_running
atomic_uint client_id
acip_transport_t * transport
char display_name[MAX_DISPLAY_NAME_LEN]
atomic_int last_sent_grid_sources
asciichat_thread_t audio_render_thread
uint32_t frames_received_logged
video_frame_buffer_t * outgoing_video_buffer
asciichat_thread_t send_thread
crypto_handshake_context_t crypto_handshake_ctx
atomic_ushort height
video_frame_buffer_t * incoming_video_buffer
atomic_ushort width
audio_ring_buffer_t * incoming_audio_buffer
atomic_bool protocol_disconnect_requested
mutex_t client_state_mutex
atomic_bool is_sending_audio
atomic_bool audio_render_thread_running
atomic_int last_rendered_grid_sources
asciichat_thread_t receive_thread
packet_queue_t * audio_queue
char client_ip[INET_ADDRSTRLEN]
atomic_bool shutting_down
packet_type_t pending_packet_type
asciichat_thread_t video_render_thread
atomic_bool is_sending_video
void * pending_packet_payload
atomic_bool active
size_t pending_packet_length
atomic_bool send_thread_running
Global client manager structure for server-side client coordination.
_Atomic uint32_t next_client_id
Monotonic counter for unique client IDs (atomic for thread-safety)
client_info_t * clients_by_id
uthash head pointer for O(1) client_id -> client_info_t* lookups
client_info_t clients[MAX_CLIENTS]
Array of client_info_t structures (backing storage)
int client_count
Current number of active clients.
Cryptographic context structure.
Error packet structure carrying error code and textual description.
Definition packet.h:619
Image frame packet structure (Packet Type 3)
Definition packet.h:768
Main mixer structure for multi-source audio processing.
Definition mixer.h:325
Opus codec context for encoding or decoding.
Definition opus_codec.h:95
Packet envelope containing received packet data.
Definition packet.h:980
Network packet header structure.
Definition packet.h:490
Protocol version negotiation packet structure (Packet Type 1)
Definition packet.h:710
Single packet ready to send (header already in network byte order)
Remote log packet structure carrying log level and message text.
Definition packet.h:633
Server context - encapsulates all server state.
Definition server/main.h:81
Server state packet structure.
Definition packet.h:598
Video frame structure.
void * data
Frame data pointer (points to pre-allocated buffer)
⏱️ High-precision timing utilities using sokol_time.h and uthash
Transport abstraction layer for ACIP protocol.
acip_transport_t * acip_tcp_transport_create(socket_t sockfd, crypto_context_t *crypto_ctx)
Create TCP transport from existing socket.
void acip_transport_destroy(acip_transport_t *transport)
Destroy transport and free all resources.
#️⃣ Wrapper for uthash.h that ensures common.h is included first
🔤 String Manipulation and Shell Escaping Utilities
Common SIMD utilities and structures.