ascii-chat 0.8.38
Real-time terminal-based video chat with ASCII art conversion
Loading...
Searching...
No Matches
client.h File Reference

Per-client state management and lifecycle orchestration. More...

Go to the source code of this file.

Data Structures

struct  client_manager_t
 Global client manager structure for server-side client coordination. More...
 

Typedefs

typedef struct server_context_t server_context_t
 

Functions

int add_client (server_context_t *server_ctx, socket_t socket, const char *client_ip, int port)
 
int add_webrtc_client (server_context_t *server_ctx, acip_transport_t *transport, const char *client_ip, bool start_threads)
 Register a WebRTC client with the server.
 
int start_webrtc_client_threads (server_context_t *server_ctx, uint32_t client_id)
 Start threads for a WebRTC client after crypto initialization.
 
int remove_client (server_context_t *server_ctx, uint32_t client_id)
 
client_info_t * find_client_by_id (uint32_t client_id)
 Fast O(1) client lookup by ID using hash table.
 
client_info_t * find_client_by_socket (socket_t socket)
 Find client by socket descriptor using linear search.
 
void cleanup_client_media_buffers (client_info_t *client)
 
void cleanup_client_packet_queues (client_info_t *client)
 
void * client_receive_thread (void *arg)
 
void stop_client_threads (client_info_t *client)
 
int process_encrypted_packet (client_info_t *client, packet_type_t *type, void **data, size_t *len, uint32_t *sender_id)
 
void process_decrypted_packet (client_info_t *client, packet_type_t type, void *data, size_t len)
 
void initialize_client_info (client_info_t *client)
 

Variables

client_manager_t g_client_manager
 Global client manager singleton - central coordination point.
 
rwlock_t g_client_manager_rwlock
 Reader-writer lock protecting the global client manager.
 

Detailed Description

Per-client state management and lifecycle orchestration.

This header provides server-specific client management functions. The client_info_t structure and network logging macros are defined in lib/network/client.h.

Definition in file client.h.

Typedef Documentation

◆ server_context_t

Definition at line 16 of file client.h.

Function Documentation

◆ add_client()

int add_client ( server_context_t server_ctx,
socket_t  socket,
const char *  client_ip,
int  port 
)

Definition at line 534 of file src/server/client.c.

534 {
535 // Find empty slot WITHOUT holding the global lock
536 // We'll re-verify under lock after allocations complete
537 int slot = -1;
538 int existing_count = 0;
539 for (int i = 0; i < MAX_CLIENTS; i++) {
540 if (slot == -1 && atomic_load(&g_client_manager.clients[i].client_id) == 0) {
541 slot = i; // Take first available slot
542 }
543 if (atomic_load(&g_client_manager.clients[i].client_id) != 0 && atomic_load(&g_client_manager.clients[i].active)) {
544 existing_count++;
545 }
546 }
547
548 // Quick pre-check before expensive allocations
549 if (existing_count >= GET_OPTION(max_clients) || slot == -1) {
550 const char *reject_msg = "SERVER_FULL: Maximum client limit reached\n";
551 ssize_t send_result = socket_send(socket, reject_msg, strlen(reject_msg), 0);
552 if (send_result < 0) {
553 log_warn("Failed to send rejection message to client: %s", SAFE_STRERROR(errno));
554 }
555 return -1;
556 }
557
558 // DO EXPENSIVE ALLOCATIONS OUTSIDE THE LOCK
559 // This prevents blocking frame processing during client initialization
560 uint32_t new_client_id = atomic_fetch_add(&g_client_manager.next_client_id, 1) + 1;
561
562 video_frame_buffer_t *incoming_video_buffer = video_frame_buffer_create(new_client_id);
563 if (!incoming_video_buffer) {
564 SET_ERRNO(ERROR_MEMORY, "Failed to create video buffer for client %u", new_client_id);
565 log_error("Failed to create video buffer for client %u", new_client_id);
566 return -1;
567 }
568
569 audio_ring_buffer_t *incoming_audio_buffer = audio_ring_buffer_create_for_capture();
570 if (!incoming_audio_buffer) {
571 SET_ERRNO(ERROR_MEMORY, "Failed to create audio buffer for client %u", new_client_id);
572 log_error("Failed to create audio buffer for client %u", new_client_id);
573 video_frame_buffer_destroy(incoming_video_buffer);
574 return -1;
575 }
576
577 packet_queue_t *audio_queue = packet_queue_create_with_pools(500, 1000, false);
578 if (!audio_queue) {
579 LOG_ERRNO_IF_SET("Failed to create audio queue for client");
580 audio_ring_buffer_destroy(incoming_audio_buffer);
581 video_frame_buffer_destroy(incoming_video_buffer);
582 return -1;
583 }
584
585 video_frame_buffer_t *outgoing_video_buffer = video_frame_buffer_create(new_client_id);
586 if (!outgoing_video_buffer) {
587 LOG_ERRNO_IF_SET("Failed to create outgoing video buffer for client");
588 packet_queue_destroy(audio_queue);
589 audio_ring_buffer_destroy(incoming_audio_buffer);
590 video_frame_buffer_destroy(incoming_video_buffer);
591 return -1;
592 }
593
594 void *send_buffer = SAFE_MALLOC_ALIGNED(MAX_FRAME_BUFFER_SIZE, 64, void *);
595 if (!send_buffer) {
596 log_error("Failed to allocate send buffer for client %u", new_client_id);
597 video_frame_buffer_destroy(outgoing_video_buffer);
598 packet_queue_destroy(audio_queue);
599 audio_ring_buffer_destroy(incoming_audio_buffer);
600 video_frame_buffer_destroy(incoming_video_buffer);
601 return -1;
602 }
603
604 // NOW acquire the lock only for the critical section: slot assignment + registration
605 rwlock_wrlock(&g_client_manager_rwlock);
606
607 // Re-check slot availability under lock (another thread might have taken it)
608 if (atomic_load(&g_client_manager.clients[slot].client_id) != 0) {
609 rwlock_wrunlock(&g_client_manager_rwlock);
610 SAFE_FREE(send_buffer);
611 video_frame_buffer_destroy(outgoing_video_buffer);
612 packet_queue_destroy(audio_queue);
613 audio_ring_buffer_destroy(incoming_audio_buffer);
614 video_frame_buffer_destroy(incoming_video_buffer);
615
616 const char *reject_msg = "SERVER_FULL: Slot reassigned, try again\n";
617 socket_send(socket, reject_msg, strlen(reject_msg), 0);
618 return -1;
619 }
620
621 // Now we have exclusive access to the slot - do the actual registration
622 client_info_t *client = &g_client_manager.clients[slot];
623 memset(client, 0, sizeof(client_info_t));
624
625 client->socket = socket;
626 client->is_tcp_client = true;
627 atomic_store(&client->client_id, new_client_id);
628 SAFE_STRNCPY(client->client_ip, client_ip, sizeof(client->client_ip) - 1);
629 client->port = port;
630 atomic_store(&client->active, true);
631 client->server_ctx = server_ctx; // Store server context for cleanup
632 atomic_store(&client->shutting_down, false);
633 atomic_store(&client->last_rendered_grid_sources, 0);
634 atomic_store(&client->last_sent_grid_sources, 0);
635 client->connected_at = time(NULL);
636
637 memset(&client->crypto_handshake_ctx, 0, sizeof(client->crypto_handshake_ctx));
638 client->crypto_initialized = false;
639
640 client->pending_packet_type = 0;
641 client->pending_packet_payload = NULL;
642 client->pending_packet_length = 0;
643
644 // Assign pre-allocated buffers
645 client->incoming_video_buffer = incoming_video_buffer;
646 client->incoming_audio_buffer = incoming_audio_buffer;
647 client->audio_queue = audio_queue;
648 client->outgoing_video_buffer = outgoing_video_buffer;
649 client->send_buffer = send_buffer;
650 client->send_buffer_size = MAX_FRAME_BUFFER_SIZE;
651
652 safe_snprintf(client->display_name, sizeof(client->display_name), "Client%u", new_client_id);
653 log_info("Added new client ID=%u from %s:%d (socket=%d, slot=%d)", new_client_id, client_ip, port, socket, slot);
654 log_debug("Client slot assigned: client_id=%u assigned to slot %d, socket=%d", new_client_id, slot, socket);
655
656 // Register socket with tcp_server
657 asciichat_error_t reg_result = tcp_server_add_client(server_ctx->tcp_server, socket, client);
658 if (reg_result != ASCIICHAT_OK) {
659 SET_ERRNO(ERROR_INTERNAL, "Failed to register client socket with tcp_server");
660 log_error("Failed to register client %u socket with tcp_server", new_client_id);
661 // Don't unlock here - error_cleanup will do it
662 goto error_cleanup;
663 }
664
666 log_debug("Client count updated: now %d clients (added client_id=%u to slot %d)", g_client_manager.client_count,
667 new_client_id, slot);
668
669 uint32_t cid = atomic_load(&client->client_id);
670 HASH_ADD_INT(g_client_manager.clients_by_id, client_id, client);
671 log_debug("Added client %u to uthash table", cid);
672
673 rwlock_wrunlock(&g_client_manager_rwlock);
674
675 // Configure socket OUTSIDE lock
676 configure_client_socket(socket, new_client_id);
677
678 // Initialize mutexes OUTSIDE lock
679 if (mutex_init(&client->client_state_mutex) != 0) {
680 log_error("Failed to initialize client state mutex for client %u", new_client_id);
681 remove_client(server_ctx, new_client_id);
682 return -1;
683 }
684
685 if (mutex_init(&client->send_mutex) != 0) {
686 log_error("Failed to initialize send mutex for client %u", new_client_id);
687 remove_client(server_ctx, new_client_id);
688 return -1;
689 }
690
691 // Register with audio mixer OUTSIDE lock
692 if (g_audio_mixer && client->incoming_audio_buffer) {
693 if (mixer_add_source(g_audio_mixer, new_client_id, client->incoming_audio_buffer) < 0) {
694 log_warn("Failed to add client %u to audio mixer", new_client_id);
695 } else {
696#ifdef DEBUG_AUDIO
697 log_debug("Added client %u to audio mixer", new_client_id);
698#endif
699 }
700 }
701
702 // Perform crypto handshake before starting threads.
703 // This ensures the handshake uses the socket directly without interference from receive thread.
704 if (server_crypto_init() == 0) {
705 // Set timeout for crypto handshake to prevent indefinite blocking
706 // This prevents clients from connecting but never completing the handshake
707 const uint64_t HANDSHAKE_TIMEOUT_NS = 30ULL * NS_PER_SEC_INT;
708 asciichat_error_t timeout_result = set_socket_timeout(socket, HANDSHAKE_TIMEOUT_NS);
709 if (timeout_result != ASCIICHAT_OK) {
710 log_warn("Failed to set handshake timeout for client %u: %s", atomic_load(&client->client_id),
711 asciichat_error_string(timeout_result));
712 // Continue anyway - timeout is a safety feature, not critical
713 }
714
715 int crypto_result = server_crypto_handshake(client);
716 if (crypto_result != 0) {
717 log_error("Crypto handshake failed for client %u: %s", atomic_load(&client->client_id), network_error_string());
718 if (remove_client(server_ctx, atomic_load(&client->client_id)) != 0) {
719 log_error("Failed to remove client after crypto handshake failure");
720 }
721 return -1;
722 }
723
724 // Clear socket timeout after handshake completes successfully
725 // This allows normal operation without timeouts on data transfer
726 asciichat_error_t clear_timeout_result = set_socket_timeout(socket, 0);
727 if (clear_timeout_result != ASCIICHAT_OK) {
728 log_warn("Failed to clear handshake timeout for client %u: %s", atomic_load(&client->client_id),
729 asciichat_error_string(clear_timeout_result));
730 // Continue anyway - we can still communicate even with timeout set
731 }
732
733 log_debug("Crypto handshake completed successfully for client %u", atomic_load(&client->client_id));
734
735 // Create ACIP transport for protocol-agnostic packet sending
736 // The transport wraps the socket with encryption context from the handshake
737 const crypto_context_t *crypto_ctx = crypto_server_get_context(atomic_load(&client->client_id));
738 client->transport = acip_tcp_transport_create(socket, (crypto_context_t *)crypto_ctx);
739 if (!client->transport) {
740 log_error("Failed to create ACIP transport for client %u", atomic_load(&client->client_id));
741 if (remove_client(server_ctx, atomic_load(&client->client_id)) != 0) {
742 log_error("Failed to remove client after transport creation failure");
743 }
744 return -1;
745 }
746 log_debug("Created ACIP transport for client %u with crypto context", atomic_load(&client->client_id));
747
748 // After handshake completes, the client immediately sends PACKET_TYPE_CLIENT_CAPABILITIES
749 // We must read and process this packet BEFORE starting the receive thread to avoid a race condition
750 // where the packet arrives but no thread is listening for it.
751 //
752 // SPECIAL CASE: If client used --no-encrypt, we already received this packet during the handshake
753 // attempt and stored it in pending_packet_*. Use that instead of receiving a new one.
754 packet_envelope_t envelope;
755 bool used_pending_packet = false;
756
757 if (client->pending_packet_payload) {
758 // Client used --no-encrypt mode - use the packet we already received
759 log_info("Client %u using --no-encrypt mode - processing pending packet type %u", atomic_load(&client->client_id),
760 client->pending_packet_type);
761 envelope.type = client->pending_packet_type;
762 envelope.data = client->pending_packet_payload;
763 envelope.len = client->pending_packet_length;
764 envelope.allocated_buffer = client->pending_packet_payload; // Will be freed below
765 envelope.allocated_size = client->pending_packet_length;
766 used_pending_packet = true;
767
768 // Clear pending packet fields
769 client->pending_packet_type = 0;
770 client->pending_packet_payload = NULL;
771 client->pending_packet_length = 0;
772 } else {
773 // Normal encrypted mode - receive capabilities packet
774 log_debug("Waiting for initial capabilities packet from client %u", atomic_load(&client->client_id));
775
776 // Protect crypto context access with client state mutex
777 mutex_lock(&client->client_state_mutex);
778 const crypto_context_t *crypto_ctx = crypto_server_get_context(atomic_load(&client->client_id));
779
780 // Use per-client crypto state to determine enforcement
781 // At this point, handshake is complete, so crypto_initialized=true and handshake is ready
782 bool enforce_encryption = !GET_OPTION(no_encrypt) && client->crypto_initialized &&
783 crypto_handshake_is_ready(&client->crypto_handshake_ctx);
784
785 packet_recv_result_t result = receive_packet_secure(socket, (void *)crypto_ctx, enforce_encryption, &envelope);
786 mutex_unlock(&client->client_state_mutex);
787
788 if (result != PACKET_RECV_SUCCESS) {
789 log_error("Failed to receive initial capabilities packet from client %u: result=%d",
790 atomic_load(&client->client_id), result);
791 if (envelope.allocated_buffer) {
792 buffer_pool_free(NULL, envelope.allocated_buffer, envelope.allocated_size);
793 }
794 if (remove_client(server_ctx, atomic_load(&client->client_id)) != 0) {
795 log_error("Failed to remove client after crypto handshake failure");
796 }
797 return -1;
798 }
799 }
800
801 if (envelope.type != PACKET_TYPE_CLIENT_CAPABILITIES) {
802 log_error("Expected PACKET_TYPE_CLIENT_CAPABILITIES but got packet type %d from client %u", envelope.type,
803 atomic_load(&client->client_id));
804 if (envelope.allocated_buffer) {
805 buffer_pool_free(NULL, envelope.allocated_buffer, envelope.allocated_size);
806 }
807 if (remove_client(server_ctx, atomic_load(&client->client_id)) != 0) {
808 log_error("Failed to remove client after crypto handshake failure");
809 }
810 return -1;
811 }
812
813 // Process the capabilities packet directly
814 log_debug("Processing initial capabilities packet from client %u (from %s)", atomic_load(&client->client_id),
815 used_pending_packet ? "pending packet" : "network");
816 handle_client_capabilities_packet(client, envelope.data, envelope.len);
817
818 // Free the packet data
819 if (envelope.allocated_buffer) {
820 buffer_pool_free(NULL, envelope.allocated_buffer, envelope.allocated_size);
821 }
822 log_debug("Successfully received and processed initial capabilities for client %u",
823 atomic_load(&client->client_id));
824 }
825
826 // Start all client threads in the correct order (unified path for TCP and WebRTC)
827 // This creates: receive thread -> render threads -> send thread
828 // The render threads MUST be created before send thread to avoid the race condition
829 // where send thread reads empty frames before render thread generates the first real frame
830 uint32_t client_id_snapshot = atomic_load(&client->client_id);
831 if (start_client_threads(server_ctx, client, true) != 0) {
832 log_error("Failed to start threads for TCP client %u", client_id_snapshot);
833 // Client is already in hash table - use remove_client for proper cleanup
834 remove_client(server_ctx, client_id_snapshot);
835 return -1;
836 }
837 log_debug("Successfully created render threads for client %u", client_id_snapshot);
838
839 // Register client with session_host (for discovery mode support)
840 if (server_ctx->session_host) {
841 uint32_t session_client_id = session_host_add_client(server_ctx->session_host, socket, client_ip, port);
842 if (session_client_id == 0) {
843 log_warn("Failed to register client %u with session_host", client_id_snapshot);
844 } else {
845 log_debug("Client %u registered with session_host as %u", client_id_snapshot, session_client_id);
846 }
847 }
848
849 // Broadcast server state to ALL clients AFTER the new client is fully set up
850 // This notifies all clients (including the new one) about the updated grid
852
853 return (int)client_id_snapshot;
854
855error_cleanup:
856 // Clean up all partially allocated resources
857 // NOTE: This label is reached when allocation or initialization fails BEFORE
858 // the client is added to the hash table. Don't call remove_client() here.
859 cleanup_client_all_buffers(client);
860 rwlock_wrunlock(&g_client_manager_rwlock);
861 return -1;
862}
void buffer_pool_free(buffer_pool_t *pool, void *data, size_t size)
bool crypto_handshake_is_ready(const crypto_handshake_context_t *ctx)
uint32_t session_host_add_client(session_host_t *host, socket_t socket, const char *ip, int port)
Definition host.c:928
void audio_ring_buffer_destroy(audio_ring_buffer_t *rb)
audio_ring_buffer_t * audio_ring_buffer_create_for_capture(void)
asciichat_error_t tcp_server_add_client(tcp_server_t *server, socket_t socket, void *client_data)
int mixer_add_source(mixer_t *mixer, uint32_t client_id, audio_ring_buffer_t *buffer)
Definition mixer.c:364
asciichat_error_t set_socket_timeout(socket_t sockfd, uint64_t timeout_ns)
Set socket timeout.
const char * network_error_string()
Get human-readable error string for network errors.
packet_recv_result_t receive_packet_secure(socket_t sockfd, void *crypto_ctx, bool enforce_encryption, packet_envelope_t *envelope)
Receive a packet with decryption and decompression support.
Definition packet.c:566
packet_queue_t * packet_queue_create_with_pools(size_t max_size, size_t node_pool_size, bool use_buffer_pool)
void packet_queue_destroy(packet_queue_t *queue)
mixer_t *volatile g_audio_mixer
Global audio mixer instance for multi-client audio processing.
void handle_client_capabilities_packet(client_info_t *client, const void *data, size_t len)
Process CLIENT_CAPABILITIES packet - configure client-specific rendering.
rwlock_t g_client_manager_rwlock
Reader-writer lock protecting the global client manager.
int remove_client(server_context_t *server_ctx, uint32_t client_id)
client_manager_t g_client_manager
Global client manager singleton - central coordination point.
void broadcast_server_state_to_all_clients(void)
Notify all clients of state changes.
const crypto_context_t * crypto_server_get_context(uint32_t client_id)
int server_crypto_init(void)
int server_crypto_handshake(client_info_t *client)
_Atomic uint32_t next_client_id
Monotonic counter for unique client IDs (atomic for thread-safety)
Definition client.h:73
client_info_t * clients_by_id
uthash head pointer for O(1) client_id -> client_info_t* lookups
Definition client.h:67
client_info_t clients[MAX_CLIENTS]
Array of client_info_t structures (backing storage)
Definition client.h:65
int client_count
Current number of active clients.
Definition client.h:69
tcp_server_t * tcp_server
TCP server managing connections.
session_host_t * session_host
Session host for discovery mode support.
int safe_snprintf(char *buffer, size_t buffer_size, const char *format,...)
Safe formatted string printing to buffer.
Definition system.c:456
acip_transport_t * acip_tcp_transport_create(socket_t sockfd, crypto_context_t *crypto_ctx)
int mutex_init(mutex_t *mutex)
Definition threading.c:16
void video_frame_buffer_destroy(video_frame_buffer_t *vfb)
Definition video_frame.c:84
video_frame_buffer_t * video_frame_buffer_create(uint32_t client_id)
Definition video_frame.c:16

References acip_tcp_transport_create(), audio_ring_buffer_create_for_capture(), audio_ring_buffer_destroy(), broadcast_server_state_to_all_clients(), buffer_pool_free(), client_manager_t::client_count, client_manager_t::clients, client_manager_t::clients_by_id, crypto_handshake_is_ready(), crypto_server_get_context(), g_audio_mixer, g_client_manager, g_client_manager_rwlock, handle_client_capabilities_packet(), mixer_add_source(), mutex_init(), network_error_string(), client_manager_t::next_client_id, packet_queue_create_with_pools(), packet_queue_destroy(), receive_packet_secure(), remove_client(), safe_snprintf(), server_crypto_handshake(), server_crypto_init(), server_context_t::session_host, session_host_add_client(), set_socket_timeout(), server_context_t::tcp_server, tcp_server_add_client(), video_frame_buffer_create(), and video_frame_buffer_destroy().

◆ add_webrtc_client()

int add_webrtc_client ( server_context_t server_ctx,
acip_transport_t *  transport,
const char *  client_ip,
bool  start_threads 
)

Register a WebRTC client with the server.

Registers a client that connected via WebRTC data channel instead of TCP socket. This function reuses most of add_client() logic but skips:

  • Crypto handshake (already done via ACDS signaling)
  • Socket-specific configuration
  • TCP thread pool registration

DIFFERENCES FROM add_client():

  • Takes an already-created acip_transport_t* instead of socket
  • No crypto handshake (WebRTC signaling handled authentication)
  • No socket configuration (WebRTC handles buffering)
  • Uses generic thread spawning instead of tcp_server thread pool
Parameters
server_ctxServer context
transportWebRTC transport (already created and connected)
client_ipClient IP address for logging (may be empty for P2P)
Returns
Client ID on success, -1 on failure
Note
The transport must be fully initialized and ready to send/receive
Client capabilities are still expected as first packet

Definition at line 887 of file src/server/client.c.

888 {
889 if (!server_ctx || !transport || !client_ip) {
890 SET_ERRNO(ERROR_INVALID_PARAM, "Invalid parameters to add_webrtc_client");
891 return -1;
892 }
893
894 rwlock_wrlock(&g_client_manager_rwlock);
895
896 // Find empty slot - this is the authoritative check
897 int slot = -1;
898 int existing_count = 0;
899 for (int i = 0; i < MAX_CLIENTS; i++) {
900 if (slot == -1 && atomic_load(&g_client_manager.clients[i].client_id) == 0) {
901 slot = i; // Take first available slot
902 }
903 // Count only active clients
904 if (atomic_load(&g_client_manager.clients[i].client_id) != 0 && atomic_load(&g_client_manager.clients[i].active)) {
905 existing_count++;
906 }
907 }
908
909 // Check if we've hit the configured max-clients limit (not the array size)
910 if (existing_count >= GET_OPTION(max_clients)) {
911 rwlock_wrunlock(&g_client_manager_rwlock);
912 SET_ERRNO(ERROR_RESOURCE_EXHAUSTED, "Maximum client limit reached (%d/%d active clients)", existing_count,
913 GET_OPTION(max_clients));
914 log_error("Maximum client limit reached (%d/%d active clients)", existing_count, GET_OPTION(max_clients));
915 return -1;
916 }
917
918 if (slot == -1) {
919 rwlock_wrunlock(&g_client_manager_rwlock);
920 SET_ERRNO(ERROR_RESOURCE_EXHAUSTED, "No available client slots (all %d array slots are in use)", MAX_CLIENTS);
921 log_error("No available client slots (all %d array slots are in use)", MAX_CLIENTS);
922 return -1;
923 }
924
925 // Update client_count to match actual count before adding new client
926 g_client_manager.client_count = existing_count;
927
928 // Initialize client
929 client_info_t *client = &g_client_manager.clients[slot];
930
931 // Free any existing buffers from previous client in this slot
932 if (client->incoming_video_buffer) {
933 video_frame_buffer_destroy(client->incoming_video_buffer);
934 client->incoming_video_buffer = NULL;
935 }
936 if (client->outgoing_video_buffer) {
937 video_frame_buffer_destroy(client->outgoing_video_buffer);
938 client->outgoing_video_buffer = NULL;
939 }
940 if (client->incoming_audio_buffer) {
941 audio_ring_buffer_destroy(client->incoming_audio_buffer);
942 client->incoming_audio_buffer = NULL;
943 }
944 if (client->send_buffer) {
945 SAFE_FREE(client->send_buffer);
946 client->send_buffer = NULL;
947 }
948
949 memset(client, 0, sizeof(client_info_t));
950
951 // Set up WebRTC-specific fields
952 client->socket = INVALID_SOCKET_VALUE; // WebRTC has no traditional socket
953 client->is_tcp_client = false; // WebRTC client - threads managed directly
954 client->transport = transport; // Use provided transport
955 uint32_t new_client_id = atomic_fetch_add(&g_client_manager.next_client_id, 1) + 1;
956 atomic_store(&client->client_id, new_client_id);
957 SAFE_STRNCPY(client->client_ip, client_ip, sizeof(client->client_ip) - 1);
958 client->port = 0; // WebRTC doesn't use port numbers
959 atomic_store(&client->active, true);
960 client->server_ctx = server_ctx; // Store server context for receive thread cleanup
961 log_info("Added new WebRTC client ID=%u from %s (transport=%p, slot=%d)", new_client_id, client_ip, transport, slot);
962 atomic_store(&client->shutting_down, false);
963 atomic_store(&client->last_rendered_grid_sources, 0); // Render thread updates this
964 atomic_store(&client->last_sent_grid_sources, 0); // Send thread updates this
965 log_debug("WebRTC client slot assigned: client_id=%u assigned to slot %d", atomic_load(&client->client_id), slot);
966 client->connected_at = time(NULL);
967
968 // Initialize crypto context for this client
969 memset(&client->crypto_handshake_ctx, 0, sizeof(client->crypto_handshake_ctx));
970 log_debug("[ADD_WEBRTC_CLIENT] Setting crypto_initialized=false (will be set to true after handshake completes)");
971 // Do NOT set crypto_initialized = true here! The handshake must complete first.
972 // For WebSocket clients: websocket_client_handler will perform the handshake
973 // For WebRTC clients: ACDS signaling may have already done crypto (future enhancement)
974 client->crypto_initialized = false;
975
976 // Initialize pending packet storage (unused for WebRTC, but keep for consistency)
977 client->pending_packet_type = 0;
978 client->pending_packet_payload = NULL;
979 client->pending_packet_length = 0;
980
981 safe_snprintf(client->display_name, sizeof(client->display_name), "WebRTC%u", atomic_load(&client->client_id));
982
983 // Create individual video buffer for this client using modern double-buffering
984 client->incoming_video_buffer = video_frame_buffer_create(atomic_load(&client->client_id));
985 if (!client->incoming_video_buffer) {
986 SET_ERRNO(ERROR_MEMORY, "Failed to create video buffer for WebRTC client %u", atomic_load(&client->client_id));
987 log_error("Failed to create video buffer for WebRTC client %u", atomic_load(&client->client_id));
988 goto error_cleanup_webrtc;
989 }
990
991 // Create individual audio buffer for this client
992 client->incoming_audio_buffer = audio_ring_buffer_create_for_capture();
993 if (!client->incoming_audio_buffer) {
994 SET_ERRNO(ERROR_MEMORY, "Failed to create audio buffer for WebRTC client %u", atomic_load(&client->client_id));
995 log_error("Failed to create audio buffer for WebRTC client %u", atomic_load(&client->client_id));
996 goto error_cleanup_webrtc;
997 }
998
999 // Create packet queues for outgoing data
1000 client->audio_queue = packet_queue_create_with_pools(500, 1000, false);
1001 if (!client->audio_queue) {
1002 LOG_ERRNO_IF_SET("Failed to create audio queue for WebRTC client");
1003 goto error_cleanup_webrtc;
1004 }
1005
1006 // Create packet queue for async dispatch: received packets waiting to be processed
1007 // Capacity of 100 packets allows buffering of ~10-50 frames (depending on fragmentation)
1008 client->received_packet_queue = packet_queue_create_with_pools(100, 500, false);
1009 if (!client->received_packet_queue) {
1010 LOG_ERRNO_IF_SET("Failed to create received packet queue for client");
1011 goto error_cleanup_webrtc;
1012 }
1013
1014 // Create outgoing video buffer for ASCII frames (double buffered, no dropping)
1015 client->outgoing_video_buffer = video_frame_buffer_create(atomic_load(&client->client_id));
1016 if (!client->outgoing_video_buffer) {
1017 LOG_ERRNO_IF_SET("Failed to create outgoing video buffer for WebRTC client");
1018 goto error_cleanup_webrtc;
1019 }
1020
1021 // Pre-allocate send buffer to avoid malloc/free in send thread (prevents deadlocks)
1022 client->send_buffer_size = MAX_FRAME_BUFFER_SIZE; // 2MB should handle largest frames
1023 client->send_buffer = SAFE_MALLOC_ALIGNED(client->send_buffer_size, 64, void *);
1024 if (!client->send_buffer) {
1025 log_error("Failed to allocate send buffer for WebRTC client %u", atomic_load(&client->client_id));
1026 goto error_cleanup_webrtc;
1027 }
1028
1029 g_client_manager.client_count = existing_count + 1; // We just added a client
1030 log_debug("Client count updated: now %d clients (added WebRTC client_id=%u to slot %d)",
1031 g_client_manager.client_count, atomic_load(&client->client_id), slot);
1032
1033 // Add client to uthash table for O(1) lookup
1034 uint32_t cid = atomic_load(&client->client_id);
1035 HASH_ADD_INT(g_client_manager.clients_by_id, client_id, client);
1036 log_debug("Added WebRTC client %u to uthash table", cid);
1037
1038 // Register this client's audio buffer with the mixer
1039 if (g_audio_mixer && client->incoming_audio_buffer) {
1040 if (mixer_add_source(g_audio_mixer, atomic_load(&client->client_id), client->incoming_audio_buffer) < 0) {
1041 log_warn("Failed to add WebRTC client %u to audio mixer", atomic_load(&client->client_id));
1042 } else {
1043#ifdef DEBUG_AUDIO
1044 log_debug("Added WebRTC client %u to audio mixer", atomic_load(&client->client_id));
1045#endif
1046 }
1047 }
1048
1049 // Initialize mutexes BEFORE creating any threads to prevent race conditions
1050 if (mutex_init(&client->client_state_mutex) != 0) {
1051 log_error("Failed to initialize client state mutex for WebRTC client %u", atomic_load(&client->client_id));
1052 // Client is already in hash table - use remove_client for proper cleanup
1053 rwlock_wrunlock(&g_client_manager_rwlock);
1054 remove_client(server_ctx, atomic_load(&client->client_id));
1055 return -1;
1056 }
1057
1058 // Initialize send mutex to protect concurrent socket writes
1059 if (mutex_init(&client->send_mutex) != 0) {
1060 log_error("Failed to initialize send mutex for WebRTC client %u", atomic_load(&client->client_id));
1061 // Client is already in hash table - use remove_client for proper cleanup
1062 rwlock_wrunlock(&g_client_manager_rwlock);
1063 remove_client(server_ctx, atomic_load(&client->client_id));
1064 return -1;
1065 }
1066
1067 rwlock_wrunlock(&g_client_manager_rwlock);
1068
1069 // For WebRTC clients, the capabilities packet will be received by the receive thread
1070 // when it starts. Unlike TCP clients where we handle it synchronously in add_client(),
1071 // WebRTC uses the transport abstraction which handles packet reception automatically.
1072 log_debug("WebRTC client %u initialized - receive thread will process capabilities", atomic_load(&client->client_id));
1073
1074 uint32_t client_id_snapshot = atomic_load(&client->client_id);
1075
1076 // Conditionally start threads based on caller preference
1077 // WebSocket handler passes start_threads=false to defer thread startup until after crypto init
1078 // This ensures receive thread doesn't try to process packets before crypto context is ready
1079 if (start_threads) {
1080 log_debug("[ADD_WEBRTC_CLIENT] Starting client threads (receive, render) for client %u...", client_id_snapshot);
1081 if (start_client_threads(server_ctx, client, false) != 0) {
1082 log_error("Failed to start threads for WebRTC client %u", client_id_snapshot);
1083 return -1;
1084 }
1085 log_debug("Created receive thread for WebRTC client %u", client_id_snapshot);
1086 log_debug("[ADD_WEBRTC_CLIENT] Receive thread started - thread will now be processing packets", client_id_snapshot);
1087 } else {
1088 log_debug("[ADD_WEBRTC_CLIENT] Deferring thread startup for client %u (caller will start after crypto init)",
1089 client_id_snapshot);
1090 }
1091
1092 // Send initial server state to the new client
1093 if (send_server_state_to_client(client) != 0) {
1094 log_warn("Failed to send initial server state to WebRTC client %u", client_id_snapshot);
1095 } else {
1096#ifdef DEBUG_NETWORK
1097 log_info("Sent initial server state to WebRTC client %u", client_id_snapshot);
1098#endif
1099 }
1100
1101 // Send initial server state via ACIP transport
1102 rwlock_rdlock(&g_client_manager_rwlock);
1103 uint32_t connected_count = g_client_manager.client_count;
1104 rwlock_rdunlock(&g_client_manager_rwlock);
1105
1106 server_state_packet_t state;
1107 state.connected_client_count = connected_count;
1108 state.active_client_count = 0; // Will be updated by broadcast thread
1109 memset(state.reserved, 0, sizeof(state.reserved));
1110
1111 // Convert to network byte order
1112 server_state_packet_t net_state;
1113 net_state.connected_client_count = HOST_TO_NET_U32(state.connected_client_count);
1114 net_state.active_client_count = HOST_TO_NET_U32(state.active_client_count);
1115 memset(net_state.reserved, 0, sizeof(net_state.reserved));
1116
1117 asciichat_error_t packet_send_result = acip_send_server_state(client->transport, &net_state);
1118 if (packet_send_result != ASCIICHAT_OK) {
1119 log_warn("Failed to send initial server state to WebRTC client %u: %s", client_id_snapshot,
1120 asciichat_error_string(packet_send_result));
1121 } else {
1122 log_debug("Sent initial server state to WebRTC client %u: %u connected clients", client_id_snapshot,
1123 state.connected_client_count);
1124 }
1125
1126 // Register client with session_host (for discovery mode support)
1127 // WebRTC clients use INVALID_SOCKET_VALUE since they don't have a TCP socket
1128 if (server_ctx->session_host) {
1129 uint32_t session_client_id = session_host_add_client(server_ctx->session_host, INVALID_SOCKET_VALUE, client_ip, 0);
1130 if (session_client_id == 0) {
1131 log_warn("Failed to register WebRTC client %u with session_host", client_id_snapshot);
1132 } else {
1133 log_debug("WebRTC client %u registered with session_host as %u", client_id_snapshot, session_client_id);
1134 }
1135 }
1136
1137 // Broadcast server state to ALL clients AFTER the new client is fully set up
1138 // This notifies all clients (including the new one) about the updated grid
1140
1141 return (int)client_id_snapshot;
1142
1143error_cleanup_webrtc:
1144 // Clean up all partially allocated resources for WebRTC client
1145 // NOTE: This label is reached when allocation or initialization fails BEFORE
1146 // the client is added to the hash table. Don't call remove_client() here.
1147 cleanup_client_all_buffers(client);
1148 rwlock_wrunlock(&g_client_manager_rwlock);
1149 return -1;
1150}
asciichat_error_t acip_send_server_state(acip_transport_t *transport, const server_state_packet_t *state)
int send_server_state_to_client(client_info_t *client)
Send current server state to a specific client.

References acip_send_server_state(), audio_ring_buffer_create_for_capture(), audio_ring_buffer_destroy(), broadcast_server_state_to_all_clients(), client_manager_t::client_count, client_manager_t::clients, client_manager_t::clients_by_id, g_audio_mixer, g_client_manager, g_client_manager_rwlock, mixer_add_source(), mutex_init(), client_manager_t::next_client_id, packet_queue_create_with_pools(), remove_client(), safe_snprintf(), send_server_state_to_client(), server_context_t::session_host, session_host_add_client(), video_frame_buffer_create(), and video_frame_buffer_destroy().

◆ cleanup_client_media_buffers()

void cleanup_client_media_buffers ( client_info_t *  client)

Definition at line 2438 of file src/server/client.c.

2438 {
2439 if (!client) {
2440 SET_ERRNO(ERROR_INVALID_PARAM, "Client is NULL");
2441 return;
2442 }
2443
2444 if (client->incoming_video_buffer) {
2445 video_frame_buffer_destroy(client->incoming_video_buffer);
2446 client->incoming_video_buffer = NULL;
2447 }
2448
2449 // Clean up outgoing video buffer (for ASCII frames)
2450 if (client->outgoing_video_buffer) {
2451 video_frame_buffer_destroy(client->outgoing_video_buffer);
2452 client->outgoing_video_buffer = NULL;
2453 }
2454
2455 // Clean up pre-allocated send buffer
2456 if (client->send_buffer) {
2457 SAFE_FREE(client->send_buffer);
2458 client->send_buffer = NULL;
2459 client->send_buffer_size = 0;
2460 }
2461
2462 if (client->incoming_audio_buffer) {
2463 audio_ring_buffer_destroy(client->incoming_audio_buffer);
2464 client->incoming_audio_buffer = NULL;
2465 }
2466
2467 // Clean up Opus decoder
2468 if (client->opus_decoder) {
2469 opus_codec_destroy((opus_codec_t *)client->opus_decoder);
2470 client->opus_decoder = NULL;
2471 }
2472}
void opus_codec_destroy(opus_codec_t *codec)
Definition opus_codec.c:215

References audio_ring_buffer_destroy(), opus_codec_destroy(), and video_frame_buffer_destroy().

◆ cleanup_client_packet_queues()

void cleanup_client_packet_queues ( client_info_t *  client)

Definition at line 2474 of file src/server/client.c.

2474 {
2475 if (!client)
2476 return;
2477
2478 if (client->audio_queue) {
2479 packet_queue_destroy(client->audio_queue);
2480 client->audio_queue = NULL;
2481 }
2482
2483 // Async dispatch: clean up received packet queue
2484 if (client->received_packet_queue) {
2485 packet_queue_destroy(client->received_packet_queue);
2486 client->received_packet_queue = NULL;
2487 }
2488
2489 // Video now uses double buffer, cleaned up in cleanup_client_media_buffers
2490}

References packet_queue_destroy().

◆ client_receive_thread()

void * client_receive_thread ( void *  arg)

Definition at line 1582 of file src/server/client.c.

1582 {
1583 // Log thread startup
1584 log_debug("RECV_THREAD: Thread function entered, arg=%p", arg);
1585
1586 client_info_t *client = (client_info_t *)arg;
1587
1588 // Validate client pointer immediately before any access.
1589 // This prevents crashes if remove_client() has zeroed the client struct
1590 // while the thread was still starting at RtlUserThreadStart.
1591 if (!client) {
1592 log_error("Invalid client info in receive thread (NULL pointer)");
1593 return NULL;
1594 }
1595
1596 // Save this thread's ID so remove_client() can detect self-joins
1597 client->receive_thread_id = asciichat_thread_self();
1598
1599 log_debug("RECV_THREAD_DEBUG: Thread started, client=%p, client_id=%u, is_tcp=%d", (void *)client,
1600 atomic_load(&client->client_id), client->is_tcp_client);
1601
1602 if (atomic_load(&client->protocol_disconnect_requested)) {
1603 log_debug("Receive thread for client %u exiting before start (protocol disconnect requested)",
1604 atomic_load(&client->client_id));
1605 return NULL;
1606 }
1607
1608 // Check if client_id is 0 (client struct has been zeroed by remove_client)
1609 // This must be checked BEFORE accessing any client fields
1610 if (atomic_load(&client->client_id) == 0) {
1611 log_debug("Receive thread: client_id is 0, client struct may have been zeroed, exiting");
1612 return NULL;
1613 }
1614
1615 // Additional validation: check socket is valid
1616 // For TCP clients, validate socket. WebRTC clients use DataChannel (no socket)
1617 if (client->is_tcp_client && client->socket == INVALID_SOCKET_VALUE) {
1618 log_error("Invalid client socket in receive thread");
1619 return NULL;
1620 }
1621
1622 // Enable thread cancellation for clean shutdown
1623 // Thread cancellation not available in platform abstraction
1624 // Threads should exit when g_server_should_exit is set
1625
1626 log_debug("Started receive thread for client %u (%s)", atomic_load(&client->client_id), client->display_name);
1627
1628 // Main receive loop - processes packets from transport
1629 // For TCP clients: receives from socket
1630 // For WebRTC clients: receives from transport ringbuffer (via ACDS signaling)
1631
1632 log_info("RECV_THREAD_LOOP_START: client_id=%u, is_tcp=%d, transport=%p, active=%d", atomic_load(&client->client_id),
1633 client->is_tcp_client, (void *)client->transport, atomic_load(&client->active));
1634
1635 while (!atomic_load(&g_server_should_exit) && atomic_load(&client->active)) {
1636 // For TCP clients, check socket validity
1637 // For WebRTC clients, continue even if no socket (transport handles everything)
1638 if (client->is_tcp_client && client->socket == INVALID_SOCKET_VALUE) {
1639 log_debug("TCP client %u has invalid socket, exiting receive thread", atomic_load(&client->client_id));
1640 break;
1641 }
1642
1643 // Check client_id is still valid before accessing transport.
1644 // This prevents accessing freed memory if remove_client() has zeroed the client struct.
1645 if (atomic_load(&client->client_id) == 0) {
1646 log_debug("Client client_id reset, exiting receive thread");
1647 break;
1648 }
1649
1650 // Receive packet (without dispatching) - decouple from dispatch for async processing
1651 // For WebRTC clients with async dispatch, we queue packets instead of processing immediately
1652 // This prevents backpressure on the network socket
1653
1654 if (client->is_tcp_client) {
1655 // TCP clients: use original synchronous dispatch
1656 asciichat_error_t acip_result =
1657 acip_server_receive_and_dispatch(client->transport, client, &g_acip_server_callbacks);
1658
1659 // Check if shutdown was requested during the network call
1660 if (atomic_load(&g_server_should_exit)) {
1661 log_debug("RECV_EXIT: Server shutdown requested, breaking loop");
1662 break;
1663 }
1664
1665 // Handle receive errors
1666 if (acip_result != ASCIICHAT_OK) {
1667 asciichat_error_context_t err_ctx;
1668 if (HAS_ERRNO(&err_ctx)) {
1669 log_error("🔴 ACIP error for client %u: code=%u msg=%s", client->client_id, err_ctx.code,
1670 err_ctx.context_message);
1671 if (err_ctx.code == ERROR_NETWORK) {
1672 log_debug("Client %u disconnected (network error): %s", client->client_id, err_ctx.context_message);
1673 break;
1674 } else if (err_ctx.code == ERROR_CRYPTO) {
1675 log_error_client(
1676 client, "SECURITY VIOLATION: Unencrypted packet when encryption required - terminating connection");
1677 atomic_store(&g_server_should_exit, true);
1678 break;
1679 }
1680 }
1681 log_warn("ACIP error for TCP client %u: %s (disconnecting)", client->client_id,
1682 asciichat_error_string(acip_result));
1683 break;
1684 }
1685 } else {
1686 // WebRTC/WebSocket clients: async dispatch - receive packet and queue for async processing
1687 void *packet_data = NULL;
1688 void *allocated_buffer = NULL;
1689 size_t packet_len = 0;
1690
1691 asciichat_error_t recv_result =
1692 client->transport->methods->recv(client->transport, &packet_data, &packet_len, &allocated_buffer);
1693
1694 if (recv_result != ASCIICHAT_OK) {
1695 asciichat_error_context_t err_ctx;
1696 if (HAS_ERRNO(&err_ctx)) {
1697 // Check for reassembly timeout (fragments arriving slowly)
1698 // This is NOT a connection failure - safe to retry
1699 if ((err_ctx.code == ERROR_NETWORK) && err_ctx.context_message &&
1700 strstr(err_ctx.context_message, "reassembly timeout")) {
1701 // Fragments are arriving slowly - this is normal, retry without disconnecting
1702 log_dev_every(100000, "Client %u: fragment reassembly timeout, retrying in 10ms", client->client_id);
1703 platform_sleep_ms(10); // Sleep 10ms to allow fragments to arrive
1704 continue; // Retry without disconnecting
1705 }
1706
1707 if (err_ctx.code == ERROR_NETWORK) {
1708 log_debug("Client %u disconnected (network error): %s", client->client_id, err_ctx.context_message);
1709 break;
1710 }
1711 }
1712 log_warn("Receive failed for WebRTC client %u: %s (disconnecting)", client->client_id,
1713 asciichat_error_string(recv_result));
1714 break;
1715 }
1716
1717 // Queue the received packet for async dispatch
1718 // This prevents the receive thread from blocking on dispatch
1719 log_info("RECV_THREAD: Queuing %zu byte packet for async dispatch (client %u)", packet_len, client->client_id);
1720
1721 // Extract packet type from the header to preserve it when queueing
1722 packet_type_t pkt_type = PACKET_TYPE_PROTOCOL_VERSION;
1723 if (packet_len >= sizeof(packet_header_t)) {
1724 const packet_header_t *pkt_header = (const packet_header_t *)allocated_buffer;
1725 pkt_type = (packet_type_t)NET_TO_HOST_U16(pkt_header->type);
1726 }
1727
1728 // Build a complete packet to queue (header + payload)
1729 // The entire buffer (allocated_buffer) contains the full packet
1730 // copy_data=false because we want to transfer ownership to the queue
1731 int enqueue_result = packet_queue_enqueue(client->received_packet_queue, pkt_type, allocated_buffer, packet_len,
1732 atomic_load(&client->client_id), false);
1733
1734 if (enqueue_result < 0) {
1735 log_warn("Failed to queue received packet for client %u (queue full?) - packet dropped", client->client_id);
1736 if (allocated_buffer) {
1737 buffer_pool_free(NULL, allocated_buffer, packet_len);
1738 }
1739 }
1740 }
1741 }
1742
1743 // Mark client as inactive and stop all threads
1744 // Must stop render threads when client disconnects.
1745 // OPTIMIZED: Use atomic operations for thread control flags (lock-free)
1746 uint32_t client_id_snapshot = atomic_load(&client->client_id);
1747 log_debug("Setting active=false in receive_thread_fn (client_id=%u, exiting receive loop)", client_id_snapshot);
1748 atomic_store(&client->active, false);
1749 atomic_store(&client->send_thread_running, false);
1750 atomic_store(&client->video_render_thread_running, false);
1751 atomic_store(&client->audio_render_thread_running, false);
1752
1753 // Call remove_client() to trigger cleanup
1754 // Safe to call from receive thread now: remove_client() detects self-join via thread IDs
1755 // and skips the receive thread join when called from the receive thread itself
1756 log_debug("Receive thread for client %u calling remove_client() for cleanup", client_id_snapshot);
1757 server_context_t *server_ctx = (server_context_t *)client->server_ctx;
1758 if (server_ctx) {
1759 if (remove_client(server_ctx, client_id_snapshot) != 0) {
1760 log_warn("Failed to remove client %u from receive thread cleanup", client_id_snapshot);
1761 }
1762 } else {
1763 log_error("Receive thread for client %u: server_ctx is NULL, cannot call remove_client()", client_id_snapshot);
1764 }
1765
1766 log_debug("Receive thread for client %u terminated", client_id_snapshot);
1767
1768 // Clean up thread-local error context before exit
1770
1771 return NULL;
1772}
void asciichat_errno_destroy(void)
asciichat_error_t acip_server_receive_and_dispatch(acip_transport_t *transport, void *client_ctx, const acip_server_callbacks_t *callbacks)
int packet_queue_enqueue(packet_queue_t *queue, packet_type_t type, const void *data, size_t data_len, uint32_t client_id, bool copy_data)
void platform_sleep_ms(unsigned int ms)
atomic_bool g_server_should_exit
Global atomic shutdown flag shared across all threads.
Server context - encapsulates all server state.
asciichat_thread_t asciichat_thread_self(void)
Definition threading.c:54

References acip_server_receive_and_dispatch(), asciichat_errno_destroy(), asciichat_thread_self(), buffer_pool_free(), g_server_should_exit, packet_queue_enqueue(), platform_sleep_ms(), and remove_client().

◆ find_client_by_id()

client_info_t * find_client_by_id ( uint32_t  client_id)

Fast O(1) client lookup by ID using hash table.

This is the primary method for locating clients throughout the server. It uses a hash table for constant-time lookups regardless of client count, making it suitable for high-performance operations like rendering and stats.

PERFORMANCE CHARACTERISTICS:

  • Time Complexity: O(1) average case, O(n) worst case (hash collision)
  • Space Complexity: O(1)
  • Thread Safety: Hash table is internally thread-safe for lookups

USAGE PATTERNS:

  • Called by render threads to find target clients for frame generation
  • Used by protocol handlers to locate clients for packet processing
  • Stats collection for per-client performance monitoring
Parameters
client_idUnique identifier for the client (0 is invalid)
Returns
Pointer to client_info_t if found, NULL if not found or invalid ID
Note
Does not require external locking - hash table provides thread safety
Returns direct pointer to client struct - caller should use snapshot pattern

Definition at line 308 of file src/server/client.c.

308 {
309 if (client_id == 0) {
310 SET_ERRNO(ERROR_INVALID_PARAM, "Invalid client ID");
311 return NULL;
312 }
313
314 // Protect uthash lookup with read lock to prevent concurrent access issues
315 rwlock_rdlock(&g_client_manager_rwlock);
316
317 client_info_t *result = NULL;
318 uint32_t search_id = client_id; // uthash needs an lvalue for the key
319 HASH_FIND_INT(g_client_manager.clients_by_id, &search_id, result);
320
321 rwlock_rdunlock(&g_client_manager_rwlock);
322
323 if (!result) {
324 log_warn("Client not found for ID %u", client_id);
325 }
326
327 return result;
328}

References client_manager_t::clients_by_id, g_client_manager, and g_client_manager_rwlock.

Referenced by broadcast_server_state_to_all_clients(), crypto_server_cleanup_client(), crypto_server_decrypt_packet(), crypto_server_encrypt_packet(), crypto_server_get_context(), crypto_server_is_ready(), and start_webrtc_client_threads().

◆ find_client_by_socket()

client_info_t * find_client_by_socket ( socket_t  socket)

Find client by socket descriptor using linear search.

This function provides socket-based client lookup, primarily used during connection establishment before client IDs are assigned. Less efficient than find_client_by_id() but necessary for socket-based operations.

PERFORMANCE CHARACTERISTICS:

  • Time Complexity: O(n) where n = number of active clients
  • Space Complexity: O(1)
  • Thread Safety: Internally acquires read lock on g_client_manager_rwlock

USAGE PATTERNS:

  • Connection establishment during add_client() processing
  • Socket error handling and cleanup operations
  • Debugging and diagnostic functions
Parameters
socketPlatform-abstracted socket descriptor to search for
Returns
Pointer to client_info_t if found, NULL if not found
Note
Only searches active clients (avoids returning stale entries)
Caller should use snapshot pattern when accessing returned client data

Definition at line 353 of file src/server/client.c.

353 {
354 rwlock_rdlock(&g_client_manager_rwlock);
355
356 for (int i = 0; i < MAX_CLIENTS; i++) {
357 if (g_client_manager.clients[i].socket == socket && atomic_load(&g_client_manager.clients[i].active)) {
358 client_info_t *client = &g_client_manager.clients[i];
359 rwlock_rdunlock(&g_client_manager_rwlock);
360 return client;
361 }
362 }
363
364 rwlock_rdunlock(&g_client_manager_rwlock);
365 return NULL;
366}

References client_manager_t::clients, g_client_manager, and g_client_manager_rwlock.

◆ initialize_client_info()

void initialize_client_info ( client_info_t *  client)

◆ process_decrypted_packet()

void process_decrypted_packet ( client_info_t *  client,
packet_type_t  type,
void *  data,
size_t  len 
)

Process a decrypted packet from a client

Parameters
clientClient info structure
typePacket type
dataPacket data
lenPacket length

Definition at line 3136 of file src/server/client.c.

3136 {
3137 if (type == 5000) { // CLIENT_CAPABILITIES
3138 log_debug("CLIENT: client_id=%u, data=%p, len=%zu", atomic_load(&client->client_id), data, len);
3139 }
3140
3141 // Rate limiting: Check and record packet-specific rate limits
3142 if (g_rate_limiter) {
3143 if (!check_and_record_packet_rate_limit(g_rate_limiter, client->client_ip, client->socket, type)) {
3144 // Rate limit exceeded - error response already sent by utility function
3145 return;
3146 }
3147 }
3148
3149 // O(1) dispatch via hash table lookup
3150 int idx = client_dispatch_hash_lookup(g_client_dispatch_hash, type);
3151 if (type == 5000 || type == 3001) {
3152 log_error("DISPATCH_LOOKUP: type=%d, idx=%d (len=%zu)", type, idx, len);
3153 }
3154 if (idx < 0) {
3155 disconnect_client_for_bad_data(client, "Unknown packet type: %d (len=%zu)", type, len);
3156 return;
3157 }
3158
3159 if (type == 5000 || type == 3001) {
3160 log_error("DISPATCH_HANDLER: type=%d, calling handler[%d]...", type, idx);
3161 }
3162 g_client_dispatch_handlers[idx](client, data, len);
3163 if (type == 5000 || type == 3001) {
3164 log_error("DISPATCH_HANDLER: type=%d, handler returned", type);
3165 }
3166}
bool check_and_record_packet_rate_limit(rate_limiter_t *rate_limiter, const char *client_ip, socket_t client_socket, packet_type_t packet_type)
Definition errors.c:54
rate_limiter_t * g_rate_limiter
Global rate limiter for connection attempts and packet processing.
void disconnect_client_for_bad_data(client_info_t *client, const char *format,...)

References check_and_record_packet_rate_limit(), disconnect_client_for_bad_data(), and g_rate_limiter.

◆ process_encrypted_packet()

int process_encrypted_packet ( client_info_t *  client,
packet_type_t *  type,
void **  data,
size_t *  len,
uint32_t *  sender_id 
)

Process an encrypted packet from a client

Parameters
clientClient info structure
typePointer to packet type (will be updated with decrypted type)
dataPointer to packet data (will be updated with decrypted data)
lenPointer to packet length (will be updated with decrypted length)
sender_idPointer to sender ID (will be updated with decrypted sender ID)
Returns
0 on success, -1 on error

Definition at line 2516 of file src/server/client.c.

2517 {
2518 if (!crypto_server_is_ready(client->client_id)) {
2519 log_error("Received encrypted packet but crypto not ready for client %u", client->client_id);
2520 buffer_pool_free(NULL, *data, *len);
2521 *data = NULL;
2522 return -1;
2523 }
2524
2525 // Store original allocation size before it gets modified
2526 size_t original_alloc_size = *len;
2527 void *decrypted_data = buffer_pool_alloc(NULL, original_alloc_size);
2528 size_t decrypted_len;
2529 int decrypt_result = crypto_server_decrypt_packet(client->client_id, (const uint8_t *)*data, *len,
2530 (uint8_t *)decrypted_data, original_alloc_size, &decrypted_len);
2531
2532 if (decrypt_result != 0) {
2533 SET_ERRNO(ERROR_CRYPTO, "Failed to process encrypted packet from client %u (result=%d)", client->client_id,
2534 decrypt_result);
2535 buffer_pool_free(NULL, *data, original_alloc_size);
2536 buffer_pool_free(NULL, decrypted_data, original_alloc_size);
2537 *data = NULL;
2538 return -1;
2539 }
2540
2541 // Replace encrypted data with decrypted data
2542 // Use original allocation size for freeing the encrypted buffer
2543 buffer_pool_free(NULL, *data, original_alloc_size);
2544
2545 *data = decrypted_data;
2546 *len = decrypted_len;
2547
2548 // Now process the decrypted packet by parsing its header
2549 if (*len < sizeof(packet_header_t)) {
2550 SET_ERRNO(ERROR_CRYPTO, "Decrypted packet too small for header from client %u", client->client_id);
2551 buffer_pool_free(NULL, *data, *len);
2552 *data = NULL;
2553 return -1;
2554 }
2555
2556 packet_header_t *header = (packet_header_t *)*data;
2557 *type = (packet_type_t)NET_TO_HOST_U16(header->type);
2558 *sender_id = NET_TO_HOST_U32(header->client_id);
2559
2560 // Adjust data pointer to skip header
2561 *data = (uint8_t *)*data + sizeof(packet_header_t);
2562 *len -= sizeof(packet_header_t);
2563
2564 return 0;
2565}
void * buffer_pool_alloc(buffer_pool_t *pool, size_t size)
Definition buffer_pool.c:99
int crypto_server_decrypt_packet(uint32_t client_id, const uint8_t *ciphertext, size_t ciphertext_len, uint8_t *plaintext, size_t plaintext_size, size_t *plaintext_len)
bool crypto_server_is_ready(uint32_t client_id)

References buffer_pool_alloc(), buffer_pool_free(), crypto_server_decrypt_packet(), and crypto_server_is_ready().

◆ remove_client()

int remove_client ( server_context_t server_ctx,
uint32_t  client_id 
)

Definition at line 1152 of file src/server/client.c.

1152 {
1153 if (!server_ctx) {
1154 SET_ERRNO(ERROR_INVALID_PARAM, "Cannot remove client %u: NULL server_ctx", client_id);
1155 return -1;
1156 }
1157
1158 // Phase 1: Mark client inactive and prepare for cleanup while holding write lock
1159 client_info_t *target_client = NULL;
1160 char display_name_copy[MAX_DISPLAY_NAME_LEN];
1161 socket_t client_socket = INVALID_SOCKET_VALUE; // Save socket for thread cleanup
1162
1163 log_debug("SOCKET_DEBUG: Attempting to remove client %d", client_id);
1164 rwlock_wrlock(&g_client_manager_rwlock);
1165
1166 for (int i = 0; i < MAX_CLIENTS; i++) {
1167 client_info_t *client = &g_client_manager.clients[i];
1168 uint32_t cid = atomic_load(&client->client_id);
1169 if (cid == client_id && cid != 0) {
1170 // Check if already being removed by another thread
1171 // This prevents double-free and use-after-free crashes during concurrent cleanup
1172 if (atomic_load(&client->shutting_down)) {
1173 rwlock_wrunlock(&g_client_manager_rwlock);
1174 log_debug("Client %u already being removed by another thread, skipping", client_id);
1175 return 0; // Return success - removal is in progress
1176 }
1177 // Mark as shutting down and inactive immediately to stop new operations
1178 log_debug("Setting active=false in remove_client (client_id=%d, socket=%d)", client_id, client->socket);
1179 log_info("Removing client %d (socket=%d) - marking inactive and clearing video flags", client_id, client->socket);
1180 atomic_store(&client->shutting_down, true);
1181 atomic_store(&client->active, false);
1182 atomic_store(&client->is_sending_video, false);
1183 atomic_store(&client->is_sending_audio, false);
1184 target_client = client;
1185
1186 // Store display name before clearing
1187 SAFE_STRNCPY(display_name_copy, client->display_name, MAX_DISPLAY_NAME_LEN - 1);
1188
1189 // Save socket for tcp_server_stop_client_threads() before closing
1190 mutex_lock(&client->client_state_mutex);
1191 client_socket = client->socket; // Save socket for thread cleanup
1192 if (client->socket != INVALID_SOCKET_VALUE) {
1193 log_debug("SOCKET_DEBUG: Client %d shutting down socket %d", client->client_id, client->socket);
1194 // Shutdown both send and receive operations to unblock any pending I/O
1195 socket_shutdown(client->socket, 2); // 2 = SHUT_RDWR on POSIX, SD_BOTH on Windows
1196 // Don't close yet - tcp_server needs socket as lookup key
1197 }
1198 mutex_unlock(&client->client_state_mutex);
1199
1200 // Shutdown packet queues to unblock send thread
1201 if (client->audio_queue) {
1202 packet_queue_stop(client->audio_queue);
1203 }
1204 // Video now uses double buffer, no queue to shutdown
1205
1206 break;
1207 }
1208 }
1209
1210 // If client not found, unlock and return
1211 if (!target_client) {
1212 rwlock_wrunlock(&g_client_manager_rwlock);
1213 log_warn("Cannot remove client %u: not found", client_id);
1214 return -1;
1215 }
1216
1217 // Unregister client from session_host (for discovery mode support)
1218 // NOTE: Client may not be registered if crypto handshake failed before session_host registration
1219 if (server_ctx->session_host) {
1220 asciichat_error_t session_result = session_host_remove_client(server_ctx->session_host, client_id);
1221 if (session_result != ASCIICHAT_OK) {
1222 // ERROR_NOT_FOUND (91) is expected if client failed crypto before being registered with session_host
1223 if (session_result == ERROR_NOT_FOUND) {
1224 log_debug("Client %u not found in session_host (likely failed crypto before registration)", client_id);
1225 } else {
1226 log_warn("Failed to unregister client %u from session_host: %s", client_id,
1227 asciichat_error_string(session_result));
1228 }
1229 } else {
1230 log_debug("Client %u unregistered from session_host", client_id);
1231 }
1232 }
1233
1234 // Release write lock before joining threads.
1235 // This prevents deadlock with render threads that need read locks.
1236 rwlock_wrunlock(&g_client_manager_rwlock);
1237
1238 // Phase 2: Stop all client threads
1239 // For TCP clients: use tcp_server thread pool management
1240 // For WebRTC clients: manually join threads (no socket-based thread pool)
1241 // Use is_tcp_client flag, not socket value - socket may already be INVALID_SOCKET_VALUE
1242 // even for TCP clients if it was closed earlier during cleanup.
1243 log_debug("Stopping all threads for client %u (socket %d, is_tcp=%d)", client_id, client_socket,
1244 target_client ? target_client->is_tcp_client : -1);
1245
1246 if (target_client && target_client->is_tcp_client) {
1247 // TCP client: use tcp_server thread pool
1248 // This joins threads in stop_id order: receive(1), render(2), send(3)
1249 // Use saved client_socket for lookup (tcp_server needs original socket as key)
1250 if (client_socket != INVALID_SOCKET_VALUE) {
1251 asciichat_error_t stop_result = tcp_server_stop_client_threads(server_ctx->tcp_server, client_socket);
1252 if (stop_result != ASCIICHAT_OK) {
1253 log_warn("Failed to stop threads for TCP client %u: error %d", client_id, stop_result);
1254 // Continue with cleanup even if thread stopping failed
1255 }
1256 } else {
1257 log_debug("TCP client %u socket already closed, threads should have already exited", client_id);
1258 }
1259 } else if (target_client) {
1260 // WebRTC client: manually join threads
1261 log_debug("Stopping WebRTC client %u threads (receive and send)", client_id);
1262
1263 // Join receive thread (but skip if called from the receive thread itself to avoid deadlock)
1264 thread_id_t current_thread_id = asciichat_thread_self();
1265 if (asciichat_thread_equal(current_thread_id, target_client->receive_thread_id)) {
1266 log_debug("remove_client() called from receive thread for client %u, skipping self-join", client_id);
1267 } else {
1268 void *recv_result = NULL;
1269 asciichat_error_t recv_join_result = asciichat_thread_join(&target_client->receive_thread, &recv_result);
1270 if (recv_join_result != ASCIICHAT_OK) {
1271 log_warn("Failed to join receive thread for WebRTC client %u: error %d", client_id, recv_join_result);
1272 } else {
1273 log_debug("Joined receive thread for WebRTC client %u", client_id);
1274 }
1275 }
1276
1277 // Join send thread
1278 void *send_result = NULL;
1279 asciichat_error_t send_join_result = asciichat_thread_join(&target_client->send_thread, &send_result);
1280 if (send_join_result != ASCIICHAT_OK) {
1281 log_warn("Failed to join send thread for WebRTC client %u: error %d", client_id, send_join_result);
1282 } else {
1283 log_debug("Joined send thread for WebRTC client %u", client_id);
1284 }
1285 // Note: Render threads still need to be stopped - they're created the same way for both TCP and WebRTC
1286 // For now, render threads are expected to exit when they check g_server_should_exit and client->active
1287 }
1288
1289 // Destroy ACIP transport before closing socket
1290 // For WebSocket clients: LWS_CALLBACK_CLOSED already closed and destroyed the transport
1291 // Trying to destroy it again causes heap-use-after-free since LWS callbacks might still fire
1292 // For TCP clients: transport is ours to clean up
1293 if (target_client && target_client->transport && target_client->is_tcp_client) {
1294 acip_transport_destroy(target_client->transport);
1295 target_client->transport = NULL;
1296 log_debug("Destroyed ACIP transport for TCP client %u", client_id);
1297 } else if (target_client && target_client->transport && !target_client->is_tcp_client) {
1298 // WebSocket client - just NULL it out, LWS_CALLBACK_CLOSED already destroyed it
1299 target_client->transport = NULL;
1300 log_debug("Skipped transport destruction for WebSocket client %u (LWS already destroyed)", client_id);
1301 }
1302
1303 // Now safe to close the socket (threads are stopped)
1304 if (client_socket != INVALID_SOCKET_VALUE) {
1305 log_debug("SOCKET_DEBUG: Closing socket %d for client %u after thread cleanup", client_socket, client_id);
1306 socket_close(client_socket);
1307 }
1308
1309 // Phase 3: Clean up resources with write lock
1310 rwlock_wrlock(&g_client_manager_rwlock);
1311
1312 // Re-validate target_client pointer after reacquiring lock.
1313 // Another thread might have invalidated the pointer while we had the lock released.
1314 if (target_client) {
1315 // Verify client_id still matches and client is still in shutting_down state
1316 uint32_t current_id = atomic_load(&target_client->client_id);
1317 bool still_shutting_down = atomic_load(&target_client->shutting_down);
1318 if (current_id != client_id || !still_shutting_down) {
1319 log_warn("Client %u pointer invalidated during thread cleanup (id=%u, shutting_down=%d)", client_id, current_id,
1320 still_shutting_down);
1321 rwlock_wrunlock(&g_client_manager_rwlock);
1322 return 0; // Another thread completed the cleanup
1323 }
1324 }
1325
1326 // Mark socket as closed in client structure
1327 if (target_client && target_client->socket != INVALID_SOCKET_VALUE) {
1328 mutex_lock(&target_client->client_state_mutex);
1329 target_client->socket = INVALID_SOCKET_VALUE;
1330 mutex_unlock(&target_client->client_state_mutex);
1331 log_debug("SOCKET_DEBUG: Client %u socket set to INVALID", client_id);
1332 }
1333
1334 // Use the dedicated cleanup function to ensure all resources are freed
1335 cleanup_client_all_buffers(target_client);
1336
1337 // Remove from audio mixer
1338 if (g_audio_mixer) {
1340#ifdef DEBUG_AUDIO
1341 log_debug("Removed client %u from audio mixer", client_id);
1342#endif
1343 }
1344
1345 // Remove from uthash table
1346 // Verify client is actually in the hash table before deleting.
1347 // Another thread might have already removed it.
1348 if (target_client) {
1349 client_info_t *hash_entry = NULL;
1350 HASH_FIND(hh, g_client_manager.clients_by_id, &client_id, sizeof(client_id), hash_entry);
1351 if (hash_entry == target_client) {
1352 HASH_DELETE(hh, g_client_manager.clients_by_id, target_client);
1353 log_debug("Removed client %u from uthash table", client_id);
1354 } else {
1355 log_warn("Client %u already removed from hash table by another thread (found=%p, expected=%p)", client_id,
1356 (void *)hash_entry, (void *)target_client);
1357 }
1358 } else {
1359 log_warn("Failed to remove client %u from hash table (client not found)", client_id);
1360 }
1361
1362 // Cleanup crypto context for this client
1363 if (target_client->crypto_initialized) {
1364 crypto_handshake_destroy(&target_client->crypto_handshake_ctx);
1365 target_client->crypto_initialized = false;
1366 log_debug("Crypto context cleaned up for client %u", client_id);
1367 }
1368
1369 // Verify all threads have actually exited before resetting client_id.
1370 // Threads that are still starting (at RtlUserThreadStart) haven't checked client_id yet.
1371 // We must ensure threads are fully joined before zeroing the client struct.
1372 // Use exponential backoff for thread termination verification
1373 int retry_count = 0;
1374 const int max_retries = 5;
1375 while (retry_count < max_retries && (asciichat_thread_is_initialized(&target_client->send_thread) ||
1376 asciichat_thread_is_initialized(&target_client->receive_thread) ||
1377 asciichat_thread_is_initialized(&target_client->video_render_thread) ||
1378 asciichat_thread_is_initialized(&target_client->audio_render_thread))) {
1379 // Exponential backoff: 10ms, 20ms, 40ms, 80ms, 160ms
1380 uint32_t delay_ms = 10 * (1 << retry_count);
1381 log_warn("Client %u: Some threads still appear initialized (attempt %d/%d), waiting %ums", client_id,
1382 retry_count + 1, max_retries, delay_ms);
1383 platform_sleep_us(delay_ms * 1000);
1384 retry_count++;
1385 }
1386
1387 if (retry_count == max_retries) {
1388 log_error("Client %u: Threads did not terminate after %d retries, proceeding with cleanup anyway", client_id,
1389 max_retries);
1390 }
1391
1392 // Only reset client_id to 0 AFTER confirming threads are joined
1393 // This prevents threads that are starting from accessing a zeroed client struct
1394 // Reset client_id to 0 before destroying mutexes to prevent race conditions.
1395 // This ensures worker threads can detect shutdown and exit before the mutex is destroyed.
1396 // If we destroy the mutex first, threads might try to access a destroyed mutex.
1397 atomic_store(&target_client->client_id, 0);
1398
1399 // Wait for threads to observe the client_id reset
1400 // Use sufficient delay for memory visibility across all CPU cores
1401 platform_sleep_us(5 * US_PER_MS_INT); // 5ms delay for memory barrier propagation
1402
1403 // Destroy mutexes
1404 // IMPORTANT: Always destroy these even if threads didn't join properly
1405 // to prevent issues when the slot is reused
1406 mutex_destroy(&target_client->client_state_mutex);
1407 mutex_destroy(&target_client->send_mutex);
1408
1409 // Clear client structure
1410 // NOTE: After memset, the mutex handles are zeroed but the OS resources
1411 // have been released by the destroy calls above
1412 memset(target_client, 0, sizeof(client_info_t));
1413
1414 // Recalculate client count using atomic reads
1415 int remaining_count = 0;
1416 for (int j = 0; j < MAX_CLIENTS; j++) {
1417 if (atomic_load(&g_client_manager.clients[j].client_id) != 0) {
1418 remaining_count++;
1419 }
1420 }
1421 g_client_manager.client_count = remaining_count;
1422
1423 log_debug("Client removed: client_id=%u (%s) removed, remaining clients: %d", client_id, display_name_copy,
1424 remaining_count);
1425
1426 rwlock_wrunlock(&g_client_manager_rwlock);
1427
1428 // Broadcast updated state
1430
1431 return 0;
1432}
void crypto_handshake_destroy(crypto_handshake_context_t *ctx)
asciichat_error_t session_host_remove_client(session_host_t *host, uint32_t client_id)
Definition host.c:1160
int socket_t
asciichat_error_t tcp_server_stop_client_threads(tcp_server_t *server, socket_t client_socket)
void mixer_remove_source(mixer_t *mixer, uint32_t client_id)
Definition mixer.c:401
void packet_queue_stop(packet_queue_t *queue)
void platform_sleep_us(unsigned int us)
void acip_transport_destroy(acip_transport_t *transport)
int asciichat_thread_join(asciichat_thread_t *thread, void **retval)
Definition threading.c:46
int asciichat_thread_equal(asciichat_thread_t t1, asciichat_thread_t t2)
Definition threading.c:58
int mutex_destroy(mutex_t *mutex)
Definition threading.c:21

References acip_transport_destroy(), asciichat_thread_equal(), asciichat_thread_join(), asciichat_thread_self(), broadcast_server_state_to_all_clients(), client_manager_t::client_count, client_manager_t::clients, client_manager_t::clients_by_id, crypto_handshake_destroy(), g_audio_mixer, g_client_manager, g_client_manager_rwlock, mixer_remove_source(), mutex_destroy(), packet_queue_stop(), platform_sleep_us(), server_context_t::session_host, session_host_remove_client(), server_context_t::tcp_server, and tcp_server_stop_client_threads().

Referenced by add_client(), add_webrtc_client(), client_receive_thread(), and server_main().

◆ start_webrtc_client_threads()

int start_webrtc_client_threads ( server_context_t server_ctx,
uint32_t  client_id 
)

Start threads for a WebRTC client after crypto initialization.

This is called by WebSocket handler after crypto handshake is initialized. It ensures receive thread doesn't try to process packets before crypto context exists.

Parameters
server_ctxServer context
client_idClient ID to start threads for
Returns
0 on success, -1 on failure

Definition at line 2397 of file src/server/client.c.

2397 {
2398 if (!server_ctx) {
2399 SET_ERRNO(ERROR_INVALID_PARAM, "Server context is NULL");
2400 return -1;
2401 }
2402
2403 client_info_t *client = find_client_by_id(client_id);
2404 if (!client) {
2405 SET_ERRNO(ERROR_NOT_FOUND, "Client %u not found", client_id);
2406 return -1;
2407 }
2408
2409 log_debug("Starting threads for WebRTC client %u...", client_id);
2410 return start_client_threads(server_ctx, client, false);
2411}
client_info_t * find_client_by_id(uint32_t client_id)
Fast O(1) client lookup by ID using hash table.

References find_client_by_id().

◆ stop_client_threads()

void stop_client_threads ( client_info_t *  client)

Definition at line 2413 of file src/server/client.c.

2413 {
2414 if (!client) {
2415 SET_ERRNO(ERROR_INVALID_PARAM, "Client is NULL");
2416 return;
2417 }
2418
2419 // Signal threads to stop
2420 log_debug("Setting active=false in stop_client_threads (client_id=%u)", atomic_load(&client->client_id));
2421 atomic_store(&client->active, false);
2422 atomic_store(&client->send_thread_running, false);
2423
2424 // Wait for threads to finish
2425 if (asciichat_thread_is_initialized(&client->send_thread)) {
2426 asciichat_thread_join(&client->send_thread, NULL);
2427 }
2428 if (asciichat_thread_is_initialized(&client->receive_thread)) {
2429 asciichat_thread_join(&client->receive_thread, NULL);
2430 }
2431 // For async dispatch: stop dispatch thread if running
2432 if (asciichat_thread_is_initialized(&client->dispatch_thread)) {
2433 atomic_store(&client->dispatch_thread_running, false);
2434 asciichat_thread_join(&client->dispatch_thread, NULL);
2435 }
2436}

References asciichat_thread_join().

Variable Documentation

◆ g_client_manager

client_manager_t g_client_manager
extern

Global client manager singleton - central coordination point.

This is the primary data structure for managing all connected clients. It serves as the bridge between main.c's connection accept loop and the per-client threading architecture.

STRUCTURE COMPONENTS:

  • clients[]: Array backing storage for client_info_t structs
  • client_hashtable: O(1) lookup table for client_id -> client_info_t*
  • client_count: Current number of active clients
  • mutex: Legacy mutex (mostly replaced by rwlock)
  • next_client_id: Monotonic counter for unique client identification

THREAD SAFETY: Protected by g_client_manager_rwlock for concurrent access

Definition at line 255 of file src/server/client.c.

◆ g_client_manager_rwlock

rwlock_t g_client_manager_rwlock
extern

Reader-writer lock protecting the global client manager.

This lock enables high-performance concurrent access patterns:

  • Multiple threads can read client data simultaneously (stats, rendering)
  • Only one thread can modify client data at a time (add/remove operations)
  • Eliminates contention between read-heavy operations

USAGE PATTERN:

  • Read operations: rwlock_rdlock() for client lookups, stats gathering
  • Write operations: rwlock_wrlock() for add_client(), remove_client()
  • Always acquire THIS lock before per-client mutexes (lock ordering)

Definition at line 270 of file src/server/client.c.

270{0};