ascii-chat 0.8.38
Real-time terminal-based video chat with ASCII art conversion
Loading...
Searching...
No Matches
src/server/client.c
Go to the documentation of this file.
1
103#include <stdatomic.h>
104#include <stdio.h>
105#include <string.h>
106#include <time.h>
107#include <errno.h>
108#ifndef _WIN32
109#include <netinet/tcp.h>
110#endif
111
112#include "client.h"
113#include "main.h"
114#include "protocol.h"
115#include "render.h"
116#include "stream.h"
117#include "crypto.h"
118#include <ascii-chat/crypto/handshake/common.h>
119#include <ascii-chat/crypto/handshake/server.h>
120#include <ascii-chat/crypto/crypto.h>
121#include <ascii-chat/common.h>
122#include <ascii-chat/util/endian.h>
123#include <ascii-chat/asciichat_errno.h>
124#include <ascii-chat/options/options.h>
125#include <ascii-chat/options/rcu.h> // For RCU-based options access
126#include <ascii-chat/buffer_pool.h>
127#include <ascii-chat/network/network.h>
128#include <ascii-chat/network/packet.h>
129#include <ascii-chat/network/packet_queue.h>
130#include <ascii-chat/network/errors.h>
131#include <ascii-chat/network/acip/handlers.h>
132#include <ascii-chat/network/acip/transport.h>
133#include <ascii-chat/network/acip/send.h>
134#include <ascii-chat/network/acip/server.h>
135#include <ascii-chat/audio/audio.h>
136#include <ascii-chat/audio/mixer.h>
137#include <ascii-chat/audio/opus_codec.h>
138#include <ascii-chat/video/video_frame.h>
139#include <ascii-chat/uthash/uthash.h>
140#include <ascii-chat/util/endian.h>
141#include <ascii-chat/util/format.h>
142#include <ascii-chat/util/time.h>
143#include <ascii-chat/platform/abstraction.h>
144#include <ascii-chat/platform/string.h>
145#include <ascii-chat/platform/socket.h>
146#include <ascii-chat/network/crc32.h>
147#include <ascii-chat/network/logging.h>
148
149// Debug flags
150#define DEBUG_NETWORK 1
151#define DEBUG_THREADS 1
152#define DEBUG_MEMORY 1
153
154// =============================================================================
155// Packet Handler Dispatch (O(1) hash table lookup)
156// =============================================================================
157
158typedef void (*client_packet_handler_t)(client_info_t *client, const void *data, size_t len);
159
160#define CLIENT_DISPATCH_HASH_SIZE 32
161#define CLIENT_DISPATCH_HANDLER_COUNT 12
162
166typedef struct {
167 packet_type_t key;
168 uint8_t handler_idx;
170
171#define CLIENT_DISPATCH_HASH(type) ((type) % CLIENT_DISPATCH_HASH_SIZE)
172
173static inline int client_dispatch_hash_lookup(const client_dispatch_entry_t *table, packet_type_t type) {
174 uint32_t h = CLIENT_DISPATCH_HASH(type);
175 for (int i = 0; i < CLIENT_DISPATCH_HASH_SIZE; i++) {
176 uint32_t slot = (h + i) % CLIENT_DISPATCH_HASH_SIZE;
177 if (table[slot].key == 0)
178 return -1;
179 if (table[slot].key == type)
180 return table[slot].handler_idx;
181 }
182 return -1;
183}
184
185// Handler array (indexed by hash lookup result)
186static const client_packet_handler_t g_client_dispatch_handlers[CLIENT_DISPATCH_HANDLER_COUNT] = {
199};
200
201// Hash table mapping packet type -> handler index
202// clang-format off
203static const client_dispatch_entry_t g_client_dispatch_hash[CLIENT_DISPATCH_HASH_SIZE] = {
204 [0] = {PACKET_TYPE_AUDIO_BATCH, 2}, // hash(4000)=0
205 [1] = {PACKET_TYPE_PROTOCOL_VERSION, 0}, // hash(1)=1
206 [2] = {PACKET_TYPE_AUDIO_OPUS_BATCH, 3}, // hash(4001)=1, probed->2
207 [8] = {PACKET_TYPE_CLIENT_CAPABILITIES, 8}, // hash(5000)=8
208 [9] = {PACKET_TYPE_PING, 9}, // hash(5001)=9
209 [10] = {PACKET_TYPE_PONG, 10}, // hash(5002)=10
210 [11] = {PACKET_TYPE_CLIENT_JOIN, 4}, // hash(5003)=11
211 [12] = {PACKET_TYPE_CLIENT_LEAVE, 5}, // hash(5004)=12
212 [13] = {PACKET_TYPE_STREAM_START, 6}, // hash(5005)=13
213 [14] = {PACKET_TYPE_STREAM_STOP, 7}, // hash(5006)=14
214 [20] = {PACKET_TYPE_REMOTE_LOG, 11}, // hash(2004)=20
215 [25] = {PACKET_TYPE_IMAGE_FRAME, 1}, // hash(3001)=25
216};
217// clang-format on
218
219// Forward declarations for static helper functions
220static inline void cleanup_client_all_buffers(client_info_t *client);
221
222static void handle_client_error_packet(client_info_t *client, const void *data, size_t len) {
223 asciichat_error_t reported_error = ASCIICHAT_OK;
224 char message[MAX_ERROR_MESSAGE_LENGTH + 1] = {0};
225
226 asciichat_error_t parse_result =
227 packet_parse_error_message(data, len, &reported_error, message, sizeof(message), NULL);
228 uint32_t client_id = client ? atomic_load(&client->client_id) : 0;
229
230 if (parse_result != ASCIICHAT_OK) {
231 log_warn("Failed to parse error packet from client %u: %s", client_id, asciichat_error_string(parse_result));
232 return;
233 }
234
235 log_error("Client %u reported error %d (%s): %s", client_id, reported_error, asciichat_error_string(reported_error),
236 message);
237}
238
256
271
272// Forward declarations for internal functions
273// client_receive_thread is implemented below
274void *client_send_thread_func(void *arg);
275void *client_dispatch_thread(void *arg);
277static int start_client_threads(server_context_t *server_ctx, client_info_t *client,
278 bool is_tcp);
279
280/* ============================================================================
281 * Client Lookup Functions
282 * ============================================================================
283 */
284
308client_info_t *find_client_by_id(uint32_t client_id) {
309 if (client_id == 0) {
310 SET_ERRNO(ERROR_INVALID_PARAM, "Invalid client ID");
311 return NULL;
312 }
313
314 // Protect uthash lookup with read lock to prevent concurrent access issues
315 rwlock_rdlock(&g_client_manager_rwlock);
316
317 client_info_t *result = NULL;
318 uint32_t search_id = client_id; // uthash needs an lvalue for the key
319 HASH_FIND_INT(g_client_manager.clients_by_id, &search_id, result);
320
321 rwlock_rdunlock(&g_client_manager_rwlock);
322
323 if (!result) {
324 log_warn("Client not found for ID %u", client_id);
325 }
326
327 return result;
328}
329
353client_info_t *find_client_by_socket(socket_t socket) {
354 rwlock_rdlock(&g_client_manager_rwlock);
355
356 for (int i = 0; i < MAX_CLIENTS; i++) {
357 if (g_client_manager.clients[i].socket == socket && atomic_load(&g_client_manager.clients[i].active)) {
358 client_info_t *client = &g_client_manager.clients[i];
359 rwlock_rdunlock(&g_client_manager_rwlock);
360 return client;
361 }
362 }
363
364 rwlock_rdunlock(&g_client_manager_rwlock);
365 return NULL;
366}
367
368/* ============================================================================
369 * Client Management Functions
370 * ============================================================================
371 */
372
379static void configure_client_socket(socket_t socket, uint32_t client_id) {
380 // Enable TCP keepalive to detect dead connections
381 asciichat_error_t keepalive_result = set_socket_keepalive(socket);
382 if (keepalive_result != ASCIICHAT_OK) {
383 log_warn("Failed to set socket keepalive for client %u: %s", client_id, asciichat_error_string(keepalive_result));
384 }
385
386 // Set socket buffer sizes for large data transmission
387 const int SOCKET_SEND_BUFFER_SIZE = 1024 * 1024; // 1MB send buffer
388 const int SOCKET_RECV_BUFFER_SIZE = 1024 * 1024; // 1MB receive buffer
389
390 if (socket_setsockopt(socket, SOL_SOCKET, SO_SNDBUF, &SOCKET_SEND_BUFFER_SIZE, sizeof(SOCKET_SEND_BUFFER_SIZE)) < 0) {
391 log_warn("Failed to set send buffer size for client %u: %s", client_id, network_error_string());
392 }
393
394 if (socket_setsockopt(socket, SOL_SOCKET, SO_RCVBUF, &SOCKET_RECV_BUFFER_SIZE, sizeof(SOCKET_RECV_BUFFER_SIZE)) < 0) {
395 log_warn("Failed to set receive buffer size for client %u: %s", client_id, network_error_string());
396 }
397
398 // Enable TCP_NODELAY to reduce latency for large packets (disables Nagle algorithm)
399 const int TCP_NODELAY_VALUE = 1;
400 if (socket_setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, &TCP_NODELAY_VALUE, sizeof(TCP_NODELAY_VALUE)) < 0) {
401 log_warn("Failed to set TCP_NODELAY for client %u: %s", client_id, network_error_string());
402 }
403}
404
422static int start_client_threads(server_context_t *server_ctx, client_info_t *client, bool is_tcp) {
423 if (!server_ctx || !client) {
424 SET_ERRNO(ERROR_INVALID_PARAM, "server_ctx or client is NULL");
425 return -1;
426 }
427
428 uint32_t client_id = atomic_load(&client->client_id);
429 log_info("★ START_CLIENT_THREADS: client_id=%u is_tcp=%d (about to create %s threads)", client_id, is_tcp,
430 is_tcp ? "TCP" : "WebRTC/WebSocket");
431 char thread_name[64];
432 asciichat_error_t result;
433
434 // Step 1: Create receive thread
435 if (is_tcp) {
436 safe_snprintf(thread_name, sizeof(thread_name), "receive_%u", client_id);
437 result =
438 tcp_server_spawn_thread(server_ctx->tcp_server, client->socket, client_receive_thread, client, 1, thread_name);
439 } else {
440 safe_snprintf(thread_name, sizeof(thread_name), "webrtc_recv_%u", client_id);
441 log_debug("THREAD_CREATE: WebRTC client %u", client_id);
442 log_debug(" client=%p, func=%p, &receive_thread=%p", (void *)client, (void *)client_receive_thread,
443 (void *)&client->receive_thread);
444 log_debug(" Pre-create: receive_thread value=%p", (void *)(uintptr_t)client->receive_thread);
445 result = asciichat_thread_create(&client->receive_thread, client_receive_thread, client);
446 log_debug(" Post-create: result=%d, receive_thread value=%p", result, (void *)(uintptr_t)client->receive_thread);
447 }
448
449 if (result != ASCIICHAT_OK) {
450 log_error("Failed to create receive thread for %s client %u: %s", is_tcp ? "TCP" : "WebRTC", client_id,
451 asciichat_error_string(result));
452 remove_client(server_ctx, client_id);
453 return -1;
454 }
455 log_debug("Created receive thread for %s client %u", is_tcp ? "TCP" : "WebRTC", client_id);
456
457 // Step 1b: Create async dispatch thread (processes queued packets)
458 // This decouples receive from dispatch to prevent backpressure on the socket
459 if (!is_tcp) { // Only for WebRTC/WebSocket clients that need async dispatch
460 safe_snprintf(thread_name, sizeof(thread_name), "dispatch_%u", client_id);
461 atomic_store(&client->dispatch_thread_running, true);
462 result = asciichat_thread_create(&client->dispatch_thread, client_dispatch_thread, client);
463 if (result != ASCIICHAT_OK) {
464 log_error("Failed to create dispatch thread for client %u: %s", client_id, asciichat_error_string(result));
465 remove_client(server_ctx, client_id);
466 return -1;
467 }
468 log_debug("Created async dispatch thread for client %u", client_id);
469 }
470
471 // Step 2: Create render threads BEFORE send thread
472 // This ensures the render threads generate the first frame before the send thread tries to read it
473 log_debug("Creating render threads for client %u", client_id);
474 if (create_client_render_threads(server_ctx, client) != 0) {
475 log_error("Failed to create render threads for client %u", client_id);
476 remove_client(server_ctx, client_id);
477 return -1;
478 }
479 log_debug("Successfully created render threads for client %u", client_id);
480
481 // Step 3: Create send thread AFTER render threads are running
482 if (is_tcp) {
483 safe_snprintf(thread_name, sizeof(thread_name), "send_%u", client_id);
484 result = tcp_server_spawn_thread(server_ctx->tcp_server, client->socket, client_send_thread_func, client, 3,
485 thread_name);
486 } else {
487 safe_snprintf(thread_name, sizeof(thread_name), "webrtc_send_%u", client_id);
488 result = asciichat_thread_create(&client->send_thread, client_send_thread_func, client);
489 }
490
491 if (result != ASCIICHAT_OK) {
492 log_error("Failed to create send thread for %s client %u: %s", is_tcp ? "TCP" : "WebRTC", client_id,
493 asciichat_error_string(result));
494 remove_client(server_ctx, client_id);
495 return -1;
496 }
497 log_debug("Created send thread for %s client %u", is_tcp ? "TCP" : "WebRTC", client_id);
498
499 // Step 4: Send initial server state to the new client
500 if (send_server_state_to_client(client) != 0) {
501 log_warn("Failed to send initial server state to client %u", client_id);
502 }
503
504 // Get current client count for initial state packet
505 rwlock_rdlock(&g_client_manager_rwlock);
506 uint32_t connected_count = g_client_manager.client_count;
507 rwlock_rdunlock(&g_client_manager_rwlock);
508
509 server_state_packet_t state;
510 state.connected_client_count = connected_count;
511 state.active_client_count = 0; // Will be updated by broadcast thread
512 memset(state.reserved, 0, sizeof(state.reserved));
513
514 // Convert to network byte order
515 server_state_packet_t net_state;
516 net_state.connected_client_count = HOST_TO_NET_U32(state.connected_client_count);
517 net_state.active_client_count = HOST_TO_NET_U32(state.active_client_count);
518 memset(net_state.reserved, 0, sizeof(net_state.reserved));
519
520 // Send initial server state via ACIP transport
521 asciichat_error_t packet_result = acip_send_server_state(client->transport, &net_state);
522 if (packet_result != ASCIICHAT_OK) {
523 log_warn("Failed to send initial server state to client %u: %s", client_id, asciichat_error_string(packet_result));
524 } else {
525 log_debug("Sent initial server state to client %u: %u connected clients", client_id, state.connected_client_count);
526 }
527
528 // Step 5: Broadcast server state to ALL clients AFTER the new client is fully set up
530
531 return 0;
532}
533
534int add_client(server_context_t *server_ctx, socket_t socket, const char *client_ip, int port) {
535 // Find empty slot WITHOUT holding the global lock
536 // We'll re-verify under lock after allocations complete
537 int slot = -1;
538 int existing_count = 0;
539 for (int i = 0; i < MAX_CLIENTS; i++) {
540 if (slot == -1 && atomic_load(&g_client_manager.clients[i].client_id) == 0) {
541 slot = i; // Take first available slot
542 }
543 if (atomic_load(&g_client_manager.clients[i].client_id) != 0 && atomic_load(&g_client_manager.clients[i].active)) {
544 existing_count++;
545 }
546 }
547
548 // Quick pre-check before expensive allocations
549 if (existing_count >= GET_OPTION(max_clients) || slot == -1) {
550 const char *reject_msg = "SERVER_FULL: Maximum client limit reached\n";
551 ssize_t send_result = socket_send(socket, reject_msg, strlen(reject_msg), 0);
552 if (send_result < 0) {
553 log_warn("Failed to send rejection message to client: %s", SAFE_STRERROR(errno));
554 }
555 return -1;
556 }
557
558 // DO EXPENSIVE ALLOCATIONS OUTSIDE THE LOCK
559 // This prevents blocking frame processing during client initialization
560 uint32_t new_client_id = atomic_fetch_add(&g_client_manager.next_client_id, 1) + 1;
561
562 video_frame_buffer_t *incoming_video_buffer = video_frame_buffer_create(new_client_id);
563 if (!incoming_video_buffer) {
564 SET_ERRNO(ERROR_MEMORY, "Failed to create video buffer for client %u", new_client_id);
565 log_error("Failed to create video buffer for client %u", new_client_id);
566 return -1;
567 }
568
569 audio_ring_buffer_t *incoming_audio_buffer = audio_ring_buffer_create_for_capture();
570 if (!incoming_audio_buffer) {
571 SET_ERRNO(ERROR_MEMORY, "Failed to create audio buffer for client %u", new_client_id);
572 log_error("Failed to create audio buffer for client %u", new_client_id);
573 video_frame_buffer_destroy(incoming_video_buffer);
574 return -1;
575 }
576
577 packet_queue_t *audio_queue = packet_queue_create_with_pools(500, 1000, false);
578 if (!audio_queue) {
579 LOG_ERRNO_IF_SET("Failed to create audio queue for client");
580 audio_ring_buffer_destroy(incoming_audio_buffer);
581 video_frame_buffer_destroy(incoming_video_buffer);
582 return -1;
583 }
584
585 video_frame_buffer_t *outgoing_video_buffer = video_frame_buffer_create(new_client_id);
586 if (!outgoing_video_buffer) {
587 LOG_ERRNO_IF_SET("Failed to create outgoing video buffer for client");
588 packet_queue_destroy(audio_queue);
589 audio_ring_buffer_destroy(incoming_audio_buffer);
590 video_frame_buffer_destroy(incoming_video_buffer);
591 return -1;
592 }
593
594 void *send_buffer = SAFE_MALLOC_ALIGNED(MAX_FRAME_BUFFER_SIZE, 64, void *);
595 if (!send_buffer) {
596 log_error("Failed to allocate send buffer for client %u", new_client_id);
597 video_frame_buffer_destroy(outgoing_video_buffer);
598 packet_queue_destroy(audio_queue);
599 audio_ring_buffer_destroy(incoming_audio_buffer);
600 video_frame_buffer_destroy(incoming_video_buffer);
601 return -1;
602 }
603
604 // NOW acquire the lock only for the critical section: slot assignment + registration
605 rwlock_wrlock(&g_client_manager_rwlock);
606
607 // Re-check slot availability under lock (another thread might have taken it)
608 if (atomic_load(&g_client_manager.clients[slot].client_id) != 0) {
609 rwlock_wrunlock(&g_client_manager_rwlock);
610 SAFE_FREE(send_buffer);
611 video_frame_buffer_destroy(outgoing_video_buffer);
612 packet_queue_destroy(audio_queue);
613 audio_ring_buffer_destroy(incoming_audio_buffer);
614 video_frame_buffer_destroy(incoming_video_buffer);
615
616 const char *reject_msg = "SERVER_FULL: Slot reassigned, try again\n";
617 socket_send(socket, reject_msg, strlen(reject_msg), 0);
618 return -1;
619 }
620
621 // Now we have exclusive access to the slot - do the actual registration
622 client_info_t *client = &g_client_manager.clients[slot];
623 memset(client, 0, sizeof(client_info_t));
624
625 client->socket = socket;
626 client->is_tcp_client = true;
627 atomic_store(&client->client_id, new_client_id);
628 SAFE_STRNCPY(client->client_ip, client_ip, sizeof(client->client_ip) - 1);
629 client->port = port;
630 atomic_store(&client->active, true);
631 client->server_ctx = server_ctx; // Store server context for cleanup
632 atomic_store(&client->shutting_down, false);
633 atomic_store(&client->last_rendered_grid_sources, 0);
634 atomic_store(&client->last_sent_grid_sources, 0);
635 client->connected_at = time(NULL);
636
637 memset(&client->crypto_handshake_ctx, 0, sizeof(client->crypto_handshake_ctx));
638 client->crypto_initialized = false;
639
640 client->pending_packet_type = 0;
641 client->pending_packet_payload = NULL;
642 client->pending_packet_length = 0;
643
644 // Assign pre-allocated buffers
645 client->incoming_video_buffer = incoming_video_buffer;
646 client->incoming_audio_buffer = incoming_audio_buffer;
647 client->audio_queue = audio_queue;
648 client->outgoing_video_buffer = outgoing_video_buffer;
649 client->send_buffer = send_buffer;
650 client->send_buffer_size = MAX_FRAME_BUFFER_SIZE;
651
652 safe_snprintf(client->display_name, sizeof(client->display_name), "Client%u", new_client_id);
653 log_info("Added new client ID=%u from %s:%d (socket=%d, slot=%d)", new_client_id, client_ip, port, socket, slot);
654 log_debug("Client slot assigned: client_id=%u assigned to slot %d, socket=%d", new_client_id, slot, socket);
655
656 // Register socket with tcp_server
657 asciichat_error_t reg_result = tcp_server_add_client(server_ctx->tcp_server, socket, client);
658 if (reg_result != ASCIICHAT_OK) {
659 SET_ERRNO(ERROR_INTERNAL, "Failed to register client socket with tcp_server");
660 log_error("Failed to register client %u socket with tcp_server", new_client_id);
661 // Don't unlock here - error_cleanup will do it
662 goto error_cleanup;
663 }
664
666 log_debug("Client count updated: now %d clients (added client_id=%u to slot %d)", g_client_manager.client_count,
667 new_client_id, slot);
668
669 uint32_t cid = atomic_load(&client->client_id);
670 HASH_ADD_INT(g_client_manager.clients_by_id, client_id, client);
671 log_debug("Added client %u to uthash table", cid);
672
673 rwlock_wrunlock(&g_client_manager_rwlock);
674
675 // Configure socket OUTSIDE lock
676 configure_client_socket(socket, new_client_id);
677
678 // Initialize mutexes OUTSIDE lock
679 if (mutex_init(&client->client_state_mutex) != 0) {
680 log_error("Failed to initialize client state mutex for client %u", new_client_id);
681 remove_client(server_ctx, new_client_id);
682 return -1;
683 }
684
685 if (mutex_init(&client->send_mutex) != 0) {
686 log_error("Failed to initialize send mutex for client %u", new_client_id);
687 remove_client(server_ctx, new_client_id);
688 return -1;
689 }
690
691 // Register with audio mixer OUTSIDE lock
692 if (g_audio_mixer && client->incoming_audio_buffer) {
693 if (mixer_add_source(g_audio_mixer, new_client_id, client->incoming_audio_buffer) < 0) {
694 log_warn("Failed to add client %u to audio mixer", new_client_id);
695 } else {
696#ifdef DEBUG_AUDIO
697 log_debug("Added client %u to audio mixer", new_client_id);
698#endif
699 }
700 }
701
702 // Perform crypto handshake before starting threads.
703 // This ensures the handshake uses the socket directly without interference from receive thread.
704 if (server_crypto_init() == 0) {
705 // Set timeout for crypto handshake to prevent indefinite blocking
706 // This prevents clients from connecting but never completing the handshake
707 const uint64_t HANDSHAKE_TIMEOUT_NS = 30ULL * NS_PER_SEC_INT;
708 asciichat_error_t timeout_result = set_socket_timeout(socket, HANDSHAKE_TIMEOUT_NS);
709 if (timeout_result != ASCIICHAT_OK) {
710 log_warn("Failed to set handshake timeout for client %u: %s", atomic_load(&client->client_id),
711 asciichat_error_string(timeout_result));
712 // Continue anyway - timeout is a safety feature, not critical
713 }
714
715 int crypto_result = server_crypto_handshake(client);
716 if (crypto_result != 0) {
717 log_error("Crypto handshake failed for client %u: %s", atomic_load(&client->client_id), network_error_string());
718 if (remove_client(server_ctx, atomic_load(&client->client_id)) != 0) {
719 log_error("Failed to remove client after crypto handshake failure");
720 }
721 return -1;
722 }
723
724 // Clear socket timeout after handshake completes successfully
725 // This allows normal operation without timeouts on data transfer
726 asciichat_error_t clear_timeout_result = set_socket_timeout(socket, 0);
727 if (clear_timeout_result != ASCIICHAT_OK) {
728 log_warn("Failed to clear handshake timeout for client %u: %s", atomic_load(&client->client_id),
729 asciichat_error_string(clear_timeout_result));
730 // Continue anyway - we can still communicate even with timeout set
731 }
732
733 log_debug("Crypto handshake completed successfully for client %u", atomic_load(&client->client_id));
734
735 // Create ACIP transport for protocol-agnostic packet sending
736 // The transport wraps the socket with encryption context from the handshake
737 const crypto_context_t *crypto_ctx = crypto_server_get_context(atomic_load(&client->client_id));
738 client->transport = acip_tcp_transport_create(socket, (crypto_context_t *)crypto_ctx);
739 if (!client->transport) {
740 log_error("Failed to create ACIP transport for client %u", atomic_load(&client->client_id));
741 if (remove_client(server_ctx, atomic_load(&client->client_id)) != 0) {
742 log_error("Failed to remove client after transport creation failure");
743 }
744 return -1;
745 }
746 log_debug("Created ACIP transport for client %u with crypto context", atomic_load(&client->client_id));
747
748 // After handshake completes, the client immediately sends PACKET_TYPE_CLIENT_CAPABILITIES
749 // We must read and process this packet BEFORE starting the receive thread to avoid a race condition
750 // where the packet arrives but no thread is listening for it.
751 //
752 // SPECIAL CASE: If client used --no-encrypt, we already received this packet during the handshake
753 // attempt and stored it in pending_packet_*. Use that instead of receiving a new one.
754 packet_envelope_t envelope;
755 bool used_pending_packet = false;
756
757 if (client->pending_packet_payload) {
758 // Client used --no-encrypt mode - use the packet we already received
759 log_info("Client %u using --no-encrypt mode - processing pending packet type %u", atomic_load(&client->client_id),
760 client->pending_packet_type);
761 envelope.type = client->pending_packet_type;
762 envelope.data = client->pending_packet_payload;
763 envelope.len = client->pending_packet_length;
764 envelope.allocated_buffer = client->pending_packet_payload; // Will be freed below
765 envelope.allocated_size = client->pending_packet_length;
766 used_pending_packet = true;
767
768 // Clear pending packet fields
769 client->pending_packet_type = 0;
770 client->pending_packet_payload = NULL;
771 client->pending_packet_length = 0;
772 } else {
773 // Normal encrypted mode - receive capabilities packet
774 log_debug("Waiting for initial capabilities packet from client %u", atomic_load(&client->client_id));
775
776 // Protect crypto context access with client state mutex
777 mutex_lock(&client->client_state_mutex);
778 const crypto_context_t *crypto_ctx = crypto_server_get_context(atomic_load(&client->client_id));
779
780 // Use per-client crypto state to determine enforcement
781 // At this point, handshake is complete, so crypto_initialized=true and handshake is ready
782 bool enforce_encryption = !GET_OPTION(no_encrypt) && client->crypto_initialized &&
783 crypto_handshake_is_ready(&client->crypto_handshake_ctx);
784
785 packet_recv_result_t result = receive_packet_secure(socket, (void *)crypto_ctx, enforce_encryption, &envelope);
786 mutex_unlock(&client->client_state_mutex);
787
788 if (result != PACKET_RECV_SUCCESS) {
789 log_error("Failed to receive initial capabilities packet from client %u: result=%d",
790 atomic_load(&client->client_id), result);
791 if (envelope.allocated_buffer) {
792 buffer_pool_free(NULL, envelope.allocated_buffer, envelope.allocated_size);
793 }
794 if (remove_client(server_ctx, atomic_load(&client->client_id)) != 0) {
795 log_error("Failed to remove client after crypto handshake failure");
796 }
797 return -1;
798 }
799 }
800
801 if (envelope.type != PACKET_TYPE_CLIENT_CAPABILITIES) {
802 log_error("Expected PACKET_TYPE_CLIENT_CAPABILITIES but got packet type %d from client %u", envelope.type,
803 atomic_load(&client->client_id));
804 if (envelope.allocated_buffer) {
805 buffer_pool_free(NULL, envelope.allocated_buffer, envelope.allocated_size);
806 }
807 if (remove_client(server_ctx, atomic_load(&client->client_id)) != 0) {
808 log_error("Failed to remove client after crypto handshake failure");
809 }
810 return -1;
811 }
812
813 // Process the capabilities packet directly
814 log_debug("Processing initial capabilities packet from client %u (from %s)", atomic_load(&client->client_id),
815 used_pending_packet ? "pending packet" : "network");
816 handle_client_capabilities_packet(client, envelope.data, envelope.len);
817
818 // Free the packet data
819 if (envelope.allocated_buffer) {
820 buffer_pool_free(NULL, envelope.allocated_buffer, envelope.allocated_size);
821 }
822 log_debug("Successfully received and processed initial capabilities for client %u",
823 atomic_load(&client->client_id));
824 }
825
826 // Start all client threads in the correct order (unified path for TCP and WebRTC)
827 // This creates: receive thread -> render threads -> send thread
828 // The render threads MUST be created before send thread to avoid the race condition
829 // where send thread reads empty frames before render thread generates the first real frame
830 uint32_t client_id_snapshot = atomic_load(&client->client_id);
831 if (start_client_threads(server_ctx, client, true) != 0) {
832 log_error("Failed to start threads for TCP client %u", client_id_snapshot);
833 // Client is already in hash table - use remove_client for proper cleanup
834 remove_client(server_ctx, client_id_snapshot);
835 return -1;
836 }
837 log_debug("Successfully created render threads for client %u", client_id_snapshot);
838
839 // Register client with session_host (for discovery mode support)
840 if (server_ctx->session_host) {
841 uint32_t session_client_id = session_host_add_client(server_ctx->session_host, socket, client_ip, port);
842 if (session_client_id == 0) {
843 log_warn("Failed to register client %u with session_host", client_id_snapshot);
844 } else {
845 log_debug("Client %u registered with session_host as %u", client_id_snapshot, session_client_id);
846 }
847 }
848
849 // Broadcast server state to ALL clients AFTER the new client is fully set up
850 // This notifies all clients (including the new one) about the updated grid
852
853 return (int)client_id_snapshot;
854
855error_cleanup:
856 // Clean up all partially allocated resources
857 // NOTE: This label is reached when allocation or initialization fails BEFORE
858 // the client is added to the hash table. Don't call remove_client() here.
859 cleanup_client_all_buffers(client);
860 rwlock_wrunlock(&g_client_manager_rwlock);
861 return -1;
862}
863
887int add_webrtc_client(server_context_t *server_ctx, acip_transport_t *transport, const char *client_ip,
888 bool start_threads) {
889 if (!server_ctx || !transport || !client_ip) {
890 SET_ERRNO(ERROR_INVALID_PARAM, "Invalid parameters to add_webrtc_client");
891 return -1;
892 }
893
894 rwlock_wrlock(&g_client_manager_rwlock);
895
896 // Find empty slot - this is the authoritative check
897 int slot = -1;
898 int existing_count = 0;
899 for (int i = 0; i < MAX_CLIENTS; i++) {
900 if (slot == -1 && atomic_load(&g_client_manager.clients[i].client_id) == 0) {
901 slot = i; // Take first available slot
902 }
903 // Count only active clients
904 if (atomic_load(&g_client_manager.clients[i].client_id) != 0 && atomic_load(&g_client_manager.clients[i].active)) {
905 existing_count++;
906 }
907 }
908
909 // Check if we've hit the configured max-clients limit (not the array size)
910 if (existing_count >= GET_OPTION(max_clients)) {
911 rwlock_wrunlock(&g_client_manager_rwlock);
912 SET_ERRNO(ERROR_RESOURCE_EXHAUSTED, "Maximum client limit reached (%d/%d active clients)", existing_count,
913 GET_OPTION(max_clients));
914 log_error("Maximum client limit reached (%d/%d active clients)", existing_count, GET_OPTION(max_clients));
915 return -1;
916 }
917
918 if (slot == -1) {
919 rwlock_wrunlock(&g_client_manager_rwlock);
920 SET_ERRNO(ERROR_RESOURCE_EXHAUSTED, "No available client slots (all %d array slots are in use)", MAX_CLIENTS);
921 log_error("No available client slots (all %d array slots are in use)", MAX_CLIENTS);
922 return -1;
923 }
924
925 // Update client_count to match actual count before adding new client
926 g_client_manager.client_count = existing_count;
927
928 // Initialize client
929 client_info_t *client = &g_client_manager.clients[slot];
930
931 // Free any existing buffers from previous client in this slot
932 if (client->incoming_video_buffer) {
933 video_frame_buffer_destroy(client->incoming_video_buffer);
934 client->incoming_video_buffer = NULL;
935 }
936 if (client->outgoing_video_buffer) {
937 video_frame_buffer_destroy(client->outgoing_video_buffer);
938 client->outgoing_video_buffer = NULL;
939 }
940 if (client->incoming_audio_buffer) {
941 audio_ring_buffer_destroy(client->incoming_audio_buffer);
942 client->incoming_audio_buffer = NULL;
943 }
944 if (client->send_buffer) {
945 SAFE_FREE(client->send_buffer);
946 client->send_buffer = NULL;
947 }
948
949 memset(client, 0, sizeof(client_info_t));
950
951 // Set up WebRTC-specific fields
952 client->socket = INVALID_SOCKET_VALUE; // WebRTC has no traditional socket
953 client->is_tcp_client = false; // WebRTC client - threads managed directly
954 client->transport = transport; // Use provided transport
955 uint32_t new_client_id = atomic_fetch_add(&g_client_manager.next_client_id, 1) + 1;
956 atomic_store(&client->client_id, new_client_id);
957 SAFE_STRNCPY(client->client_ip, client_ip, sizeof(client->client_ip) - 1);
958 client->port = 0; // WebRTC doesn't use port numbers
959 atomic_store(&client->active, true);
960 client->server_ctx = server_ctx; // Store server context for receive thread cleanup
961 log_info("Added new WebRTC client ID=%u from %s (transport=%p, slot=%d)", new_client_id, client_ip, transport, slot);
962 atomic_store(&client->shutting_down, false);
963 atomic_store(&client->last_rendered_grid_sources, 0); // Render thread updates this
964 atomic_store(&client->last_sent_grid_sources, 0); // Send thread updates this
965 log_debug("WebRTC client slot assigned: client_id=%u assigned to slot %d", atomic_load(&client->client_id), slot);
966 client->connected_at = time(NULL);
967
968 // Initialize crypto context for this client
969 memset(&client->crypto_handshake_ctx, 0, sizeof(client->crypto_handshake_ctx));
970 log_debug("[ADD_WEBRTC_CLIENT] Setting crypto_initialized=false (will be set to true after handshake completes)");
971 // Do NOT set crypto_initialized = true here! The handshake must complete first.
972 // For WebSocket clients: websocket_client_handler will perform the handshake
973 // For WebRTC clients: ACDS signaling may have already done crypto (future enhancement)
974 client->crypto_initialized = false;
975
976 // Initialize pending packet storage (unused for WebRTC, but keep for consistency)
977 client->pending_packet_type = 0;
978 client->pending_packet_payload = NULL;
979 client->pending_packet_length = 0;
980
981 safe_snprintf(client->display_name, sizeof(client->display_name), "WebRTC%u", atomic_load(&client->client_id));
982
983 // Create individual video buffer for this client using modern double-buffering
984 client->incoming_video_buffer = video_frame_buffer_create(atomic_load(&client->client_id));
985 if (!client->incoming_video_buffer) {
986 SET_ERRNO(ERROR_MEMORY, "Failed to create video buffer for WebRTC client %u", atomic_load(&client->client_id));
987 log_error("Failed to create video buffer for WebRTC client %u", atomic_load(&client->client_id));
988 goto error_cleanup_webrtc;
989 }
990
991 // Create individual audio buffer for this client
992 client->incoming_audio_buffer = audio_ring_buffer_create_for_capture();
993 if (!client->incoming_audio_buffer) {
994 SET_ERRNO(ERROR_MEMORY, "Failed to create audio buffer for WebRTC client %u", atomic_load(&client->client_id));
995 log_error("Failed to create audio buffer for WebRTC client %u", atomic_load(&client->client_id));
996 goto error_cleanup_webrtc;
997 }
998
999 // Create packet queues for outgoing data
1000 client->audio_queue = packet_queue_create_with_pools(500, 1000, false);
1001 if (!client->audio_queue) {
1002 LOG_ERRNO_IF_SET("Failed to create audio queue for WebRTC client");
1003 goto error_cleanup_webrtc;
1004 }
1005
1006 // Create packet queue for async dispatch: received packets waiting to be processed
1007 // Capacity of 100 packets allows buffering of ~10-50 frames (depending on fragmentation)
1008 client->received_packet_queue = packet_queue_create_with_pools(100, 500, false);
1009 if (!client->received_packet_queue) {
1010 LOG_ERRNO_IF_SET("Failed to create received packet queue for client");
1011 goto error_cleanup_webrtc;
1012 }
1013
1014 // Create outgoing video buffer for ASCII frames (double buffered, no dropping)
1015 client->outgoing_video_buffer = video_frame_buffer_create(atomic_load(&client->client_id));
1016 if (!client->outgoing_video_buffer) {
1017 LOG_ERRNO_IF_SET("Failed to create outgoing video buffer for WebRTC client");
1018 goto error_cleanup_webrtc;
1019 }
1020
1021 // Pre-allocate send buffer to avoid malloc/free in send thread (prevents deadlocks)
1022 client->send_buffer_size = MAX_FRAME_BUFFER_SIZE; // 2MB should handle largest frames
1023 client->send_buffer = SAFE_MALLOC_ALIGNED(client->send_buffer_size, 64, void *);
1024 if (!client->send_buffer) {
1025 log_error("Failed to allocate send buffer for WebRTC client %u", atomic_load(&client->client_id));
1026 goto error_cleanup_webrtc;
1027 }
1028
1029 g_client_manager.client_count = existing_count + 1; // We just added a client
1030 log_debug("Client count updated: now %d clients (added WebRTC client_id=%u to slot %d)",
1031 g_client_manager.client_count, atomic_load(&client->client_id), slot);
1032
1033 // Add client to uthash table for O(1) lookup
1034 uint32_t cid = atomic_load(&client->client_id);
1035 HASH_ADD_INT(g_client_manager.clients_by_id, client_id, client);
1036 log_debug("Added WebRTC client %u to uthash table", cid);
1037
1038 // Register this client's audio buffer with the mixer
1039 if (g_audio_mixer && client->incoming_audio_buffer) {
1040 if (mixer_add_source(g_audio_mixer, atomic_load(&client->client_id), client->incoming_audio_buffer) < 0) {
1041 log_warn("Failed to add WebRTC client %u to audio mixer", atomic_load(&client->client_id));
1042 } else {
1043#ifdef DEBUG_AUDIO
1044 log_debug("Added WebRTC client %u to audio mixer", atomic_load(&client->client_id));
1045#endif
1046 }
1047 }
1048
1049 // Initialize mutexes BEFORE creating any threads to prevent race conditions
1050 if (mutex_init(&client->client_state_mutex) != 0) {
1051 log_error("Failed to initialize client state mutex for WebRTC client %u", atomic_load(&client->client_id));
1052 // Client is already in hash table - use remove_client for proper cleanup
1053 rwlock_wrunlock(&g_client_manager_rwlock);
1054 remove_client(server_ctx, atomic_load(&client->client_id));
1055 return -1;
1056 }
1057
1058 // Initialize send mutex to protect concurrent socket writes
1059 if (mutex_init(&client->send_mutex) != 0) {
1060 log_error("Failed to initialize send mutex for WebRTC client %u", atomic_load(&client->client_id));
1061 // Client is already in hash table - use remove_client for proper cleanup
1062 rwlock_wrunlock(&g_client_manager_rwlock);
1063 remove_client(server_ctx, atomic_load(&client->client_id));
1064 return -1;
1065 }
1066
1067 rwlock_wrunlock(&g_client_manager_rwlock);
1068
1069 // For WebRTC clients, the capabilities packet will be received by the receive thread
1070 // when it starts. Unlike TCP clients where we handle it synchronously in add_client(),
1071 // WebRTC uses the transport abstraction which handles packet reception automatically.
1072 log_debug("WebRTC client %u initialized - receive thread will process capabilities", atomic_load(&client->client_id));
1073
1074 uint32_t client_id_snapshot = atomic_load(&client->client_id);
1075
1076 // Conditionally start threads based on caller preference
1077 // WebSocket handler passes start_threads=false to defer thread startup until after crypto init
1078 // This ensures receive thread doesn't try to process packets before crypto context is ready
1079 if (start_threads) {
1080 log_debug("[ADD_WEBRTC_CLIENT] Starting client threads (receive, render) for client %u...", client_id_snapshot);
1081 if (start_client_threads(server_ctx, client, false) != 0) {
1082 log_error("Failed to start threads for WebRTC client %u", client_id_snapshot);
1083 return -1;
1084 }
1085 log_debug("Created receive thread for WebRTC client %u", client_id_snapshot);
1086 log_debug("[ADD_WEBRTC_CLIENT] Receive thread started - thread will now be processing packets", client_id_snapshot);
1087 } else {
1088 log_debug("[ADD_WEBRTC_CLIENT] Deferring thread startup for client %u (caller will start after crypto init)",
1089 client_id_snapshot);
1090 }
1091
1092 // Send initial server state to the new client
1093 if (send_server_state_to_client(client) != 0) {
1094 log_warn("Failed to send initial server state to WebRTC client %u", client_id_snapshot);
1095 } else {
1096#ifdef DEBUG_NETWORK
1097 log_info("Sent initial server state to WebRTC client %u", client_id_snapshot);
1098#endif
1099 }
1100
1101 // Send initial server state via ACIP transport
1102 rwlock_rdlock(&g_client_manager_rwlock);
1103 uint32_t connected_count = g_client_manager.client_count;
1104 rwlock_rdunlock(&g_client_manager_rwlock);
1105
1106 server_state_packet_t state;
1107 state.connected_client_count = connected_count;
1108 state.active_client_count = 0; // Will be updated by broadcast thread
1109 memset(state.reserved, 0, sizeof(state.reserved));
1110
1111 // Convert to network byte order
1112 server_state_packet_t net_state;
1113 net_state.connected_client_count = HOST_TO_NET_U32(state.connected_client_count);
1114 net_state.active_client_count = HOST_TO_NET_U32(state.active_client_count);
1115 memset(net_state.reserved, 0, sizeof(net_state.reserved));
1116
1117 asciichat_error_t packet_send_result = acip_send_server_state(client->transport, &net_state);
1118 if (packet_send_result != ASCIICHAT_OK) {
1119 log_warn("Failed to send initial server state to WebRTC client %u: %s", client_id_snapshot,
1120 asciichat_error_string(packet_send_result));
1121 } else {
1122 log_debug("Sent initial server state to WebRTC client %u: %u connected clients", client_id_snapshot,
1123 state.connected_client_count);
1124 }
1125
1126 // Register client with session_host (for discovery mode support)
1127 // WebRTC clients use INVALID_SOCKET_VALUE since they don't have a TCP socket
1128 if (server_ctx->session_host) {
1129 uint32_t session_client_id = session_host_add_client(server_ctx->session_host, INVALID_SOCKET_VALUE, client_ip, 0);
1130 if (session_client_id == 0) {
1131 log_warn("Failed to register WebRTC client %u with session_host", client_id_snapshot);
1132 } else {
1133 log_debug("WebRTC client %u registered with session_host as %u", client_id_snapshot, session_client_id);
1134 }
1135 }
1136
1137 // Broadcast server state to ALL clients AFTER the new client is fully set up
1138 // This notifies all clients (including the new one) about the updated grid
1140
1141 return (int)client_id_snapshot;
1142
1143error_cleanup_webrtc:
1144 // Clean up all partially allocated resources for WebRTC client
1145 // NOTE: This label is reached when allocation or initialization fails BEFORE
1146 // the client is added to the hash table. Don't call remove_client() here.
1147 cleanup_client_all_buffers(client);
1148 rwlock_wrunlock(&g_client_manager_rwlock);
1149 return -1;
1150}
1151
1152int remove_client(server_context_t *server_ctx, uint32_t client_id) {
1153 if (!server_ctx) {
1154 SET_ERRNO(ERROR_INVALID_PARAM, "Cannot remove client %u: NULL server_ctx", client_id);
1155 return -1;
1156 }
1157
1158 // Phase 1: Mark client inactive and prepare for cleanup while holding write lock
1159 client_info_t *target_client = NULL;
1160 char display_name_copy[MAX_DISPLAY_NAME_LEN];
1161 socket_t client_socket = INVALID_SOCKET_VALUE; // Save socket for thread cleanup
1162
1163 log_debug("SOCKET_DEBUG: Attempting to remove client %d", client_id);
1164 rwlock_wrlock(&g_client_manager_rwlock);
1165
1166 for (int i = 0; i < MAX_CLIENTS; i++) {
1167 client_info_t *client = &g_client_manager.clients[i];
1168 uint32_t cid = atomic_load(&client->client_id);
1169 if (cid == client_id && cid != 0) {
1170 // Check if already being removed by another thread
1171 // This prevents double-free and use-after-free crashes during concurrent cleanup
1172 if (atomic_load(&client->shutting_down)) {
1173 rwlock_wrunlock(&g_client_manager_rwlock);
1174 log_debug("Client %u already being removed by another thread, skipping", client_id);
1175 return 0; // Return success - removal is in progress
1176 }
1177 // Mark as shutting down and inactive immediately to stop new operations
1178 log_debug("Setting active=false in remove_client (client_id=%d, socket=%d)", client_id, client->socket);
1179 log_info("Removing client %d (socket=%d) - marking inactive and clearing video flags", client_id, client->socket);
1180 atomic_store(&client->shutting_down, true);
1181 atomic_store(&client->active, false);
1182 atomic_store(&client->is_sending_video, false);
1183 atomic_store(&client->is_sending_audio, false);
1184 target_client = client;
1185
1186 // Store display name before clearing
1187 SAFE_STRNCPY(display_name_copy, client->display_name, MAX_DISPLAY_NAME_LEN - 1);
1188
1189 // Save socket for tcp_server_stop_client_threads() before closing
1190 mutex_lock(&client->client_state_mutex);
1191 client_socket = client->socket; // Save socket for thread cleanup
1192 if (client->socket != INVALID_SOCKET_VALUE) {
1193 log_debug("SOCKET_DEBUG: Client %d shutting down socket %d", client->client_id, client->socket);
1194 // Shutdown both send and receive operations to unblock any pending I/O
1195 socket_shutdown(client->socket, 2); // 2 = SHUT_RDWR on POSIX, SD_BOTH on Windows
1196 // Don't close yet - tcp_server needs socket as lookup key
1197 }
1198 mutex_unlock(&client->client_state_mutex);
1199
1200 // Shutdown packet queues to unblock send thread
1201 if (client->audio_queue) {
1202 packet_queue_stop(client->audio_queue);
1203 }
1204 // Video now uses double buffer, no queue to shutdown
1205
1206 break;
1207 }
1208 }
1209
1210 // If client not found, unlock and return
1211 if (!target_client) {
1212 rwlock_wrunlock(&g_client_manager_rwlock);
1213 log_warn("Cannot remove client %u: not found", client_id);
1214 return -1;
1215 }
1216
1217 // Unregister client from session_host (for discovery mode support)
1218 // NOTE: Client may not be registered if crypto handshake failed before session_host registration
1219 if (server_ctx->session_host) {
1220 asciichat_error_t session_result = session_host_remove_client(server_ctx->session_host, client_id);
1221 if (session_result != ASCIICHAT_OK) {
1222 // ERROR_NOT_FOUND (91) is expected if client failed crypto before being registered with session_host
1223 if (session_result == ERROR_NOT_FOUND) {
1224 log_debug("Client %u not found in session_host (likely failed crypto before registration)", client_id);
1225 } else {
1226 log_warn("Failed to unregister client %u from session_host: %s", client_id,
1227 asciichat_error_string(session_result));
1228 }
1229 } else {
1230 log_debug("Client %u unregistered from session_host", client_id);
1231 }
1232 }
1233
1234 // Release write lock before joining threads.
1235 // This prevents deadlock with render threads that need read locks.
1236 rwlock_wrunlock(&g_client_manager_rwlock);
1237
1238 // Phase 2: Stop all client threads
1239 // For TCP clients: use tcp_server thread pool management
1240 // For WebRTC clients: manually join threads (no socket-based thread pool)
1241 // Use is_tcp_client flag, not socket value - socket may already be INVALID_SOCKET_VALUE
1242 // even for TCP clients if it was closed earlier during cleanup.
1243 log_debug("Stopping all threads for client %u (socket %d, is_tcp=%d)", client_id, client_socket,
1244 target_client ? target_client->is_tcp_client : -1);
1245
1246 if (target_client && target_client->is_tcp_client) {
1247 // TCP client: use tcp_server thread pool
1248 // This joins threads in stop_id order: receive(1), render(2), send(3)
1249 // Use saved client_socket for lookup (tcp_server needs original socket as key)
1250 if (client_socket != INVALID_SOCKET_VALUE) {
1251 asciichat_error_t stop_result = tcp_server_stop_client_threads(server_ctx->tcp_server, client_socket);
1252 if (stop_result != ASCIICHAT_OK) {
1253 log_warn("Failed to stop threads for TCP client %u: error %d", client_id, stop_result);
1254 // Continue with cleanup even if thread stopping failed
1255 }
1256 } else {
1257 log_debug("TCP client %u socket already closed, threads should have already exited", client_id);
1258 }
1259 } else if (target_client) {
1260 // WebRTC client: manually join threads
1261 log_debug("Stopping WebRTC client %u threads (receive and send)", client_id);
1262
1263 // Join receive thread (but skip if called from the receive thread itself to avoid deadlock)
1264 thread_id_t current_thread_id = asciichat_thread_self();
1265 if (asciichat_thread_equal(current_thread_id, target_client->receive_thread_id)) {
1266 log_debug("remove_client() called from receive thread for client %u, skipping self-join", client_id);
1267 } else {
1268 void *recv_result = NULL;
1269 asciichat_error_t recv_join_result = asciichat_thread_join(&target_client->receive_thread, &recv_result);
1270 if (recv_join_result != ASCIICHAT_OK) {
1271 log_warn("Failed to join receive thread for WebRTC client %u: error %d", client_id, recv_join_result);
1272 } else {
1273 log_debug("Joined receive thread for WebRTC client %u", client_id);
1274 }
1275 }
1276
1277 // Join send thread
1278 void *send_result = NULL;
1279 asciichat_error_t send_join_result = asciichat_thread_join(&target_client->send_thread, &send_result);
1280 if (send_join_result != ASCIICHAT_OK) {
1281 log_warn("Failed to join send thread for WebRTC client %u: error %d", client_id, send_join_result);
1282 } else {
1283 log_debug("Joined send thread for WebRTC client %u", client_id);
1284 }
1285 // Note: Render threads still need to be stopped - they're created the same way for both TCP and WebRTC
1286 // For now, render threads are expected to exit when they check g_server_should_exit and client->active
1287 }
1288
1289 // Destroy ACIP transport before closing socket
1290 // For WebSocket clients: LWS_CALLBACK_CLOSED already closed and destroyed the transport
1291 // Trying to destroy it again causes heap-use-after-free since LWS callbacks might still fire
1292 // For TCP clients: transport is ours to clean up
1293 if (target_client && target_client->transport && target_client->is_tcp_client) {
1294 acip_transport_destroy(target_client->transport);
1295 target_client->transport = NULL;
1296 log_debug("Destroyed ACIP transport for TCP client %u", client_id);
1297 } else if (target_client && target_client->transport && !target_client->is_tcp_client) {
1298 // WebSocket client - just NULL it out, LWS_CALLBACK_CLOSED already destroyed it
1299 target_client->transport = NULL;
1300 log_debug("Skipped transport destruction for WebSocket client %u (LWS already destroyed)", client_id);
1301 }
1302
1303 // Now safe to close the socket (threads are stopped)
1304 if (client_socket != INVALID_SOCKET_VALUE) {
1305 log_debug("SOCKET_DEBUG: Closing socket %d for client %u after thread cleanup", client_socket, client_id);
1306 socket_close(client_socket);
1307 }
1308
1309 // Phase 3: Clean up resources with write lock
1310 rwlock_wrlock(&g_client_manager_rwlock);
1311
1312 // Re-validate target_client pointer after reacquiring lock.
1313 // Another thread might have invalidated the pointer while we had the lock released.
1314 if (target_client) {
1315 // Verify client_id still matches and client is still in shutting_down state
1316 uint32_t current_id = atomic_load(&target_client->client_id);
1317 bool still_shutting_down = atomic_load(&target_client->shutting_down);
1318 if (current_id != client_id || !still_shutting_down) {
1319 log_warn("Client %u pointer invalidated during thread cleanup (id=%u, shutting_down=%d)", client_id, current_id,
1320 still_shutting_down);
1321 rwlock_wrunlock(&g_client_manager_rwlock);
1322 return 0; // Another thread completed the cleanup
1323 }
1324 }
1325
1326 // Mark socket as closed in client structure
1327 if (target_client && target_client->socket != INVALID_SOCKET_VALUE) {
1328 mutex_lock(&target_client->client_state_mutex);
1329 target_client->socket = INVALID_SOCKET_VALUE;
1330 mutex_unlock(&target_client->client_state_mutex);
1331 log_debug("SOCKET_DEBUG: Client %u socket set to INVALID", client_id);
1332 }
1333
1334 // Use the dedicated cleanup function to ensure all resources are freed
1335 cleanup_client_all_buffers(target_client);
1336
1337 // Remove from audio mixer
1338 if (g_audio_mixer) {
1340#ifdef DEBUG_AUDIO
1341 log_debug("Removed client %u from audio mixer", client_id);
1342#endif
1343 }
1344
1345 // Remove from uthash table
1346 // Verify client is actually in the hash table before deleting.
1347 // Another thread might have already removed it.
1348 if (target_client) {
1349 client_info_t *hash_entry = NULL;
1350 HASH_FIND(hh, g_client_manager.clients_by_id, &client_id, sizeof(client_id), hash_entry);
1351 if (hash_entry == target_client) {
1352 HASH_DELETE(hh, g_client_manager.clients_by_id, target_client);
1353 log_debug("Removed client %u from uthash table", client_id);
1354 } else {
1355 log_warn("Client %u already removed from hash table by another thread (found=%p, expected=%p)", client_id,
1356 (void *)hash_entry, (void *)target_client);
1357 }
1358 } else {
1359 log_warn("Failed to remove client %u from hash table (client not found)", client_id);
1360 }
1361
1362 // Cleanup crypto context for this client
1363 if (target_client->crypto_initialized) {
1364 crypto_handshake_destroy(&target_client->crypto_handshake_ctx);
1365 target_client->crypto_initialized = false;
1366 log_debug("Crypto context cleaned up for client %u", client_id);
1367 }
1368
1369 // Verify all threads have actually exited before resetting client_id.
1370 // Threads that are still starting (at RtlUserThreadStart) haven't checked client_id yet.
1371 // We must ensure threads are fully joined before zeroing the client struct.
1372 // Use exponential backoff for thread termination verification
1373 int retry_count = 0;
1374 const int max_retries = 5;
1375 while (retry_count < max_retries && (asciichat_thread_is_initialized(&target_client->send_thread) ||
1376 asciichat_thread_is_initialized(&target_client->receive_thread) ||
1377 asciichat_thread_is_initialized(&target_client->video_render_thread) ||
1378 asciichat_thread_is_initialized(&target_client->audio_render_thread))) {
1379 // Exponential backoff: 10ms, 20ms, 40ms, 80ms, 160ms
1380 uint32_t delay_ms = 10 * (1 << retry_count);
1381 log_warn("Client %u: Some threads still appear initialized (attempt %d/%d), waiting %ums", client_id,
1382 retry_count + 1, max_retries, delay_ms);
1383 platform_sleep_us(delay_ms * 1000);
1384 retry_count++;
1385 }
1386
1387 if (retry_count == max_retries) {
1388 log_error("Client %u: Threads did not terminate after %d retries, proceeding with cleanup anyway", client_id,
1389 max_retries);
1390 }
1391
1392 // Only reset client_id to 0 AFTER confirming threads are joined
1393 // This prevents threads that are starting from accessing a zeroed client struct
1394 // Reset client_id to 0 before destroying mutexes to prevent race conditions.
1395 // This ensures worker threads can detect shutdown and exit before the mutex is destroyed.
1396 // If we destroy the mutex first, threads might try to access a destroyed mutex.
1397 atomic_store(&target_client->client_id, 0);
1398
1399 // Wait for threads to observe the client_id reset
1400 // Use sufficient delay for memory visibility across all CPU cores
1401 platform_sleep_us(5 * US_PER_MS_INT); // 5ms delay for memory barrier propagation
1402
1403 // Destroy mutexes
1404 // IMPORTANT: Always destroy these even if threads didn't join properly
1405 // to prevent issues when the slot is reused
1406 mutex_destroy(&target_client->client_state_mutex);
1407 mutex_destroy(&target_client->send_mutex);
1408
1409 // Clear client structure
1410 // NOTE: After memset, the mutex handles are zeroed but the OS resources
1411 // have been released by the destroy calls above
1412 memset(target_client, 0, sizeof(client_info_t));
1413
1414 // Recalculate client count using atomic reads
1415 int remaining_count = 0;
1416 for (int j = 0; j < MAX_CLIENTS; j++) {
1417 if (atomic_load(&g_client_manager.clients[j].client_id) != 0) {
1418 remaining_count++;
1419 }
1420 }
1421 g_client_manager.client_count = remaining_count;
1422
1423 log_debug("Client removed: client_id=%u (%s) removed, remaining clients: %d", client_id, display_name_copy,
1424 remaining_count);
1425
1426 rwlock_wrunlock(&g_client_manager_rwlock);
1427
1428 // Broadcast updated state
1430
1431 return 0;
1432}
1433
1434/* ============================================================================
1435 * Client Thread Functions
1436 * ============================================================================
1437 */
1438
1439// Forward declaration for ACIP server callbacks (defined later in file)
1440static const acip_server_callbacks_t g_acip_server_callbacks;
1441
1449void *client_dispatch_thread(void *arg) {
1450 client_info_t *client = (client_info_t *)arg;
1451
1452 if (!client) {
1453 log_error("Invalid client info in dispatch thread (NULL pointer)");
1454 return NULL;
1455 }
1456
1457 uint32_t client_id = atomic_load(&client->client_id);
1458 log_info("DISPATCH_THREAD: Started for client %u", client_id);
1459
1460 uint64_t dispatch_loop_count = 0;
1461 uint64_t last_dequeue_attempt = time_get_ns();
1462
1463 while (!atomic_load(&g_server_should_exit) && atomic_load(&client->dispatch_thread_running)) {
1464 dispatch_loop_count++;
1465 // Try to dequeue next packet (non-blocking)
1466 // Use try_dequeue to avoid blocking - allows checking exit flag frequently
1467 uint64_t dequeue_start = time_get_ns();
1468 queued_packet_t *queued_pkt = packet_queue_try_dequeue(client->received_packet_queue);
1469 uint64_t dequeue_end = time_get_ns();
1470
1471 if (!queued_pkt) {
1472 // Queue was empty, sleep briefly to avoid busy-waiting
1473 log_dev_every(5 * US_PER_MS_INT, "🔄 DISPATCH_LOOP[%llu]: Queue empty after %.1fμs, sleeping 10ms",
1474 (unsigned long long)dispatch_loop_count, (dequeue_end - dequeue_start) / 1000.0);
1475 usleep(10 * US_PER_MS_INT); // 10ms sleep
1476 last_dequeue_attempt = dequeue_end;
1477 continue;
1478 }
1479
1480 // Frame received! Log it immediately
1481 log_info("🚀 DISPATCH_LOOP[%llu]: 📦 DEQUEUED %zu byte packet (dequeue took %.1fμs) for client %u",
1482 (unsigned long long)dispatch_loop_count, queued_pkt->data_len,
1483 (dequeue_end - dequeue_start) / 1000.0, client_id);
1484
1485 // Process the dequeued packet
1486 // The queued packet contains the complete ACIP packet (header + payload) from websocket_recv()
1487 const packet_header_t *header = (const packet_header_t *)queued_pkt->data;
1488 size_t total_len = queued_pkt->data_len;
1489 uint8_t *payload = (uint8_t *)header + sizeof(packet_header_t);
1490 size_t payload_len = 0;
1491
1492 log_info("DISPATCH_THREAD: Processing %zu byte packet for client %u", total_len, client_id);
1493
1494 if (total_len >= sizeof(packet_header_t)) {
1495 packet_type_t packet_type = (packet_type_t)NET_TO_HOST_U16(header->type);
1496 payload_len = NET_TO_HOST_U32(header->length);
1497
1498 log_info("🎯 DISPATCH_THREAD: Packet type=%d, payload_len=%u (will dispatch now)", packet_type, payload_len);
1499
1500 // Handle PACKET_TYPE_ENCRYPTED from WebSocket clients that encrypt at application layer
1501 // This mirrors the decryption logic in acip_server_receive_and_dispatch()
1502 if (packet_type == PACKET_TYPE_ENCRYPTED && client->transport && client->transport->crypto_ctx) {
1503 log_info("DISPATCH_THREAD: Decrypting PACKET_TYPE_ENCRYPTED for client %u", client_id);
1504
1505 uint8_t *ciphertext = payload;
1506 size_t ciphertext_len = payload_len;
1507
1508 // Decrypt to get inner plaintext packet (header + payload)
1509 size_t plaintext_size = ciphertext_len + 1024;
1510 uint8_t *plaintext = SAFE_MALLOC(plaintext_size, uint8_t *);
1511 if (!plaintext) {
1512 log_error("DISPATCH_THREAD: Failed to allocate plaintext buffer for decryption");
1513 packet_queue_free_packet(queued_pkt);
1514 continue;
1515 }
1516
1517 size_t plaintext_len;
1518 crypto_result_t crypto_result = crypto_decrypt(client->transport->crypto_ctx, ciphertext, ciphertext_len,
1519 plaintext, plaintext_size, &plaintext_len);
1520
1521 if (crypto_result != CRYPTO_OK) {
1522 log_error("DISPATCH_THREAD: Failed to decrypt packet: %s", crypto_result_to_string(crypto_result));
1523 SAFE_FREE(plaintext);
1524 packet_queue_free_packet(queued_pkt);
1525 continue;
1526 }
1527
1528 if (plaintext_len < sizeof(packet_header_t)) {
1529 log_error("DISPATCH_THREAD: Decrypted packet too small: %zu < %zu", plaintext_len, sizeof(packet_header_t));
1530 SAFE_FREE(plaintext);
1531 packet_queue_free_packet(queued_pkt);
1532 continue;
1533 }
1534
1535 // Parse the inner (decrypted) header
1536 const packet_header_t *inner_header = (const packet_header_t *)plaintext;
1537 packet_type = (packet_type_t)NET_TO_HOST_U16(inner_header->type);
1538 payload_len = NET_TO_HOST_U32(inner_header->length);
1539 payload = plaintext + sizeof(packet_header_t);
1540
1541 log_info("DISPATCH_THREAD: Decrypted inner packet type=%d, payload_len=%u", packet_type, payload_len);
1542
1543 // Dispatch the decrypted packet
1544 if (client->transport) {
1545 asciichat_error_t dispatch_result = acip_handle_server_packet(client->transport, packet_type, payload,
1546 payload_len, client, &g_acip_server_callbacks);
1547
1548 if (dispatch_result != ASCIICHAT_OK) {
1549 log_error("DISPATCH_THREAD: Handler failed for decrypted packet type=%d: %s", packet_type,
1550 asciichat_error_string(dispatch_result));
1551 }
1552 } else {
1553 log_error("DISPATCH_THREAD: Cannot dispatch decrypted packet - transport is NULL for client %u", client_id);
1554 }
1555
1556 // Free the decrypted buffer
1557 SAFE_FREE(plaintext);
1558 } else {
1559 // Not encrypted or no crypto context - dispatch as-is
1560 if (client->transport) {
1561 asciichat_error_t dispatch_result = acip_handle_server_packet(client->transport, packet_type, payload,
1562 payload_len, client, &g_acip_server_callbacks);
1563
1564 if (dispatch_result != ASCIICHAT_OK) {
1565 log_error("DISPATCH_THREAD: Handler failed for packet type=%d: %s", packet_type,
1566 asciichat_error_string(dispatch_result));
1567 }
1568 } else {
1569 log_error("DISPATCH_THREAD: Cannot dispatch packet - transport is NULL for client %u", client_id);
1570 }
1571 }
1572 }
1573
1574 // Free the queued packet
1575 packet_queue_free_packet(queued_pkt);
1576 }
1577
1578 log_info("DISPATCH_THREAD: Exiting for client %u", client_id);
1579 return NULL;
1580}
1581
1582void *client_receive_thread(void *arg) {
1583 // Log thread startup
1584 log_debug("RECV_THREAD: Thread function entered, arg=%p", arg);
1585
1586 client_info_t *client = (client_info_t *)arg;
1587
1588 // Validate client pointer immediately before any access.
1589 // This prevents crashes if remove_client() has zeroed the client struct
1590 // while the thread was still starting at RtlUserThreadStart.
1591 if (!client) {
1592 log_error("Invalid client info in receive thread (NULL pointer)");
1593 return NULL;
1594 }
1595
1596 // Save this thread's ID so remove_client() can detect self-joins
1597 client->receive_thread_id = asciichat_thread_self();
1598
1599 log_debug("RECV_THREAD_DEBUG: Thread started, client=%p, client_id=%u, is_tcp=%d", (void *)client,
1600 atomic_load(&client->client_id), client->is_tcp_client);
1601
1602 if (atomic_load(&client->protocol_disconnect_requested)) {
1603 log_debug("Receive thread for client %u exiting before start (protocol disconnect requested)",
1604 atomic_load(&client->client_id));
1605 return NULL;
1606 }
1607
1608 // Check if client_id is 0 (client struct has been zeroed by remove_client)
1609 // This must be checked BEFORE accessing any client fields
1610 if (atomic_load(&client->client_id) == 0) {
1611 log_debug("Receive thread: client_id is 0, client struct may have been zeroed, exiting");
1612 return NULL;
1613 }
1614
1615 // Additional validation: check socket is valid
1616 // For TCP clients, validate socket. WebRTC clients use DataChannel (no socket)
1617 if (client->is_tcp_client && client->socket == INVALID_SOCKET_VALUE) {
1618 log_error("Invalid client socket in receive thread");
1619 return NULL;
1620 }
1621
1622 // Enable thread cancellation for clean shutdown
1623 // Thread cancellation not available in platform abstraction
1624 // Threads should exit when g_server_should_exit is set
1625
1626 log_debug("Started receive thread for client %u (%s)", atomic_load(&client->client_id), client->display_name);
1627
1628 // Main receive loop - processes packets from transport
1629 // For TCP clients: receives from socket
1630 // For WebRTC clients: receives from transport ringbuffer (via ACDS signaling)
1631
1632 log_info("RECV_THREAD_LOOP_START: client_id=%u, is_tcp=%d, transport=%p, active=%d", atomic_load(&client->client_id),
1633 client->is_tcp_client, (void *)client->transport, atomic_load(&client->active));
1634
1635 while (!atomic_load(&g_server_should_exit) && atomic_load(&client->active)) {
1636 // For TCP clients, check socket validity
1637 // For WebRTC clients, continue even if no socket (transport handles everything)
1638 if (client->is_tcp_client && client->socket == INVALID_SOCKET_VALUE) {
1639 log_debug("TCP client %u has invalid socket, exiting receive thread", atomic_load(&client->client_id));
1640 break;
1641 }
1642
1643 // Check client_id is still valid before accessing transport.
1644 // This prevents accessing freed memory if remove_client() has zeroed the client struct.
1645 if (atomic_load(&client->client_id) == 0) {
1646 log_debug("Client client_id reset, exiting receive thread");
1647 break;
1648 }
1649
1650 // Receive packet (without dispatching) - decouple from dispatch for async processing
1651 // For WebRTC clients with async dispatch, we queue packets instead of processing immediately
1652 // This prevents backpressure on the network socket
1653
1654 if (client->is_tcp_client) {
1655 // TCP clients: use original synchronous dispatch
1656 asciichat_error_t acip_result =
1657 acip_server_receive_and_dispatch(client->transport, client, &g_acip_server_callbacks);
1658
1659 // Check if shutdown was requested during the network call
1660 if (atomic_load(&g_server_should_exit)) {
1661 log_debug("RECV_EXIT: Server shutdown requested, breaking loop");
1662 break;
1663 }
1664
1665 // Handle receive errors
1666 if (acip_result != ASCIICHAT_OK) {
1667 asciichat_error_context_t err_ctx;
1668 if (HAS_ERRNO(&err_ctx)) {
1669 log_error("🔴 ACIP error for client %u: code=%u msg=%s", client->client_id, err_ctx.code,
1670 err_ctx.context_message);
1671 if (err_ctx.code == ERROR_NETWORK) {
1672 log_debug("Client %u disconnected (network error): %s", client->client_id, err_ctx.context_message);
1673 break;
1674 } else if (err_ctx.code == ERROR_CRYPTO) {
1675 log_error_client(
1676 client, "SECURITY VIOLATION: Unencrypted packet when encryption required - terminating connection");
1677 atomic_store(&g_server_should_exit, true);
1678 break;
1679 }
1680 }
1681 log_warn("ACIP error for TCP client %u: %s (disconnecting)", client->client_id,
1682 asciichat_error_string(acip_result));
1683 break;
1684 }
1685 } else {
1686 // WebRTC/WebSocket clients: async dispatch - receive packet and queue for async processing
1687 void *packet_data = NULL;
1688 void *allocated_buffer = NULL;
1689 size_t packet_len = 0;
1690
1691 asciichat_error_t recv_result =
1692 client->transport->methods->recv(client->transport, &packet_data, &packet_len, &allocated_buffer);
1693
1694 if (recv_result != ASCIICHAT_OK) {
1695 asciichat_error_context_t err_ctx;
1696 if (HAS_ERRNO(&err_ctx)) {
1697 // Check for reassembly timeout (fragments arriving slowly)
1698 // This is NOT a connection failure - safe to retry
1699 if ((err_ctx.code == ERROR_NETWORK) && err_ctx.context_message &&
1700 strstr(err_ctx.context_message, "reassembly timeout")) {
1701 // Fragments are arriving slowly - this is normal, retry without disconnecting
1702 log_dev_every(100000, "Client %u: fragment reassembly timeout, retrying in 10ms", client->client_id);
1703 platform_sleep_ms(10); // Sleep 10ms to allow fragments to arrive
1704 continue; // Retry without disconnecting
1705 }
1706
1707 if (err_ctx.code == ERROR_NETWORK) {
1708 log_debug("Client %u disconnected (network error): %s", client->client_id, err_ctx.context_message);
1709 break;
1710 }
1711 }
1712 log_warn("Receive failed for WebRTC client %u: %s (disconnecting)", client->client_id,
1713 asciichat_error_string(recv_result));
1714 break;
1715 }
1716
1717 // Queue the received packet for async dispatch
1718 // This prevents the receive thread from blocking on dispatch
1719 log_info("RECV_THREAD: Queuing %zu byte packet for async dispatch (client %u)", packet_len, client->client_id);
1720
1721 // Extract packet type from the header to preserve it when queueing
1722 packet_type_t pkt_type = PACKET_TYPE_PROTOCOL_VERSION;
1723 if (packet_len >= sizeof(packet_header_t)) {
1724 const packet_header_t *pkt_header = (const packet_header_t *)allocated_buffer;
1725 pkt_type = (packet_type_t)NET_TO_HOST_U16(pkt_header->type);
1726 }
1727
1728 // Build a complete packet to queue (header + payload)
1729 // The entire buffer (allocated_buffer) contains the full packet
1730 // copy_data=false because we want to transfer ownership to the queue
1731 int enqueue_result = packet_queue_enqueue(client->received_packet_queue, pkt_type, allocated_buffer, packet_len,
1732 atomic_load(&client->client_id), false);
1733
1734 if (enqueue_result < 0) {
1735 log_warn("Failed to queue received packet for client %u (queue full?) - packet dropped", client->client_id);
1736 if (allocated_buffer) {
1737 buffer_pool_free(NULL, allocated_buffer, packet_len);
1738 }
1739 }
1740 }
1741 }
1742
1743 // Mark client as inactive and stop all threads
1744 // Must stop render threads when client disconnects.
1745 // OPTIMIZED: Use atomic operations for thread control flags (lock-free)
1746 uint32_t client_id_snapshot = atomic_load(&client->client_id);
1747 log_debug("Setting active=false in receive_thread_fn (client_id=%u, exiting receive loop)", client_id_snapshot);
1748 atomic_store(&client->active, false);
1749 atomic_store(&client->send_thread_running, false);
1750 atomic_store(&client->video_render_thread_running, false);
1751 atomic_store(&client->audio_render_thread_running, false);
1752
1753 // Call remove_client() to trigger cleanup
1754 // Safe to call from receive thread now: remove_client() detects self-join via thread IDs
1755 // and skips the receive thread join when called from the receive thread itself
1756 log_debug("Receive thread for client %u calling remove_client() for cleanup", client_id_snapshot);
1757 server_context_t *server_ctx = (server_context_t *)client->server_ctx;
1758 if (server_ctx) {
1759 if (remove_client(server_ctx, client_id_snapshot) != 0) {
1760 log_warn("Failed to remove client %u from receive thread cleanup", client_id_snapshot);
1761 }
1762 } else {
1763 log_error("Receive thread for client %u: server_ctx is NULL, cannot call remove_client()", client_id_snapshot);
1764 }
1765
1766 log_debug("Receive thread for client %u terminated", client_id_snapshot);
1767
1768 // Clean up thread-local error context before exit
1770
1771 return NULL;
1772}
1773
1774// Thread function to handle sending data to a specific client
1775void *client_send_thread_func(void *arg) {
1776 client_info_t *client = (client_info_t *)arg;
1777
1778 // Validate client pointer immediately before any access.
1779 // This prevents crashes if remove_client() has zeroed the client struct
1780 // while the thread was still starting at RtlUserThreadStart.
1781 if (!client) {
1782 log_error("Invalid client info in send thread (NULL pointer)");
1783 return NULL;
1784 }
1785
1786 // Check if client_id is 0 (client struct has been zeroed by remove_client)
1787 // This must be checked BEFORE accessing any client fields
1788 if (atomic_load(&client->client_id) == 0) {
1789 log_debug_every(100 * US_PER_MS_INT, "Send thread: client_id is 0, client struct may have been zeroed, exiting");
1790 return NULL;
1791 }
1792
1793 // Additional validation: check socket OR transport is valid
1794 // For TCP clients: socket is valid
1795 // For WebRTC clients: socket is INVALID_SOCKET_VALUE but transport is valid
1796 mutex_lock(&client->send_mutex);
1797 bool has_socket = (client->socket != INVALID_SOCKET_VALUE);
1798 bool has_transport = (client->transport != NULL);
1799 mutex_unlock(&client->send_mutex);
1800
1801 log_info("SEND_THREAD_VALIDATION: client_id=%u socket_valid=%d transport_valid=%d transport_ptr=%p",
1802 atomic_load(&client->client_id), has_socket, has_transport, (void *)client->transport);
1803
1804 if (!has_socket && !has_transport) {
1805 log_error("Invalid client connection in send thread (no socket or transport)");
1806 return NULL;
1807 }
1808
1809 log_info("Started send thread for client %u (%s)", atomic_load(&client->client_id), client->display_name);
1810
1811 // Mark thread as running
1812 atomic_store(&client->send_thread_running, true);
1813
1814 log_info("SEND_THREAD_LOOP_START: client_id=%u active=%d shutting_down=%d running=%d",
1815 atomic_load(&client->client_id), atomic_load(&client->active), atomic_load(&client->shutting_down),
1816 atomic_load(&client->send_thread_running));
1817
1818 // Track timing for video frame sends
1819 uint64_t last_video_send_time = 0;
1820 const uint64_t video_send_interval_us = 16666; // 60fps = ~16.67ms
1821
1822 // High-frequency audio loop - separate from video frame loop
1823 // to ensure audio packets are sent immediately, not rate-limited by video
1824#define MAX_AUDIO_BATCH 8
1825 int loop_iteration_count = 0;
1826 while (!atomic_load(&g_server_should_exit) && !atomic_load(&client->shutting_down) && atomic_load(&client->active) &&
1827 atomic_load(&client->send_thread_running)) {
1828 loop_iteration_count++;
1829 bool sent_something = false;
1830 uint64_t loop_start_ns = time_get_ns();
1831 log_info_every(5000 * US_PER_MS_INT, "[SEND_LOOP_%d] START: client=%u", loop_iteration_count,
1832 atomic_load(&client->client_id));
1833
1834 // PRIORITY: Drain all queued audio packets before video
1835 // Audio must not be rate-limited by video frame sending (16.67ms)
1836 queued_packet_t *audio_packets[MAX_AUDIO_BATCH];
1837 int audio_packet_count = 0;
1838
1839 if (client->audio_queue) {
1840 // Try to dequeue multiple audio packets
1841 for (int i = 0; i < MAX_AUDIO_BATCH; i++) {
1842 audio_packets[i] = packet_queue_try_dequeue(client->audio_queue);
1843 if (audio_packets[i]) {
1844 audio_packet_count++;
1845 } else {
1846 break; // No more packets available
1847 }
1848 }
1849 if (audio_packet_count > 0) {
1850 log_dev_every(4500 * US_PER_MS_INT, "SEND_AUDIO: client=%u dequeued=%d packets",
1851 atomic_load(&client->client_id), audio_packet_count);
1852 }
1853 } else {
1854 log_warn("Send thread: audio_queue is NULL for client %u", atomic_load(&client->client_id));
1855 }
1856
1857 // Send batched audio if we have packets
1858 if (audio_packet_count > 0) {
1859 // Protect crypto field access with mutex
1860 mutex_lock(&client->client_state_mutex);
1861 bool crypto_ready = !GET_OPTION(no_encrypt) && client->crypto_initialized &&
1862 crypto_handshake_is_ready(&client->crypto_handshake_ctx);
1863 mutex_unlock(&client->client_state_mutex);
1864 (void)crypto_ready; // Currently unused - kept for potential future encryption support
1865
1866 asciichat_error_t result = ASCIICHAT_OK;
1867
1868 if (audio_packet_count == 1) {
1869 // Single packet - send directly for low latency using ACIP transport
1870 packet_type_t pkt_type = (packet_type_t)NET_TO_HOST_U16(audio_packets[0]->header.type);
1871
1872 // Get transport reference while holding mutex briefly (prevents deadlock on TCP buffer full)
1873 mutex_lock(&client->send_mutex);
1874 if (atomic_load(&client->shutting_down) || !client->transport) {
1875 mutex_unlock(&client->send_mutex);
1876 log_warn("BREAK_AUDIO_SINGLE: client_id=%u shutting_down=%d transport=%p", atomic_load(&client->client_id),
1877 atomic_load(&client->shutting_down), (void *)client->transport);
1878 break; // Client is shutting down, exit thread
1879 }
1880 acip_transport_t *transport = client->transport;
1881 mutex_unlock(&client->send_mutex);
1882
1883 // Network I/O happens OUTSIDE the mutex
1884 result = packet_send_via_transport(transport, pkt_type, audio_packets[0]->data, audio_packets[0]->data_len,
1885 atomic_load(&client->client_id));
1886 if (result != ASCIICHAT_OK) {
1887 log_error("AUDIO SEND FAIL: client=%u, len=%zu, result=%d", client->client_id, audio_packets[0]->data_len,
1888 result);
1889 }
1890 } else {
1891 // Multiple packets - batch them together and send via transport (works for all client types)
1892 packet_type_t first_pkt_type = (packet_type_t)NET_TO_HOST_U16(audio_packets[0]->header.type);
1893
1894 // Get transport reference
1895 mutex_lock(&client->send_mutex);
1896 if (atomic_load(&client->shutting_down) || !client->transport) {
1897 mutex_unlock(&client->send_mutex);
1898 log_warn("BREAK_AUDIO_BATCH: client_id=%u shutting_down=%d transport=%p", atomic_load(&client->client_id),
1899 atomic_load(&client->shutting_down), (void *)client->transport);
1900 result = ERROR_NETWORK;
1901 } else {
1902 acip_transport_t *transport = client->transport;
1903 mutex_unlock(&client->send_mutex);
1904
1905 if (first_pkt_type == PACKET_TYPE_AUDIO_OPUS_BATCH) {
1906 // Opus packets - batch and send via transport
1907 size_t total_opus_size = 0;
1908 for (int i = 0; i < audio_packet_count; i++) {
1909 total_opus_size += audio_packets[i]->data_len;
1910 }
1911
1912 uint8_t *batched_opus = SAFE_MALLOC(total_opus_size, uint8_t *);
1913 uint16_t *frame_sizes = SAFE_MALLOC((size_t)audio_packet_count * sizeof(uint16_t), uint16_t *);
1914
1915 if (batched_opus && frame_sizes) {
1916 size_t offset = 0;
1917 for (int i = 0; i < audio_packet_count; i++) {
1918 frame_sizes[i] = (uint16_t)audio_packets[i]->data_len;
1919 memcpy(batched_opus + offset, audio_packets[i]->data, audio_packets[i]->data_len);
1920 offset += audio_packets[i]->data_len;
1921 }
1922 result = acip_send_audio_opus_batch(transport, batched_opus, total_opus_size, frame_sizes,
1923 (uint32_t)audio_packet_count, AUDIO_SAMPLE_RATE, 20);
1924 if (result != ASCIICHAT_OK) {
1925 log_error("AUDIO SEND FAIL (opus batch): client=%u, frames=%d, total_size=%zu, result=%d",
1926 atomic_load(&client->client_id), audio_packet_count, total_opus_size, result);
1927 }
1928 } else {
1929 log_error("Failed to allocate buffer for Opus batch");
1930 result = ERROR_MEMORY;
1931 }
1932 SAFE_FREE(batched_opus);
1933 SAFE_FREE(frame_sizes);
1934 } else {
1935 // Raw float audio - batch and send via transport
1936 size_t total_samples = 0;
1937 for (int i = 0; i < audio_packet_count; i++) {
1938 total_samples += audio_packets[i]->data_len / sizeof(float);
1939 }
1940
1941 float *batched_audio = SAFE_MALLOC(total_samples * sizeof(float), float *);
1942 if (batched_audio) {
1943 size_t offset = 0;
1944 for (int i = 0; i < audio_packet_count; i++) {
1945 size_t packet_samples = audio_packets[i]->data_len / sizeof(float);
1946 memcpy(batched_audio + offset, audio_packets[i]->data, audio_packets[i]->data_len);
1947 offset += packet_samples;
1948 }
1949 result = acip_send_audio_batch(transport, batched_audio, (uint32_t)total_samples, AUDIO_SAMPLE_RATE);
1950 if (result != ASCIICHAT_OK) {
1951 log_error("AUDIO SEND FAIL (raw batch): client=%u, packets=%d, samples=%zu, result=%d",
1952 atomic_load(&client->client_id), audio_packet_count, total_samples, result);
1953 }
1954 } else {
1955 log_error("Failed to allocate buffer for audio batch");
1956 result = ERROR_MEMORY;
1957 }
1958 SAFE_FREE(batched_audio);
1959 }
1960 }
1961 }
1962
1963 // Free all audio packets
1964 for (int i = 0; i < audio_packet_count; i++) {
1965 packet_queue_free_packet(audio_packets[i]);
1966 }
1967
1968 if (result != ASCIICHAT_OK) {
1969 if (!atomic_load(&g_server_should_exit)) {
1970 log_error("Failed to send audio to client %u: %s", client->client_id, asciichat_error_string(result));
1971 }
1972 log_warn("SKIP_AUDIO_ERROR: client_id=%u result=%d (continuing to send video)", atomic_load(&client->client_id),
1973 result);
1974 // Continue sending video even if audio fails - audio is optional for browser clients
1975 }
1976
1977 sent_something = true;
1978 uint64_t audio_done_ns = time_get_ns();
1979 log_info_every(5000 * US_PER_MS_INT, "[SEND_LOOP_%d] AUDIO_SENT: took %.2fms", loop_iteration_count,
1980 (audio_done_ns - loop_start_ns) / 1e6);
1981
1982 // Small sleep to let more audio packets queue (helps batching efficiency)
1983 if (audio_packet_count > 0) {
1984 platform_sleep_us(100); // 0.1ms - minimal delay
1985 }
1986 } else {
1987 // No audio packets - brief sleep to avoid busy-looping, then check for other tasks
1988 log_info_every(5000 * US_PER_MS_INT, "[SEND_LOOP_%d] NO_AUDIO: sleeping 1ms", loop_iteration_count);
1989 platform_sleep_us(1 * US_PER_MS_INT); // 1ms - enough for audio render thread to queue more packets
1990
1991 // Check if session rekeying should be triggered
1992 mutex_lock(&client->client_state_mutex);
1993 bool should_rekey = !GET_OPTION(no_encrypt) && client->crypto_initialized &&
1994 crypto_handshake_is_ready(&client->crypto_handshake_ctx) &&
1995 crypto_handshake_should_rekey(&client->crypto_handshake_ctx);
1996 mutex_unlock(&client->client_state_mutex);
1997
1998 if (should_rekey) {
1999 log_debug("Rekey threshold reached for client %u, initiating session rekey", client->client_id);
2000 mutex_lock(&client->client_state_mutex);
2001 // Get socket reference briefly to avoid deadlock on TCP buffer full
2002 mutex_lock(&client->send_mutex);
2003 if (atomic_load(&client->shutting_down) || client->socket == INVALID_SOCKET_VALUE) {
2004 mutex_unlock(&client->send_mutex);
2005 mutex_unlock(&client->client_state_mutex);
2006 log_warn("BREAK_REKEY: client_id=%u shutting_down=%d socket=%d", atomic_load(&client->client_id),
2007 atomic_load(&client->shutting_down), (int)client->socket);
2008 break; // Client is shutting down, exit thread
2009 }
2010 socket_t rekey_socket = client->socket;
2011 mutex_unlock(&client->send_mutex);
2012
2013 // Network I/O happens OUTSIDE the send_mutex (client_state_mutex still held for crypto state)
2014 asciichat_error_t result = crypto_handshake_rekey_request(&client->crypto_handshake_ctx, rekey_socket);
2015 mutex_unlock(&client->client_state_mutex);
2016
2017 if (result != ASCIICHAT_OK) {
2018 log_error("Failed to send REKEY_REQUEST to client %u: %d", client->client_id, result);
2019 } else {
2020 log_debug("Sent REKEY_REQUEST to client %u", client->client_id);
2021 // Notify client that session rekeying has been initiated (old keys still active)
2022 log_info_client(client, "Session rekey initiated - rotating encryption keys");
2023 }
2024 }
2025 }
2026
2027 // Always consume frames from the buffer to prevent accumulation
2028 // Rate-limit the actual sending, but always mark frames as consumed
2029 uint64_t video_check_ns = time_get_ns();
2030 if (!client->outgoing_video_buffer) {
2031 // Buffer has been destroyed (client is shutting down).
2032 // Exit cleanly instead of looping forever trying to access freed memory.
2033 log_warn("⚠️ Send thread exiting: outgoing_video_buffer is NULL for client %u (client shutting down?)",
2034 client->client_id);
2035 break;
2036 }
2037
2038 // Get latest frame from double buffer (lock-free operation)
2039 // This marks the frame as consumed even if we don't send it yet
2040 const video_frame_t *frame = video_frame_get_latest(client->outgoing_video_buffer);
2041 uint64_t frame_get_ns = time_get_ns();
2042 log_info_every(5000 * US_PER_MS_INT, "[SEND_LOOP_%d] VIDEO_GET_FRAME: took %.3fms, frame=%p", loop_iteration_count,
2043 (frame_get_ns - video_check_ns) / 1e6, (void *)frame);
2044 log_dev_every(4500 * US_PER_MS_INT, "Send thread: video_frame_get_latest returned %p for client %u", (void *)frame,
2045 client->client_id);
2046
2047 // Check if get_latest failed (buffer might have been destroyed)
2048 if (!frame) {
2049 log_warn("⚠️ Send thread exiting: video_frame_get_latest returned NULL for client %u (buffer destroyed?)",
2050 client->client_id);
2051 break; // Exit thread if buffer is invalid
2052 }
2053
2054 // Check if it's time to send a video frame (60fps rate limiting)
2055 // Only rate-limit the SEND operation, not frame consumption
2056 uint64_t current_time_ns = time_get_ns();
2057 uint64_t current_time_us = time_ns_to_us(current_time_ns);
2058 uint64_t time_since_last_send_us = current_time_us - last_video_send_time;
2059 log_dev_every(4500 * US_PER_MS_INT,
2060 "Send thread timing check: time_since_last=%llu us, interval=%llu us, should_send=%d",
2061 (unsigned long long)time_since_last_send_us, (unsigned long long)video_send_interval_us,
2062 (time_since_last_send_us >= video_send_interval_us));
2063
2064 if (current_time_us - last_video_send_time >= video_send_interval_us) {
2065 log_info_every(5000 * US_PER_MS_INT, "✓ SEND_TIME_READY: client_id=%u time_since=%llu interval=%llu",
2066 atomic_load(&client->client_id), (unsigned long long)time_since_last_send_us,
2067 (unsigned long long)video_send_interval_us);
2068 uint64_t frame_start_ns = time_get_ns();
2069
2070 // GRID LAYOUT CHANGE: Check if render thread has buffered a frame with different source count
2071 // If so, send CLEAR_CONSOLE before sending the new frame
2072 int rendered_sources = atomic_load(&client->last_rendered_grid_sources);
2073 int sent_sources = atomic_load(&client->last_sent_grid_sources);
2074
2075 if (rendered_sources != sent_sources && rendered_sources > 0) {
2076 // Grid layout changed! Send CLEAR_CONSOLE before next frame using ACIP transport
2077 // Get transport reference briefly to avoid deadlock on TCP buffer full
2078 mutex_lock(&client->send_mutex);
2079 if (atomic_load(&client->shutting_down) || !client->transport) {
2080 mutex_unlock(&client->send_mutex);
2081 log_warn("BREAK_CLEAR_CONSOLE: client_id=%u shutting_down=%d transport=%p", atomic_load(&client->client_id),
2082 atomic_load(&client->shutting_down), (void *)client->transport);
2083 break; // Client is shutting down, exit thread
2084 }
2085 acip_transport_t *clear_transport = client->transport;
2086 mutex_unlock(&client->send_mutex);
2087
2088 // Network I/O happens OUTSIDE the mutex
2089 acip_send_clear_console(clear_transport);
2090 log_debug_every(LOG_RATE_FAST, "Client %u: Sent CLEAR_CONSOLE (grid changed %d → %d sources)",
2091 client->client_id, sent_sources, rendered_sources);
2092 atomic_store(&client->last_sent_grid_sources, rendered_sources);
2093 sent_something = true;
2094 }
2095
2096 log_dev_every(4500 * US_PER_MS_INT, "Send thread: frame validation - frame=%p, frame->data=%p, frame->size=%zu",
2097 (void *)frame, (void *)frame->data, frame->size);
2098
2099 if (!frame->data) {
2100 log_dev("✗ SKIP_NO_DATA: client_id=%u frame=%p data=%p",
2101 atomic_load(&client->client_id), (void *)frame, (void *)frame->data);
2102 continue;
2103 }
2104 log_dev("✓ FRAME_DATA_OK: client_id=%u data=%p", atomic_load(&client->client_id),
2105 (void *)frame->data);
2106
2107 if (frame->data && frame->size == 0) {
2108 log_dev("✗ SKIP_ZERO_SIZE: client_id=%u size=%zu", atomic_load(&client->client_id),
2109 frame->size);
2110 platform_sleep_us(1 * US_PER_MS_INT); // 1ms sleep
2111 continue;
2112 }
2113 log_dev("✓ FRAME_SIZE_OK: client_id=%u size=%zu", atomic_load(&client->client_id),
2114 frame->size);
2115
2116 // Snapshot frame metadata (safe with double-buffer system)
2117 const char *frame_data = (const char *)frame->data; // Pointer snapshot - data is stable in front buffer
2118 size_t frame_size = frame->size; // Size snapshot - prevent race condition with render thread
2119 uint32_t width = atomic_load(&client->width);
2120 uint32_t height = atomic_load(&client->height);
2121 uint64_t step1_ns = time_get_ns();
2122 uint64_t step2_ns = time_get_ns();
2123 uint64_t step3_ns = time_get_ns();
2124 uint64_t step4_ns = time_get_ns();
2125
2126 // Check if crypto handshake is complete before sending (prevents sending to unauthenticated clients)
2127 mutex_lock(&client->client_state_mutex);
2128 bool crypto_ready = GET_OPTION(no_encrypt) ||
2129 (client->crypto_initialized && crypto_handshake_is_ready(&client->crypto_handshake_ctx));
2130 mutex_unlock(&client->client_state_mutex);
2131
2132 if (!crypto_ready) {
2133 log_dev("⚠️ SKIP_SEND_CRYPTO: client_id=%u crypto_initialized=%d no_encrypt=%d",
2134 atomic_load(&client->client_id), client->crypto_initialized, GET_OPTION(no_encrypt));
2135 continue; // Skip this frame, will try again on next loop iteration
2136 }
2137 log_dev("✓ CRYPTO_READY: client_id=%u about to send frame",
2138 atomic_load(&client->client_id));
2139
2140 // Get transport reference briefly to avoid deadlock on TCP buffer full
2141 // ACIP transport handles header building, CRC32, encryption internally
2142 log_dev_every(4500 * US_PER_MS_INT,
2143 "Send thread: About to send frame to client %u (width=%u, height=%u, size=%zu, data=%p)",
2144 client->client_id, width, height, frame_size, (void *)frame_data);
2145
2146 // Log first 32 bytes of frame data to verify we can access it
2147 if (frame_data && frame_size > 0) {
2148 log_info_every(5000 * US_PER_MS_INT,
2149 "FRAME_DATA_HEX: client=%u first_bytes=[%02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x "
2150 "%02x %02x %02x %02x %02x]",
2151 atomic_load(&client->client_id), ((uint8_t *)frame_data)[0], ((uint8_t *)frame_data)[1],
2152 ((uint8_t *)frame_data)[2], ((uint8_t *)frame_data)[3], ((uint8_t *)frame_data)[4],
2153 ((uint8_t *)frame_data)[5], ((uint8_t *)frame_data)[6], ((uint8_t *)frame_data)[7],
2154 ((uint8_t *)frame_data)[8], ((uint8_t *)frame_data)[9], ((uint8_t *)frame_data)[10],
2155 ((uint8_t *)frame_data)[11], ((uint8_t *)frame_data)[12], ((uint8_t *)frame_data)[13],
2156 ((uint8_t *)frame_data)[14], ((uint8_t *)frame_data)[15]);
2157 }
2158
2159 mutex_lock(&client->send_mutex);
2160 if (atomic_load(&client->shutting_down) || !client->transport) {
2161 mutex_unlock(&client->send_mutex);
2162 log_warn("BREAK_FRAME_SEND: client_id=%u shutting_down=%d transport=%p loop_iter=%d",
2163 atomic_load(&client->client_id), atomic_load(&client->shutting_down), (void *)client->transport,
2164 loop_iteration_count);
2165 break; // Client is shutting down, exit thread
2166 }
2167 acip_transport_t *frame_transport = client->transport;
2168 mutex_unlock(&client->send_mutex);
2169
2170 // Network I/O happens OUTSIDE the mutex
2171 log_dev_every(4500 * US_PER_MS_INT, "SEND_ASCII_FRAME: client_id=%u size=%zu width=%u height=%u",
2172 atomic_load(&client->client_id), frame_size, width, height);
2173 uint64_t send_start_ns = time_get_ns();
2174 log_dev("[SEND_LOOP_%d] FRAME_SEND_START: size=%zu", loop_iteration_count,
2175 frame_size);
2176 asciichat_error_t send_result = acip_send_ascii_frame(frame_transport, frame_data, frame_size, width, height,
2177 atomic_load(&client->client_id));
2178 uint64_t send_end_ns = time_get_ns();
2179 uint64_t send_ms = (send_end_ns - send_start_ns) / 1e6;
2180 log_dev("[SEND_LOOP_%d] FRAME_SEND_END: took %.2fms, result=%d",
2181 loop_iteration_count, send_ms, send_result);
2182 uint64_t step5_ns = time_get_ns();
2183
2184 if (send_result != ASCIICHAT_OK) {
2185 if (!atomic_load(&g_server_should_exit)) {
2186 SET_ERRNO(ERROR_NETWORK, "Failed to send video frame to client %u: %s", client->client_id,
2187 asciichat_error_string(send_result));
2188 }
2189 log_error("SEND_FRAME_FAILED: client_id=%u result=%d message=%s loop_iter=%d", atomic_load(&client->client_id),
2190 send_result, asciichat_error_string(send_result), loop_iteration_count);
2191 log_warn("BREAK_SEND_ERROR: client_id=%u (frame send failed)", atomic_load(&client->client_id));
2192 break;
2193 }
2194
2195 log_dev_every(4500 * US_PER_MS_INT, "SEND_FRAME_SUCCESS: client_id=%u size=%zu", atomic_load(&client->client_id),
2196 frame_size);
2197
2198 // Increment frame counter and log
2199 unsigned long frame_count = atomic_fetch_add(&client->frames_sent_count, 1) + 1;
2200 log_info("🎬 FRAME_SENT: client_id=%u frame_num=%lu size=%zu",
2201 atomic_load(&client->client_id), frame_count, frame_size);
2202
2203 sent_something = true;
2204 last_video_send_time = current_time_us;
2205
2206 uint64_t frame_end_ns = time_get_ns();
2207 uint64_t frame_time_us = time_ns_to_us(time_elapsed_ns(frame_start_ns, frame_end_ns));
2208 if (frame_time_us > 15 * US_PER_MS_INT) { // Log if sending a frame takes > 15ms (encryption adds ~5-6ms)
2209 uint64_t step1_us = time_ns_to_us(time_elapsed_ns(frame_start_ns, step1_ns));
2210 uint64_t step2_us = time_ns_to_us(time_elapsed_ns(step1_ns, step2_ns));
2211 uint64_t step3_us = time_ns_to_us(time_elapsed_ns(step2_ns, step3_ns));
2212 uint64_t step4_us = time_ns_to_us(time_elapsed_ns(step3_ns, step4_ns));
2213 uint64_t step5_us = time_ns_to_us(time_elapsed_ns(step4_ns, step5_ns));
2214 log_warn_every(
2215 LOG_RATE_DEFAULT,
2216 "SEND_THREAD: Frame send took %.2fms for client %u | Snapshot: %.2fms | Memcpy: %.2fms | CRC32: %.2fms | "
2217 "Header: %.2fms | send_packet_secure: %.2fms",
2218 frame_time_us / 1000.0, client->client_id, step1_us / 1000.0, step2_us / 1000.0, step3_us / 1000.0,
2219 step4_us / 1000.0, step5_us / 1000.0);
2220 }
2221 }
2222
2223 // If we didn't send anything, sleep briefly to prevent busy waiting
2224 if (!sent_something) {
2225 log_info_every(5000 * US_PER_MS_INT, "[SEND_LOOP_%d] IDLE_SLEEP: nothing sent", loop_iteration_count);
2226 platform_sleep_us(1 * US_PER_MS_INT); // 1ms sleep
2227 }
2228 uint64_t loop_end_ns = time_get_ns();
2229 uint64_t loop_ms = (loop_end_ns - loop_start_ns) / 1e6;
2230 if (loop_ms > 10) {
2231 log_warn("[SEND_LOOP_%d] SLOW_ITERATION: took %.2fms (client=%u)", loop_iteration_count, loop_ms,
2232 atomic_load(&client->client_id));
2233 }
2234 }
2235
2236 // Log why the send thread exited
2237 log_warn("Send thread exit conditions - client_id=%u g_server_should_exit=%d shutting_down=%d active=%d "
2238 "send_thread_running=%d",
2239 atomic_load(&client->client_id), atomic_load(&g_server_should_exit), atomic_load(&client->shutting_down),
2240 atomic_load(&client->active), atomic_load(&client->send_thread_running));
2241
2242 // Mark thread as stopped
2243 atomic_store(&client->send_thread_running, false);
2244 log_debug("Send thread for client %u terminated", client->client_id);
2245
2246 // Clean up thread-local error context before exit
2248
2249 return NULL;
2250}
2251
2252/* ============================================================================
2253 * Broadcast Functions
2254 * ============================================================================
2255 */
2256
2257// Broadcast server state to all connected clients
2259 // SNAPSHOT PATTERN: Collect client data while holding lock, then release before network I/O
2260 typedef struct {
2261 socket_t socket;
2262 uint32_t client_id;
2263 const crypto_context_t *crypto_ctx;
2264 } client_snapshot_t;
2265
2266 client_snapshot_t client_snapshots[MAX_CLIENTS];
2267 int snapshot_count = 0;
2268 int active_video_count = 0;
2269
2270 uint64_t lock_start_ns = time_get_ns();
2271 rwlock_rdlock(&g_client_manager_rwlock);
2272 uint64_t lock_end_ns = time_get_ns();
2273 uint64_t lock_time_ns = time_elapsed_ns(lock_start_ns, lock_end_ns);
2274 if (lock_time_ns > 1 * NS_PER_MS_INT) {
2275 char duration_str[32];
2276 format_duration_ns((double)lock_time_ns, duration_str, sizeof(duration_str));
2277 log_warn("broadcast_server_state: rwlock_rdlock took %s", duration_str);
2278 }
2279
2280 // Count active clients and snapshot client data while holding lock
2281 // Use atomic_load for all atomic fields to prevent data races.
2282 for (int i = 0; i < MAX_CLIENTS; i++) {
2283 bool is_active = atomic_load(&g_client_manager.clients[i].active);
2284 if (is_active && atomic_load(&g_client_manager.clients[i].is_sending_video)) {
2285 active_video_count++;
2286 }
2287 if (is_active && g_client_manager.clients[i].socket != INVALID_SOCKET_VALUE) {
2288 // Skip clients that haven't completed crypto handshake yet
2289 // Check both crypto_initialized AND crypto context (defense in depth)
2290 if (!GET_OPTION(no_encrypt) && !g_client_manager.clients[i].crypto_initialized) {
2291 log_debug("Skipping server_state broadcast to client %u: crypto handshake not complete",
2292 atomic_load(&g_client_manager.clients[i].client_id));
2293 continue;
2294 }
2295
2296 // Get crypto context (may be NULL if --no-encrypt)
2297 const crypto_context_t *crypto_ctx =
2298 crypto_handshake_get_context(&g_client_manager.clients[i].crypto_handshake_ctx);
2299 if (!GET_OPTION(no_encrypt) && !crypto_ctx) {
2300 // Skip clients without valid crypto context
2301 log_debug("Skipping server_state broadcast to client %u: no crypto context",
2302 atomic_load(&g_client_manager.clients[i].client_id));
2303 continue;
2304 }
2305
2306 client_snapshots[snapshot_count].socket = g_client_manager.clients[i].socket;
2307 client_snapshots[snapshot_count].client_id = atomic_load(&g_client_manager.clients[i].client_id);
2308 client_snapshots[snapshot_count].crypto_ctx = crypto_ctx;
2309 snapshot_count++;
2310 }
2311 }
2312
2313 // Prepare server state packet while still holding lock (fast operation)
2314 server_state_packet_t state;
2315 state.connected_client_count = g_client_manager.client_count;
2316 state.active_client_count = active_video_count;
2317 memset(state.reserved, 0, sizeof(state.reserved));
2318
2319 // Convert to network byte order
2320 server_state_packet_t net_state;
2321 net_state.connected_client_count = HOST_TO_NET_U32(state.connected_client_count);
2322 net_state.active_client_count = HOST_TO_NET_U32(state.active_client_count);
2323 memset(net_state.reserved, 0, sizeof(net_state.reserved));
2324
2325 // Release lock BEFORE sending (snapshot pattern)
2326 // Sending while holding lock blocks all client operations
2327 uint64_t lock_held_final_ns = time_get_ns();
2328 uint64_t lock_held_ns = time_elapsed_ns(lock_start_ns, lock_held_final_ns);
2329 rwlock_rdunlock(&g_client_manager_rwlock);
2330
2331 // Send to all clients AFTER releasing the lock
2332 // This prevents blocking other threads during network I/O
2333 for (int i = 0; i < snapshot_count; i++) {
2334 log_debug_every(5000 * US_PER_MS_INT,
2335 "BROADCAST_DEBUG: Sending SERVER_STATE to client %u (socket %d) with crypto_ctx=%p",
2336 client_snapshots[i].client_id, client_snapshots[i].socket, (void *)client_snapshots[i].crypto_ctx);
2337
2338 // Protect socket write with per-client send_mutex.
2339 client_info_t *target = find_client_by_id(client_snapshots[i].client_id);
2340 if (target) {
2341 // IMPORTANT: Verify client_id matches expected value - prevents use-after-free
2342 // if client was removed and replaced with another client in same slot
2343 if (atomic_load(&target->client_id) != client_snapshots[i].client_id) {
2344 log_warn("Client %u ID mismatch during broadcast (found %u), skipping send", client_snapshots[i].client_id,
2345 atomic_load(&target->client_id));
2346 continue;
2347 }
2348
2349 mutex_lock(&target->send_mutex);
2350
2351 // Double-check client_id again after acquiring mutex (stronger protection)
2352 if (atomic_load(&target->client_id) != client_snapshots[i].client_id) {
2353 mutex_unlock(&target->send_mutex);
2354 log_warn("Client %u was removed during broadcast send (now %u), skipping", client_snapshots[i].client_id,
2355 atomic_load(&target->client_id));
2356 continue;
2357 }
2358
2359 // Send via ACIP transport
2360 asciichat_error_t result = acip_send_server_state(target->transport, &net_state);
2361 mutex_unlock(&target->send_mutex);
2362
2363 if (result != ASCIICHAT_OK) {
2364 log_error("Failed to send server state to client %u: %s", client_snapshots[i].client_id,
2365 asciichat_error_string(result));
2366 } else {
2367 log_debug_every(5000 * US_PER_MS_INT, "Sent server state to client %u: %u connected, %u active",
2368 client_snapshots[i].client_id, state.connected_client_count, state.active_client_count);
2369 }
2370 } else {
2371 log_warn("Client %u removed before broadcast send could complete", client_snapshots[i].client_id);
2372 }
2373 }
2374
2375 if (lock_held_ns > 1 * NS_PER_MS_INT) {
2376 char duration_str[32];
2377 format_duration_ns((double)lock_held_ns, duration_str, sizeof(duration_str));
2378 log_warn("broadcast_server_state: rwlock held for %s (includes network I/O)", duration_str);
2379 }
2380}
2381
2382/* ============================================================================
2383 * Helper Functions
2384 * ============================================================================
2385 */
2386
2397int start_webrtc_client_threads(server_context_t *server_ctx, uint32_t client_id) {
2398 if (!server_ctx) {
2399 SET_ERRNO(ERROR_INVALID_PARAM, "Server context is NULL");
2400 return -1;
2401 }
2402
2403 client_info_t *client = find_client_by_id(client_id);
2404 if (!client) {
2405 SET_ERRNO(ERROR_NOT_FOUND, "Client %u not found", client_id);
2406 return -1;
2407 }
2408
2409 log_debug("Starting threads for WebRTC client %u...", client_id);
2410 return start_client_threads(server_ctx, client, false);
2411}
2412
2413void stop_client_threads(client_info_t *client) {
2414 if (!client) {
2415 SET_ERRNO(ERROR_INVALID_PARAM, "Client is NULL");
2416 return;
2417 }
2418
2419 // Signal threads to stop
2420 log_debug("Setting active=false in stop_client_threads (client_id=%u)", atomic_load(&client->client_id));
2421 atomic_store(&client->active, false);
2422 atomic_store(&client->send_thread_running, false);
2423
2424 // Wait for threads to finish
2425 if (asciichat_thread_is_initialized(&client->send_thread)) {
2426 asciichat_thread_join(&client->send_thread, NULL);
2427 }
2428 if (asciichat_thread_is_initialized(&client->receive_thread)) {
2429 asciichat_thread_join(&client->receive_thread, NULL);
2430 }
2431 // For async dispatch: stop dispatch thread if running
2432 if (asciichat_thread_is_initialized(&client->dispatch_thread)) {
2433 atomic_store(&client->dispatch_thread_running, false);
2434 asciichat_thread_join(&client->dispatch_thread, NULL);
2435 }
2436}
2437
2438void cleanup_client_media_buffers(client_info_t *client) {
2439 if (!client) {
2440 SET_ERRNO(ERROR_INVALID_PARAM, "Client is NULL");
2441 return;
2442 }
2443
2444 if (client->incoming_video_buffer) {
2445 video_frame_buffer_destroy(client->incoming_video_buffer);
2446 client->incoming_video_buffer = NULL;
2447 }
2448
2449 // Clean up outgoing video buffer (for ASCII frames)
2450 if (client->outgoing_video_buffer) {
2451 video_frame_buffer_destroy(client->outgoing_video_buffer);
2452 client->outgoing_video_buffer = NULL;
2453 }
2454
2455 // Clean up pre-allocated send buffer
2456 if (client->send_buffer) {
2457 SAFE_FREE(client->send_buffer);
2458 client->send_buffer = NULL;
2459 client->send_buffer_size = 0;
2460 }
2461
2462 if (client->incoming_audio_buffer) {
2463 audio_ring_buffer_destroy(client->incoming_audio_buffer);
2464 client->incoming_audio_buffer = NULL;
2465 }
2466
2467 // Clean up Opus decoder
2468 if (client->opus_decoder) {
2469 opus_codec_destroy((opus_codec_t *)client->opus_decoder);
2470 client->opus_decoder = NULL;
2471 }
2472}
2473
2474void cleanup_client_packet_queues(client_info_t *client) {
2475 if (!client)
2476 return;
2477
2478 if (client->audio_queue) {
2479 packet_queue_destroy(client->audio_queue);
2480 client->audio_queue = NULL;
2481 }
2482
2483 // Async dispatch: clean up received packet queue
2484 if (client->received_packet_queue) {
2485 packet_queue_destroy(client->received_packet_queue);
2486 client->received_packet_queue = NULL;
2487 }
2488
2489 // Video now uses double buffer, cleaned up in cleanup_client_media_buffers
2490}
2491
2501static inline void cleanup_client_all_buffers(client_info_t *client) {
2504}
2505
2516int process_encrypted_packet(client_info_t *client, packet_type_t *type, void **data, size_t *len,
2517 uint32_t *sender_id) {
2518 if (!crypto_server_is_ready(client->client_id)) {
2519 log_error("Received encrypted packet but crypto not ready for client %u", client->client_id);
2520 buffer_pool_free(NULL, *data, *len);
2521 *data = NULL;
2522 return -1;
2523 }
2524
2525 // Store original allocation size before it gets modified
2526 size_t original_alloc_size = *len;
2527 void *decrypted_data = buffer_pool_alloc(NULL, original_alloc_size);
2528 size_t decrypted_len;
2529 int decrypt_result = crypto_server_decrypt_packet(client->client_id, (const uint8_t *)*data, *len,
2530 (uint8_t *)decrypted_data, original_alloc_size, &decrypted_len);
2531
2532 if (decrypt_result != 0) {
2533 SET_ERRNO(ERROR_CRYPTO, "Failed to process encrypted packet from client %u (result=%d)", client->client_id,
2534 decrypt_result);
2535 buffer_pool_free(NULL, *data, original_alloc_size);
2536 buffer_pool_free(NULL, decrypted_data, original_alloc_size);
2537 *data = NULL;
2538 return -1;
2539 }
2540
2541 // Replace encrypted data with decrypted data
2542 // Use original allocation size for freeing the encrypted buffer
2543 buffer_pool_free(NULL, *data, original_alloc_size);
2544
2545 *data = decrypted_data;
2546 *len = decrypted_len;
2547
2548 // Now process the decrypted packet by parsing its header
2549 if (*len < sizeof(packet_header_t)) {
2550 SET_ERRNO(ERROR_CRYPTO, "Decrypted packet too small for header from client %u", client->client_id);
2551 buffer_pool_free(NULL, *data, *len);
2552 *data = NULL;
2553 return -1;
2554 }
2555
2556 packet_header_t *header = (packet_header_t *)*data;
2557 *type = (packet_type_t)NET_TO_HOST_U16(header->type);
2558 *sender_id = NET_TO_HOST_U32(header->client_id);
2559
2560 // Adjust data pointer to skip header
2561 *data = (uint8_t *)*data + sizeof(packet_header_t);
2562 *len -= sizeof(packet_header_t);
2563
2564 return 0;
2565}
2566
2567/* ============================================================================
2568 * ACIP Server Callback Wrappers
2569 * ============================================================================ */
2570
2571// Forward declarations for ACIP server callbacks
2572static void acip_server_on_protocol_version(const protocol_version_packet_t *version, void *client_ctx, void *app_ctx);
2573static void acip_server_on_image_frame(const image_frame_packet_t *header, const void *pixel_data, size_t data_len,
2574 void *client_ctx, void *app_ctx);
2575static void acip_server_on_audio(const void *audio_data, size_t audio_len, void *client_ctx, void *app_ctx);
2576static void acip_server_on_audio_batch(const audio_batch_packet_t *header, const float *samples, size_t num_samples,
2577 void *client_ctx, void *app_ctx);
2578static void acip_server_on_audio_opus(const void *opus_data, size_t opus_len, void *client_ctx, void *app_ctx);
2579static void acip_server_on_audio_opus_batch(const void *batch_data, size_t batch_len, void *client_ctx, void *app_ctx);
2580static void acip_server_on_client_join(const void *join_data, size_t data_len, void *client_ctx, void *app_ctx);
2581static void acip_server_on_client_leave(void *client_ctx, void *app_ctx);
2582static void acip_server_on_stream_start(uint32_t stream_types, void *client_ctx, void *app_ctx);
2583static void acip_server_on_stream_stop(uint32_t stream_types, void *client_ctx, void *app_ctx);
2584static void acip_server_on_capabilities(const void *cap_data, size_t data_len, void *client_ctx, void *app_ctx);
2585static void acip_server_on_ping(void *client_ctx, void *app_ctx);
2586static void acip_server_on_pong(void *client_ctx, void *app_ctx);
2587static void acip_server_on_error(const error_packet_t *header, const char *message, void *client_ctx, void *app_ctx);
2588static void acip_server_on_remote_log(const remote_log_packet_t *header, const char *message, void *client_ctx,
2589 void *app_ctx);
2590static void acip_server_on_crypto_rekey_request(const void *payload, size_t payload_len, void *client_ctx,
2591 void *app_ctx);
2592static void acip_server_on_crypto_rekey_response(const void *payload, size_t payload_len, void *client_ctx,
2593 void *app_ctx);
2594static void acip_server_on_crypto_rekey_complete(const void *payload, size_t payload_len, void *client_ctx,
2595 void *app_ctx);
2596static void acip_server_on_crypto_key_exchange_resp(packet_type_t type, const void *payload, size_t payload_len,
2597 void *client_ctx, void *app_ctx);
2598static void acip_server_on_crypto_auth_response(packet_type_t type, const void *payload, size_t payload_len,
2599 void *client_ctx, void *app_ctx);
2600static void acip_server_on_crypto_no_encryption(packet_type_t type, const void *payload, size_t payload_len,
2601 void *client_ctx, void *app_ctx);
2602
2609static const acip_server_callbacks_t g_acip_server_callbacks = {
2610 .on_protocol_version = acip_server_on_protocol_version,
2611 .on_image_frame = acip_server_on_image_frame,
2612 .on_audio = acip_server_on_audio,
2613 .on_audio_batch = acip_server_on_audio_batch,
2614 .on_audio_opus = acip_server_on_audio_opus,
2615 .on_audio_opus_batch = acip_server_on_audio_opus_batch,
2616 .on_client_join = acip_server_on_client_join,
2617 .on_client_leave = acip_server_on_client_leave,
2618 .on_stream_start = acip_server_on_stream_start,
2619 .on_stream_stop = acip_server_on_stream_stop,
2620 .on_capabilities = acip_server_on_capabilities,
2621 .on_ping = acip_server_on_ping,
2622 .on_pong = acip_server_on_pong,
2623 .on_error = acip_server_on_error,
2624 .on_remote_log = acip_server_on_remote_log,
2625 .on_crypto_rekey_request = acip_server_on_crypto_rekey_request,
2626 .on_crypto_rekey_response = acip_server_on_crypto_rekey_response,
2627 .on_crypto_rekey_complete = acip_server_on_crypto_rekey_complete,
2628 .on_crypto_key_exchange_resp = acip_server_on_crypto_key_exchange_resp,
2629 .on_crypto_auth_response = acip_server_on_crypto_auth_response,
2630 .on_crypto_no_encryption = acip_server_on_crypto_no_encryption,
2631 .app_ctx = NULL // Not used - client context passed per-call
2632};
2633
2634// Callback implementations (delegate to existing handlers)
2635
2636static void acip_server_on_protocol_version(const protocol_version_packet_t *version, void *client_ctx, void *app_ctx) {
2637 // TODO: Use app_ctx for context-aware protocol handling or metrics collection in future versions
2638 (void)app_ctx;
2639 client_info_t *client = (client_info_t *)client_ctx;
2640 handle_protocol_version_packet(client, (void *)version, sizeof(*version));
2641}
2642
2643static void acip_server_on_image_frame(const image_frame_packet_t *header, const void *pixel_data, size_t data_len,
2644 void *client_ctx, void *app_ctx) {
2645 (void)app_ctx;
2646 uint64_t callback_start_ns = time_get_ns();
2647 client_info_t *client = (client_info_t *)client_ctx;
2648
2649 log_info("CALLBACK_IMAGE_FRAME: client_id=%u, width=%u, height=%u, pixel_format=%u, compressed_size=%u, data_len=%zu",
2650 atomic_load(&client->client_id), header->width, header->height, header->pixel_format,
2651 header->compressed_size, data_len);
2652
2653 // Validate frame dimensions to prevent DoS and buffer overflow attacks
2654 if (header->width == 0 || header->height == 0) {
2655 log_error("Invalid image dimensions: %ux%u (width and height must be > 0)", header->width, header->height);
2656 disconnect_client_for_bad_data(client, "IMAGE_FRAME invalid dimensions");
2657 return;
2658 }
2659
2660 const uint32_t MAX_WIDTH = 8192;
2661 const uint32_t MAX_HEIGHT = 8192;
2662 if (header->width > MAX_WIDTH || header->height > MAX_HEIGHT) {
2663 log_error("Image dimensions too large: %ux%u (max: %ux%u)", header->width, header->height, MAX_WIDTH, MAX_HEIGHT);
2664 disconnect_client_for_bad_data(client, "IMAGE_FRAME dimensions too large");
2665 return;
2666 }
2667
2668 // Auto-set dimensions from IMAGE_FRAME if not already set (fallback for missing CLIENT_CAPABILITIES)
2669 // This ensures render thread can start even if CLIENT_CAPABILITIES was never sent
2670 if (atomic_load(&client->width) == 0 || atomic_load(&client->height) == 0) {
2671 atomic_store(&client->width, header->width);
2672 atomic_store(&client->height, header->height);
2673 log_info("Client %u: Auto-set dimensions from IMAGE_FRAME: %ux%u (CLIENT_CAPABILITIES not received)",
2674 atomic_load(&client->client_id), header->width, header->height);
2675 }
2676
2677 // Auto-enable video stream if not already enabled
2678 bool was_sending_video = atomic_load(&client->is_sending_video);
2679 if (!was_sending_video) {
2680 if (atomic_compare_exchange_strong(&client->is_sending_video, &was_sending_video, true)) {
2681 log_info("Client %u auto-enabled video stream (received IMAGE_FRAME)", atomic_load(&client->client_id));
2682 log_info_client(client, "First video frame received - streaming active");
2683 }
2684 } else {
2685 // Log periodically
2686 mutex_lock(&client->client_state_mutex);
2687 client->frames_received_logged++;
2688 if (client->frames_received_logged % 25000 == 0) {
2689 char pretty[64];
2690 format_bytes_pretty(data_len, pretty, sizeof(pretty));
2691 log_debug("Client %u has sent %u IMAGE_FRAME packets (%s)", atomic_load(&client->client_id),
2692 client->frames_received_logged, pretty);
2693 }
2694 mutex_unlock(&client->client_state_mutex);
2695 }
2696
2697 // Compute hash of incoming pixel data to detect duplicates
2698 uint32_t incoming_pixel_hash = 0;
2699 for (size_t i = 0; i < data_len && i < 1000; i++) {
2700 incoming_pixel_hash = (uint32_t)((uint64_t)incoming_pixel_hash * 31 + ((unsigned char *)pixel_data)[i]);
2701 }
2702
2703 // Per-client hash tracking to detect duplicate frames
2704 uint32_t client_id = atomic_load(&client->client_id);
2705 bool is_new_frame = (incoming_pixel_hash != client->last_received_frame_hash);
2706
2707 // Inspect first few pixels of incoming frame
2708 uint32_t first_pixel_rgb = 0;
2709 if (data_len >= 3) {
2710 first_pixel_rgb = ((uint32_t)((unsigned char *)pixel_data)[0] << 16) |
2711 ((uint32_t)((unsigned char *)pixel_data)[1] << 8) | (uint32_t)((unsigned char *)pixel_data)[2];
2712 }
2713
2714 if (is_new_frame) {
2715 log_info("RECV_FRAME #%u NEW: Client %u dimensions=%ux%u pixel_size=%zu hash=0x%08x first_rgb=0x%06x (prev=0x%08x)",
2716 client->frames_received, client_id, header->width, header->height, data_len, incoming_pixel_hash,
2717 first_pixel_rgb, client->last_received_frame_hash);
2718 client->last_received_frame_hash = incoming_pixel_hash;
2719 } else {
2720 log_info("RECV_FRAME #%u DUP: Client %u dimensions=%ux%u pixel_size=%zu hash=0x%08x first_rgb=0x%06x",
2721 client->frames_received, client_id, header->width, header->height, data_len, incoming_pixel_hash,
2722 first_pixel_rgb);
2723 }
2724
2725 // Store frame data directly to incoming_video_buffer (don't wait for legacy handler)
2726 // This ensures frame data is available immediately for the render thread
2727 if (client->incoming_video_buffer) {
2728 video_frame_t *frame = video_frame_begin_write(client->incoming_video_buffer);
2729 log_info("STORE_FRAME: client_id=%u, frame_ptr=%p, frame->data=%p", atomic_load(&client->client_id), (void *)frame,
2730 frame ? frame->data : NULL);
2731 if (frame && frame->data && data_len > 0) {
2732 // Store frame data: [width:4][height:4][pixel_data]
2733 uint32_t width_net = HOST_TO_NET_U32(header->width);
2734 uint32_t height_net = HOST_TO_NET_U32(header->height);
2735 size_t total_size = sizeof(uint32_t) * 2 + data_len;
2736
2737 log_info("STORE_FRAME_DATA: total_size=%zu, max_allowed=2097152, fits=%d", total_size,
2738 total_size <= 2 * 1024 * 1024);
2739
2740 if (total_size <= 2 * 1024 * 1024) { // Max 2MB
2741 memcpy(frame->data, &width_net, sizeof(uint32_t));
2742 memcpy((char *)frame->data + sizeof(uint32_t), &height_net, sizeof(uint32_t));
2743 memcpy((char *)frame->data + sizeof(uint32_t) * 2, pixel_data, data_len);
2744 frame->size = total_size;
2745 frame->width = header->width;
2746 frame->height = header->height;
2747 frame->capture_timestamp_ns = (uint64_t)time(NULL) * NS_PER_SEC_INT;
2748 frame->sequence_number = ++client->frames_received;
2749 video_frame_commit(client->incoming_video_buffer);
2750 log_info("FRAME_COMMITTED: client_id=%u, seq=%u, size=%zu hash=0x%08x", atomic_load(&client->client_id),
2751 frame->sequence_number, total_size, incoming_pixel_hash);
2752 } else {
2753 log_warn("FRAME_TOO_LARGE: client_id=%u, size=%zu > max 2MB", atomic_load(&client->client_id), total_size);
2754 }
2755 } else {
2756 log_warn("STORE_FRAME_FAILED: frame_ptr=%p, frame->data=%p, data_len=%zu", (void *)frame,
2757 frame ? frame->data : NULL, data_len);
2758 }
2759 } else {
2760 log_warn("NO_INCOMING_VIDEO_BUFFER: client_id=%u", atomic_load(&client->client_id));
2761 }
2762
2763 uint64_t callback_end_ns = time_get_ns();
2764 char cb_duration_str[32];
2765 format_duration_ns((double)(callback_end_ns - callback_start_ns), cb_duration_str, sizeof(cb_duration_str));
2766 log_info("[WS_TIMING] on_image_frame callback took %s (data_len=%zu)", cb_duration_str, data_len);
2767}
2768
2769static void acip_server_on_audio(const void *audio_data, size_t audio_len, void *client_ctx, void *app_ctx) {
2770 (void)app_ctx;
2771 client_info_t *client = (client_info_t *)client_ctx;
2772 handle_audio_packet(client, (void *)audio_data, audio_len);
2773}
2774
2775static void acip_server_on_audio_batch(const audio_batch_packet_t *header, const float *samples, size_t num_samples,
2776 void *client_ctx, void *app_ctx) {
2777 (void)app_ctx;
2778 (void)header; // Header info not needed - ACIP already validated
2779 client_info_t *client = (client_info_t *)client_ctx;
2780
2781 // ACIP handler already dequantized samples - write directly to audio buffer
2782 // This is more efficient than calling the existing handler which would re-dequantize
2783 log_debug_every(LOG_RATE_DEFAULT, "Received audio batch from client %u (samples=%zu, is_sending_audio=%d)",
2784 atomic_load(&client->client_id), num_samples, atomic_load(&client->is_sending_audio));
2785
2786 if (!atomic_load(&client->is_sending_audio)) {
2787 log_debug("Ignoring audio batch - client %u not in audio streaming mode", client->client_id);
2788 return;
2789 }
2790
2791 if (client->incoming_audio_buffer) {
2792 asciichat_error_t write_result =
2793 audio_ring_buffer_write(client->incoming_audio_buffer, (float *)samples, num_samples);
2794 if (write_result != ASCIICHAT_OK) {
2795 log_error("Failed to write decoded audio batch to buffer: %s", asciichat_error_string(write_result));
2796 }
2797 }
2798}
2799
2800static void acip_server_on_audio_opus(const void *opus_data, size_t opus_len, void *client_ctx, void *app_ctx) {
2801 (void)app_ctx;
2802 client_info_t *client = (client_info_t *)client_ctx;
2803
2804 // Special handling: Convert single-frame Opus to batch format
2805 // This maintains compatibility with existing server-side Opus batch processing
2806
2807 if (opus_len < 16) {
2808 log_warn("AUDIO_OPUS packet too small: %zu bytes", opus_len);
2809 return;
2810 }
2811
2812 const uint8_t *payload = (const uint8_t *)opus_data;
2813 // Use unaligned read helpers - network data may not be aligned
2814 int sample_rate = (int)NET_TO_HOST_U32(read_u32_unaligned(payload));
2815 int frame_duration = (int)NET_TO_HOST_U32(read_u32_unaligned(payload + 4));
2816 // Reserved bytes at offset 8-15
2817 size_t actual_opus_size = opus_len - 16;
2818
2819 if (actual_opus_size > 0 && actual_opus_size <= 1024 && sample_rate == 48000 && frame_duration == 20) {
2820 // Create a synthetic Opus batch packet (frame_count=1)
2821 uint8_t batch_buffer[1024 + 20]; // Max Opus + header
2822 uint8_t *batch_ptr = batch_buffer;
2823
2824 // Write batch header (batch_buffer is stack-aligned, writes are safe)
2825 write_u32_unaligned(batch_ptr, HOST_TO_NET_U32((uint32_t)sample_rate));
2826 batch_ptr += 4;
2827 write_u32_unaligned(batch_ptr, HOST_TO_NET_U32((uint32_t)frame_duration));
2828 batch_ptr += 4;
2829 write_u32_unaligned(batch_ptr, HOST_TO_NET_U32(1)); // frame_count = 1
2830 batch_ptr += 4;
2831 memset(batch_ptr, 0, 4); // reserved
2832 batch_ptr += 4;
2833
2834 // Write frame size
2835 write_u16_unaligned(batch_ptr, HOST_TO_NET_U16((uint16_t)actual_opus_size));
2836 batch_ptr += 2;
2837
2838 // Write Opus data
2839 memcpy(batch_ptr, payload + 16, actual_opus_size);
2840 batch_ptr += actual_opus_size;
2841
2842 // Process as batch packet
2843 size_t batch_size = (size_t)(batch_ptr - batch_buffer);
2844 handle_audio_opus_batch_packet(client, batch_buffer, batch_size);
2845 }
2846}
2847
2848static void acip_server_on_audio_opus_batch(const void *batch_data, size_t batch_len, void *client_ctx, void *app_ctx) {
2849 (void)app_ctx;
2850 client_info_t *client = (client_info_t *)client_ctx;
2851 handle_audio_opus_batch_packet(client, (void *)batch_data, batch_len);
2852}
2853
2854static void acip_server_on_client_join(const void *join_data, size_t data_len, void *client_ctx, void *app_ctx) {
2855 (void)app_ctx;
2856 client_info_t *client = (client_info_t *)client_ctx;
2857 handle_client_join_packet(client, (void *)join_data, data_len);
2858}
2859
2860static void acip_server_on_client_leave(void *client_ctx, void *app_ctx) {
2861 (void)app_ctx;
2862 client_info_t *client = (client_info_t *)client_ctx;
2863 handle_client_leave_packet(client, NULL, 0);
2864}
2865
2866static void acip_server_on_stream_start(uint32_t stream_types, void *client_ctx, void *app_ctx) {
2867 (void)app_ctx;
2868 client_info_t *client = (client_info_t *)client_ctx;
2869 // ACIP layer provides stream_types in host byte order, but handle_stream_start_packet()
2870 // expects network byte order (it does NET_TO_HOST_U32 internally)
2871 uint32_t stream_types_net = HOST_TO_NET_U32(stream_types);
2872 handle_stream_start_packet(client, &stream_types_net, sizeof(stream_types_net));
2873}
2874
2875static void acip_server_on_stream_stop(uint32_t stream_types, void *client_ctx, void *app_ctx) {
2876 (void)app_ctx;
2877 client_info_t *client = (client_info_t *)client_ctx;
2878 // ACIP layer provides stream_types in host byte order, but handle_stream_stop_packet()
2879 // expects network byte order (it does NET_TO_HOST_U32 internally)
2880 uint32_t stream_types_net = HOST_TO_NET_U32(stream_types);
2881 handle_stream_stop_packet(client, &stream_types_net, sizeof(stream_types_net));
2882}
2883
2884static void acip_server_on_capabilities(const void *cap_data, size_t data_len, void *client_ctx, void *app_ctx) {
2885 (void)app_ctx;
2886 client_info_t *client = (client_info_t *)client_ctx;
2887 handle_client_capabilities_packet(client, (void *)cap_data, data_len);
2888}
2889
2890static void acip_server_on_ping(void *client_ctx, void *app_ctx) {
2891 (void)app_ctx;
2892 client_info_t *client = (client_info_t *)client_ctx;
2893
2894 // Respond with PONG using ACIP transport
2895 // Get transport reference briefly to avoid deadlock on TCP buffer full
2896 mutex_lock(&client->send_mutex);
2897 if (atomic_load(&client->shutting_down) || !client->transport) {
2898 mutex_unlock(&client->send_mutex);
2899 return; // Client is shutting down, skip pong
2900 }
2901 acip_transport_t *pong_transport = client->transport;
2902 mutex_unlock(&client->send_mutex);
2903
2904 // Network I/O happens OUTSIDE the mutex
2905 asciichat_error_t pong_result = acip_send_pong(pong_transport);
2906
2907 if (pong_result != ASCIICHAT_OK) {
2908 SET_ERRNO(ERROR_NETWORK, "Failed to send PONG response to client %u: %s", client->client_id,
2909 asciichat_error_string(pong_result));
2910 }
2911}
2912
2913static void acip_server_on_pong(void *client_ctx, void *app_ctx) {
2914 (void)client_ctx;
2915 (void)app_ctx;
2916 // Client acknowledged our PING - no action needed
2917}
2918
2919static void acip_server_on_error(const error_packet_t *header, const char *message, void *client_ctx, void *app_ctx) {
2920 (void)app_ctx;
2921 client_info_t *client = (client_info_t *)client_ctx;
2922
2923 // Reconstruct full packet for existing handler
2924 size_t msg_len = strlen(message);
2925 size_t total_len = sizeof(*header) + msg_len;
2926 uint8_t *full_packet = SAFE_MALLOC(total_len, uint8_t *);
2927 if (!full_packet) {
2928 log_error("Failed to allocate buffer for ERROR_MESSAGE reconstruction");
2929 return;
2930 }
2931
2932 memcpy(full_packet, header, sizeof(*header));
2933 memcpy(full_packet + sizeof(*header), message, msg_len);
2934
2935 handle_client_error_packet(client, full_packet, total_len);
2936 SAFE_FREE(full_packet);
2937}
2938
2939static void acip_server_on_remote_log(const remote_log_packet_t *header, const char *message, void *client_ctx,
2940 void *app_ctx) {
2941 (void)app_ctx;
2942 client_info_t *client = (client_info_t *)client_ctx;
2943
2944 // Reconstruct full packet for existing handler
2945 size_t msg_len = strlen(message);
2946 size_t total_len = sizeof(*header) + msg_len;
2947 uint8_t *full_packet = SAFE_MALLOC(total_len, uint8_t *);
2948 if (!full_packet) {
2949 log_error("Failed to allocate buffer for REMOTE_LOG reconstruction");
2950 return;
2951 }
2952
2953 memcpy(full_packet, header, sizeof(*header));
2954 memcpy(full_packet + sizeof(*header), message, msg_len);
2955
2956 handle_remote_log_packet_from_client(client, full_packet, total_len);
2957 SAFE_FREE(full_packet);
2958}
2959
2960static void acip_server_on_crypto_rekey_request(const void *payload, size_t payload_len, void *client_ctx,
2961 void *app_ctx) {
2962 (void)app_ctx;
2963 client_info_t *client = (client_info_t *)client_ctx;
2964
2965 log_debug("Received REKEY_REQUEST from client %u", client->client_id);
2966
2967 // Process the client's rekey request
2968 mutex_lock(&client->client_state_mutex);
2969 asciichat_error_t crypto_result =
2970 crypto_handshake_process_rekey_request(&client->crypto_handshake_ctx, (void *)payload, payload_len);
2971 mutex_unlock(&client->client_state_mutex);
2972
2973 if (crypto_result != ASCIICHAT_OK) {
2974 log_error("Failed to process REKEY_REQUEST from client %u: %d", client->client_id, crypto_result);
2975 return;
2976 }
2977
2978 // Send REKEY_RESPONSE
2979 mutex_lock(&client->client_state_mutex);
2980 // Get socket reference briefly to avoid deadlock on TCP buffer full
2981 mutex_lock(&client->send_mutex);
2982 if (atomic_load(&client->shutting_down) || client->socket == INVALID_SOCKET_VALUE) {
2983 mutex_unlock(&client->send_mutex);
2984 mutex_unlock(&client->client_state_mutex);
2985 return; // Client is shutting down
2986 }
2987 socket_t rekey_socket = client->socket;
2988 mutex_unlock(&client->send_mutex);
2989
2990 // Network I/O happens OUTSIDE the send_mutex (client_state_mutex still held for crypto state)
2991 crypto_result = crypto_handshake_rekey_response(&client->crypto_handshake_ctx, rekey_socket);
2992 mutex_unlock(&client->client_state_mutex);
2993
2994 if (crypto_result != ASCIICHAT_OK) {
2995 log_error("Failed to send REKEY_RESPONSE to client %u: %d", client->client_id, crypto_result);
2996 } else {
2997 log_debug("Sent REKEY_RESPONSE to client %u", client->client_id);
2998 }
2999}
3000
3001static void acip_server_on_crypto_rekey_response(const void *payload, size_t payload_len, void *client_ctx,
3002 void *app_ctx) {
3003 (void)app_ctx;
3004 client_info_t *client = (client_info_t *)client_ctx;
3005
3006 log_debug("Received REKEY_RESPONSE from client %u", client->client_id);
3007
3008 // Process the client's rekey response
3009 mutex_lock(&client->client_state_mutex);
3010 asciichat_error_t crypto_result =
3011 crypto_handshake_process_rekey_response(&client->crypto_handshake_ctx, (void *)payload, payload_len);
3012 mutex_unlock(&client->client_state_mutex);
3013
3014 if (crypto_result != ASCIICHAT_OK) {
3015 log_error("Failed to process REKEY_RESPONSE from client %u: %d", client->client_id, crypto_result);
3016 return;
3017 }
3018
3019 // Send REKEY_COMPLETE to confirm and activate new key
3020 mutex_lock(&client->client_state_mutex);
3021 // Get socket reference briefly to avoid deadlock on TCP buffer full
3022 mutex_lock(&client->send_mutex);
3023 if (atomic_load(&client->shutting_down) || client->socket == INVALID_SOCKET_VALUE) {
3024 mutex_unlock(&client->send_mutex);
3025 mutex_unlock(&client->client_state_mutex);
3026 return; // Client is shutting down
3027 }
3028 socket_t complete_socket = client->socket;
3029 mutex_unlock(&client->send_mutex);
3030
3031 // Network I/O happens OUTSIDE the send_mutex (client_state_mutex still held for crypto state)
3032 crypto_result = crypto_handshake_rekey_complete(&client->crypto_handshake_ctx, complete_socket);
3033 mutex_unlock(&client->client_state_mutex);
3034
3035 if (crypto_result != ASCIICHAT_OK) {
3036 log_error("Failed to send REKEY_COMPLETE to client %u: %d", client->client_id, crypto_result);
3037 } else {
3038 log_debug("Sent REKEY_COMPLETE to client %u - session rekeying complete", client->client_id);
3039 }
3040}
3041
3042static void acip_server_on_crypto_rekey_complete(const void *payload, size_t payload_len, void *client_ctx,
3043 void *app_ctx) {
3044 (void)app_ctx;
3045 client_info_t *client = (client_info_t *)client_ctx;
3046
3047 log_debug("Received REKEY_COMPLETE from client %u", client->client_id);
3048
3049 // Process and commit to new key
3050 mutex_lock(&client->client_state_mutex);
3051 asciichat_error_t crypto_result =
3052 crypto_handshake_process_rekey_complete(&client->crypto_handshake_ctx, (void *)payload, payload_len);
3053 mutex_unlock(&client->client_state_mutex);
3054
3055 if (crypto_result != ASCIICHAT_OK) {
3056 log_error("Failed to process REKEY_COMPLETE from client %u: %d", client->client_id, crypto_result);
3057 } else {
3058 log_debug("Session rekeying completed successfully with client %u", client->client_id);
3059 // Notify client that rekeying is complete (new keys now active on both sides)
3060 log_info_client(client, "Session rekey complete - new encryption keys active");
3061 }
3062}
3063
3064static void acip_server_on_crypto_key_exchange_resp(packet_type_t type, const void *payload, size_t payload_len,
3065 void *client_ctx, void *app_ctx) {
3066 (void)app_ctx;
3067 client_info_t *client = (client_info_t *)client_ctx;
3068
3069 log_debug("Received CRYPTO_KEY_EXCHANGE_RESP from client %u", client->client_id);
3070
3071 // Call refactored handshake function from Phase 2
3072 asciichat_error_t result = crypto_handshake_server_auth_challenge(&client->crypto_handshake_ctx, client->transport,
3073 type, payload, payload_len);
3074
3075 if (result != ASCIICHAT_OK) {
3076 log_error("Crypto handshake auth challenge failed for client %u", client->client_id);
3077 disconnect_client_for_bad_data(client, "Crypto handshake auth challenge failed");
3078 } else {
3079 // Check if handshake completed (no-auth flow) or if AUTH_CHALLENGE was sent (with-auth flow)
3080 if (client->crypto_handshake_ctx.state == CRYPTO_HANDSHAKE_READY) {
3081 // No-auth flow: handshake complete, HANDSHAKE_COMPLETE was sent
3082 log_info("Crypto handshake completed successfully for client %u (no authentication)", client->client_id);
3083 client->crypto_initialized = true;
3084 client->transport->crypto_ctx = &client->crypto_handshake_ctx.crypto_ctx;
3085 } else {
3086 // With-auth flow: AUTH_CHALLENGE was sent, waiting for AUTH_RESPONSE
3087 log_debug("Sent AUTH_CHALLENGE to client %u", client->client_id);
3088 }
3089 }
3090}
3091
3092static void acip_server_on_crypto_auth_response(packet_type_t type, const void *payload, size_t payload_len,
3093 void *client_ctx, void *app_ctx) {
3094 (void)app_ctx;
3095 client_info_t *client = (client_info_t *)client_ctx;
3096
3097 log_debug("Received CRYPTO_AUTH_RESPONSE from client %u", client->client_id);
3098
3099 // Call refactored handshake function from Phase 2
3100 asciichat_error_t result =
3101 crypto_handshake_server_complete(&client->crypto_handshake_ctx, client->transport, type, payload, payload_len);
3102
3103 if (result != ASCIICHAT_OK) {
3104 log_error("Crypto handshake complete failed for client %u", client->client_id);
3105 disconnect_client_for_bad_data(client, "Crypto handshake complete failed");
3106 } else {
3107 log_info("Crypto handshake completed successfully for client %u", client->client_id);
3108 log_error("[CRYPTO_SETUP] Setting crypto context for client %u: transport=%p, crypto_ctx=%p", client->client_id,
3109 (void *)client->transport, (void *)&client->crypto_handshake_ctx.crypto_ctx);
3110 client->crypto_initialized = true;
3111 client->transport->crypto_ctx = &client->crypto_handshake_ctx.crypto_ctx;
3112 log_error("[CRYPTO_SETUP] Crypto context SET: transport->crypto_ctx=%p", (void *)client->transport->crypto_ctx);
3113 }
3114}
3115
3116static void acip_server_on_crypto_no_encryption(packet_type_t type, const void *payload, size_t payload_len,
3117 void *client_ctx, void *app_ctx) {
3118 (void)app_ctx;
3119 (void)type;
3120 (void)payload;
3121 (void)payload_len;
3122 client_info_t *client = (client_info_t *)client_ctx;
3123
3124 log_error("Client %u sent NO_ENCRYPTION - encryption mode mismatch", client->client_id);
3125 disconnect_client_for_bad_data(client, "Encryption mode mismatch - server requires encryption");
3126}
3127
3136void process_decrypted_packet(client_info_t *client, packet_type_t type, void *data, size_t len) {
3137 if (type == 5000) { // CLIENT_CAPABILITIES
3138 log_debug("CLIENT: client_id=%u, data=%p, len=%zu", atomic_load(&client->client_id), data, len);
3139 }
3140
3141 // Rate limiting: Check and record packet-specific rate limits
3142 if (g_rate_limiter) {
3143 if (!check_and_record_packet_rate_limit(g_rate_limiter, client->client_ip, client->socket, type)) {
3144 // Rate limit exceeded - error response already sent by utility function
3145 return;
3146 }
3147 }
3148
3149 // O(1) dispatch via hash table lookup
3150 int idx = client_dispatch_hash_lookup(g_client_dispatch_hash, type);
3151 if (type == 5000 || type == 3001) {
3152 log_error("DISPATCH_LOOKUP: type=%d, idx=%d (len=%zu)", type, idx, len);
3153 }
3154 if (idx < 0) {
3155 disconnect_client_for_bad_data(client, "Unknown packet type: %d (len=%zu)", type, len);
3156 return;
3157 }
3158
3159 if (type == 5000 || type == 3001) {
3160 log_error("DISPATCH_HANDLER: type=%d, calling handler[%d]...", type, idx);
3161 }
3162 g_client_dispatch_handlers[idx](client, data, len);
3163 if (type == 5000 || type == 3001) {
3164 log_error("DISPATCH_HANDLER: type=%d, handler returned", type);
3165 }
3166}
void asciichat_errno_destroy(void)
void buffer_pool_free(buffer_pool_t *pool, void *data, size_t size)
void * buffer_pool_alloc(buffer_pool_t *pool, size_t size)
Definition buffer_pool.c:99
Per-client state management and lifecycle orchestration.
asciichat_error_t crypto_handshake_rekey_complete(crypto_handshake_context_t *ctx, socket_t socket)
const crypto_context_t * crypto_handshake_get_context(const crypto_handshake_context_t *ctx)
void crypto_handshake_destroy(crypto_handshake_context_t *ctx)
asciichat_error_t crypto_handshake_process_rekey_request(crypto_handshake_context_t *ctx, const uint8_t *packet, size_t packet_len)
bool crypto_handshake_is_ready(const crypto_handshake_context_t *ctx)
bool crypto_handshake_should_rekey(const crypto_handshake_context_t *ctx)
asciichat_error_t crypto_handshake_rekey_response(crypto_handshake_context_t *ctx, socket_t socket)
asciichat_error_t crypto_handshake_process_rekey_response(crypto_handshake_context_t *ctx, const uint8_t *packet, size_t packet_len)
asciichat_error_t crypto_handshake_process_rekey_complete(crypto_handshake_context_t *ctx, const uint8_t *packet, size_t packet_len)
asciichat_error_t crypto_handshake_rekey_request(crypto_handshake_context_t *ctx, socket_t socket)
bool check_and_record_packet_rate_limit(rate_limiter_t *rate_limiter, const char *client_ip, socket_t client_socket, packet_type_t packet_type)
Definition errors.c:54
void handle_audio_opus_batch_packet(client_info_t *client, const void *data, size_t len)
Process AUDIO_OPUS_BATCH packet - efficient Opus-encoded audio batch from client.
uint32_t session_host_add_client(session_host_t *host, socket_t socket, const char *ip, int port)
Definition host.c:928
asciichat_error_t session_host_remove_client(session_host_t *host, uint32_t client_id)
Definition host.c:1160
int socket_t
asciichat_error_t audio_ring_buffer_write(audio_ring_buffer_t *rb, const float *data, int samples)
void audio_ring_buffer_destroy(audio_ring_buffer_t *rb)
audio_ring_buffer_t * audio_ring_buffer_create_for_capture(void)
const char * crypto_result_to_string(crypto_result_t result)
crypto_result_t crypto_decrypt(crypto_context_t *ctx, const uint8_t *ciphertext, size_t ciphertext_len, uint8_t *plaintext_out, size_t plaintext_out_size, size_t *plaintext_len_out)
asciichat_error_t crypto_handshake_server_complete(crypto_handshake_context_t *ctx, acip_transport_t *transport, packet_type_t packet_type, const uint8_t *payload, size_t payload_len)
asciichat_error_t crypto_handshake_server_auth_challenge(crypto_handshake_context_t *ctx, acip_transport_t *transport, packet_type_t packet_type, const uint8_t *payload, size_t payload_len)
asciichat_error_t acip_send_server_state(acip_transport_t *transport, const server_state_packet_t *state)
asciichat_error_t acip_server_receive_and_dispatch(acip_transport_t *transport, void *client_ctx, const acip_server_callbacks_t *callbacks)
asciichat_error_t acip_send_clear_console(acip_transport_t *transport)
asciichat_error_t acip_send_ascii_frame(acip_transport_t *transport, const char *frame_data, size_t frame_size, uint32_t width, uint32_t height, uint32_t client_id)
asciichat_error_t tcp_server_spawn_thread(tcp_server_t *server, socket_t client_socket, void *(*thread_func)(void *), void *thread_arg, int stop_id, const char *thread_name)
asciichat_error_t tcp_server_stop_client_threads(tcp_server_t *server, socket_t client_socket)
asciichat_error_t tcp_server_add_client(tcp_server_t *server, socket_t socket, void *client_data)
int mixer_add_source(mixer_t *mixer, uint32_t client_id, audio_ring_buffer_t *buffer)
Definition mixer.c:364
void mixer_remove_source(mixer_t *mixer, uint32_t client_id)
Definition mixer.c:401
asciichat_error_t acip_handle_server_packet(acip_transport_t *transport, packet_type_t type, const void *payload, size_t payload_len, void *client_ctx, const acip_server_callbacks_t *callbacks)
asciichat_error_t set_socket_timeout(socket_t sockfd, uint64_t timeout_ns)
Set socket timeout.
asciichat_error_t set_socket_keepalive(socket_t sockfd)
Set socket keepalive.
const char * network_error_string()
Get human-readable error string for network errors.
void opus_codec_destroy(opus_codec_t *codec)
Definition opus_codec.c:215
asciichat_error_t packet_parse_error_message(const void *data, size_t len, asciichat_error_t *out_error_code, char *message_buffer, size_t message_buffer_size, size_t *out_message_length)
Definition packet.c:851
packet_recv_result_t receive_packet_secure(socket_t sockfd, void *crypto_ctx, bool enforce_encryption, packet_envelope_t *envelope)
Receive a packet with decryption and decompression support.
Definition packet.c:566
void packet_queue_stop(packet_queue_t *queue)
packet_queue_t * packet_queue_create_with_pools(size_t max_size, size_t node_pool_size, bool use_buffer_pool)
void packet_queue_free_packet(queued_packet_t *packet)
int packet_queue_enqueue(packet_queue_t *queue, packet_type_t type, const void *data, size_t data_len, uint32_t client_id, bool copy_data)
queued_packet_t * packet_queue_try_dequeue(packet_queue_t *queue)
void packet_queue_destroy(packet_queue_t *queue)
void platform_sleep_us(unsigned int us)
void platform_sleep_ms(unsigned int ms)
Per-client rendering threads with rate limiting.
asciichat_error_t acip_send_pong(acip_transport_t *transport)
Definition send.c:212
asciichat_error_t acip_send_audio_opus_batch(acip_transport_t *transport, const void *opus_data, size_t opus_len, const uint16_t *frame_sizes, uint32_t frame_count, uint32_t sample_rate, uint32_t frame_duration)
Definition send.c:158
asciichat_error_t acip_send_audio_batch(acip_transport_t *transport, const float *samples, uint32_t num_samples, uint32_t batch_count)
Definition send.c:115
asciichat_error_t packet_send_via_transport(acip_transport_t *transport, packet_type_t type, const void *payload, size_t payload_len, uint32_t client_id)
Send packet via transport with proper header (exported for generic wrappers)
Definition send.c:41
Server cryptographic operations and per-client handshake management.
rate_limiter_t * g_rate_limiter
Global rate limiter for connection attempts and packet processing.
mixer_t *volatile g_audio_mixer
Global audio mixer instance for multi-client audio processing.
atomic_bool g_server_should_exit
Global atomic shutdown flag shared across all threads.
ascii-chat Server Mode Entry Point Header
void handle_client_join_packet(client_info_t *client, const void *data, size_t len)
Process CLIENT_JOIN packet - client announces identity and capabilities.
void handle_pong_packet(client_info_t *client, const void *data, size_t len)
Handle PONG packet - client acknowledged our PING.
void handle_protocol_version_packet(client_info_t *client, const void *data, size_t len)
Process PROTOCOL_VERSION packet - validate protocol compatibility.
void handle_image_frame_packet(client_info_t *client, void *data, size_t len)
Process IMAGE_FRAME packet - store client's video data for rendering.
void handle_audio_batch_packet(client_info_t *client, const void *data, size_t len)
Process AUDIO_BATCH packet - store efficiently batched audio samples.
void handle_audio_packet(client_info_t *client, const void *data, size_t len)
Process AUDIO packet - store single audio sample batch (legacy format)
void handle_client_leave_packet(client_info_t *client, const void *data, size_t len)
Process CLIENT_LEAVE packet - handle clean client disconnect.
void handle_stream_stop_packet(client_info_t *client, const void *data, size_t len)
Process STREAM_STOP packet - client requests to halt media transmission.
int send_server_state_to_client(client_info_t *client)
Send current server state to a specific client.
void handle_client_capabilities_packet(client_info_t *client, const void *data, size_t len)
Process CLIENT_CAPABILITIES packet - configure client-specific rendering.
void handle_stream_start_packet(client_info_t *client, const void *data, size_t len)
Process STREAM_START packet - client requests to begin media transmission.
void handle_remote_log_packet_from_client(client_info_t *client, const void *data, size_t len)
void disconnect_client_for_bad_data(client_info_t *client, const char *format,...)
void handle_ping_packet(client_info_t *client, const void *data, size_t len)
Handle PING packet - respond with PONG.
Server packet processing and protocol implementation.
int start_webrtc_client_threads(server_context_t *server_ctx, uint32_t client_id)
Start threads for a WebRTC client after crypto initialization.
void * client_send_thread_func(void *arg)
Client packet send thread.
client_info_t * find_client_by_id(uint32_t client_id)
Fast O(1) client lookup by ID using hash table.
#define CLIENT_DISPATCH_HANDLER_COUNT
int add_client(server_context_t *server_ctx, socket_t socket, const char *client_ip, int port)
void cleanup_client_media_buffers(client_info_t *client)
void process_decrypted_packet(client_info_t *client, packet_type_t type, void *data, size_t len)
void cleanup_client_packet_queues(client_info_t *client)
rwlock_t g_client_manager_rwlock
Reader-writer lock protecting the global client manager.
void * client_receive_thread(void *arg)
int remove_client(server_context_t *server_ctx, uint32_t client_id)
void stop_client_threads(client_info_t *client)
void(* client_packet_handler_t)(client_info_t *client, const void *data, size_t len)
int add_webrtc_client(server_context_t *server_ctx, acip_transport_t *transport, const char *client_ip, bool start_threads)
Register a WebRTC client with the server.
client_info_t * find_client_by_socket(socket_t socket)
Find client by socket descriptor using linear search.
int process_encrypted_packet(client_info_t *client, packet_type_t *type, void **data, size_t *len, uint32_t *sender_id)
#define MAX_AUDIO_BATCH
void * client_dispatch_thread(void *arg)
Async dispatch thread for WebRTC clients.
client_manager_t g_client_manager
Global client manager singleton - central coordination point.
#define CLIENT_DISPATCH_HASH(type)
void broadcast_server_state_to_all_clients(void)
Notify all clients of state changes.
#define CLIENT_DISPATCH_HASH_SIZE
int crypto_server_decrypt_packet(uint32_t client_id, const uint8_t *ciphertext, size_t ciphertext_len, uint8_t *plaintext, size_t plaintext_size, size_t *plaintext_len)
const crypto_context_t * crypto_server_get_context(uint32_t client_id)
int server_crypto_init(void)
bool crypto_server_is_ready(uint32_t client_id)
int server_crypto_handshake(client_info_t *client)
int create_client_render_threads(server_context_t *server_ctx, client_info_t *client)
Create and initialize per-client rendering threads.
Multi-client video mixing and ASCII frame generation.
Hash table entry for client packet dispatch.
uint8_t handler_idx
Handler index (0-based)
packet_type_t key
Packet type (0 = empty slot)
Global client manager structure for server-side client coordination.
Definition client.h:63
_Atomic uint32_t next_client_id
Monotonic counter for unique client IDs (atomic for thread-safety)
Definition client.h:73
client_info_t * clients_by_id
uthash head pointer for O(1) client_id -> client_info_t* lookups
Definition client.h:67
client_info_t clients[MAX_CLIENTS]
Array of client_info_t structures (backing storage)
Definition client.h:65
int client_count
Current number of active clients.
Definition client.h:69
Server context - encapsulates all server state.
tcp_server_t * tcp_server
TCP server managing connections.
session_host_t * session_host
Session host for discovery mode support.
int safe_snprintf(char *buffer, size_t buffer_size, const char *format,...)
Safe formatted string printing to buffer.
Definition system.c:456
acip_transport_t * acip_tcp_transport_create(socket_t sockfd, crypto_context_t *crypto_ctx)
void acip_transport_destroy(acip_transport_t *transport)
int mutex_init(mutex_t *mutex)
Definition threading.c:16
int asciichat_thread_create(asciichat_thread_t *thread, void *(*start_routine)(void *), void *arg)
Definition threading.c:42
asciichat_thread_t asciichat_thread_self(void)
Definition threading.c:54
int asciichat_thread_join(asciichat_thread_t *thread, void **retval)
Definition threading.c:46
int asciichat_thread_equal(asciichat_thread_t t1, asciichat_thread_t t2)
Definition threading.c:58
int mutex_destroy(mutex_t *mutex)
Definition threading.c:21
void format_bytes_pretty(size_t bytes, char *out, size_t out_capacity)
Definition util/format.c:10
uint64_t time_get_ns(void)
Definition util/time.c:48
int format_duration_ns(double nanoseconds, char *buffer, size_t buffer_size)
Definition util/time.c:275
uint64_t time_elapsed_ns(uint64_t start_ns, uint64_t end_ns)
Definition util/time.c:90
void video_frame_buffer_destroy(video_frame_buffer_t *vfb)
Definition video_frame.c:84
video_frame_buffer_t * video_frame_buffer_create(uint32_t client_id)
Definition video_frame.c:16
video_frame_t * video_frame_begin_write(video_frame_buffer_t *vfb)
void video_frame_commit(video_frame_buffer_t *vfb)
const video_frame_t * video_frame_get_latest(video_frame_buffer_t *vfb)