ascii-chat 0.8.38
Real-time terminal-based video chat with ASCII art conversion
Loading...
Searching...
No Matches
client.c File Reference

👥 Per-client lifecycle manager: threading coordination, state management, and client lifecycle orchestration More...

Go to the source code of this file.

Data Structures

struct  client_dispatch_entry_t
 Hash table entry for client packet dispatch. More...
 

Macros

#define DEBUG_NETWORK   1
 
#define DEBUG_THREADS   1
 
#define DEBUG_MEMORY   1
 
#define CLIENT_DISPATCH_HASH_SIZE   32
 
#define CLIENT_DISPATCH_HANDLER_COUNT   12
 
#define CLIENT_DISPATCH_HASH(type)   ((type) % CLIENT_DISPATCH_HASH_SIZE)
 
#define MAX_AUDIO_BATCH   8
 

Typedefs

typedef void(* client_packet_handler_t) (client_info_t *client, const void *data, size_t len)
 

Functions

void * client_send_thread_func (void *arg)
 Client packet send thread.
 
void * client_dispatch_thread (void *arg)
 Async dispatch thread for WebRTC clients.
 
void broadcast_server_state_to_all_clients (void)
 Notify all clients of state changes.
 
client_info_t * find_client_by_id (uint32_t client_id)
 Fast O(1) client lookup by ID using hash table.
 
client_info_t * find_client_by_socket (socket_t socket)
 Find client by socket descriptor using linear search.
 
int add_client (server_context_t *server_ctx, socket_t socket, const char *client_ip, int port)
 
int add_webrtc_client (server_context_t *server_ctx, acip_transport_t *transport, const char *client_ip, bool start_threads)
 Register a WebRTC client with the server.
 
int remove_client (server_context_t *server_ctx, uint32_t client_id)
 
void * client_receive_thread (void *arg)
 
int start_webrtc_client_threads (server_context_t *server_ctx, uint32_t client_id)
 Start threads for a WebRTC client after crypto initialization.
 
void stop_client_threads (client_info_t *client)
 
void cleanup_client_media_buffers (client_info_t *client)
 
void cleanup_client_packet_queues (client_info_t *client)
 
int process_encrypted_packet (client_info_t *client, packet_type_t *type, void **data, size_t *len, uint32_t *sender_id)
 
void process_decrypted_packet (client_info_t *client, packet_type_t type, void *data, size_t len)
 

Variables

client_manager_t g_client_manager
 Global client manager singleton - central coordination point.
 
rwlock_t g_client_manager_rwlock = {0}
 Reader-writer lock protecting the global client manager.
 

Detailed Description

👥 Per-client lifecycle manager: threading coordination, state management, and client lifecycle orchestration

1. Client connection establishment and initializationinitializationPer-client thread creation and management (receive, send, render))Client state management with thread-safe access patternspatternsClient disconnection handling and resource cleanupcleanupHash table management for O(1) client lookupslookupsIntegration point between main.c and other modules THREADING ARCHITECTURE PER CLIENT:

Each connected client spawns multiple dedicated threads:

  • Receive Thread: Handles incoming packets from client (calls protocol.c functions)
  • Send Thread: Manages outgoing packet delivery using packet queues
  • Video Render Thread: Generates ASCII frames at 60fps (render.c)
  • Audio Render Thread: Mixes audio streams at 172fps (render.c)

This per-client threading model provides several advantages:

  • Linear performance scaling (no shared bottlenecks)
  • Fault isolation (one client's issues don't affect others)
  • Simplified synchronization (each client owns its resources)
  • Real-time performance guarantees per client

DATA STRUCTURES AND SYNCHRONIZATION:

The module manages two primary data structures:

  1. client_manager_t (global singleton):
    • Array of client_info_t structs (backing storage)
    • Hash table for O(1) client_id -> client_info_t lookups
    • Protected by reader-writer lock (g_client_manager_rwlock)
    • Allows concurrent reads, exclusive writes
  2. client_info_t (per-client state):
    • Network connection details and capabilities
    • Thread handles and synchronization primitives
    • Media buffers (video/audio) and packet queues
    • Terminal capabilities and rendering preferences
    • Protected by per-client mutex (client_state_mutex)

Synchronization Patterns:

LOCK ORDERING PROTOCOL (prevents deadlocks):

  1. Always acquire g_client_manager_rwlock FIRST
  2. Then acquire per-client mutexes if needed
  3. Release in reverse order

SNAPSHOT PATTERN (reduces lock contention):

  1. Acquire mutex
  2. Copy needed state variables to local copies
  3. Release mutex immediately
  4. Process using local copies without locks

INTEGRATION WITH OTHER MODULES:

  • main.c: Calls add_client() and remove_client() from main loop
  • protocol.c: Functions called by client receive threads for packet processing
  • render.c: Render thread functions created per client
  • stream.c: Stream generation functions called by render threads
  • stats.c: Accesses client data for performance monitoring

THREAD LIFECYCLE MANAGEMENT:

Thread creation order (in add_client()):

  1. Initialize client data structures and mutexes
  2. Create send thread (for outgoing packet delivery)
  3. Create receive thread (for incoming packet processing)
  4. Create render threads (video + audio generation)

Thread termination order (in remove_client()):

  1. Set shutdown flags (causes threads to exit main loops)
  2. Join send thread (cleanest exit, no blocking I/O)
  3. Join receive thread (may be blocked on network I/O)
  4. Join render threads (computational work, clean exit)
  5. Clean up resources (queues, buffers, mutexes)

WHY THIS MODULAR DESIGN:

The original server.c contained all client management code inline, making it:

  • Hard to understand the client lifecycle
  • Difficult to modify threading behavior
  • Impossible to isolate client-related bugs
  • Challenging to add new client features

This modular approach provides:

  • Clear separation of client vs. server concerns
  • Easier testing of client management logic
  • Better code reuse and maintenance
  • Future extensibility for new client types
Author
Zachary Fogg me@zf.nosp@m.o.gg
Date
September 2025
Version
2.0 (Post-Modularization)
See also
main.c For overall server architecture
render.c For per-client rendering implementation
protocol.c For client Packet Types processing

Definition in file src/server/client.c.

Macro Definition Documentation

◆ CLIENT_DISPATCH_HANDLER_COUNT

#define CLIENT_DISPATCH_HANDLER_COUNT   12

Definition at line 161 of file src/server/client.c.

◆ CLIENT_DISPATCH_HASH

#define CLIENT_DISPATCH_HASH (   type)    ((type) % CLIENT_DISPATCH_HASH_SIZE)

Definition at line 171 of file src/server/client.c.

◆ CLIENT_DISPATCH_HASH_SIZE

#define CLIENT_DISPATCH_HASH_SIZE   32

Definition at line 160 of file src/server/client.c.

◆ DEBUG_MEMORY

#define DEBUG_MEMORY   1

Definition at line 152 of file src/server/client.c.

◆ DEBUG_NETWORK

#define DEBUG_NETWORK   1

Definition at line 150 of file src/server/client.c.

◆ DEBUG_THREADS

#define DEBUG_THREADS   1

Definition at line 151 of file src/server/client.c.

◆ MAX_AUDIO_BATCH

#define MAX_AUDIO_BATCH   8

Typedef Documentation

◆ client_packet_handler_t

typedef void(* client_packet_handler_t) (client_info_t *client, const void *data, size_t len)

Definition at line 158 of file src/server/client.c.

Function Documentation

◆ add_client()

int add_client ( server_context_t server_ctx,
socket_t  socket,
const char *  client_ip,
int  port 
)

Definition at line 534 of file src/server/client.c.

534 {
535 // Find empty slot WITHOUT holding the global lock
536 // We'll re-verify under lock after allocations complete
537 int slot = -1;
538 int existing_count = 0;
539 for (int i = 0; i < MAX_CLIENTS; i++) {
540 if (slot == -1 && atomic_load(&g_client_manager.clients[i].client_id) == 0) {
541 slot = i; // Take first available slot
542 }
543 if (atomic_load(&g_client_manager.clients[i].client_id) != 0 && atomic_load(&g_client_manager.clients[i].active)) {
544 existing_count++;
545 }
546 }
547
548 // Quick pre-check before expensive allocations
549 if (existing_count >= GET_OPTION(max_clients) || slot == -1) {
550 const char *reject_msg = "SERVER_FULL: Maximum client limit reached\n";
551 ssize_t send_result = socket_send(socket, reject_msg, strlen(reject_msg), 0);
552 if (send_result < 0) {
553 log_warn("Failed to send rejection message to client: %s", SAFE_STRERROR(errno));
554 }
555 return -1;
556 }
557
558 // DO EXPENSIVE ALLOCATIONS OUTSIDE THE LOCK
559 // This prevents blocking frame processing during client initialization
560 uint32_t new_client_id = atomic_fetch_add(&g_client_manager.next_client_id, 1) + 1;
561
562 video_frame_buffer_t *incoming_video_buffer = video_frame_buffer_create(new_client_id);
563 if (!incoming_video_buffer) {
564 SET_ERRNO(ERROR_MEMORY, "Failed to create video buffer for client %u", new_client_id);
565 log_error("Failed to create video buffer for client %u", new_client_id);
566 return -1;
567 }
568
569 audio_ring_buffer_t *incoming_audio_buffer = audio_ring_buffer_create_for_capture();
570 if (!incoming_audio_buffer) {
571 SET_ERRNO(ERROR_MEMORY, "Failed to create audio buffer for client %u", new_client_id);
572 log_error("Failed to create audio buffer for client %u", new_client_id);
573 video_frame_buffer_destroy(incoming_video_buffer);
574 return -1;
575 }
576
577 packet_queue_t *audio_queue = packet_queue_create_with_pools(500, 1000, false);
578 if (!audio_queue) {
579 LOG_ERRNO_IF_SET("Failed to create audio queue for client");
580 audio_ring_buffer_destroy(incoming_audio_buffer);
581 video_frame_buffer_destroy(incoming_video_buffer);
582 return -1;
583 }
584
585 video_frame_buffer_t *outgoing_video_buffer = video_frame_buffer_create(new_client_id);
586 if (!outgoing_video_buffer) {
587 LOG_ERRNO_IF_SET("Failed to create outgoing video buffer for client");
588 packet_queue_destroy(audio_queue);
589 audio_ring_buffer_destroy(incoming_audio_buffer);
590 video_frame_buffer_destroy(incoming_video_buffer);
591 return -1;
592 }
593
594 void *send_buffer = SAFE_MALLOC_ALIGNED(MAX_FRAME_BUFFER_SIZE, 64, void *);
595 if (!send_buffer) {
596 log_error("Failed to allocate send buffer for client %u", new_client_id);
597 video_frame_buffer_destroy(outgoing_video_buffer);
598 packet_queue_destroy(audio_queue);
599 audio_ring_buffer_destroy(incoming_audio_buffer);
600 video_frame_buffer_destroy(incoming_video_buffer);
601 return -1;
602 }
603
604 // NOW acquire the lock only for the critical section: slot assignment + registration
605 rwlock_wrlock(&g_client_manager_rwlock);
606
607 // Re-check slot availability under lock (another thread might have taken it)
608 if (atomic_load(&g_client_manager.clients[slot].client_id) != 0) {
609 rwlock_wrunlock(&g_client_manager_rwlock);
610 SAFE_FREE(send_buffer);
611 video_frame_buffer_destroy(outgoing_video_buffer);
612 packet_queue_destroy(audio_queue);
613 audio_ring_buffer_destroy(incoming_audio_buffer);
614 video_frame_buffer_destroy(incoming_video_buffer);
615
616 const char *reject_msg = "SERVER_FULL: Slot reassigned, try again\n";
617 socket_send(socket, reject_msg, strlen(reject_msg), 0);
618 return -1;
619 }
620
621 // Now we have exclusive access to the slot - do the actual registration
622 client_info_t *client = &g_client_manager.clients[slot];
623 memset(client, 0, sizeof(client_info_t));
624
625 client->socket = socket;
626 client->is_tcp_client = true;
627 atomic_store(&client->client_id, new_client_id);
628 SAFE_STRNCPY(client->client_ip, client_ip, sizeof(client->client_ip) - 1);
629 client->port = port;
630 atomic_store(&client->active, true);
631 client->server_ctx = server_ctx; // Store server context for cleanup
632 atomic_store(&client->shutting_down, false);
633 atomic_store(&client->last_rendered_grid_sources, 0);
634 atomic_store(&client->last_sent_grid_sources, 0);
635 client->connected_at = time(NULL);
636
637 memset(&client->crypto_handshake_ctx, 0, sizeof(client->crypto_handshake_ctx));
638 client->crypto_initialized = false;
639
640 client->pending_packet_type = 0;
641 client->pending_packet_payload = NULL;
642 client->pending_packet_length = 0;
643
644 // Assign pre-allocated buffers
645 client->incoming_video_buffer = incoming_video_buffer;
646 client->incoming_audio_buffer = incoming_audio_buffer;
647 client->audio_queue = audio_queue;
648 client->outgoing_video_buffer = outgoing_video_buffer;
649 client->send_buffer = send_buffer;
650 client->send_buffer_size = MAX_FRAME_BUFFER_SIZE;
651
652 safe_snprintf(client->display_name, sizeof(client->display_name), "Client%u", new_client_id);
653 log_info("Added new client ID=%u from %s:%d (socket=%d, slot=%d)", new_client_id, client_ip, port, socket, slot);
654 log_debug("Client slot assigned: client_id=%u assigned to slot %d, socket=%d", new_client_id, slot, socket);
655
656 // Register socket with tcp_server
657 asciichat_error_t reg_result = tcp_server_add_client(server_ctx->tcp_server, socket, client);
658 if (reg_result != ASCIICHAT_OK) {
659 SET_ERRNO(ERROR_INTERNAL, "Failed to register client socket with tcp_server");
660 log_error("Failed to register client %u socket with tcp_server", new_client_id);
661 // Don't unlock here - error_cleanup will do it
662 goto error_cleanup;
663 }
664
666 log_debug("Client count updated: now %d clients (added client_id=%u to slot %d)", g_client_manager.client_count,
667 new_client_id, slot);
668
669 uint32_t cid = atomic_load(&client->client_id);
670 HASH_ADD_INT(g_client_manager.clients_by_id, client_id, client);
671 log_debug("Added client %u to uthash table", cid);
672
673 rwlock_wrunlock(&g_client_manager_rwlock);
674
675 // Configure socket OUTSIDE lock
676 configure_client_socket(socket, new_client_id);
677
678 // Initialize mutexes OUTSIDE lock
679 if (mutex_init(&client->client_state_mutex) != 0) {
680 log_error("Failed to initialize client state mutex for client %u", new_client_id);
681 remove_client(server_ctx, new_client_id);
682 return -1;
683 }
684
685 if (mutex_init(&client->send_mutex) != 0) {
686 log_error("Failed to initialize send mutex for client %u", new_client_id);
687 remove_client(server_ctx, new_client_id);
688 return -1;
689 }
690
691 // Register with audio mixer OUTSIDE lock
692 if (g_audio_mixer && client->incoming_audio_buffer) {
693 if (mixer_add_source(g_audio_mixer, new_client_id, client->incoming_audio_buffer) < 0) {
694 log_warn("Failed to add client %u to audio mixer", new_client_id);
695 } else {
696#ifdef DEBUG_AUDIO
697 log_debug("Added client %u to audio mixer", new_client_id);
698#endif
699 }
700 }
701
702 // Perform crypto handshake before starting threads.
703 // This ensures the handshake uses the socket directly without interference from receive thread.
704 if (server_crypto_init() == 0) {
705 // Set timeout for crypto handshake to prevent indefinite blocking
706 // This prevents clients from connecting but never completing the handshake
707 const uint64_t HANDSHAKE_TIMEOUT_NS = 30ULL * NS_PER_SEC_INT;
708 asciichat_error_t timeout_result = set_socket_timeout(socket, HANDSHAKE_TIMEOUT_NS);
709 if (timeout_result != ASCIICHAT_OK) {
710 log_warn("Failed to set handshake timeout for client %u: %s", atomic_load(&client->client_id),
711 asciichat_error_string(timeout_result));
712 // Continue anyway - timeout is a safety feature, not critical
713 }
714
715 int crypto_result = server_crypto_handshake(client);
716 if (crypto_result != 0) {
717 log_error("Crypto handshake failed for client %u: %s", atomic_load(&client->client_id), network_error_string());
718 if (remove_client(server_ctx, atomic_load(&client->client_id)) != 0) {
719 log_error("Failed to remove client after crypto handshake failure");
720 }
721 return -1;
722 }
723
724 // Clear socket timeout after handshake completes successfully
725 // This allows normal operation without timeouts on data transfer
726 asciichat_error_t clear_timeout_result = set_socket_timeout(socket, 0);
727 if (clear_timeout_result != ASCIICHAT_OK) {
728 log_warn("Failed to clear handshake timeout for client %u: %s", atomic_load(&client->client_id),
729 asciichat_error_string(clear_timeout_result));
730 // Continue anyway - we can still communicate even with timeout set
731 }
732
733 log_debug("Crypto handshake completed successfully for client %u", atomic_load(&client->client_id));
734
735 // Create ACIP transport for protocol-agnostic packet sending
736 // The transport wraps the socket with encryption context from the handshake
737 const crypto_context_t *crypto_ctx = crypto_server_get_context(atomic_load(&client->client_id));
738 client->transport = acip_tcp_transport_create(socket, (crypto_context_t *)crypto_ctx);
739 if (!client->transport) {
740 log_error("Failed to create ACIP transport for client %u", atomic_load(&client->client_id));
741 if (remove_client(server_ctx, atomic_load(&client->client_id)) != 0) {
742 log_error("Failed to remove client after transport creation failure");
743 }
744 return -1;
745 }
746 log_debug("Created ACIP transport for client %u with crypto context", atomic_load(&client->client_id));
747
748 // After handshake completes, the client immediately sends PACKET_TYPE_CLIENT_CAPABILITIES
749 // We must read and process this packet BEFORE starting the receive thread to avoid a race condition
750 // where the packet arrives but no thread is listening for it.
751 //
752 // SPECIAL CASE: If client used --no-encrypt, we already received this packet during the handshake
753 // attempt and stored it in pending_packet_*. Use that instead of receiving a new one.
754 packet_envelope_t envelope;
755 bool used_pending_packet = false;
756
757 if (client->pending_packet_payload) {
758 // Client used --no-encrypt mode - use the packet we already received
759 log_info("Client %u using --no-encrypt mode - processing pending packet type %u", atomic_load(&client->client_id),
760 client->pending_packet_type);
761 envelope.type = client->pending_packet_type;
762 envelope.data = client->pending_packet_payload;
763 envelope.len = client->pending_packet_length;
764 envelope.allocated_buffer = client->pending_packet_payload; // Will be freed below
765 envelope.allocated_size = client->pending_packet_length;
766 used_pending_packet = true;
767
768 // Clear pending packet fields
769 client->pending_packet_type = 0;
770 client->pending_packet_payload = NULL;
771 client->pending_packet_length = 0;
772 } else {
773 // Normal encrypted mode - receive capabilities packet
774 log_debug("Waiting for initial capabilities packet from client %u", atomic_load(&client->client_id));
775
776 // Protect crypto context access with client state mutex
777 mutex_lock(&client->client_state_mutex);
778 const crypto_context_t *crypto_ctx = crypto_server_get_context(atomic_load(&client->client_id));
779
780 // Use per-client crypto state to determine enforcement
781 // At this point, handshake is complete, so crypto_initialized=true and handshake is ready
782 bool enforce_encryption = !GET_OPTION(no_encrypt) && client->crypto_initialized &&
783 crypto_handshake_is_ready(&client->crypto_handshake_ctx);
784
785 packet_recv_result_t result = receive_packet_secure(socket, (void *)crypto_ctx, enforce_encryption, &envelope);
786 mutex_unlock(&client->client_state_mutex);
787
788 if (result != PACKET_RECV_SUCCESS) {
789 log_error("Failed to receive initial capabilities packet from client %u: result=%d",
790 atomic_load(&client->client_id), result);
791 if (envelope.allocated_buffer) {
792 buffer_pool_free(NULL, envelope.allocated_buffer, envelope.allocated_size);
793 }
794 if (remove_client(server_ctx, atomic_load(&client->client_id)) != 0) {
795 log_error("Failed to remove client after crypto handshake failure");
796 }
797 return -1;
798 }
799 }
800
801 if (envelope.type != PACKET_TYPE_CLIENT_CAPABILITIES) {
802 log_error("Expected PACKET_TYPE_CLIENT_CAPABILITIES but got packet type %d from client %u", envelope.type,
803 atomic_load(&client->client_id));
804 if (envelope.allocated_buffer) {
805 buffer_pool_free(NULL, envelope.allocated_buffer, envelope.allocated_size);
806 }
807 if (remove_client(server_ctx, atomic_load(&client->client_id)) != 0) {
808 log_error("Failed to remove client after crypto handshake failure");
809 }
810 return -1;
811 }
812
813 // Process the capabilities packet directly
814 log_debug("Processing initial capabilities packet from client %u (from %s)", atomic_load(&client->client_id),
815 used_pending_packet ? "pending packet" : "network");
816 handle_client_capabilities_packet(client, envelope.data, envelope.len);
817
818 // Free the packet data
819 if (envelope.allocated_buffer) {
820 buffer_pool_free(NULL, envelope.allocated_buffer, envelope.allocated_size);
821 }
822 log_debug("Successfully received and processed initial capabilities for client %u",
823 atomic_load(&client->client_id));
824 }
825
826 // Start all client threads in the correct order (unified path for TCP and WebRTC)
827 // This creates: receive thread -> render threads -> send thread
828 // The render threads MUST be created before send thread to avoid the race condition
829 // where send thread reads empty frames before render thread generates the first real frame
830 uint32_t client_id_snapshot = atomic_load(&client->client_id);
831 if (start_client_threads(server_ctx, client, true) != 0) {
832 log_error("Failed to start threads for TCP client %u", client_id_snapshot);
833 // Client is already in hash table - use remove_client for proper cleanup
834 remove_client(server_ctx, client_id_snapshot);
835 return -1;
836 }
837 log_debug("Successfully created render threads for client %u", client_id_snapshot);
838
839 // Register client with session_host (for discovery mode support)
840 if (server_ctx->session_host) {
841 uint32_t session_client_id = session_host_add_client(server_ctx->session_host, socket, client_ip, port);
842 if (session_client_id == 0) {
843 log_warn("Failed to register client %u with session_host", client_id_snapshot);
844 } else {
845 log_debug("Client %u registered with session_host as %u", client_id_snapshot, session_client_id);
846 }
847 }
848
849 // Broadcast server state to ALL clients AFTER the new client is fully set up
850 // This notifies all clients (including the new one) about the updated grid
852
853 return (int)client_id_snapshot;
854
855error_cleanup:
856 // Clean up all partially allocated resources
857 // NOTE: This label is reached when allocation or initialization fails BEFORE
858 // the client is added to the hash table. Don't call remove_client() here.
859 cleanup_client_all_buffers(client);
860 rwlock_wrunlock(&g_client_manager_rwlock);
861 return -1;
862}
void buffer_pool_free(buffer_pool_t *pool, void *data, size_t size)
bool crypto_handshake_is_ready(const crypto_handshake_context_t *ctx)
uint32_t session_host_add_client(session_host_t *host, socket_t socket, const char *ip, int port)
Definition host.c:928
void audio_ring_buffer_destroy(audio_ring_buffer_t *rb)
audio_ring_buffer_t * audio_ring_buffer_create_for_capture(void)
asciichat_error_t tcp_server_add_client(tcp_server_t *server, socket_t socket, void *client_data)
int mixer_add_source(mixer_t *mixer, uint32_t client_id, audio_ring_buffer_t *buffer)
Definition mixer.c:364
asciichat_error_t set_socket_timeout(socket_t sockfd, uint64_t timeout_ns)
Set socket timeout.
const char * network_error_string()
Get human-readable error string for network errors.
packet_recv_result_t receive_packet_secure(socket_t sockfd, void *crypto_ctx, bool enforce_encryption, packet_envelope_t *envelope)
Receive a packet with decryption and decompression support.
Definition packet.c:566
packet_queue_t * packet_queue_create_with_pools(size_t max_size, size_t node_pool_size, bool use_buffer_pool)
void packet_queue_destroy(packet_queue_t *queue)
mixer_t *volatile g_audio_mixer
Global audio mixer instance for multi-client audio processing.
void handle_client_capabilities_packet(client_info_t *client, const void *data, size_t len)
Process CLIENT_CAPABILITIES packet - configure client-specific rendering.
rwlock_t g_client_manager_rwlock
Reader-writer lock protecting the global client manager.
int remove_client(server_context_t *server_ctx, uint32_t client_id)
client_manager_t g_client_manager
Global client manager singleton - central coordination point.
void broadcast_server_state_to_all_clients(void)
Notify all clients of state changes.
const crypto_context_t * crypto_server_get_context(uint32_t client_id)
int server_crypto_init(void)
int server_crypto_handshake(client_info_t *client)
_Atomic uint32_t next_client_id
Monotonic counter for unique client IDs (atomic for thread-safety)
Definition client.h:73
client_info_t * clients_by_id
uthash head pointer for O(1) client_id -> client_info_t* lookups
Definition client.h:67
client_info_t clients[MAX_CLIENTS]
Array of client_info_t structures (backing storage)
Definition client.h:65
int client_count
Current number of active clients.
Definition client.h:69
tcp_server_t * tcp_server
TCP server managing connections.
session_host_t * session_host
Session host for discovery mode support.
int safe_snprintf(char *buffer, size_t buffer_size, const char *format,...)
Safe formatted string printing to buffer.
Definition system.c:456
acip_transport_t * acip_tcp_transport_create(socket_t sockfd, crypto_context_t *crypto_ctx)
int mutex_init(mutex_t *mutex)
Definition threading.c:16
void video_frame_buffer_destroy(video_frame_buffer_t *vfb)
Definition video_frame.c:84
video_frame_buffer_t * video_frame_buffer_create(uint32_t client_id)
Definition video_frame.c:16

References acip_tcp_transport_create(), audio_ring_buffer_create_for_capture(), audio_ring_buffer_destroy(), broadcast_server_state_to_all_clients(), buffer_pool_free(), client_manager_t::client_count, client_manager_t::clients, client_manager_t::clients_by_id, crypto_handshake_is_ready(), crypto_server_get_context(), g_audio_mixer, g_client_manager, g_client_manager_rwlock, handle_client_capabilities_packet(), mixer_add_source(), mutex_init(), network_error_string(), client_manager_t::next_client_id, packet_queue_create_with_pools(), packet_queue_destroy(), receive_packet_secure(), remove_client(), safe_snprintf(), server_crypto_handshake(), server_crypto_init(), server_context_t::session_host, session_host_add_client(), set_socket_timeout(), server_context_t::tcp_server, tcp_server_add_client(), video_frame_buffer_create(), and video_frame_buffer_destroy().

◆ add_webrtc_client()

int add_webrtc_client ( server_context_t server_ctx,
acip_transport_t *  transport,
const char *  client_ip,
bool  start_threads 
)

Register a WebRTC client with the server.

Registers a client that connected via WebRTC data channel instead of TCP socket. This function reuses most of add_client() logic but skips:

  • Crypto handshake (already done via ACDS signaling)
  • Socket-specific configuration
  • TCP thread pool registration

DIFFERENCES FROM add_client():

  • Takes an already-created acip_transport_t* instead of socket
  • No crypto handshake (WebRTC signaling handled authentication)
  • No socket configuration (WebRTC handles buffering)
  • Uses generic thread spawning instead of tcp_server thread pool
Parameters
server_ctxServer context
transportWebRTC transport (already created and connected)
client_ipClient IP address for logging (may be empty for P2P)
Returns
Client ID on success, -1 on failure
Note
The transport must be fully initialized and ready to send/receive
Client capabilities are still expected as first packet

Definition at line 887 of file src/server/client.c.

888 {
889 if (!server_ctx || !transport || !client_ip) {
890 SET_ERRNO(ERROR_INVALID_PARAM, "Invalid parameters to add_webrtc_client");
891 return -1;
892 }
893
894 rwlock_wrlock(&g_client_manager_rwlock);
895
896 // Find empty slot - this is the authoritative check
897 int slot = -1;
898 int existing_count = 0;
899 for (int i = 0; i < MAX_CLIENTS; i++) {
900 if (slot == -1 && atomic_load(&g_client_manager.clients[i].client_id) == 0) {
901 slot = i; // Take first available slot
902 }
903 // Count only active clients
904 if (atomic_load(&g_client_manager.clients[i].client_id) != 0 && atomic_load(&g_client_manager.clients[i].active)) {
905 existing_count++;
906 }
907 }
908
909 // Check if we've hit the configured max-clients limit (not the array size)
910 if (existing_count >= GET_OPTION(max_clients)) {
911 rwlock_wrunlock(&g_client_manager_rwlock);
912 SET_ERRNO(ERROR_RESOURCE_EXHAUSTED, "Maximum client limit reached (%d/%d active clients)", existing_count,
913 GET_OPTION(max_clients));
914 log_error("Maximum client limit reached (%d/%d active clients)", existing_count, GET_OPTION(max_clients));
915 return -1;
916 }
917
918 if (slot == -1) {
919 rwlock_wrunlock(&g_client_manager_rwlock);
920 SET_ERRNO(ERROR_RESOURCE_EXHAUSTED, "No available client slots (all %d array slots are in use)", MAX_CLIENTS);
921 log_error("No available client slots (all %d array slots are in use)", MAX_CLIENTS);
922 return -1;
923 }
924
925 // Update client_count to match actual count before adding new client
926 g_client_manager.client_count = existing_count;
927
928 // Initialize client
929 client_info_t *client = &g_client_manager.clients[slot];
930
931 // Free any existing buffers from previous client in this slot
932 if (client->incoming_video_buffer) {
933 video_frame_buffer_destroy(client->incoming_video_buffer);
934 client->incoming_video_buffer = NULL;
935 }
936 if (client->outgoing_video_buffer) {
937 video_frame_buffer_destroy(client->outgoing_video_buffer);
938 client->outgoing_video_buffer = NULL;
939 }
940 if (client->incoming_audio_buffer) {
941 audio_ring_buffer_destroy(client->incoming_audio_buffer);
942 client->incoming_audio_buffer = NULL;
943 }
944 if (client->send_buffer) {
945 SAFE_FREE(client->send_buffer);
946 client->send_buffer = NULL;
947 }
948
949 memset(client, 0, sizeof(client_info_t));
950
951 // Set up WebRTC-specific fields
952 client->socket = INVALID_SOCKET_VALUE; // WebRTC has no traditional socket
953 client->is_tcp_client = false; // WebRTC client - threads managed directly
954 client->transport = transport; // Use provided transport
955 uint32_t new_client_id = atomic_fetch_add(&g_client_manager.next_client_id, 1) + 1;
956 atomic_store(&client->client_id, new_client_id);
957 SAFE_STRNCPY(client->client_ip, client_ip, sizeof(client->client_ip) - 1);
958 client->port = 0; // WebRTC doesn't use port numbers
959 atomic_store(&client->active, true);
960 client->server_ctx = server_ctx; // Store server context for receive thread cleanup
961 log_info("Added new WebRTC client ID=%u from %s (transport=%p, slot=%d)", new_client_id, client_ip, transport, slot);
962 atomic_store(&client->shutting_down, false);
963 atomic_store(&client->last_rendered_grid_sources, 0); // Render thread updates this
964 atomic_store(&client->last_sent_grid_sources, 0); // Send thread updates this
965 log_debug("WebRTC client slot assigned: client_id=%u assigned to slot %d", atomic_load(&client->client_id), slot);
966 client->connected_at = time(NULL);
967
968 // Initialize crypto context for this client
969 memset(&client->crypto_handshake_ctx, 0, sizeof(client->crypto_handshake_ctx));
970 log_debug("[ADD_WEBRTC_CLIENT] Setting crypto_initialized=false (will be set to true after handshake completes)");
971 // Do NOT set crypto_initialized = true here! The handshake must complete first.
972 // For WebSocket clients: websocket_client_handler will perform the handshake
973 // For WebRTC clients: ACDS signaling may have already done crypto (future enhancement)
974 client->crypto_initialized = false;
975
976 // Initialize pending packet storage (unused for WebRTC, but keep for consistency)
977 client->pending_packet_type = 0;
978 client->pending_packet_payload = NULL;
979 client->pending_packet_length = 0;
980
981 safe_snprintf(client->display_name, sizeof(client->display_name), "WebRTC%u", atomic_load(&client->client_id));
982
983 // Create individual video buffer for this client using modern double-buffering
984 client->incoming_video_buffer = video_frame_buffer_create(atomic_load(&client->client_id));
985 if (!client->incoming_video_buffer) {
986 SET_ERRNO(ERROR_MEMORY, "Failed to create video buffer for WebRTC client %u", atomic_load(&client->client_id));
987 log_error("Failed to create video buffer for WebRTC client %u", atomic_load(&client->client_id));
988 goto error_cleanup_webrtc;
989 }
990
991 // Create individual audio buffer for this client
992 client->incoming_audio_buffer = audio_ring_buffer_create_for_capture();
993 if (!client->incoming_audio_buffer) {
994 SET_ERRNO(ERROR_MEMORY, "Failed to create audio buffer for WebRTC client %u", atomic_load(&client->client_id));
995 log_error("Failed to create audio buffer for WebRTC client %u", atomic_load(&client->client_id));
996 goto error_cleanup_webrtc;
997 }
998
999 // Create packet queues for outgoing data
1000 client->audio_queue = packet_queue_create_with_pools(500, 1000, false);
1001 if (!client->audio_queue) {
1002 LOG_ERRNO_IF_SET("Failed to create audio queue for WebRTC client");
1003 goto error_cleanup_webrtc;
1004 }
1005
1006 // Create packet queue for async dispatch: received packets waiting to be processed
1007 // Capacity of 100 packets allows buffering of ~10-50 frames (depending on fragmentation)
1008 client->received_packet_queue = packet_queue_create_with_pools(100, 500, false);
1009 if (!client->received_packet_queue) {
1010 LOG_ERRNO_IF_SET("Failed to create received packet queue for client");
1011 goto error_cleanup_webrtc;
1012 }
1013
1014 // Create outgoing video buffer for ASCII frames (double buffered, no dropping)
1015 client->outgoing_video_buffer = video_frame_buffer_create(atomic_load(&client->client_id));
1016 if (!client->outgoing_video_buffer) {
1017 LOG_ERRNO_IF_SET("Failed to create outgoing video buffer for WebRTC client");
1018 goto error_cleanup_webrtc;
1019 }
1020
1021 // Pre-allocate send buffer to avoid malloc/free in send thread (prevents deadlocks)
1022 client->send_buffer_size = MAX_FRAME_BUFFER_SIZE; // 2MB should handle largest frames
1023 client->send_buffer = SAFE_MALLOC_ALIGNED(client->send_buffer_size, 64, void *);
1024 if (!client->send_buffer) {
1025 log_error("Failed to allocate send buffer for WebRTC client %u", atomic_load(&client->client_id));
1026 goto error_cleanup_webrtc;
1027 }
1028
1029 g_client_manager.client_count = existing_count + 1; // We just added a client
1030 log_debug("Client count updated: now %d clients (added WebRTC client_id=%u to slot %d)",
1031 g_client_manager.client_count, atomic_load(&client->client_id), slot);
1032
1033 // Add client to uthash table for O(1) lookup
1034 uint32_t cid = atomic_load(&client->client_id);
1035 HASH_ADD_INT(g_client_manager.clients_by_id, client_id, client);
1036 log_debug("Added WebRTC client %u to uthash table", cid);
1037
1038 // Register this client's audio buffer with the mixer
1039 if (g_audio_mixer && client->incoming_audio_buffer) {
1040 if (mixer_add_source(g_audio_mixer, atomic_load(&client->client_id), client->incoming_audio_buffer) < 0) {
1041 log_warn("Failed to add WebRTC client %u to audio mixer", atomic_load(&client->client_id));
1042 } else {
1043#ifdef DEBUG_AUDIO
1044 log_debug("Added WebRTC client %u to audio mixer", atomic_load(&client->client_id));
1045#endif
1046 }
1047 }
1048
1049 // Initialize mutexes BEFORE creating any threads to prevent race conditions
1050 if (mutex_init(&client->client_state_mutex) != 0) {
1051 log_error("Failed to initialize client state mutex for WebRTC client %u", atomic_load(&client->client_id));
1052 // Client is already in hash table - use remove_client for proper cleanup
1053 rwlock_wrunlock(&g_client_manager_rwlock);
1054 remove_client(server_ctx, atomic_load(&client->client_id));
1055 return -1;
1056 }
1057
1058 // Initialize send mutex to protect concurrent socket writes
1059 if (mutex_init(&client->send_mutex) != 0) {
1060 log_error("Failed to initialize send mutex for WebRTC client %u", atomic_load(&client->client_id));
1061 // Client is already in hash table - use remove_client for proper cleanup
1062 rwlock_wrunlock(&g_client_manager_rwlock);
1063 remove_client(server_ctx, atomic_load(&client->client_id));
1064 return -1;
1065 }
1066
1067 rwlock_wrunlock(&g_client_manager_rwlock);
1068
1069 // For WebRTC clients, the capabilities packet will be received by the receive thread
1070 // when it starts. Unlike TCP clients where we handle it synchronously in add_client(),
1071 // WebRTC uses the transport abstraction which handles packet reception automatically.
1072 log_debug("WebRTC client %u initialized - receive thread will process capabilities", atomic_load(&client->client_id));
1073
1074 uint32_t client_id_snapshot = atomic_load(&client->client_id);
1075
1076 // Conditionally start threads based on caller preference
1077 // WebSocket handler passes start_threads=false to defer thread startup until after crypto init
1078 // This ensures receive thread doesn't try to process packets before crypto context is ready
1079 if (start_threads) {
1080 log_debug("[ADD_WEBRTC_CLIENT] Starting client threads (receive, render) for client %u...", client_id_snapshot);
1081 if (start_client_threads(server_ctx, client, false) != 0) {
1082 log_error("Failed to start threads for WebRTC client %u", client_id_snapshot);
1083 return -1;
1084 }
1085 log_debug("Created receive thread for WebRTC client %u", client_id_snapshot);
1086 log_debug("[ADD_WEBRTC_CLIENT] Receive thread started - thread will now be processing packets", client_id_snapshot);
1087 } else {
1088 log_debug("[ADD_WEBRTC_CLIENT] Deferring thread startup for client %u (caller will start after crypto init)",
1089 client_id_snapshot);
1090 }
1091
1092 // Send initial server state to the new client
1093 if (send_server_state_to_client(client) != 0) {
1094 log_warn("Failed to send initial server state to WebRTC client %u", client_id_snapshot);
1095 } else {
1096#ifdef DEBUG_NETWORK
1097 log_info("Sent initial server state to WebRTC client %u", client_id_snapshot);
1098#endif
1099 }
1100
1101 // Send initial server state via ACIP transport
1102 rwlock_rdlock(&g_client_manager_rwlock);
1103 uint32_t connected_count = g_client_manager.client_count;
1104 rwlock_rdunlock(&g_client_manager_rwlock);
1105
1106 server_state_packet_t state;
1107 state.connected_client_count = connected_count;
1108 state.active_client_count = 0; // Will be updated by broadcast thread
1109 memset(state.reserved, 0, sizeof(state.reserved));
1110
1111 // Convert to network byte order
1112 server_state_packet_t net_state;
1113 net_state.connected_client_count = HOST_TO_NET_U32(state.connected_client_count);
1114 net_state.active_client_count = HOST_TO_NET_U32(state.active_client_count);
1115 memset(net_state.reserved, 0, sizeof(net_state.reserved));
1116
1117 asciichat_error_t packet_send_result = acip_send_server_state(client->transport, &net_state);
1118 if (packet_send_result != ASCIICHAT_OK) {
1119 log_warn("Failed to send initial server state to WebRTC client %u: %s", client_id_snapshot,
1120 asciichat_error_string(packet_send_result));
1121 } else {
1122 log_debug("Sent initial server state to WebRTC client %u: %u connected clients", client_id_snapshot,
1123 state.connected_client_count);
1124 }
1125
1126 // Register client with session_host (for discovery mode support)
1127 // WebRTC clients use INVALID_SOCKET_VALUE since they don't have a TCP socket
1128 if (server_ctx->session_host) {
1129 uint32_t session_client_id = session_host_add_client(server_ctx->session_host, INVALID_SOCKET_VALUE, client_ip, 0);
1130 if (session_client_id == 0) {
1131 log_warn("Failed to register WebRTC client %u with session_host", client_id_snapshot);
1132 } else {
1133 log_debug("WebRTC client %u registered with session_host as %u", client_id_snapshot, session_client_id);
1134 }
1135 }
1136
1137 // Broadcast server state to ALL clients AFTER the new client is fully set up
1138 // This notifies all clients (including the new one) about the updated grid
1140
1141 return (int)client_id_snapshot;
1142
1143error_cleanup_webrtc:
1144 // Clean up all partially allocated resources for WebRTC client
1145 // NOTE: This label is reached when allocation or initialization fails BEFORE
1146 // the client is added to the hash table. Don't call remove_client() here.
1147 cleanup_client_all_buffers(client);
1148 rwlock_wrunlock(&g_client_manager_rwlock);
1149 return -1;
1150}
asciichat_error_t acip_send_server_state(acip_transport_t *transport, const server_state_packet_t *state)
int send_server_state_to_client(client_info_t *client)
Send current server state to a specific client.

References acip_send_server_state(), audio_ring_buffer_create_for_capture(), audio_ring_buffer_destroy(), broadcast_server_state_to_all_clients(), client_manager_t::client_count, client_manager_t::clients, client_manager_t::clients_by_id, g_audio_mixer, g_client_manager, g_client_manager_rwlock, mixer_add_source(), mutex_init(), client_manager_t::next_client_id, packet_queue_create_with_pools(), remove_client(), safe_snprintf(), send_server_state_to_client(), server_context_t::session_host, session_host_add_client(), video_frame_buffer_create(), and video_frame_buffer_destroy().

◆ broadcast_server_state_to_all_clients()

void broadcast_server_state_to_all_clients ( void  )

Notify all clients of state changes.

Definition at line 2258 of file src/server/client.c.

2258 {
2259 // SNAPSHOT PATTERN: Collect client data while holding lock, then release before network I/O
2260 typedef struct {
2261 socket_t socket;
2262 uint32_t client_id;
2263 const crypto_context_t *crypto_ctx;
2264 } client_snapshot_t;
2265
2266 client_snapshot_t client_snapshots[MAX_CLIENTS];
2267 int snapshot_count = 0;
2268 int active_video_count = 0;
2269
2270 uint64_t lock_start_ns = time_get_ns();
2271 rwlock_rdlock(&g_client_manager_rwlock);
2272 uint64_t lock_end_ns = time_get_ns();
2273 uint64_t lock_time_ns = time_elapsed_ns(lock_start_ns, lock_end_ns);
2274 if (lock_time_ns > 1 * NS_PER_MS_INT) {
2275 char duration_str[32];
2276 format_duration_ns((double)lock_time_ns, duration_str, sizeof(duration_str));
2277 log_warn("broadcast_server_state: rwlock_rdlock took %s", duration_str);
2278 }
2279
2280 // Count active clients and snapshot client data while holding lock
2281 // Use atomic_load for all atomic fields to prevent data races.
2282 for (int i = 0; i < MAX_CLIENTS; i++) {
2283 bool is_active = atomic_load(&g_client_manager.clients[i].active);
2284 if (is_active && atomic_load(&g_client_manager.clients[i].is_sending_video)) {
2285 active_video_count++;
2286 }
2287 if (is_active && g_client_manager.clients[i].socket != INVALID_SOCKET_VALUE) {
2288 // Skip clients that haven't completed crypto handshake yet
2289 // Check both crypto_initialized AND crypto context (defense in depth)
2290 if (!GET_OPTION(no_encrypt) && !g_client_manager.clients[i].crypto_initialized) {
2291 log_debug("Skipping server_state broadcast to client %u: crypto handshake not complete",
2292 atomic_load(&g_client_manager.clients[i].client_id));
2293 continue;
2294 }
2295
2296 // Get crypto context (may be NULL if --no-encrypt)
2297 const crypto_context_t *crypto_ctx =
2298 crypto_handshake_get_context(&g_client_manager.clients[i].crypto_handshake_ctx);
2299 if (!GET_OPTION(no_encrypt) && !crypto_ctx) {
2300 // Skip clients without valid crypto context
2301 log_debug("Skipping server_state broadcast to client %u: no crypto context",
2302 atomic_load(&g_client_manager.clients[i].client_id));
2303 continue;
2304 }
2305
2306 client_snapshots[snapshot_count].socket = g_client_manager.clients[i].socket;
2307 client_snapshots[snapshot_count].client_id = atomic_load(&g_client_manager.clients[i].client_id);
2308 client_snapshots[snapshot_count].crypto_ctx = crypto_ctx;
2309 snapshot_count++;
2310 }
2311 }
2312
2313 // Prepare server state packet while still holding lock (fast operation)
2314 server_state_packet_t state;
2315 state.connected_client_count = g_client_manager.client_count;
2316 state.active_client_count = active_video_count;
2317 memset(state.reserved, 0, sizeof(state.reserved));
2318
2319 // Convert to network byte order
2320 server_state_packet_t net_state;
2321 net_state.connected_client_count = HOST_TO_NET_U32(state.connected_client_count);
2322 net_state.active_client_count = HOST_TO_NET_U32(state.active_client_count);
2323 memset(net_state.reserved, 0, sizeof(net_state.reserved));
2324
2325 // Release lock BEFORE sending (snapshot pattern)
2326 // Sending while holding lock blocks all client operations
2327 uint64_t lock_held_final_ns = time_get_ns();
2328 uint64_t lock_held_ns = time_elapsed_ns(lock_start_ns, lock_held_final_ns);
2329 rwlock_rdunlock(&g_client_manager_rwlock);
2330
2331 // Send to all clients AFTER releasing the lock
2332 // This prevents blocking other threads during network I/O
2333 for (int i = 0; i < snapshot_count; i++) {
2334 log_debug_every(5000 * US_PER_MS_INT,
2335 "BROADCAST_DEBUG: Sending SERVER_STATE to client %u (socket %d) with crypto_ctx=%p",
2336 client_snapshots[i].client_id, client_snapshots[i].socket, (void *)client_snapshots[i].crypto_ctx);
2337
2338 // Protect socket write with per-client send_mutex.
2339 client_info_t *target = find_client_by_id(client_snapshots[i].client_id);
2340 if (target) {
2341 // IMPORTANT: Verify client_id matches expected value - prevents use-after-free
2342 // if client was removed and replaced with another client in same slot
2343 if (atomic_load(&target->client_id) != client_snapshots[i].client_id) {
2344 log_warn("Client %u ID mismatch during broadcast (found %u), skipping send", client_snapshots[i].client_id,
2345 atomic_load(&target->client_id));
2346 continue;
2347 }
2348
2349 mutex_lock(&target->send_mutex);
2350
2351 // Double-check client_id again after acquiring mutex (stronger protection)
2352 if (atomic_load(&target->client_id) != client_snapshots[i].client_id) {
2353 mutex_unlock(&target->send_mutex);
2354 log_warn("Client %u was removed during broadcast send (now %u), skipping", client_snapshots[i].client_id,
2355 atomic_load(&target->client_id));
2356 continue;
2357 }
2358
2359 // Send via ACIP transport
2360 asciichat_error_t result = acip_send_server_state(target->transport, &net_state);
2361 mutex_unlock(&target->send_mutex);
2362
2363 if (result != ASCIICHAT_OK) {
2364 log_error("Failed to send server state to client %u: %s", client_snapshots[i].client_id,
2365 asciichat_error_string(result));
2366 } else {
2367 log_debug_every(5000 * US_PER_MS_INT, "Sent server state to client %u: %u connected, %u active",
2368 client_snapshots[i].client_id, state.connected_client_count, state.active_client_count);
2369 }
2370 } else {
2371 log_warn("Client %u removed before broadcast send could complete", client_snapshots[i].client_id);
2372 }
2373 }
2374
2375 if (lock_held_ns > 1 * NS_PER_MS_INT) {
2376 char duration_str[32];
2377 format_duration_ns((double)lock_held_ns, duration_str, sizeof(duration_str));
2378 log_warn("broadcast_server_state: rwlock held for %s (includes network I/O)", duration_str);
2379 }
2380}
const crypto_context_t * crypto_handshake_get_context(const crypto_handshake_context_t *ctx)
int socket_t
client_info_t * find_client_by_id(uint32_t client_id)
Fast O(1) client lookup by ID using hash table.
uint64_t time_get_ns(void)
Definition util/time.c:48
int format_duration_ns(double nanoseconds, char *buffer, size_t buffer_size)
Definition util/time.c:275
uint64_t time_elapsed_ns(uint64_t start_ns, uint64_t end_ns)
Definition util/time.c:90

References acip_send_server_state(), client_manager_t::client_count, client_manager_t::clients, crypto_handshake_get_context(), find_client_by_id(), format_duration_ns(), g_client_manager, g_client_manager_rwlock, time_elapsed_ns(), and time_get_ns().

Referenced by add_client(), add_webrtc_client(), and remove_client().

◆ cleanup_client_media_buffers()

void cleanup_client_media_buffers ( client_info_t *  client)

Definition at line 2438 of file src/server/client.c.

2438 {
2439 if (!client) {
2440 SET_ERRNO(ERROR_INVALID_PARAM, "Client is NULL");
2441 return;
2442 }
2443
2444 if (client->incoming_video_buffer) {
2445 video_frame_buffer_destroy(client->incoming_video_buffer);
2446 client->incoming_video_buffer = NULL;
2447 }
2448
2449 // Clean up outgoing video buffer (for ASCII frames)
2450 if (client->outgoing_video_buffer) {
2451 video_frame_buffer_destroy(client->outgoing_video_buffer);
2452 client->outgoing_video_buffer = NULL;
2453 }
2454
2455 // Clean up pre-allocated send buffer
2456 if (client->send_buffer) {
2457 SAFE_FREE(client->send_buffer);
2458 client->send_buffer = NULL;
2459 client->send_buffer_size = 0;
2460 }
2461
2462 if (client->incoming_audio_buffer) {
2463 audio_ring_buffer_destroy(client->incoming_audio_buffer);
2464 client->incoming_audio_buffer = NULL;
2465 }
2466
2467 // Clean up Opus decoder
2468 if (client->opus_decoder) {
2469 opus_codec_destroy((opus_codec_t *)client->opus_decoder);
2470 client->opus_decoder = NULL;
2471 }
2472}
void opus_codec_destroy(opus_codec_t *codec)
Definition opus_codec.c:215

References audio_ring_buffer_destroy(), opus_codec_destroy(), and video_frame_buffer_destroy().

◆ cleanup_client_packet_queues()

void cleanup_client_packet_queues ( client_info_t *  client)

Definition at line 2474 of file src/server/client.c.

2474 {
2475 if (!client)
2476 return;
2477
2478 if (client->audio_queue) {
2479 packet_queue_destroy(client->audio_queue);
2480 client->audio_queue = NULL;
2481 }
2482
2483 // Async dispatch: clean up received packet queue
2484 if (client->received_packet_queue) {
2485 packet_queue_destroy(client->received_packet_queue);
2486 client->received_packet_queue = NULL;
2487 }
2488
2489 // Video now uses double buffer, cleaned up in cleanup_client_media_buffers
2490}

References packet_queue_destroy().

◆ client_dispatch_thread()

void * client_dispatch_thread ( void *  arg)

Async dispatch thread for WebRTC clients.

Async dispatch thread - processes queued received packets.

Decouples receive from dispatch to prevent backpressure on the network socket. Received packets are queued by the receive thread, and processed asynchronously here. This allows the receive thread to keep accepting packets while dispatch processes them.

Definition at line 1449 of file src/server/client.c.

1449 {
1450 client_info_t *client = (client_info_t *)arg;
1451
1452 if (!client) {
1453 log_error("Invalid client info in dispatch thread (NULL pointer)");
1454 return NULL;
1455 }
1456
1457 uint32_t client_id = atomic_load(&client->client_id);
1458 log_info("DISPATCH_THREAD: Started for client %u", client_id);
1459
1460 uint64_t dispatch_loop_count = 0;
1461 uint64_t last_dequeue_attempt = time_get_ns();
1462
1463 while (!atomic_load(&g_server_should_exit) && atomic_load(&client->dispatch_thread_running)) {
1464 dispatch_loop_count++;
1465 // Try to dequeue next packet (non-blocking)
1466 // Use try_dequeue to avoid blocking - allows checking exit flag frequently
1467 uint64_t dequeue_start = time_get_ns();
1468 queued_packet_t *queued_pkt = packet_queue_try_dequeue(client->received_packet_queue);
1469 uint64_t dequeue_end = time_get_ns();
1470
1471 if (!queued_pkt) {
1472 // Queue was empty, sleep briefly to avoid busy-waiting
1473 log_dev_every(5 * US_PER_MS_INT, "🔄 DISPATCH_LOOP[%llu]: Queue empty after %.1fμs, sleeping 10ms",
1474 (unsigned long long)dispatch_loop_count, (dequeue_end - dequeue_start) / 1000.0);
1475 usleep(10 * US_PER_MS_INT); // 10ms sleep
1476 last_dequeue_attempt = dequeue_end;
1477 continue;
1478 }
1479
1480 // Frame received! Log it immediately
1481 log_info("🚀 DISPATCH_LOOP[%llu]: 📦 DEQUEUED %zu byte packet (dequeue took %.1fμs) for client %u",
1482 (unsigned long long)dispatch_loop_count, queued_pkt->data_len,
1483 (dequeue_end - dequeue_start) / 1000.0, client_id);
1484
1485 // Process the dequeued packet
1486 // The queued packet contains the complete ACIP packet (header + payload) from websocket_recv()
1487 const packet_header_t *header = (const packet_header_t *)queued_pkt->data;
1488 size_t total_len = queued_pkt->data_len;
1489 uint8_t *payload = (uint8_t *)header + sizeof(packet_header_t);
1490 size_t payload_len = 0;
1491
1492 log_info("DISPATCH_THREAD: Processing %zu byte packet for client %u", total_len, client_id);
1493
1494 if (total_len >= sizeof(packet_header_t)) {
1495 packet_type_t packet_type = (packet_type_t)NET_TO_HOST_U16(header->type);
1496 payload_len = NET_TO_HOST_U32(header->length);
1497
1498 log_info("🎯 DISPATCH_THREAD: Packet type=%d, payload_len=%u (will dispatch now)", packet_type, payload_len);
1499
1500 // Handle PACKET_TYPE_ENCRYPTED from WebSocket clients that encrypt at application layer
1501 // This mirrors the decryption logic in acip_server_receive_and_dispatch()
1502 if (packet_type == PACKET_TYPE_ENCRYPTED && client->transport && client->transport->crypto_ctx) {
1503 log_info("DISPATCH_THREAD: Decrypting PACKET_TYPE_ENCRYPTED for client %u", client_id);
1504
1505 uint8_t *ciphertext = payload;
1506 size_t ciphertext_len = payload_len;
1507
1508 // Decrypt to get inner plaintext packet (header + payload)
1509 size_t plaintext_size = ciphertext_len + 1024;
1510 uint8_t *plaintext = SAFE_MALLOC(plaintext_size, uint8_t *);
1511 if (!plaintext) {
1512 log_error("DISPATCH_THREAD: Failed to allocate plaintext buffer for decryption");
1513 packet_queue_free_packet(queued_pkt);
1514 continue;
1515 }
1516
1517 size_t plaintext_len;
1518 crypto_result_t crypto_result = crypto_decrypt(client->transport->crypto_ctx, ciphertext, ciphertext_len,
1519 plaintext, plaintext_size, &plaintext_len);
1520
1521 if (crypto_result != CRYPTO_OK) {
1522 log_error("DISPATCH_THREAD: Failed to decrypt packet: %s", crypto_result_to_string(crypto_result));
1523 SAFE_FREE(plaintext);
1524 packet_queue_free_packet(queued_pkt);
1525 continue;
1526 }
1527
1528 if (plaintext_len < sizeof(packet_header_t)) {
1529 log_error("DISPATCH_THREAD: Decrypted packet too small: %zu < %zu", plaintext_len, sizeof(packet_header_t));
1530 SAFE_FREE(plaintext);
1531 packet_queue_free_packet(queued_pkt);
1532 continue;
1533 }
1534
1535 // Parse the inner (decrypted) header
1536 const packet_header_t *inner_header = (const packet_header_t *)plaintext;
1537 packet_type = (packet_type_t)NET_TO_HOST_U16(inner_header->type);
1538 payload_len = NET_TO_HOST_U32(inner_header->length);
1539 payload = plaintext + sizeof(packet_header_t);
1540
1541 log_info("DISPATCH_THREAD: Decrypted inner packet type=%d, payload_len=%u", packet_type, payload_len);
1542
1543 // Dispatch the decrypted packet
1544 if (client->transport) {
1545 asciichat_error_t dispatch_result = acip_handle_server_packet(client->transport, packet_type, payload,
1546 payload_len, client, &g_acip_server_callbacks);
1547
1548 if (dispatch_result != ASCIICHAT_OK) {
1549 log_error("DISPATCH_THREAD: Handler failed for decrypted packet type=%d: %s", packet_type,
1550 asciichat_error_string(dispatch_result));
1551 }
1552 } else {
1553 log_error("DISPATCH_THREAD: Cannot dispatch decrypted packet - transport is NULL for client %u", client_id);
1554 }
1555
1556 // Free the decrypted buffer
1557 SAFE_FREE(plaintext);
1558 } else {
1559 // Not encrypted or no crypto context - dispatch as-is
1560 if (client->transport) {
1561 asciichat_error_t dispatch_result = acip_handle_server_packet(client->transport, packet_type, payload,
1562 payload_len, client, &g_acip_server_callbacks);
1563
1564 if (dispatch_result != ASCIICHAT_OK) {
1565 log_error("DISPATCH_THREAD: Handler failed for packet type=%d: %s", packet_type,
1566 asciichat_error_string(dispatch_result));
1567 }
1568 } else {
1569 log_error("DISPATCH_THREAD: Cannot dispatch packet - transport is NULL for client %u", client_id);
1570 }
1571 }
1572 }
1573
1574 // Free the queued packet
1575 packet_queue_free_packet(queued_pkt);
1576 }
1577
1578 log_info("DISPATCH_THREAD: Exiting for client %u", client_id);
1579 return NULL;
1580}
const char * crypto_result_to_string(crypto_result_t result)
crypto_result_t crypto_decrypt(crypto_context_t *ctx, const uint8_t *ciphertext, size_t ciphertext_len, uint8_t *plaintext_out, size_t plaintext_out_size, size_t *plaintext_len_out)
asciichat_error_t acip_handle_server_packet(acip_transport_t *transport, packet_type_t type, const void *payload, size_t payload_len, void *client_ctx, const acip_server_callbacks_t *callbacks)
void packet_queue_free_packet(queued_packet_t *packet)
queued_packet_t * packet_queue_try_dequeue(packet_queue_t *queue)
atomic_bool g_server_should_exit
Global atomic shutdown flag shared across all threads.

References acip_handle_server_packet(), crypto_decrypt(), crypto_result_to_string(), g_server_should_exit, packet_queue_free_packet(), packet_queue_try_dequeue(), and time_get_ns().

◆ client_receive_thread()

void * client_receive_thread ( void *  arg)

Definition at line 1582 of file src/server/client.c.

1582 {
1583 // Log thread startup
1584 log_debug("RECV_THREAD: Thread function entered, arg=%p", arg);
1585
1586 client_info_t *client = (client_info_t *)arg;
1587
1588 // Validate client pointer immediately before any access.
1589 // This prevents crashes if remove_client() has zeroed the client struct
1590 // while the thread was still starting at RtlUserThreadStart.
1591 if (!client) {
1592 log_error("Invalid client info in receive thread (NULL pointer)");
1593 return NULL;
1594 }
1595
1596 // Save this thread's ID so remove_client() can detect self-joins
1597 client->receive_thread_id = asciichat_thread_self();
1598
1599 log_debug("RECV_THREAD_DEBUG: Thread started, client=%p, client_id=%u, is_tcp=%d", (void *)client,
1600 atomic_load(&client->client_id), client->is_tcp_client);
1601
1602 if (atomic_load(&client->protocol_disconnect_requested)) {
1603 log_debug("Receive thread for client %u exiting before start (protocol disconnect requested)",
1604 atomic_load(&client->client_id));
1605 return NULL;
1606 }
1607
1608 // Check if client_id is 0 (client struct has been zeroed by remove_client)
1609 // This must be checked BEFORE accessing any client fields
1610 if (atomic_load(&client->client_id) == 0) {
1611 log_debug("Receive thread: client_id is 0, client struct may have been zeroed, exiting");
1612 return NULL;
1613 }
1614
1615 // Additional validation: check socket is valid
1616 // For TCP clients, validate socket. WebRTC clients use DataChannel (no socket)
1617 if (client->is_tcp_client && client->socket == INVALID_SOCKET_VALUE) {
1618 log_error("Invalid client socket in receive thread");
1619 return NULL;
1620 }
1621
1622 // Enable thread cancellation for clean shutdown
1623 // Thread cancellation not available in platform abstraction
1624 // Threads should exit when g_server_should_exit is set
1625
1626 log_debug("Started receive thread for client %u (%s)", atomic_load(&client->client_id), client->display_name);
1627
1628 // Main receive loop - processes packets from transport
1629 // For TCP clients: receives from socket
1630 // For WebRTC clients: receives from transport ringbuffer (via ACDS signaling)
1631
1632 log_info("RECV_THREAD_LOOP_START: client_id=%u, is_tcp=%d, transport=%p, active=%d", atomic_load(&client->client_id),
1633 client->is_tcp_client, (void *)client->transport, atomic_load(&client->active));
1634
1635 while (!atomic_load(&g_server_should_exit) && atomic_load(&client->active)) {
1636 // For TCP clients, check socket validity
1637 // For WebRTC clients, continue even if no socket (transport handles everything)
1638 if (client->is_tcp_client && client->socket == INVALID_SOCKET_VALUE) {
1639 log_debug("TCP client %u has invalid socket, exiting receive thread", atomic_load(&client->client_id));
1640 break;
1641 }
1642
1643 // Check client_id is still valid before accessing transport.
1644 // This prevents accessing freed memory if remove_client() has zeroed the client struct.
1645 if (atomic_load(&client->client_id) == 0) {
1646 log_debug("Client client_id reset, exiting receive thread");
1647 break;
1648 }
1649
1650 // Receive packet (without dispatching) - decouple from dispatch for async processing
1651 // For WebRTC clients with async dispatch, we queue packets instead of processing immediately
1652 // This prevents backpressure on the network socket
1653
1654 if (client->is_tcp_client) {
1655 // TCP clients: use original synchronous dispatch
1656 asciichat_error_t acip_result =
1657 acip_server_receive_and_dispatch(client->transport, client, &g_acip_server_callbacks);
1658
1659 // Check if shutdown was requested during the network call
1660 if (atomic_load(&g_server_should_exit)) {
1661 log_debug("RECV_EXIT: Server shutdown requested, breaking loop");
1662 break;
1663 }
1664
1665 // Handle receive errors
1666 if (acip_result != ASCIICHAT_OK) {
1667 asciichat_error_context_t err_ctx;
1668 if (HAS_ERRNO(&err_ctx)) {
1669 log_error("🔴 ACIP error for client %u: code=%u msg=%s", client->client_id, err_ctx.code,
1670 err_ctx.context_message);
1671 if (err_ctx.code == ERROR_NETWORK) {
1672 log_debug("Client %u disconnected (network error): %s", client->client_id, err_ctx.context_message);
1673 break;
1674 } else if (err_ctx.code == ERROR_CRYPTO) {
1675 log_error_client(
1676 client, "SECURITY VIOLATION: Unencrypted packet when encryption required - terminating connection");
1677 atomic_store(&g_server_should_exit, true);
1678 break;
1679 }
1680 }
1681 log_warn("ACIP error for TCP client %u: %s (disconnecting)", client->client_id,
1682 asciichat_error_string(acip_result));
1683 break;
1684 }
1685 } else {
1686 // WebRTC/WebSocket clients: async dispatch - receive packet and queue for async processing
1687 void *packet_data = NULL;
1688 void *allocated_buffer = NULL;
1689 size_t packet_len = 0;
1690
1691 asciichat_error_t recv_result =
1692 client->transport->methods->recv(client->transport, &packet_data, &packet_len, &allocated_buffer);
1693
1694 if (recv_result != ASCIICHAT_OK) {
1695 asciichat_error_context_t err_ctx;
1696 if (HAS_ERRNO(&err_ctx)) {
1697 // Check for reassembly timeout (fragments arriving slowly)
1698 // This is NOT a connection failure - safe to retry
1699 if ((err_ctx.code == ERROR_NETWORK) && err_ctx.context_message &&
1700 strstr(err_ctx.context_message, "reassembly timeout")) {
1701 // Fragments are arriving slowly - this is normal, retry without disconnecting
1702 log_dev_every(100000, "Client %u: fragment reassembly timeout, retrying in 10ms", client->client_id);
1703 platform_sleep_ms(10); // Sleep 10ms to allow fragments to arrive
1704 continue; // Retry without disconnecting
1705 }
1706
1707 if (err_ctx.code == ERROR_NETWORK) {
1708 log_debug("Client %u disconnected (network error): %s", client->client_id, err_ctx.context_message);
1709 break;
1710 }
1711 }
1712 log_warn("Receive failed for WebRTC client %u: %s (disconnecting)", client->client_id,
1713 asciichat_error_string(recv_result));
1714 break;
1715 }
1716
1717 // Queue the received packet for async dispatch
1718 // This prevents the receive thread from blocking on dispatch
1719 log_info("RECV_THREAD: Queuing %zu byte packet for async dispatch (client %u)", packet_len, client->client_id);
1720
1721 // Extract packet type from the header to preserve it when queueing
1722 packet_type_t pkt_type = PACKET_TYPE_PROTOCOL_VERSION;
1723 if (packet_len >= sizeof(packet_header_t)) {
1724 const packet_header_t *pkt_header = (const packet_header_t *)allocated_buffer;
1725 pkt_type = (packet_type_t)NET_TO_HOST_U16(pkt_header->type);
1726 }
1727
1728 // Build a complete packet to queue (header + payload)
1729 // The entire buffer (allocated_buffer) contains the full packet
1730 // copy_data=false because we want to transfer ownership to the queue
1731 int enqueue_result = packet_queue_enqueue(client->received_packet_queue, pkt_type, allocated_buffer, packet_len,
1732 atomic_load(&client->client_id), false);
1733
1734 if (enqueue_result < 0) {
1735 log_warn("Failed to queue received packet for client %u (queue full?) - packet dropped", client->client_id);
1736 if (allocated_buffer) {
1737 buffer_pool_free(NULL, allocated_buffer, packet_len);
1738 }
1739 }
1740 }
1741 }
1742
1743 // Mark client as inactive and stop all threads
1744 // Must stop render threads when client disconnects.
1745 // OPTIMIZED: Use atomic operations for thread control flags (lock-free)
1746 uint32_t client_id_snapshot = atomic_load(&client->client_id);
1747 log_debug("Setting active=false in receive_thread_fn (client_id=%u, exiting receive loop)", client_id_snapshot);
1748 atomic_store(&client->active, false);
1749 atomic_store(&client->send_thread_running, false);
1750 atomic_store(&client->video_render_thread_running, false);
1751 atomic_store(&client->audio_render_thread_running, false);
1752
1753 // Call remove_client() to trigger cleanup
1754 // Safe to call from receive thread now: remove_client() detects self-join via thread IDs
1755 // and skips the receive thread join when called from the receive thread itself
1756 log_debug("Receive thread for client %u calling remove_client() for cleanup", client_id_snapshot);
1757 server_context_t *server_ctx = (server_context_t *)client->server_ctx;
1758 if (server_ctx) {
1759 if (remove_client(server_ctx, client_id_snapshot) != 0) {
1760 log_warn("Failed to remove client %u from receive thread cleanup", client_id_snapshot);
1761 }
1762 } else {
1763 log_error("Receive thread for client %u: server_ctx is NULL, cannot call remove_client()", client_id_snapshot);
1764 }
1765
1766 log_debug("Receive thread for client %u terminated", client_id_snapshot);
1767
1768 // Clean up thread-local error context before exit
1770
1771 return NULL;
1772}
void asciichat_errno_destroy(void)
asciichat_error_t acip_server_receive_and_dispatch(acip_transport_t *transport, void *client_ctx, const acip_server_callbacks_t *callbacks)
int packet_queue_enqueue(packet_queue_t *queue, packet_type_t type, const void *data, size_t data_len, uint32_t client_id, bool copy_data)
void platform_sleep_ms(unsigned int ms)
Server context - encapsulates all server state.
asciichat_thread_t asciichat_thread_self(void)
Definition threading.c:54

References acip_server_receive_and_dispatch(), asciichat_errno_destroy(), asciichat_thread_self(), buffer_pool_free(), g_server_should_exit, packet_queue_enqueue(), platform_sleep_ms(), and remove_client().

◆ client_send_thread_func()

void * client_send_thread_func ( void *  arg)

Client packet send thread.

Definition at line 1775 of file src/server/client.c.

1775 {
1776 client_info_t *client = (client_info_t *)arg;
1777
1778 // Validate client pointer immediately before any access.
1779 // This prevents crashes if remove_client() has zeroed the client struct
1780 // while the thread was still starting at RtlUserThreadStart.
1781 if (!client) {
1782 log_error("Invalid client info in send thread (NULL pointer)");
1783 return NULL;
1784 }
1785
1786 // Check if client_id is 0 (client struct has been zeroed by remove_client)
1787 // This must be checked BEFORE accessing any client fields
1788 if (atomic_load(&client->client_id) == 0) {
1789 log_debug_every(100 * US_PER_MS_INT, "Send thread: client_id is 0, client struct may have been zeroed, exiting");
1790 return NULL;
1791 }
1792
1793 // Additional validation: check socket OR transport is valid
1794 // For TCP clients: socket is valid
1795 // For WebRTC clients: socket is INVALID_SOCKET_VALUE but transport is valid
1796 mutex_lock(&client->send_mutex);
1797 bool has_socket = (client->socket != INVALID_SOCKET_VALUE);
1798 bool has_transport = (client->transport != NULL);
1799 mutex_unlock(&client->send_mutex);
1800
1801 log_info("SEND_THREAD_VALIDATION: client_id=%u socket_valid=%d transport_valid=%d transport_ptr=%p",
1802 atomic_load(&client->client_id), has_socket, has_transport, (void *)client->transport);
1803
1804 if (!has_socket && !has_transport) {
1805 log_error("Invalid client connection in send thread (no socket or transport)");
1806 return NULL;
1807 }
1808
1809 log_info("Started send thread for client %u (%s)", atomic_load(&client->client_id), client->display_name);
1810
1811 // Mark thread as running
1812 atomic_store(&client->send_thread_running, true);
1813
1814 log_info("SEND_THREAD_LOOP_START: client_id=%u active=%d shutting_down=%d running=%d",
1815 atomic_load(&client->client_id), atomic_load(&client->active), atomic_load(&client->shutting_down),
1816 atomic_load(&client->send_thread_running));
1817
1818 // Track timing for video frame sends
1819 uint64_t last_video_send_time = 0;
1820 const uint64_t video_send_interval_us = 16666; // 60fps = ~16.67ms
1821
1822 // High-frequency audio loop - separate from video frame loop
1823 // to ensure audio packets are sent immediately, not rate-limited by video
1824#define MAX_AUDIO_BATCH 8
1825 int loop_iteration_count = 0;
1826 while (!atomic_load(&g_server_should_exit) && !atomic_load(&client->shutting_down) && atomic_load(&client->active) &&
1827 atomic_load(&client->send_thread_running)) {
1828 loop_iteration_count++;
1829 bool sent_something = false;
1830 uint64_t loop_start_ns = time_get_ns();
1831 log_info_every(5000 * US_PER_MS_INT, "[SEND_LOOP_%d] START: client=%u", loop_iteration_count,
1832 atomic_load(&client->client_id));
1833
1834 // PRIORITY: Drain all queued audio packets before video
1835 // Audio must not be rate-limited by video frame sending (16.67ms)
1836 queued_packet_t *audio_packets[MAX_AUDIO_BATCH];
1837 int audio_packet_count = 0;
1838
1839 if (client->audio_queue) {
1840 // Try to dequeue multiple audio packets
1841 for (int i = 0; i < MAX_AUDIO_BATCH; i++) {
1842 audio_packets[i] = packet_queue_try_dequeue(client->audio_queue);
1843 if (audio_packets[i]) {
1844 audio_packet_count++;
1845 } else {
1846 break; // No more packets available
1847 }
1848 }
1849 if (audio_packet_count > 0) {
1850 log_dev_every(4500 * US_PER_MS_INT, "SEND_AUDIO: client=%u dequeued=%d packets",
1851 atomic_load(&client->client_id), audio_packet_count);
1852 }
1853 } else {
1854 log_warn("Send thread: audio_queue is NULL for client %u", atomic_load(&client->client_id));
1855 }
1856
1857 // Send batched audio if we have packets
1858 if (audio_packet_count > 0) {
1859 // Protect crypto field access with mutex
1860 mutex_lock(&client->client_state_mutex);
1861 bool crypto_ready = !GET_OPTION(no_encrypt) && client->crypto_initialized &&
1862 crypto_handshake_is_ready(&client->crypto_handshake_ctx);
1863 mutex_unlock(&client->client_state_mutex);
1864 (void)crypto_ready; // Currently unused - kept for potential future encryption support
1865
1866 asciichat_error_t result = ASCIICHAT_OK;
1867
1868 if (audio_packet_count == 1) {
1869 // Single packet - send directly for low latency using ACIP transport
1870 packet_type_t pkt_type = (packet_type_t)NET_TO_HOST_U16(audio_packets[0]->header.type);
1871
1872 // Get transport reference while holding mutex briefly (prevents deadlock on TCP buffer full)
1873 mutex_lock(&client->send_mutex);
1874 if (atomic_load(&client->shutting_down) || !client->transport) {
1875 mutex_unlock(&client->send_mutex);
1876 log_warn("BREAK_AUDIO_SINGLE: client_id=%u shutting_down=%d transport=%p", atomic_load(&client->client_id),
1877 atomic_load(&client->shutting_down), (void *)client->transport);
1878 break; // Client is shutting down, exit thread
1879 }
1880 acip_transport_t *transport = client->transport;
1881 mutex_unlock(&client->send_mutex);
1882
1883 // Network I/O happens OUTSIDE the mutex
1884 result = packet_send_via_transport(transport, pkt_type, audio_packets[0]->data, audio_packets[0]->data_len,
1885 atomic_load(&client->client_id));
1886 if (result != ASCIICHAT_OK) {
1887 log_error("AUDIO SEND FAIL: client=%u, len=%zu, result=%d", client->client_id, audio_packets[0]->data_len,
1888 result);
1889 }
1890 } else {
1891 // Multiple packets - batch them together and send via transport (works for all client types)
1892 packet_type_t first_pkt_type = (packet_type_t)NET_TO_HOST_U16(audio_packets[0]->header.type);
1893
1894 // Get transport reference
1895 mutex_lock(&client->send_mutex);
1896 if (atomic_load(&client->shutting_down) || !client->transport) {
1897 mutex_unlock(&client->send_mutex);
1898 log_warn("BREAK_AUDIO_BATCH: client_id=%u shutting_down=%d transport=%p", atomic_load(&client->client_id),
1899 atomic_load(&client->shutting_down), (void *)client->transport);
1900 result = ERROR_NETWORK;
1901 } else {
1902 acip_transport_t *transport = client->transport;
1903 mutex_unlock(&client->send_mutex);
1904
1905 if (first_pkt_type == PACKET_TYPE_AUDIO_OPUS_BATCH) {
1906 // Opus packets - batch and send via transport
1907 size_t total_opus_size = 0;
1908 for (int i = 0; i < audio_packet_count; i++) {
1909 total_opus_size += audio_packets[i]->data_len;
1910 }
1911
1912 uint8_t *batched_opus = SAFE_MALLOC(total_opus_size, uint8_t *);
1913 uint16_t *frame_sizes = SAFE_MALLOC((size_t)audio_packet_count * sizeof(uint16_t), uint16_t *);
1914
1915 if (batched_opus && frame_sizes) {
1916 size_t offset = 0;
1917 for (int i = 0; i < audio_packet_count; i++) {
1918 frame_sizes[i] = (uint16_t)audio_packets[i]->data_len;
1919 memcpy(batched_opus + offset, audio_packets[i]->data, audio_packets[i]->data_len);
1920 offset += audio_packets[i]->data_len;
1921 }
1922 result = acip_send_audio_opus_batch(transport, batched_opus, total_opus_size, frame_sizes,
1923 (uint32_t)audio_packet_count, AUDIO_SAMPLE_RATE, 20);
1924 if (result != ASCIICHAT_OK) {
1925 log_error("AUDIO SEND FAIL (opus batch): client=%u, frames=%d, total_size=%zu, result=%d",
1926 atomic_load(&client->client_id), audio_packet_count, total_opus_size, result);
1927 }
1928 } else {
1929 log_error("Failed to allocate buffer for Opus batch");
1930 result = ERROR_MEMORY;
1931 }
1932 SAFE_FREE(batched_opus);
1933 SAFE_FREE(frame_sizes);
1934 } else {
1935 // Raw float audio - batch and send via transport
1936 size_t total_samples = 0;
1937 for (int i = 0; i < audio_packet_count; i++) {
1938 total_samples += audio_packets[i]->data_len / sizeof(float);
1939 }
1940
1941 float *batched_audio = SAFE_MALLOC(total_samples * sizeof(float), float *);
1942 if (batched_audio) {
1943 size_t offset = 0;
1944 for (int i = 0; i < audio_packet_count; i++) {
1945 size_t packet_samples = audio_packets[i]->data_len / sizeof(float);
1946 memcpy(batched_audio + offset, audio_packets[i]->data, audio_packets[i]->data_len);
1947 offset += packet_samples;
1948 }
1949 result = acip_send_audio_batch(transport, batched_audio, (uint32_t)total_samples, AUDIO_SAMPLE_RATE);
1950 if (result != ASCIICHAT_OK) {
1951 log_error("AUDIO SEND FAIL (raw batch): client=%u, packets=%d, samples=%zu, result=%d",
1952 atomic_load(&client->client_id), audio_packet_count, total_samples, result);
1953 }
1954 } else {
1955 log_error("Failed to allocate buffer for audio batch");
1956 result = ERROR_MEMORY;
1957 }
1958 SAFE_FREE(batched_audio);
1959 }
1960 }
1961 }
1962
1963 // Free all audio packets
1964 for (int i = 0; i < audio_packet_count; i++) {
1965 packet_queue_free_packet(audio_packets[i]);
1966 }
1967
1968 if (result != ASCIICHAT_OK) {
1969 if (!atomic_load(&g_server_should_exit)) {
1970 log_error("Failed to send audio to client %u: %s", client->client_id, asciichat_error_string(result));
1971 }
1972 log_warn("SKIP_AUDIO_ERROR: client_id=%u result=%d (continuing to send video)", atomic_load(&client->client_id),
1973 result);
1974 // Continue sending video even if audio fails - audio is optional for browser clients
1975 }
1976
1977 sent_something = true;
1978 uint64_t audio_done_ns = time_get_ns();
1979 log_info_every(5000 * US_PER_MS_INT, "[SEND_LOOP_%d] AUDIO_SENT: took %.2fms", loop_iteration_count,
1980 (audio_done_ns - loop_start_ns) / 1e6);
1981
1982 // Small sleep to let more audio packets queue (helps batching efficiency)
1983 if (audio_packet_count > 0) {
1984 platform_sleep_us(100); // 0.1ms - minimal delay
1985 }
1986 } else {
1987 // No audio packets - brief sleep to avoid busy-looping, then check for other tasks
1988 log_info_every(5000 * US_PER_MS_INT, "[SEND_LOOP_%d] NO_AUDIO: sleeping 1ms", loop_iteration_count);
1989 platform_sleep_us(1 * US_PER_MS_INT); // 1ms - enough for audio render thread to queue more packets
1990
1991 // Check if session rekeying should be triggered
1992 mutex_lock(&client->client_state_mutex);
1993 bool should_rekey = !GET_OPTION(no_encrypt) && client->crypto_initialized &&
1994 crypto_handshake_is_ready(&client->crypto_handshake_ctx) &&
1995 crypto_handshake_should_rekey(&client->crypto_handshake_ctx);
1996 mutex_unlock(&client->client_state_mutex);
1997
1998 if (should_rekey) {
1999 log_debug("Rekey threshold reached for client %u, initiating session rekey", client->client_id);
2000 mutex_lock(&client->client_state_mutex);
2001 // Get socket reference briefly to avoid deadlock on TCP buffer full
2002 mutex_lock(&client->send_mutex);
2003 if (atomic_load(&client->shutting_down) || client->socket == INVALID_SOCKET_VALUE) {
2004 mutex_unlock(&client->send_mutex);
2005 mutex_unlock(&client->client_state_mutex);
2006 log_warn("BREAK_REKEY: client_id=%u shutting_down=%d socket=%d", atomic_load(&client->client_id),
2007 atomic_load(&client->shutting_down), (int)client->socket);
2008 break; // Client is shutting down, exit thread
2009 }
2010 socket_t rekey_socket = client->socket;
2011 mutex_unlock(&client->send_mutex);
2012
2013 // Network I/O happens OUTSIDE the send_mutex (client_state_mutex still held for crypto state)
2014 asciichat_error_t result = crypto_handshake_rekey_request(&client->crypto_handshake_ctx, rekey_socket);
2015 mutex_unlock(&client->client_state_mutex);
2016
2017 if (result != ASCIICHAT_OK) {
2018 log_error("Failed to send REKEY_REQUEST to client %u: %d", client->client_id, result);
2019 } else {
2020 log_debug("Sent REKEY_REQUEST to client %u", client->client_id);
2021 // Notify client that session rekeying has been initiated (old keys still active)
2022 log_info_client(client, "Session rekey initiated - rotating encryption keys");
2023 }
2024 }
2025 }
2026
2027 // Always consume frames from the buffer to prevent accumulation
2028 // Rate-limit the actual sending, but always mark frames as consumed
2029 uint64_t video_check_ns = time_get_ns();
2030 if (!client->outgoing_video_buffer) {
2031 // Buffer has been destroyed (client is shutting down).
2032 // Exit cleanly instead of looping forever trying to access freed memory.
2033 log_warn("⚠️ Send thread exiting: outgoing_video_buffer is NULL for client %u (client shutting down?)",
2034 client->client_id);
2035 break;
2036 }
2037
2038 // Get latest frame from double buffer (lock-free operation)
2039 // This marks the frame as consumed even if we don't send it yet
2040 const video_frame_t *frame = video_frame_get_latest(client->outgoing_video_buffer);
2041 uint64_t frame_get_ns = time_get_ns();
2042 log_info_every(5000 * US_PER_MS_INT, "[SEND_LOOP_%d] VIDEO_GET_FRAME: took %.3fms, frame=%p", loop_iteration_count,
2043 (frame_get_ns - video_check_ns) / 1e6, (void *)frame);
2044 log_dev_every(4500 * US_PER_MS_INT, "Send thread: video_frame_get_latest returned %p for client %u", (void *)frame,
2045 client->client_id);
2046
2047 // Check if get_latest failed (buffer might have been destroyed)
2048 if (!frame) {
2049 log_warn("⚠️ Send thread exiting: video_frame_get_latest returned NULL for client %u (buffer destroyed?)",
2050 client->client_id);
2051 break; // Exit thread if buffer is invalid
2052 }
2053
2054 // Check if it's time to send a video frame (60fps rate limiting)
2055 // Only rate-limit the SEND operation, not frame consumption
2056 uint64_t current_time_ns = time_get_ns();
2057 uint64_t current_time_us = time_ns_to_us(current_time_ns);
2058 uint64_t time_since_last_send_us = current_time_us - last_video_send_time;
2059 log_dev_every(4500 * US_PER_MS_INT,
2060 "Send thread timing check: time_since_last=%llu us, interval=%llu us, should_send=%d",
2061 (unsigned long long)time_since_last_send_us, (unsigned long long)video_send_interval_us,
2062 (time_since_last_send_us >= video_send_interval_us));
2063
2064 if (current_time_us - last_video_send_time >= video_send_interval_us) {
2065 log_info_every(5000 * US_PER_MS_INT, "✓ SEND_TIME_READY: client_id=%u time_since=%llu interval=%llu",
2066 atomic_load(&client->client_id), (unsigned long long)time_since_last_send_us,
2067 (unsigned long long)video_send_interval_us);
2068 uint64_t frame_start_ns = time_get_ns();
2069
2070 // GRID LAYOUT CHANGE: Check if render thread has buffered a frame with different source count
2071 // If so, send CLEAR_CONSOLE before sending the new frame
2072 int rendered_sources = atomic_load(&client->last_rendered_grid_sources);
2073 int sent_sources = atomic_load(&client->last_sent_grid_sources);
2074
2075 if (rendered_sources != sent_sources && rendered_sources > 0) {
2076 // Grid layout changed! Send CLEAR_CONSOLE before next frame using ACIP transport
2077 // Get transport reference briefly to avoid deadlock on TCP buffer full
2078 mutex_lock(&client->send_mutex);
2079 if (atomic_load(&client->shutting_down) || !client->transport) {
2080 mutex_unlock(&client->send_mutex);
2081 log_warn("BREAK_CLEAR_CONSOLE: client_id=%u shutting_down=%d transport=%p", atomic_load(&client->client_id),
2082 atomic_load(&client->shutting_down), (void *)client->transport);
2083 break; // Client is shutting down, exit thread
2084 }
2085 acip_transport_t *clear_transport = client->transport;
2086 mutex_unlock(&client->send_mutex);
2087
2088 // Network I/O happens OUTSIDE the mutex
2089 acip_send_clear_console(clear_transport);
2090 log_debug_every(LOG_RATE_FAST, "Client %u: Sent CLEAR_CONSOLE (grid changed %d → %d sources)",
2091 client->client_id, sent_sources, rendered_sources);
2092 atomic_store(&client->last_sent_grid_sources, rendered_sources);
2093 sent_something = true;
2094 }
2095
2096 log_dev_every(4500 * US_PER_MS_INT, "Send thread: frame validation - frame=%p, frame->data=%p, frame->size=%zu",
2097 (void *)frame, (void *)frame->data, frame->size);
2098
2099 if (!frame->data) {
2100 log_dev("✗ SKIP_NO_DATA: client_id=%u frame=%p data=%p",
2101 atomic_load(&client->client_id), (void *)frame, (void *)frame->data);
2102 continue;
2103 }
2104 log_dev("✓ FRAME_DATA_OK: client_id=%u data=%p", atomic_load(&client->client_id),
2105 (void *)frame->data);
2106
2107 if (frame->data && frame->size == 0) {
2108 log_dev("✗ SKIP_ZERO_SIZE: client_id=%u size=%zu", atomic_load(&client->client_id),
2109 frame->size);
2110 platform_sleep_us(1 * US_PER_MS_INT); // 1ms sleep
2111 continue;
2112 }
2113 log_dev("✓ FRAME_SIZE_OK: client_id=%u size=%zu", atomic_load(&client->client_id),
2114 frame->size);
2115
2116 // Snapshot frame metadata (safe with double-buffer system)
2117 const char *frame_data = (const char *)frame->data; // Pointer snapshot - data is stable in front buffer
2118 size_t frame_size = frame->size; // Size snapshot - prevent race condition with render thread
2119 uint32_t width = atomic_load(&client->width);
2120 uint32_t height = atomic_load(&client->height);
2121 uint64_t step1_ns = time_get_ns();
2122 uint64_t step2_ns = time_get_ns();
2123 uint64_t step3_ns = time_get_ns();
2124 uint64_t step4_ns = time_get_ns();
2125
2126 // Check if crypto handshake is complete before sending (prevents sending to unauthenticated clients)
2127 mutex_lock(&client->client_state_mutex);
2128 bool crypto_ready = GET_OPTION(no_encrypt) ||
2129 (client->crypto_initialized && crypto_handshake_is_ready(&client->crypto_handshake_ctx));
2130 mutex_unlock(&client->client_state_mutex);
2131
2132 if (!crypto_ready) {
2133 log_dev("⚠️ SKIP_SEND_CRYPTO: client_id=%u crypto_initialized=%d no_encrypt=%d",
2134 atomic_load(&client->client_id), client->crypto_initialized, GET_OPTION(no_encrypt));
2135 continue; // Skip this frame, will try again on next loop iteration
2136 }
2137 log_dev("✓ CRYPTO_READY: client_id=%u about to send frame",
2138 atomic_load(&client->client_id));
2139
2140 // Get transport reference briefly to avoid deadlock on TCP buffer full
2141 // ACIP transport handles header building, CRC32, encryption internally
2142 log_dev_every(4500 * US_PER_MS_INT,
2143 "Send thread: About to send frame to client %u (width=%u, height=%u, size=%zu, data=%p)",
2144 client->client_id, width, height, frame_size, (void *)frame_data);
2145
2146 // Log first 32 bytes of frame data to verify we can access it
2147 if (frame_data && frame_size > 0) {
2148 log_info_every(5000 * US_PER_MS_INT,
2149 "FRAME_DATA_HEX: client=%u first_bytes=[%02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x "
2150 "%02x %02x %02x %02x %02x]",
2151 atomic_load(&client->client_id), ((uint8_t *)frame_data)[0], ((uint8_t *)frame_data)[1],
2152 ((uint8_t *)frame_data)[2], ((uint8_t *)frame_data)[3], ((uint8_t *)frame_data)[4],
2153 ((uint8_t *)frame_data)[5], ((uint8_t *)frame_data)[6], ((uint8_t *)frame_data)[7],
2154 ((uint8_t *)frame_data)[8], ((uint8_t *)frame_data)[9], ((uint8_t *)frame_data)[10],
2155 ((uint8_t *)frame_data)[11], ((uint8_t *)frame_data)[12], ((uint8_t *)frame_data)[13],
2156 ((uint8_t *)frame_data)[14], ((uint8_t *)frame_data)[15]);
2157 }
2158
2159 mutex_lock(&client->send_mutex);
2160 if (atomic_load(&client->shutting_down) || !client->transport) {
2161 mutex_unlock(&client->send_mutex);
2162 log_warn("BREAK_FRAME_SEND: client_id=%u shutting_down=%d transport=%p loop_iter=%d",
2163 atomic_load(&client->client_id), atomic_load(&client->shutting_down), (void *)client->transport,
2164 loop_iteration_count);
2165 break; // Client is shutting down, exit thread
2166 }
2167 acip_transport_t *frame_transport = client->transport;
2168 mutex_unlock(&client->send_mutex);
2169
2170 // Network I/O happens OUTSIDE the mutex
2171 log_dev_every(4500 * US_PER_MS_INT, "SEND_ASCII_FRAME: client_id=%u size=%zu width=%u height=%u",
2172 atomic_load(&client->client_id), frame_size, width, height);
2173 uint64_t send_start_ns = time_get_ns();
2174 log_dev("[SEND_LOOP_%d] FRAME_SEND_START: size=%zu", loop_iteration_count,
2175 frame_size);
2176 asciichat_error_t send_result = acip_send_ascii_frame(frame_transport, frame_data, frame_size, width, height,
2177 atomic_load(&client->client_id));
2178 uint64_t send_end_ns = time_get_ns();
2179 uint64_t send_ms = (send_end_ns - send_start_ns) / 1e6;
2180 log_dev("[SEND_LOOP_%d] FRAME_SEND_END: took %.2fms, result=%d",
2181 loop_iteration_count, send_ms, send_result);
2182 uint64_t step5_ns = time_get_ns();
2183
2184 if (send_result != ASCIICHAT_OK) {
2185 if (!atomic_load(&g_server_should_exit)) {
2186 SET_ERRNO(ERROR_NETWORK, "Failed to send video frame to client %u: %s", client->client_id,
2187 asciichat_error_string(send_result));
2188 }
2189 log_error("SEND_FRAME_FAILED: client_id=%u result=%d message=%s loop_iter=%d", atomic_load(&client->client_id),
2190 send_result, asciichat_error_string(send_result), loop_iteration_count);
2191 log_warn("BREAK_SEND_ERROR: client_id=%u (frame send failed)", atomic_load(&client->client_id));
2192 break;
2193 }
2194
2195 log_dev_every(4500 * US_PER_MS_INT, "SEND_FRAME_SUCCESS: client_id=%u size=%zu", atomic_load(&client->client_id),
2196 frame_size);
2197
2198 // Increment frame counter and log
2199 unsigned long frame_count = atomic_fetch_add(&client->frames_sent_count, 1) + 1;
2200 log_info("🎬 FRAME_SENT: client_id=%u frame_num=%lu size=%zu",
2201 atomic_load(&client->client_id), frame_count, frame_size);
2202
2203 sent_something = true;
2204 last_video_send_time = current_time_us;
2205
2206 uint64_t frame_end_ns = time_get_ns();
2207 uint64_t frame_time_us = time_ns_to_us(time_elapsed_ns(frame_start_ns, frame_end_ns));
2208 if (frame_time_us > 15 * US_PER_MS_INT) { // Log if sending a frame takes > 15ms (encryption adds ~5-6ms)
2209 uint64_t step1_us = time_ns_to_us(time_elapsed_ns(frame_start_ns, step1_ns));
2210 uint64_t step2_us = time_ns_to_us(time_elapsed_ns(step1_ns, step2_ns));
2211 uint64_t step3_us = time_ns_to_us(time_elapsed_ns(step2_ns, step3_ns));
2212 uint64_t step4_us = time_ns_to_us(time_elapsed_ns(step3_ns, step4_ns));
2213 uint64_t step5_us = time_ns_to_us(time_elapsed_ns(step4_ns, step5_ns));
2214 log_warn_every(
2215 LOG_RATE_DEFAULT,
2216 "SEND_THREAD: Frame send took %.2fms for client %u | Snapshot: %.2fms | Memcpy: %.2fms | CRC32: %.2fms | "
2217 "Header: %.2fms | send_packet_secure: %.2fms",
2218 frame_time_us / 1000.0, client->client_id, step1_us / 1000.0, step2_us / 1000.0, step3_us / 1000.0,
2219 step4_us / 1000.0, step5_us / 1000.0);
2220 }
2221 }
2222
2223 // If we didn't send anything, sleep briefly to prevent busy waiting
2224 if (!sent_something) {
2225 log_info_every(5000 * US_PER_MS_INT, "[SEND_LOOP_%d] IDLE_SLEEP: nothing sent", loop_iteration_count);
2226 platform_sleep_us(1 * US_PER_MS_INT); // 1ms sleep
2227 }
2228 uint64_t loop_end_ns = time_get_ns();
2229 uint64_t loop_ms = (loop_end_ns - loop_start_ns) / 1e6;
2230 if (loop_ms > 10) {
2231 log_warn("[SEND_LOOP_%d] SLOW_ITERATION: took %.2fms (client=%u)", loop_iteration_count, loop_ms,
2232 atomic_load(&client->client_id));
2233 }
2234 }
2235
2236 // Log why the send thread exited
2237 log_warn("Send thread exit conditions - client_id=%u g_server_should_exit=%d shutting_down=%d active=%d "
2238 "send_thread_running=%d",
2239 atomic_load(&client->client_id), atomic_load(&g_server_should_exit), atomic_load(&client->shutting_down),
2240 atomic_load(&client->active), atomic_load(&client->send_thread_running));
2241
2242 // Mark thread as stopped
2243 atomic_store(&client->send_thread_running, false);
2244 log_debug("Send thread for client %u terminated", client->client_id);
2245
2246 // Clean up thread-local error context before exit
2248
2249 return NULL;
2250}
bool crypto_handshake_should_rekey(const crypto_handshake_context_t *ctx)
asciichat_error_t crypto_handshake_rekey_request(crypto_handshake_context_t *ctx, socket_t socket)
asciichat_error_t acip_send_clear_console(acip_transport_t *transport)
asciichat_error_t acip_send_ascii_frame(acip_transport_t *transport, const char *frame_data, size_t frame_size, uint32_t width, uint32_t height, uint32_t client_id)
void platform_sleep_us(unsigned int us)
asciichat_error_t acip_send_audio_opus_batch(acip_transport_t *transport, const void *opus_data, size_t opus_len, const uint16_t *frame_sizes, uint32_t frame_count, uint32_t sample_rate, uint32_t frame_duration)
Definition send.c:158
asciichat_error_t acip_send_audio_batch(acip_transport_t *transport, const float *samples, uint32_t num_samples, uint32_t batch_count)
Definition send.c:115
asciichat_error_t packet_send_via_transport(acip_transport_t *transport, packet_type_t type, const void *payload, size_t payload_len, uint32_t client_id)
Send packet via transport with proper header (exported for generic wrappers)
Definition send.c:41
#define MAX_AUDIO_BATCH
const video_frame_t * video_frame_get_latest(video_frame_buffer_t *vfb)

References acip_send_ascii_frame(), acip_send_audio_batch(), acip_send_audio_opus_batch(), acip_send_clear_console(), asciichat_errno_destroy(), crypto_handshake_is_ready(), crypto_handshake_rekey_request(), crypto_handshake_should_rekey(), g_server_should_exit, MAX_AUDIO_BATCH, packet_queue_free_packet(), packet_queue_try_dequeue(), packet_send_via_transport(), platform_sleep_us(), time_elapsed_ns(), time_get_ns(), and video_frame_get_latest().

◆ find_client_by_id()

client_info_t * find_client_by_id ( uint32_t  client_id)

Fast O(1) client lookup by ID using hash table.

This is the primary method for locating clients throughout the server. It uses a hash table for constant-time lookups regardless of client count, making it suitable for high-performance operations like rendering and stats.

PERFORMANCE CHARACTERISTICS:

  • Time Complexity: O(1) average case, O(n) worst case (hash collision)
  • Space Complexity: O(1)
  • Thread Safety: Hash table is internally thread-safe for lookups

USAGE PATTERNS:

  • Called by render threads to find target clients for frame generation
  • Used by protocol handlers to locate clients for packet processing
  • Stats collection for per-client performance monitoring
Parameters
client_idUnique identifier for the client (0 is invalid)
Returns
Pointer to client_info_t if found, NULL if not found or invalid ID
Note
Does not require external locking - hash table provides thread safety
Returns direct pointer to client struct - caller should use snapshot pattern

Definition at line 308 of file src/server/client.c.

308 {
309 if (client_id == 0) {
310 SET_ERRNO(ERROR_INVALID_PARAM, "Invalid client ID");
311 return NULL;
312 }
313
314 // Protect uthash lookup with read lock to prevent concurrent access issues
315 rwlock_rdlock(&g_client_manager_rwlock);
316
317 client_info_t *result = NULL;
318 uint32_t search_id = client_id; // uthash needs an lvalue for the key
319 HASH_FIND_INT(g_client_manager.clients_by_id, &search_id, result);
320
321 rwlock_rdunlock(&g_client_manager_rwlock);
322
323 if (!result) {
324 log_warn("Client not found for ID %u", client_id);
325 }
326
327 return result;
328}

References client_manager_t::clients_by_id, g_client_manager, and g_client_manager_rwlock.

Referenced by broadcast_server_state_to_all_clients(), crypto_server_cleanup_client(), crypto_server_decrypt_packet(), crypto_server_encrypt_packet(), crypto_server_get_context(), crypto_server_is_ready(), and start_webrtc_client_threads().

◆ find_client_by_socket()

client_info_t * find_client_by_socket ( socket_t  socket)

Find client by socket descriptor using linear search.

This function provides socket-based client lookup, primarily used during connection establishment before client IDs are assigned. Less efficient than find_client_by_id() but necessary for socket-based operations.

PERFORMANCE CHARACTERISTICS:

  • Time Complexity: O(n) where n = number of active clients
  • Space Complexity: O(1)
  • Thread Safety: Internally acquires read lock on g_client_manager_rwlock

USAGE PATTERNS:

  • Connection establishment during add_client() processing
  • Socket error handling and cleanup operations
  • Debugging and diagnostic functions
Parameters
socketPlatform-abstracted socket descriptor to search for
Returns
Pointer to client_info_t if found, NULL if not found
Note
Only searches active clients (avoids returning stale entries)
Caller should use snapshot pattern when accessing returned client data

Definition at line 353 of file src/server/client.c.

353 {
354 rwlock_rdlock(&g_client_manager_rwlock);
355
356 for (int i = 0; i < MAX_CLIENTS; i++) {
357 if (g_client_manager.clients[i].socket == socket && atomic_load(&g_client_manager.clients[i].active)) {
358 client_info_t *client = &g_client_manager.clients[i];
359 rwlock_rdunlock(&g_client_manager_rwlock);
360 return client;
361 }
362 }
363
364 rwlock_rdunlock(&g_client_manager_rwlock);
365 return NULL;
366}

References client_manager_t::clients, g_client_manager, and g_client_manager_rwlock.

◆ process_decrypted_packet()

void process_decrypted_packet ( client_info_t *  client,
packet_type_t  type,
void *  data,
size_t  len 
)

Process a decrypted packet from a client

Parameters
clientClient info structure
typePacket type
dataPacket data
lenPacket length

Definition at line 3136 of file src/server/client.c.

3136 {
3137 if (type == 5000) { // CLIENT_CAPABILITIES
3138 log_debug("CLIENT: client_id=%u, data=%p, len=%zu", atomic_load(&client->client_id), data, len);
3139 }
3140
3141 // Rate limiting: Check and record packet-specific rate limits
3142 if (g_rate_limiter) {
3143 if (!check_and_record_packet_rate_limit(g_rate_limiter, client->client_ip, client->socket, type)) {
3144 // Rate limit exceeded - error response already sent by utility function
3145 return;
3146 }
3147 }
3148
3149 // O(1) dispatch via hash table lookup
3150 int idx = client_dispatch_hash_lookup(g_client_dispatch_hash, type);
3151 if (type == 5000 || type == 3001) {
3152 log_error("DISPATCH_LOOKUP: type=%d, idx=%d (len=%zu)", type, idx, len);
3153 }
3154 if (idx < 0) {
3155 disconnect_client_for_bad_data(client, "Unknown packet type: %d (len=%zu)", type, len);
3156 return;
3157 }
3158
3159 if (type == 5000 || type == 3001) {
3160 log_error("DISPATCH_HANDLER: type=%d, calling handler[%d]...", type, idx);
3161 }
3162 g_client_dispatch_handlers[idx](client, data, len);
3163 if (type == 5000 || type == 3001) {
3164 log_error("DISPATCH_HANDLER: type=%d, handler returned", type);
3165 }
3166}
bool check_and_record_packet_rate_limit(rate_limiter_t *rate_limiter, const char *client_ip, socket_t client_socket, packet_type_t packet_type)
Definition errors.c:54
rate_limiter_t * g_rate_limiter
Global rate limiter for connection attempts and packet processing.
void disconnect_client_for_bad_data(client_info_t *client, const char *format,...)

References check_and_record_packet_rate_limit(), disconnect_client_for_bad_data(), and g_rate_limiter.

◆ process_encrypted_packet()

int process_encrypted_packet ( client_info_t *  client,
packet_type_t *  type,
void **  data,
size_t *  len,
uint32_t *  sender_id 
)

Process an encrypted packet from a client

Parameters
clientClient info structure
typePointer to packet type (will be updated with decrypted type)
dataPointer to packet data (will be updated with decrypted data)
lenPointer to packet length (will be updated with decrypted length)
sender_idPointer to sender ID (will be updated with decrypted sender ID)
Returns
0 on success, -1 on error

Definition at line 2516 of file src/server/client.c.

2517 {
2518 if (!crypto_server_is_ready(client->client_id)) {
2519 log_error("Received encrypted packet but crypto not ready for client %u", client->client_id);
2520 buffer_pool_free(NULL, *data, *len);
2521 *data = NULL;
2522 return -1;
2523 }
2524
2525 // Store original allocation size before it gets modified
2526 size_t original_alloc_size = *len;
2527 void *decrypted_data = buffer_pool_alloc(NULL, original_alloc_size);
2528 size_t decrypted_len;
2529 int decrypt_result = crypto_server_decrypt_packet(client->client_id, (const uint8_t *)*data, *len,
2530 (uint8_t *)decrypted_data, original_alloc_size, &decrypted_len);
2531
2532 if (decrypt_result != 0) {
2533 SET_ERRNO(ERROR_CRYPTO, "Failed to process encrypted packet from client %u (result=%d)", client->client_id,
2534 decrypt_result);
2535 buffer_pool_free(NULL, *data, original_alloc_size);
2536 buffer_pool_free(NULL, decrypted_data, original_alloc_size);
2537 *data = NULL;
2538 return -1;
2539 }
2540
2541 // Replace encrypted data with decrypted data
2542 // Use original allocation size for freeing the encrypted buffer
2543 buffer_pool_free(NULL, *data, original_alloc_size);
2544
2545 *data = decrypted_data;
2546 *len = decrypted_len;
2547
2548 // Now process the decrypted packet by parsing its header
2549 if (*len < sizeof(packet_header_t)) {
2550 SET_ERRNO(ERROR_CRYPTO, "Decrypted packet too small for header from client %u", client->client_id);
2551 buffer_pool_free(NULL, *data, *len);
2552 *data = NULL;
2553 return -1;
2554 }
2555
2556 packet_header_t *header = (packet_header_t *)*data;
2557 *type = (packet_type_t)NET_TO_HOST_U16(header->type);
2558 *sender_id = NET_TO_HOST_U32(header->client_id);
2559
2560 // Adjust data pointer to skip header
2561 *data = (uint8_t *)*data + sizeof(packet_header_t);
2562 *len -= sizeof(packet_header_t);
2563
2564 return 0;
2565}
void * buffer_pool_alloc(buffer_pool_t *pool, size_t size)
Definition buffer_pool.c:99
int crypto_server_decrypt_packet(uint32_t client_id, const uint8_t *ciphertext, size_t ciphertext_len, uint8_t *plaintext, size_t plaintext_size, size_t *plaintext_len)
bool crypto_server_is_ready(uint32_t client_id)

References buffer_pool_alloc(), buffer_pool_free(), crypto_server_decrypt_packet(), and crypto_server_is_ready().

◆ remove_client()

int remove_client ( server_context_t server_ctx,
uint32_t  client_id 
)

Definition at line 1152 of file src/server/client.c.

1152 {
1153 if (!server_ctx) {
1154 SET_ERRNO(ERROR_INVALID_PARAM, "Cannot remove client %u: NULL server_ctx", client_id);
1155 return -1;
1156 }
1157
1158 // Phase 1: Mark client inactive and prepare for cleanup while holding write lock
1159 client_info_t *target_client = NULL;
1160 char display_name_copy[MAX_DISPLAY_NAME_LEN];
1161 socket_t client_socket = INVALID_SOCKET_VALUE; // Save socket for thread cleanup
1162
1163 log_debug("SOCKET_DEBUG: Attempting to remove client %d", client_id);
1164 rwlock_wrlock(&g_client_manager_rwlock);
1165
1166 for (int i = 0; i < MAX_CLIENTS; i++) {
1167 client_info_t *client = &g_client_manager.clients[i];
1168 uint32_t cid = atomic_load(&client->client_id);
1169 if (cid == client_id && cid != 0) {
1170 // Check if already being removed by another thread
1171 // This prevents double-free and use-after-free crashes during concurrent cleanup
1172 if (atomic_load(&client->shutting_down)) {
1173 rwlock_wrunlock(&g_client_manager_rwlock);
1174 log_debug("Client %u already being removed by another thread, skipping", client_id);
1175 return 0; // Return success - removal is in progress
1176 }
1177 // Mark as shutting down and inactive immediately to stop new operations
1178 log_debug("Setting active=false in remove_client (client_id=%d, socket=%d)", client_id, client->socket);
1179 log_info("Removing client %d (socket=%d) - marking inactive and clearing video flags", client_id, client->socket);
1180 atomic_store(&client->shutting_down, true);
1181 atomic_store(&client->active, false);
1182 atomic_store(&client->is_sending_video, false);
1183 atomic_store(&client->is_sending_audio, false);
1184 target_client = client;
1185
1186 // Store display name before clearing
1187 SAFE_STRNCPY(display_name_copy, client->display_name, MAX_DISPLAY_NAME_LEN - 1);
1188
1189 // Save socket for tcp_server_stop_client_threads() before closing
1190 mutex_lock(&client->client_state_mutex);
1191 client_socket = client->socket; // Save socket for thread cleanup
1192 if (client->socket != INVALID_SOCKET_VALUE) {
1193 log_debug("SOCKET_DEBUG: Client %d shutting down socket %d", client->client_id, client->socket);
1194 // Shutdown both send and receive operations to unblock any pending I/O
1195 socket_shutdown(client->socket, 2); // 2 = SHUT_RDWR on POSIX, SD_BOTH on Windows
1196 // Don't close yet - tcp_server needs socket as lookup key
1197 }
1198 mutex_unlock(&client->client_state_mutex);
1199
1200 // Shutdown packet queues to unblock send thread
1201 if (client->audio_queue) {
1202 packet_queue_stop(client->audio_queue);
1203 }
1204 // Video now uses double buffer, no queue to shutdown
1205
1206 break;
1207 }
1208 }
1209
1210 // If client not found, unlock and return
1211 if (!target_client) {
1212 rwlock_wrunlock(&g_client_manager_rwlock);
1213 log_warn("Cannot remove client %u: not found", client_id);
1214 return -1;
1215 }
1216
1217 // Unregister client from session_host (for discovery mode support)
1218 // NOTE: Client may not be registered if crypto handshake failed before session_host registration
1219 if (server_ctx->session_host) {
1220 asciichat_error_t session_result = session_host_remove_client(server_ctx->session_host, client_id);
1221 if (session_result != ASCIICHAT_OK) {
1222 // ERROR_NOT_FOUND (91) is expected if client failed crypto before being registered with session_host
1223 if (session_result == ERROR_NOT_FOUND) {
1224 log_debug("Client %u not found in session_host (likely failed crypto before registration)", client_id);
1225 } else {
1226 log_warn("Failed to unregister client %u from session_host: %s", client_id,
1227 asciichat_error_string(session_result));
1228 }
1229 } else {
1230 log_debug("Client %u unregistered from session_host", client_id);
1231 }
1232 }
1233
1234 // Release write lock before joining threads.
1235 // This prevents deadlock with render threads that need read locks.
1236 rwlock_wrunlock(&g_client_manager_rwlock);
1237
1238 // Phase 2: Stop all client threads
1239 // For TCP clients: use tcp_server thread pool management
1240 // For WebRTC clients: manually join threads (no socket-based thread pool)
1241 // Use is_tcp_client flag, not socket value - socket may already be INVALID_SOCKET_VALUE
1242 // even for TCP clients if it was closed earlier during cleanup.
1243 log_debug("Stopping all threads for client %u (socket %d, is_tcp=%d)", client_id, client_socket,
1244 target_client ? target_client->is_tcp_client : -1);
1245
1246 if (target_client && target_client->is_tcp_client) {
1247 // TCP client: use tcp_server thread pool
1248 // This joins threads in stop_id order: receive(1), render(2), send(3)
1249 // Use saved client_socket for lookup (tcp_server needs original socket as key)
1250 if (client_socket != INVALID_SOCKET_VALUE) {
1251 asciichat_error_t stop_result = tcp_server_stop_client_threads(server_ctx->tcp_server, client_socket);
1252 if (stop_result != ASCIICHAT_OK) {
1253 log_warn("Failed to stop threads for TCP client %u: error %d", client_id, stop_result);
1254 // Continue with cleanup even if thread stopping failed
1255 }
1256 } else {
1257 log_debug("TCP client %u socket already closed, threads should have already exited", client_id);
1258 }
1259 } else if (target_client) {
1260 // WebRTC client: manually join threads
1261 log_debug("Stopping WebRTC client %u threads (receive and send)", client_id);
1262
1263 // Join receive thread (but skip if called from the receive thread itself to avoid deadlock)
1264 thread_id_t current_thread_id = asciichat_thread_self();
1265 if (asciichat_thread_equal(current_thread_id, target_client->receive_thread_id)) {
1266 log_debug("remove_client() called from receive thread for client %u, skipping self-join", client_id);
1267 } else {
1268 void *recv_result = NULL;
1269 asciichat_error_t recv_join_result = asciichat_thread_join(&target_client->receive_thread, &recv_result);
1270 if (recv_join_result != ASCIICHAT_OK) {
1271 log_warn("Failed to join receive thread for WebRTC client %u: error %d", client_id, recv_join_result);
1272 } else {
1273 log_debug("Joined receive thread for WebRTC client %u", client_id);
1274 }
1275 }
1276
1277 // Join send thread
1278 void *send_result = NULL;
1279 asciichat_error_t send_join_result = asciichat_thread_join(&target_client->send_thread, &send_result);
1280 if (send_join_result != ASCIICHAT_OK) {
1281 log_warn("Failed to join send thread for WebRTC client %u: error %d", client_id, send_join_result);
1282 } else {
1283 log_debug("Joined send thread for WebRTC client %u", client_id);
1284 }
1285 // Note: Render threads still need to be stopped - they're created the same way for both TCP and WebRTC
1286 // For now, render threads are expected to exit when they check g_server_should_exit and client->active
1287 }
1288
1289 // Destroy ACIP transport before closing socket
1290 // For WebSocket clients: LWS_CALLBACK_CLOSED already closed and destroyed the transport
1291 // Trying to destroy it again causes heap-use-after-free since LWS callbacks might still fire
1292 // For TCP clients: transport is ours to clean up
1293 if (target_client && target_client->transport && target_client->is_tcp_client) {
1294 acip_transport_destroy(target_client->transport);
1295 target_client->transport = NULL;
1296 log_debug("Destroyed ACIP transport for TCP client %u", client_id);
1297 } else if (target_client && target_client->transport && !target_client->is_tcp_client) {
1298 // WebSocket client - just NULL it out, LWS_CALLBACK_CLOSED already destroyed it
1299 target_client->transport = NULL;
1300 log_debug("Skipped transport destruction for WebSocket client %u (LWS already destroyed)", client_id);
1301 }
1302
1303 // Now safe to close the socket (threads are stopped)
1304 if (client_socket != INVALID_SOCKET_VALUE) {
1305 log_debug("SOCKET_DEBUG: Closing socket %d for client %u after thread cleanup", client_socket, client_id);
1306 socket_close(client_socket);
1307 }
1308
1309 // Phase 3: Clean up resources with write lock
1310 rwlock_wrlock(&g_client_manager_rwlock);
1311
1312 // Re-validate target_client pointer after reacquiring lock.
1313 // Another thread might have invalidated the pointer while we had the lock released.
1314 if (target_client) {
1315 // Verify client_id still matches and client is still in shutting_down state
1316 uint32_t current_id = atomic_load(&target_client->client_id);
1317 bool still_shutting_down = atomic_load(&target_client->shutting_down);
1318 if (current_id != client_id || !still_shutting_down) {
1319 log_warn("Client %u pointer invalidated during thread cleanup (id=%u, shutting_down=%d)", client_id, current_id,
1320 still_shutting_down);
1321 rwlock_wrunlock(&g_client_manager_rwlock);
1322 return 0; // Another thread completed the cleanup
1323 }
1324 }
1325
1326 // Mark socket as closed in client structure
1327 if (target_client && target_client->socket != INVALID_SOCKET_VALUE) {
1328 mutex_lock(&target_client->client_state_mutex);
1329 target_client->socket = INVALID_SOCKET_VALUE;
1330 mutex_unlock(&target_client->client_state_mutex);
1331 log_debug("SOCKET_DEBUG: Client %u socket set to INVALID", client_id);
1332 }
1333
1334 // Use the dedicated cleanup function to ensure all resources are freed
1335 cleanup_client_all_buffers(target_client);
1336
1337 // Remove from audio mixer
1338 if (g_audio_mixer) {
1340#ifdef DEBUG_AUDIO
1341 log_debug("Removed client %u from audio mixer", client_id);
1342#endif
1343 }
1344
1345 // Remove from uthash table
1346 // Verify client is actually in the hash table before deleting.
1347 // Another thread might have already removed it.
1348 if (target_client) {
1349 client_info_t *hash_entry = NULL;
1350 HASH_FIND(hh, g_client_manager.clients_by_id, &client_id, sizeof(client_id), hash_entry);
1351 if (hash_entry == target_client) {
1352 HASH_DELETE(hh, g_client_manager.clients_by_id, target_client);
1353 log_debug("Removed client %u from uthash table", client_id);
1354 } else {
1355 log_warn("Client %u already removed from hash table by another thread (found=%p, expected=%p)", client_id,
1356 (void *)hash_entry, (void *)target_client);
1357 }
1358 } else {
1359 log_warn("Failed to remove client %u from hash table (client not found)", client_id);
1360 }
1361
1362 // Cleanup crypto context for this client
1363 if (target_client->crypto_initialized) {
1364 crypto_handshake_destroy(&target_client->crypto_handshake_ctx);
1365 target_client->crypto_initialized = false;
1366 log_debug("Crypto context cleaned up for client %u", client_id);
1367 }
1368
1369 // Verify all threads have actually exited before resetting client_id.
1370 // Threads that are still starting (at RtlUserThreadStart) haven't checked client_id yet.
1371 // We must ensure threads are fully joined before zeroing the client struct.
1372 // Use exponential backoff for thread termination verification
1373 int retry_count = 0;
1374 const int max_retries = 5;
1375 while (retry_count < max_retries && (asciichat_thread_is_initialized(&target_client->send_thread) ||
1376 asciichat_thread_is_initialized(&target_client->receive_thread) ||
1377 asciichat_thread_is_initialized(&target_client->video_render_thread) ||
1378 asciichat_thread_is_initialized(&target_client->audio_render_thread))) {
1379 // Exponential backoff: 10ms, 20ms, 40ms, 80ms, 160ms
1380 uint32_t delay_ms = 10 * (1 << retry_count);
1381 log_warn("Client %u: Some threads still appear initialized (attempt %d/%d), waiting %ums", client_id,
1382 retry_count + 1, max_retries, delay_ms);
1383 platform_sleep_us(delay_ms * 1000);
1384 retry_count++;
1385 }
1386
1387 if (retry_count == max_retries) {
1388 log_error("Client %u: Threads did not terminate after %d retries, proceeding with cleanup anyway", client_id,
1389 max_retries);
1390 }
1391
1392 // Only reset client_id to 0 AFTER confirming threads are joined
1393 // This prevents threads that are starting from accessing a zeroed client struct
1394 // Reset client_id to 0 before destroying mutexes to prevent race conditions.
1395 // This ensures worker threads can detect shutdown and exit before the mutex is destroyed.
1396 // If we destroy the mutex first, threads might try to access a destroyed mutex.
1397 atomic_store(&target_client->client_id, 0);
1398
1399 // Wait for threads to observe the client_id reset
1400 // Use sufficient delay for memory visibility across all CPU cores
1401 platform_sleep_us(5 * US_PER_MS_INT); // 5ms delay for memory barrier propagation
1402
1403 // Destroy mutexes
1404 // IMPORTANT: Always destroy these even if threads didn't join properly
1405 // to prevent issues when the slot is reused
1406 mutex_destroy(&target_client->client_state_mutex);
1407 mutex_destroy(&target_client->send_mutex);
1408
1409 // Clear client structure
1410 // NOTE: After memset, the mutex handles are zeroed but the OS resources
1411 // have been released by the destroy calls above
1412 memset(target_client, 0, sizeof(client_info_t));
1413
1414 // Recalculate client count using atomic reads
1415 int remaining_count = 0;
1416 for (int j = 0; j < MAX_CLIENTS; j++) {
1417 if (atomic_load(&g_client_manager.clients[j].client_id) != 0) {
1418 remaining_count++;
1419 }
1420 }
1421 g_client_manager.client_count = remaining_count;
1422
1423 log_debug("Client removed: client_id=%u (%s) removed, remaining clients: %d", client_id, display_name_copy,
1424 remaining_count);
1425
1426 rwlock_wrunlock(&g_client_manager_rwlock);
1427
1428 // Broadcast updated state
1430
1431 return 0;
1432}
void crypto_handshake_destroy(crypto_handshake_context_t *ctx)
asciichat_error_t session_host_remove_client(session_host_t *host, uint32_t client_id)
Definition host.c:1160
asciichat_error_t tcp_server_stop_client_threads(tcp_server_t *server, socket_t client_socket)
void mixer_remove_source(mixer_t *mixer, uint32_t client_id)
Definition mixer.c:401
void packet_queue_stop(packet_queue_t *queue)
void acip_transport_destroy(acip_transport_t *transport)
int asciichat_thread_join(asciichat_thread_t *thread, void **retval)
Definition threading.c:46
int asciichat_thread_equal(asciichat_thread_t t1, asciichat_thread_t t2)
Definition threading.c:58
int mutex_destroy(mutex_t *mutex)
Definition threading.c:21

References acip_transport_destroy(), asciichat_thread_equal(), asciichat_thread_join(), asciichat_thread_self(), broadcast_server_state_to_all_clients(), client_manager_t::client_count, client_manager_t::clients, client_manager_t::clients_by_id, crypto_handshake_destroy(), g_audio_mixer, g_client_manager, g_client_manager_rwlock, mixer_remove_source(), mutex_destroy(), packet_queue_stop(), platform_sleep_us(), server_context_t::session_host, session_host_remove_client(), server_context_t::tcp_server, and tcp_server_stop_client_threads().

Referenced by add_client(), add_webrtc_client(), client_receive_thread(), and server_main().

◆ start_webrtc_client_threads()

int start_webrtc_client_threads ( server_context_t server_ctx,
uint32_t  client_id 
)

Start threads for a WebRTC client after crypto initialization.

This is called by WebSocket handler after crypto handshake is initialized. It ensures receive thread doesn't try to process packets before crypto context exists.

Parameters
server_ctxServer context
client_idClient ID to start threads for
Returns
0 on success, -1 on failure

Definition at line 2397 of file src/server/client.c.

2397 {
2398 if (!server_ctx) {
2399 SET_ERRNO(ERROR_INVALID_PARAM, "Server context is NULL");
2400 return -1;
2401 }
2402
2403 client_info_t *client = find_client_by_id(client_id);
2404 if (!client) {
2405 SET_ERRNO(ERROR_NOT_FOUND, "Client %u not found", client_id);
2406 return -1;
2407 }
2408
2409 log_debug("Starting threads for WebRTC client %u...", client_id);
2410 return start_client_threads(server_ctx, client, false);
2411}

References find_client_by_id().

◆ stop_client_threads()

void stop_client_threads ( client_info_t *  client)

Definition at line 2413 of file src/server/client.c.

2413 {
2414 if (!client) {
2415 SET_ERRNO(ERROR_INVALID_PARAM, "Client is NULL");
2416 return;
2417 }
2418
2419 // Signal threads to stop
2420 log_debug("Setting active=false in stop_client_threads (client_id=%u)", atomic_load(&client->client_id));
2421 atomic_store(&client->active, false);
2422 atomic_store(&client->send_thread_running, false);
2423
2424 // Wait for threads to finish
2425 if (asciichat_thread_is_initialized(&client->send_thread)) {
2426 asciichat_thread_join(&client->send_thread, NULL);
2427 }
2428 if (asciichat_thread_is_initialized(&client->receive_thread)) {
2429 asciichat_thread_join(&client->receive_thread, NULL);
2430 }
2431 // For async dispatch: stop dispatch thread if running
2432 if (asciichat_thread_is_initialized(&client->dispatch_thread)) {
2433 atomic_store(&client->dispatch_thread_running, false);
2434 asciichat_thread_join(&client->dispatch_thread, NULL);
2435 }
2436}

References asciichat_thread_join().

Variable Documentation

◆ g_client_manager

client_manager_t g_client_manager

Global client manager singleton - central coordination point.

Global client manager.

This is the primary data structure for managing all connected clients. It serves as the bridge between main.c's connection accept loop and the per-client threading architecture.

STRUCTURE COMPONENTS:

  • clients[]: Array backing storage for client_info_t structs
  • client_hashtable: O(1) lookup table for client_id -> client_info_t*
  • client_count: Current number of active clients
  • mutex: Legacy mutex (mostly replaced by rwlock)
  • next_client_id: Monotonic counter for unique client identification

THREAD SAFETY: Protected by g_client_manager_rwlock for concurrent access

Definition at line 255 of file src/server/client.c.

Referenced by add_client(), add_webrtc_client(), any_clients_sending_video(), broadcast_server_state_to_all_clients(), find_client_by_id(), find_client_by_socket(), remove_client(), send_server_state_to_client(), server_main(), stats_logger_thread(), and update_server_stats().

◆ g_client_manager_rwlock

rwlock_t g_client_manager_rwlock = {0}

Reader-writer lock protecting the global client manager.

Read-write lock protecting client manager.

This lock enables high-performance concurrent access patterns:

  • Multiple threads can read client data simultaneously (stats, rendering)
  • Only one thread can modify client data at a time (add/remove operations)
  • Eliminates contention between read-heavy operations

USAGE PATTERN:

  • Read operations: rwlock_rdlock() for client lookups, stats gathering
  • Write operations: rwlock_wrlock() for add_client(), remove_client()
  • Always acquire THIS lock before per-client mutexes (lock ordering)

Definition at line 270 of file src/server/client.c.

270{0};

Referenced by add_client(), add_webrtc_client(), broadcast_server_state_to_all_clients(), find_client_by_id(), find_client_by_socket(), remove_client(), server_main(), stats_logger_thread(), and update_server_stats().