103#include <stdatomic.h>
109#include <netinet/tcp.h>
118#include <ascii-chat/crypto/handshake/common.h>
119#include <ascii-chat/crypto/handshake/server.h>
120#include <ascii-chat/crypto/crypto.h>
121#include <ascii-chat/common.h>
122#include <ascii-chat/util/endian.h>
123#include <ascii-chat/asciichat_errno.h>
124#include <ascii-chat/options/options.h>
125#include <ascii-chat/options/rcu.h>
126#include <ascii-chat/buffer_pool.h>
127#include <ascii-chat/network/network.h>
128#include <ascii-chat/network/packet.h>
129#include <ascii-chat/network/packet_queue.h>
130#include <ascii-chat/network/errors.h>
131#include <ascii-chat/network/acip/handlers.h>
132#include <ascii-chat/network/acip/transport.h>
133#include <ascii-chat/network/acip/send.h>
134#include <ascii-chat/network/acip/server.h>
135#include <ascii-chat/audio/audio.h>
136#include <ascii-chat/audio/mixer.h>
137#include <ascii-chat/audio/opus_codec.h>
138#include <ascii-chat/video/video_frame.h>
139#include <ascii-chat/uthash/uthash.h>
140#include <ascii-chat/util/endian.h>
141#include <ascii-chat/util/format.h>
142#include <ascii-chat/util/time.h>
143#include <ascii-chat/platform/abstraction.h>
144#include <ascii-chat/platform/string.h>
145#include <ascii-chat/platform/socket.h>
146#include <ascii-chat/network/crc32.h>
147#include <ascii-chat/network/logging.h>
150#define DEBUG_NETWORK 1
151#define DEBUG_THREADS 1
152#define DEBUG_MEMORY 1
160#define CLIENT_DISPATCH_HASH_SIZE 32
161#define CLIENT_DISPATCH_HANDLER_COUNT 12
171#define CLIENT_DISPATCH_HASH(type) ((type) % CLIENT_DISPATCH_HASH_SIZE)
177 if (table[slot].key == 0)
179 if (table[slot].key == type)
204 [0] = {PACKET_TYPE_AUDIO_BATCH, 2},
205 [1] = {PACKET_TYPE_PROTOCOL_VERSION, 0},
206 [2] = {PACKET_TYPE_AUDIO_OPUS_BATCH, 3},
207 [8] = {PACKET_TYPE_CLIENT_CAPABILITIES, 8},
208 [9] = {PACKET_TYPE_PING, 9},
209 [10] = {PACKET_TYPE_PONG, 10},
210 [11] = {PACKET_TYPE_CLIENT_JOIN, 4},
211 [12] = {PACKET_TYPE_CLIENT_LEAVE, 5},
212 [13] = {PACKET_TYPE_STREAM_START, 6},
213 [14] = {PACKET_TYPE_STREAM_STOP, 7},
214 [20] = {PACKET_TYPE_REMOTE_LOG, 11},
215 [25] = {PACKET_TYPE_IMAGE_FRAME, 1},
220static inline void cleanup_client_all_buffers(client_info_t *client);
222static void handle_client_error_packet(client_info_t *client,
const void *data,
size_t len) {
223 asciichat_error_t reported_error = ASCIICHAT_OK;
224 char message[MAX_ERROR_MESSAGE_LENGTH + 1] = {0};
226 asciichat_error_t parse_result =
228 uint32_t client_id = client ? atomic_load(&client->client_id) : 0;
230 if (parse_result != ASCIICHAT_OK) {
231 log_warn(
"Failed to parse error packet from client %u: %s", client_id, asciichat_error_string(parse_result));
235 log_error(
"Client %u reported error %d (%s): %s", client_id, reported_error, asciichat_error_string(reported_error),
277static int start_client_threads(
server_context_t *server_ctx, client_info_t *client,
309 if (client_id == 0) {
310 SET_ERRNO(ERROR_INVALID_PARAM,
"Invalid client ID");
317 client_info_t *result = NULL;
318 uint32_t search_id = client_id;
324 log_warn(
"Client not found for ID %u", client_id);
356 for (
int i = 0; i < MAX_CLIENTS; i++) {
379static void configure_client_socket(
socket_t socket, uint32_t client_id) {
382 if (keepalive_result != ASCIICHAT_OK) {
383 log_warn(
"Failed to set socket keepalive for client %u: %s", client_id, asciichat_error_string(keepalive_result));
387 const int SOCKET_SEND_BUFFER_SIZE = 1024 * 1024;
388 const int SOCKET_RECV_BUFFER_SIZE = 1024 * 1024;
390 if (socket_setsockopt(socket, SOL_SOCKET, SO_SNDBUF, &SOCKET_SEND_BUFFER_SIZE,
sizeof(SOCKET_SEND_BUFFER_SIZE)) < 0) {
391 log_warn(
"Failed to set send buffer size for client %u: %s", client_id,
network_error_string());
394 if (socket_setsockopt(socket, SOL_SOCKET, SO_RCVBUF, &SOCKET_RECV_BUFFER_SIZE,
sizeof(SOCKET_RECV_BUFFER_SIZE)) < 0) {
395 log_warn(
"Failed to set receive buffer size for client %u: %s", client_id,
network_error_string());
399 const int TCP_NODELAY_VALUE = 1;
400 if (socket_setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, &TCP_NODELAY_VALUE,
sizeof(TCP_NODELAY_VALUE)) < 0) {
422static int start_client_threads(
server_context_t *server_ctx, client_info_t *client,
bool is_tcp) {
423 if (!server_ctx || !client) {
424 SET_ERRNO(ERROR_INVALID_PARAM,
"server_ctx or client is NULL");
428 uint32_t client_id = atomic_load(&client->client_id);
429 log_info(
"★ START_CLIENT_THREADS: client_id=%u is_tcp=%d (about to create %s threads)", client_id, is_tcp,
430 is_tcp ?
"TCP" :
"WebRTC/WebSocket");
431 char thread_name[64];
432 asciichat_error_t result;
436 safe_snprintf(thread_name,
sizeof(thread_name),
"receive_%u", client_id);
440 safe_snprintf(thread_name,
sizeof(thread_name),
"webrtc_recv_%u", client_id);
441 log_debug(
"THREAD_CREATE: WebRTC client %u", client_id);
443 (
void *)&client->receive_thread);
444 log_debug(
" Pre-create: receive_thread value=%p", (
void *)(uintptr_t)client->receive_thread);
446 log_debug(
" Post-create: result=%d, receive_thread value=%p", result, (
void *)(uintptr_t)client->receive_thread);
449 if (result != ASCIICHAT_OK) {
450 log_error(
"Failed to create receive thread for %s client %u: %s", is_tcp ?
"TCP" :
"WebRTC", client_id,
451 asciichat_error_string(result));
455 log_debug(
"Created receive thread for %s client %u", is_tcp ?
"TCP" :
"WebRTC", client_id);
460 safe_snprintf(thread_name,
sizeof(thread_name),
"dispatch_%u", client_id);
461 atomic_store(&client->dispatch_thread_running,
true);
463 if (result != ASCIICHAT_OK) {
464 log_error(
"Failed to create dispatch thread for client %u: %s", client_id, asciichat_error_string(result));
468 log_debug(
"Created async dispatch thread for client %u", client_id);
473 log_debug(
"Creating render threads for client %u", client_id);
475 log_error(
"Failed to create render threads for client %u", client_id);
479 log_debug(
"Successfully created render threads for client %u", client_id);
483 safe_snprintf(thread_name,
sizeof(thread_name),
"send_%u", client_id);
487 safe_snprintf(thread_name,
sizeof(thread_name),
"webrtc_send_%u", client_id);
491 if (result != ASCIICHAT_OK) {
492 log_error(
"Failed to create send thread for %s client %u: %s", is_tcp ?
"TCP" :
"WebRTC", client_id,
493 asciichat_error_string(result));
497 log_debug(
"Created send thread for %s client %u", is_tcp ?
"TCP" :
"WebRTC", client_id);
501 log_warn(
"Failed to send initial server state to client %u", client_id);
509 server_state_packet_t state;
510 state.connected_client_count = connected_count;
511 state.active_client_count = 0;
512 memset(state.reserved, 0,
sizeof(state.reserved));
515 server_state_packet_t net_state;
516 net_state.connected_client_count = HOST_TO_NET_U32(state.connected_client_count);
517 net_state.active_client_count = HOST_TO_NET_U32(state.active_client_count);
518 memset(net_state.reserved, 0,
sizeof(net_state.reserved));
522 if (packet_result != ASCIICHAT_OK) {
523 log_warn(
"Failed to send initial server state to client %u: %s", client_id, asciichat_error_string(packet_result));
525 log_debug(
"Sent initial server state to client %u: %u connected clients", client_id, state.connected_client_count);
538 int existing_count = 0;
539 for (
int i = 0; i < MAX_CLIENTS; i++) {
549 if (existing_count >= GET_OPTION(max_clients) || slot == -1) {
550 const char *reject_msg =
"SERVER_FULL: Maximum client limit reached\n";
551 ssize_t send_result = socket_send(socket, reject_msg, strlen(reject_msg), 0);
552 if (send_result < 0) {
553 log_warn(
"Failed to send rejection message to client: %s", SAFE_STRERROR(errno));
563 if (!incoming_video_buffer) {
564 SET_ERRNO(ERROR_MEMORY,
"Failed to create video buffer for client %u", new_client_id);
565 log_error(
"Failed to create video buffer for client %u", new_client_id);
570 if (!incoming_audio_buffer) {
571 SET_ERRNO(ERROR_MEMORY,
"Failed to create audio buffer for client %u", new_client_id);
572 log_error(
"Failed to create audio buffer for client %u", new_client_id);
579 LOG_ERRNO_IF_SET(
"Failed to create audio queue for client");
586 if (!outgoing_video_buffer) {
587 LOG_ERRNO_IF_SET(
"Failed to create outgoing video buffer for client");
594 void *send_buffer = SAFE_MALLOC_ALIGNED(MAX_FRAME_BUFFER_SIZE, 64,
void *);
596 log_error(
"Failed to allocate send buffer for client %u", new_client_id);
610 SAFE_FREE(send_buffer);
616 const char *reject_msg =
"SERVER_FULL: Slot reassigned, try again\n";
617 socket_send(socket, reject_msg, strlen(reject_msg), 0);
623 memset(client, 0,
sizeof(client_info_t));
625 client->socket = socket;
626 client->is_tcp_client =
true;
627 atomic_store(&client->client_id, new_client_id);
628 SAFE_STRNCPY(client->client_ip, client_ip,
sizeof(client->client_ip) - 1);
630 atomic_store(&client->active,
true);
631 client->server_ctx = server_ctx;
632 atomic_store(&client->shutting_down,
false);
633 atomic_store(&client->last_rendered_grid_sources, 0);
634 atomic_store(&client->last_sent_grid_sources, 0);
635 client->connected_at = time(NULL);
637 memset(&client->crypto_handshake_ctx, 0,
sizeof(client->crypto_handshake_ctx));
638 client->crypto_initialized =
false;
640 client->pending_packet_type = 0;
641 client->pending_packet_payload = NULL;
642 client->pending_packet_length = 0;
645 client->incoming_video_buffer = incoming_video_buffer;
646 client->incoming_audio_buffer = incoming_audio_buffer;
647 client->audio_queue = audio_queue;
648 client->outgoing_video_buffer = outgoing_video_buffer;
649 client->send_buffer = send_buffer;
650 client->send_buffer_size = MAX_FRAME_BUFFER_SIZE;
652 safe_snprintf(client->display_name,
sizeof(client->display_name),
"Client%u", new_client_id);
653 log_info(
"Added new client ID=%u from %s:%d (socket=%d, slot=%d)", new_client_id, client_ip, port, socket, slot);
654 log_debug(
"Client slot assigned: client_id=%u assigned to slot %d, socket=%d", new_client_id, slot, socket);
658 if (reg_result != ASCIICHAT_OK) {
659 SET_ERRNO(ERROR_INTERNAL,
"Failed to register client socket with tcp_server");
660 log_error(
"Failed to register client %u socket with tcp_server", new_client_id);
667 new_client_id, slot);
669 uint32_t cid = atomic_load(&client->client_id);
671 log_debug(
"Added client %u to uthash table", cid);
676 configure_client_socket(socket, new_client_id);
679 if (
mutex_init(&client->client_state_mutex) != 0) {
680 log_error(
"Failed to initialize client state mutex for client %u", new_client_id);
686 log_error(
"Failed to initialize send mutex for client %u", new_client_id);
694 log_warn(
"Failed to add client %u to audio mixer", new_client_id);
697 log_debug(
"Added client %u to audio mixer", new_client_id);
707 const uint64_t HANDSHAKE_TIMEOUT_NS = 30ULL * NS_PER_SEC_INT;
709 if (timeout_result != ASCIICHAT_OK) {
710 log_warn(
"Failed to set handshake timeout for client %u: %s", atomic_load(&client->client_id),
711 asciichat_error_string(timeout_result));
716 if (crypto_result != 0) {
717 log_error(
"Crypto handshake failed for client %u: %s", atomic_load(&client->client_id),
network_error_string());
718 if (
remove_client(server_ctx, atomic_load(&client->client_id)) != 0) {
719 log_error(
"Failed to remove client after crypto handshake failure");
727 if (clear_timeout_result != ASCIICHAT_OK) {
728 log_warn(
"Failed to clear handshake timeout for client %u: %s", atomic_load(&client->client_id),
729 asciichat_error_string(clear_timeout_result));
733 log_debug(
"Crypto handshake completed successfully for client %u", atomic_load(&client->client_id));
739 if (!client->transport) {
740 log_error(
"Failed to create ACIP transport for client %u", atomic_load(&client->client_id));
741 if (
remove_client(server_ctx, atomic_load(&client->client_id)) != 0) {
742 log_error(
"Failed to remove client after transport creation failure");
746 log_debug(
"Created ACIP transport for client %u with crypto context", atomic_load(&client->client_id));
754 packet_envelope_t envelope;
755 bool used_pending_packet =
false;
757 if (client->pending_packet_payload) {
759 log_info(
"Client %u using --no-encrypt mode - processing pending packet type %u", atomic_load(&client->client_id),
760 client->pending_packet_type);
761 envelope.type = client->pending_packet_type;
762 envelope.data = client->pending_packet_payload;
763 envelope.len = client->pending_packet_length;
764 envelope.allocated_buffer = client->pending_packet_payload;
765 envelope.allocated_size = client->pending_packet_length;
766 used_pending_packet =
true;
769 client->pending_packet_type = 0;
770 client->pending_packet_payload = NULL;
771 client->pending_packet_length = 0;
774 log_debug(
"Waiting for initial capabilities packet from client %u", atomic_load(&client->client_id));
777 mutex_lock(&client->client_state_mutex);
782 bool enforce_encryption = !GET_OPTION(no_encrypt) && client->crypto_initialized &&
785 packet_recv_result_t result =
receive_packet_secure(socket, (
void *)crypto_ctx, enforce_encryption, &envelope);
786 mutex_unlock(&client->client_state_mutex);
788 if (result != PACKET_RECV_SUCCESS) {
789 log_error(
"Failed to receive initial capabilities packet from client %u: result=%d",
790 atomic_load(&client->client_id), result);
791 if (envelope.allocated_buffer) {
794 if (
remove_client(server_ctx, atomic_load(&client->client_id)) != 0) {
795 log_error(
"Failed to remove client after crypto handshake failure");
801 if (envelope.type != PACKET_TYPE_CLIENT_CAPABILITIES) {
802 log_error(
"Expected PACKET_TYPE_CLIENT_CAPABILITIES but got packet type %d from client %u", envelope.type,
803 atomic_load(&client->client_id));
804 if (envelope.allocated_buffer) {
807 if (
remove_client(server_ctx, atomic_load(&client->client_id)) != 0) {
808 log_error(
"Failed to remove client after crypto handshake failure");
814 log_debug(
"Processing initial capabilities packet from client %u (from %s)", atomic_load(&client->client_id),
815 used_pending_packet ?
"pending packet" :
"network");
819 if (envelope.allocated_buffer) {
822 log_debug(
"Successfully received and processed initial capabilities for client %u",
823 atomic_load(&client->client_id));
830 uint32_t client_id_snapshot = atomic_load(&client->client_id);
831 if (start_client_threads(server_ctx, client,
true) != 0) {
832 log_error(
"Failed to start threads for TCP client %u", client_id_snapshot);
837 log_debug(
"Successfully created render threads for client %u", client_id_snapshot);
842 if (session_client_id == 0) {
843 log_warn(
"Failed to register client %u with session_host", client_id_snapshot);
845 log_debug(
"Client %u registered with session_host as %u", client_id_snapshot, session_client_id);
853 return (
int)client_id_snapshot;
859 cleanup_client_all_buffers(client);
888 bool start_threads) {
889 if (!server_ctx || !transport || !client_ip) {
890 SET_ERRNO(ERROR_INVALID_PARAM,
"Invalid parameters to add_webrtc_client");
898 int existing_count = 0;
899 for (
int i = 0; i < MAX_CLIENTS; i++) {
910 if (existing_count >= GET_OPTION(max_clients)) {
912 SET_ERRNO(ERROR_RESOURCE_EXHAUSTED,
"Maximum client limit reached (%d/%d active clients)", existing_count,
913 GET_OPTION(max_clients));
914 log_error(
"Maximum client limit reached (%d/%d active clients)", existing_count, GET_OPTION(max_clients));
920 SET_ERRNO(ERROR_RESOURCE_EXHAUSTED,
"No available client slots (all %d array slots are in use)", MAX_CLIENTS);
921 log_error(
"No available client slots (all %d array slots are in use)", MAX_CLIENTS);
932 if (client->incoming_video_buffer) {
934 client->incoming_video_buffer = NULL;
936 if (client->outgoing_video_buffer) {
938 client->outgoing_video_buffer = NULL;
940 if (client->incoming_audio_buffer) {
942 client->incoming_audio_buffer = NULL;
944 if (client->send_buffer) {
945 SAFE_FREE(client->send_buffer);
946 client->send_buffer = NULL;
949 memset(client, 0,
sizeof(client_info_t));
952 client->socket = INVALID_SOCKET_VALUE;
953 client->is_tcp_client =
false;
954 client->transport = transport;
956 atomic_store(&client->client_id, new_client_id);
957 SAFE_STRNCPY(client->client_ip, client_ip,
sizeof(client->client_ip) - 1);
959 atomic_store(&client->active,
true);
960 client->server_ctx = server_ctx;
961 log_info(
"Added new WebRTC client ID=%u from %s (transport=%p, slot=%d)", new_client_id, client_ip, transport, slot);
962 atomic_store(&client->shutting_down,
false);
963 atomic_store(&client->last_rendered_grid_sources, 0);
964 atomic_store(&client->last_sent_grid_sources, 0);
965 log_debug(
"WebRTC client slot assigned: client_id=%u assigned to slot %d", atomic_load(&client->client_id), slot);
966 client->connected_at = time(NULL);
969 memset(&client->crypto_handshake_ctx, 0,
sizeof(client->crypto_handshake_ctx));
970 log_debug(
"[ADD_WEBRTC_CLIENT] Setting crypto_initialized=false (will be set to true after handshake completes)");
974 client->crypto_initialized =
false;
977 client->pending_packet_type = 0;
978 client->pending_packet_payload = NULL;
979 client->pending_packet_length = 0;
981 safe_snprintf(client->display_name,
sizeof(client->display_name),
"WebRTC%u", atomic_load(&client->client_id));
985 if (!client->incoming_video_buffer) {
986 SET_ERRNO(ERROR_MEMORY,
"Failed to create video buffer for WebRTC client %u", atomic_load(&client->client_id));
987 log_error(
"Failed to create video buffer for WebRTC client %u", atomic_load(&client->client_id));
988 goto error_cleanup_webrtc;
993 if (!client->incoming_audio_buffer) {
994 SET_ERRNO(ERROR_MEMORY,
"Failed to create audio buffer for WebRTC client %u", atomic_load(&client->client_id));
995 log_error(
"Failed to create audio buffer for WebRTC client %u", atomic_load(&client->client_id));
996 goto error_cleanup_webrtc;
1001 if (!client->audio_queue) {
1002 LOG_ERRNO_IF_SET(
"Failed to create audio queue for WebRTC client");
1003 goto error_cleanup_webrtc;
1009 if (!client->received_packet_queue) {
1010 LOG_ERRNO_IF_SET(
"Failed to create received packet queue for client");
1011 goto error_cleanup_webrtc;
1016 if (!client->outgoing_video_buffer) {
1017 LOG_ERRNO_IF_SET(
"Failed to create outgoing video buffer for WebRTC client");
1018 goto error_cleanup_webrtc;
1022 client->send_buffer_size = MAX_FRAME_BUFFER_SIZE;
1023 client->send_buffer = SAFE_MALLOC_ALIGNED(client->send_buffer_size, 64,
void *);
1024 if (!client->send_buffer) {
1025 log_error(
"Failed to allocate send buffer for WebRTC client %u", atomic_load(&client->client_id));
1026 goto error_cleanup_webrtc;
1030 log_debug(
"Client count updated: now %d clients (added WebRTC client_id=%u to slot %d)",
1034 uint32_t cid = atomic_load(&client->client_id);
1036 log_debug(
"Added WebRTC client %u to uthash table", cid);
1041 log_warn(
"Failed to add WebRTC client %u to audio mixer", atomic_load(&client->client_id));
1044 log_debug(
"Added WebRTC client %u to audio mixer", atomic_load(&client->client_id));
1050 if (
mutex_init(&client->client_state_mutex) != 0) {
1051 log_error(
"Failed to initialize client state mutex for WebRTC client %u", atomic_load(&client->client_id));
1060 log_error(
"Failed to initialize send mutex for WebRTC client %u", atomic_load(&client->client_id));
1072 log_debug(
"WebRTC client %u initialized - receive thread will process capabilities", atomic_load(&client->client_id));
1074 uint32_t client_id_snapshot = atomic_load(&client->client_id);
1079 if (start_threads) {
1080 log_debug(
"[ADD_WEBRTC_CLIENT] Starting client threads (receive, render) for client %u...", client_id_snapshot);
1081 if (start_client_threads(server_ctx, client,
false) != 0) {
1082 log_error(
"Failed to start threads for WebRTC client %u", client_id_snapshot);
1085 log_debug(
"Created receive thread for WebRTC client %u", client_id_snapshot);
1086 log_debug(
"[ADD_WEBRTC_CLIENT] Receive thread started - thread will now be processing packets", client_id_snapshot);
1088 log_debug(
"[ADD_WEBRTC_CLIENT] Deferring thread startup for client %u (caller will start after crypto init)",
1089 client_id_snapshot);
1094 log_warn(
"Failed to send initial server state to WebRTC client %u", client_id_snapshot);
1097 log_info(
"Sent initial server state to WebRTC client %u", client_id_snapshot);
1106 server_state_packet_t state;
1107 state.connected_client_count = connected_count;
1108 state.active_client_count = 0;
1109 memset(state.reserved, 0,
sizeof(state.reserved));
1112 server_state_packet_t net_state;
1113 net_state.connected_client_count = HOST_TO_NET_U32(state.connected_client_count);
1114 net_state.active_client_count = HOST_TO_NET_U32(state.active_client_count);
1115 memset(net_state.reserved, 0,
sizeof(net_state.reserved));
1118 if (packet_send_result != ASCIICHAT_OK) {
1119 log_warn(
"Failed to send initial server state to WebRTC client %u: %s", client_id_snapshot,
1120 asciichat_error_string(packet_send_result));
1122 log_debug(
"Sent initial server state to WebRTC client %u: %u connected clients", client_id_snapshot,
1123 state.connected_client_count);
1130 if (session_client_id == 0) {
1131 log_warn(
"Failed to register WebRTC client %u with session_host", client_id_snapshot);
1133 log_debug(
"WebRTC client %u registered with session_host as %u", client_id_snapshot, session_client_id);
1141 return (
int)client_id_snapshot;
1143error_cleanup_webrtc:
1147 cleanup_client_all_buffers(client);
1154 SET_ERRNO(ERROR_INVALID_PARAM,
"Cannot remove client %u: NULL server_ctx", client_id);
1159 client_info_t *target_client = NULL;
1160 char display_name_copy[MAX_DISPLAY_NAME_LEN];
1161 socket_t client_socket = INVALID_SOCKET_VALUE;
1163 log_debug(
"SOCKET_DEBUG: Attempting to remove client %d", client_id);
1166 for (
int i = 0; i < MAX_CLIENTS; i++) {
1168 uint32_t cid = atomic_load(&client->client_id);
1169 if (cid == client_id && cid != 0) {
1172 if (atomic_load(&client->shutting_down)) {
1174 log_debug(
"Client %u already being removed by another thread, skipping", client_id);
1178 log_debug(
"Setting active=false in remove_client (client_id=%d, socket=%d)", client_id, client->socket);
1179 log_info(
"Removing client %d (socket=%d) - marking inactive and clearing video flags", client_id, client->socket);
1180 atomic_store(&client->shutting_down,
true);
1181 atomic_store(&client->active,
false);
1182 atomic_store(&client->is_sending_video,
false);
1183 atomic_store(&client->is_sending_audio,
false);
1184 target_client = client;
1187 SAFE_STRNCPY(display_name_copy, client->display_name, MAX_DISPLAY_NAME_LEN - 1);
1190 mutex_lock(&client->client_state_mutex);
1191 client_socket = client->socket;
1192 if (client->socket != INVALID_SOCKET_VALUE) {
1193 log_debug(
"SOCKET_DEBUG: Client %d shutting down socket %d", client->client_id, client->socket);
1195 socket_shutdown(client->socket, 2);
1198 mutex_unlock(&client->client_state_mutex);
1201 if (client->audio_queue) {
1211 if (!target_client) {
1213 log_warn(
"Cannot remove client %u: not found", client_id);
1221 if (session_result != ASCIICHAT_OK) {
1223 if (session_result == ERROR_NOT_FOUND) {
1224 log_debug(
"Client %u not found in session_host (likely failed crypto before registration)", client_id);
1226 log_warn(
"Failed to unregister client %u from session_host: %s", client_id,
1227 asciichat_error_string(session_result));
1230 log_debug(
"Client %u unregistered from session_host", client_id);
1243 log_debug(
"Stopping all threads for client %u (socket %d, is_tcp=%d)", client_id, client_socket,
1244 target_client ? target_client->is_tcp_client : -1);
1246 if (target_client && target_client->is_tcp_client) {
1250 if (client_socket != INVALID_SOCKET_VALUE) {
1252 if (stop_result != ASCIICHAT_OK) {
1253 log_warn(
"Failed to stop threads for TCP client %u: error %d", client_id, stop_result);
1257 log_debug(
"TCP client %u socket already closed, threads should have already exited", client_id);
1259 }
else if (target_client) {
1261 log_debug(
"Stopping WebRTC client %u threads (receive and send)", client_id);
1266 log_debug(
"remove_client() called from receive thread for client %u, skipping self-join", client_id);
1268 void *recv_result = NULL;
1269 asciichat_error_t recv_join_result =
asciichat_thread_join(&target_client->receive_thread, &recv_result);
1270 if (recv_join_result != ASCIICHAT_OK) {
1271 log_warn(
"Failed to join receive thread for WebRTC client %u: error %d", client_id, recv_join_result);
1273 log_debug(
"Joined receive thread for WebRTC client %u", client_id);
1278 void *send_result = NULL;
1279 asciichat_error_t send_join_result =
asciichat_thread_join(&target_client->send_thread, &send_result);
1280 if (send_join_result != ASCIICHAT_OK) {
1281 log_warn(
"Failed to join send thread for WebRTC client %u: error %d", client_id, send_join_result);
1283 log_debug(
"Joined send thread for WebRTC client %u", client_id);
1293 if (target_client && target_client->transport && target_client->is_tcp_client) {
1295 target_client->transport = NULL;
1296 log_debug(
"Destroyed ACIP transport for TCP client %u", client_id);
1297 }
else if (target_client && target_client->transport && !target_client->is_tcp_client) {
1299 target_client->transport = NULL;
1300 log_debug(
"Skipped transport destruction for WebSocket client %u (LWS already destroyed)", client_id);
1304 if (client_socket != INVALID_SOCKET_VALUE) {
1305 log_debug(
"SOCKET_DEBUG: Closing socket %d for client %u after thread cleanup", client_socket, client_id);
1306 socket_close(client_socket);
1314 if (target_client) {
1316 uint32_t current_id = atomic_load(&target_client->client_id);
1317 bool still_shutting_down = atomic_load(&target_client->shutting_down);
1318 if (current_id != client_id || !still_shutting_down) {
1319 log_warn(
"Client %u pointer invalidated during thread cleanup (id=%u, shutting_down=%d)", client_id, current_id,
1320 still_shutting_down);
1327 if (target_client && target_client->socket != INVALID_SOCKET_VALUE) {
1328 mutex_lock(&target_client->client_state_mutex);
1329 target_client->socket = INVALID_SOCKET_VALUE;
1330 mutex_unlock(&target_client->client_state_mutex);
1331 log_debug(
"SOCKET_DEBUG: Client %u socket set to INVALID", client_id);
1335 cleanup_client_all_buffers(target_client);
1341 log_debug(
"Removed client %u from audio mixer", client_id);
1348 if (target_client) {
1349 client_info_t *hash_entry = NULL;
1351 if (hash_entry == target_client) {
1353 log_debug(
"Removed client %u from uthash table", client_id);
1355 log_warn(
"Client %u already removed from hash table by another thread (found=%p, expected=%p)", client_id,
1356 (
void *)hash_entry, (
void *)target_client);
1359 log_warn(
"Failed to remove client %u from hash table (client not found)", client_id);
1363 if (target_client->crypto_initialized) {
1365 target_client->crypto_initialized =
false;
1366 log_debug(
"Crypto context cleaned up for client %u", client_id);
1373 int retry_count = 0;
1374 const int max_retries = 5;
1375 while (retry_count < max_retries && (asciichat_thread_is_initialized(&target_client->send_thread) ||
1376 asciichat_thread_is_initialized(&target_client->receive_thread) ||
1377 asciichat_thread_is_initialized(&target_client->video_render_thread) ||
1378 asciichat_thread_is_initialized(&target_client->audio_render_thread))) {
1380 uint32_t delay_ms = 10 * (1 << retry_count);
1381 log_warn(
"Client %u: Some threads still appear initialized (attempt %d/%d), waiting %ums", client_id,
1382 retry_count + 1, max_retries, delay_ms);
1387 if (retry_count == max_retries) {
1388 log_error(
"Client %u: Threads did not terminate after %d retries, proceeding with cleanup anyway", client_id,
1397 atomic_store(&target_client->client_id, 0);
1412 memset(target_client, 0,
sizeof(client_info_t));
1415 int remaining_count = 0;
1416 for (
int j = 0; j < MAX_CLIENTS; j++) {
1423 log_debug(
"Client removed: client_id=%u (%s) removed, remaining clients: %d", client_id, display_name_copy,
1440static const acip_server_callbacks_t g_acip_server_callbacks;
1450 client_info_t *client = (client_info_t *)arg;
1453 log_error(
"Invalid client info in dispatch thread (NULL pointer)");
1457 uint32_t client_id = atomic_load(&client->client_id);
1458 log_info(
"DISPATCH_THREAD: Started for client %u", client_id);
1460 uint64_t dispatch_loop_count = 0;
1464 dispatch_loop_count++;
1473 log_dev_every(5 * US_PER_MS_INT,
"🔄 DISPATCH_LOOP[%llu]: Queue empty after %.1fμs, sleeping 10ms",
1474 (
unsigned long long)dispatch_loop_count, (dequeue_end - dequeue_start) / 1000.0);
1475 usleep(10 * US_PER_MS_INT);
1476 last_dequeue_attempt = dequeue_end;
1481 log_info(
"🚀 DISPATCH_LOOP[%llu]: 📦 DEQUEUED %zu byte packet (dequeue took %.1fμs) for client %u",
1482 (
unsigned long long)dispatch_loop_count, queued_pkt->data_len,
1483 (dequeue_end - dequeue_start) / 1000.0, client_id);
1487 const packet_header_t *header = (
const packet_header_t *)queued_pkt->data;
1488 size_t total_len = queued_pkt->data_len;
1489 uint8_t *payload = (uint8_t *)header +
sizeof(packet_header_t);
1490 size_t payload_len = 0;
1492 log_info(
"DISPATCH_THREAD: Processing %zu byte packet for client %u", total_len, client_id);
1494 if (total_len >=
sizeof(packet_header_t)) {
1495 packet_type_t packet_type = (packet_type_t)NET_TO_HOST_U16(header->type);
1496 payload_len = NET_TO_HOST_U32(header->length);
1498 log_info(
"🎯 DISPATCH_THREAD: Packet type=%d, payload_len=%u (will dispatch now)", packet_type, payload_len);
1502 if (packet_type == PACKET_TYPE_ENCRYPTED && client->transport && client->transport->crypto_ctx) {
1503 log_info(
"DISPATCH_THREAD: Decrypting PACKET_TYPE_ENCRYPTED for client %u", client_id);
1505 uint8_t *ciphertext = payload;
1506 size_t ciphertext_len = payload_len;
1509 size_t plaintext_size = ciphertext_len + 1024;
1510 uint8_t *plaintext = SAFE_MALLOC(plaintext_size, uint8_t *);
1512 log_error(
"DISPATCH_THREAD: Failed to allocate plaintext buffer for decryption");
1517 size_t plaintext_len;
1518 crypto_result_t crypto_result =
crypto_decrypt(client->transport->crypto_ctx, ciphertext, ciphertext_len,
1519 plaintext, plaintext_size, &plaintext_len);
1521 if (crypto_result != CRYPTO_OK) {
1523 SAFE_FREE(plaintext);
1528 if (plaintext_len <
sizeof(packet_header_t)) {
1529 log_error(
"DISPATCH_THREAD: Decrypted packet too small: %zu < %zu", plaintext_len,
sizeof(packet_header_t));
1530 SAFE_FREE(plaintext);
1536 const packet_header_t *inner_header = (
const packet_header_t *)plaintext;
1537 packet_type = (packet_type_t)NET_TO_HOST_U16(inner_header->type);
1538 payload_len = NET_TO_HOST_U32(inner_header->length);
1539 payload = plaintext +
sizeof(packet_header_t);
1541 log_info(
"DISPATCH_THREAD: Decrypted inner packet type=%d, payload_len=%u", packet_type, payload_len);
1544 if (client->transport) {
1546 payload_len, client, &g_acip_server_callbacks);
1548 if (dispatch_result != ASCIICHAT_OK) {
1549 log_error(
"DISPATCH_THREAD: Handler failed for decrypted packet type=%d: %s", packet_type,
1550 asciichat_error_string(dispatch_result));
1553 log_error(
"DISPATCH_THREAD: Cannot dispatch decrypted packet - transport is NULL for client %u", client_id);
1557 SAFE_FREE(plaintext);
1560 if (client->transport) {
1562 payload_len, client, &g_acip_server_callbacks);
1564 if (dispatch_result != ASCIICHAT_OK) {
1565 log_error(
"DISPATCH_THREAD: Handler failed for packet type=%d: %s", packet_type,
1566 asciichat_error_string(dispatch_result));
1569 log_error(
"DISPATCH_THREAD: Cannot dispatch packet - transport is NULL for client %u", client_id);
1578 log_info(
"DISPATCH_THREAD: Exiting for client %u", client_id);
1584 log_debug(
"RECV_THREAD: Thread function entered, arg=%p", arg);
1586 client_info_t *client = (client_info_t *)arg;
1592 log_error(
"Invalid client info in receive thread (NULL pointer)");
1599 log_debug(
"RECV_THREAD_DEBUG: Thread started, client=%p, client_id=%u, is_tcp=%d", (
void *)client,
1600 atomic_load(&client->client_id), client->is_tcp_client);
1602 if (atomic_load(&client->protocol_disconnect_requested)) {
1603 log_debug(
"Receive thread for client %u exiting before start (protocol disconnect requested)",
1604 atomic_load(&client->client_id));
1610 if (atomic_load(&client->client_id) == 0) {
1611 log_debug(
"Receive thread: client_id is 0, client struct may have been zeroed, exiting");
1617 if (client->is_tcp_client && client->socket == INVALID_SOCKET_VALUE) {
1618 log_error(
"Invalid client socket in receive thread");
1626 log_debug(
"Started receive thread for client %u (%s)", atomic_load(&client->client_id), client->display_name);
1632 log_info(
"RECV_THREAD_LOOP_START: client_id=%u, is_tcp=%d, transport=%p, active=%d", atomic_load(&client->client_id),
1633 client->is_tcp_client, (
void *)client->transport, atomic_load(&client->active));
1638 if (client->is_tcp_client && client->socket == INVALID_SOCKET_VALUE) {
1639 log_debug(
"TCP client %u has invalid socket, exiting receive thread", atomic_load(&client->client_id));
1645 if (atomic_load(&client->client_id) == 0) {
1646 log_debug(
"Client client_id reset, exiting receive thread");
1654 if (client->is_tcp_client) {
1656 asciichat_error_t acip_result =
1661 log_debug(
"RECV_EXIT: Server shutdown requested, breaking loop");
1666 if (acip_result != ASCIICHAT_OK) {
1667 asciichat_error_context_t err_ctx;
1668 if (HAS_ERRNO(&err_ctx)) {
1669 log_error(
"🔴 ACIP error for client %u: code=%u msg=%s", client->client_id, err_ctx.code,
1670 err_ctx.context_message);
1671 if (err_ctx.code == ERROR_NETWORK) {
1672 log_debug(
"Client %u disconnected (network error): %s", client->client_id, err_ctx.context_message);
1674 }
else if (err_ctx.code == ERROR_CRYPTO) {
1676 client,
"SECURITY VIOLATION: Unencrypted packet when encryption required - terminating connection");
1681 log_warn(
"ACIP error for TCP client %u: %s (disconnecting)", client->client_id,
1682 asciichat_error_string(acip_result));
1687 void *packet_data = NULL;
1688 void *allocated_buffer = NULL;
1689 size_t packet_len = 0;
1691 asciichat_error_t recv_result =
1692 client->transport->methods->recv(client->transport, &packet_data, &packet_len, &allocated_buffer);
1694 if (recv_result != ASCIICHAT_OK) {
1695 asciichat_error_context_t err_ctx;
1696 if (HAS_ERRNO(&err_ctx)) {
1699 if ((err_ctx.code == ERROR_NETWORK) && err_ctx.context_message &&
1700 strstr(err_ctx.context_message,
"reassembly timeout")) {
1702 log_dev_every(100000,
"Client %u: fragment reassembly timeout, retrying in 10ms", client->client_id);
1707 if (err_ctx.code == ERROR_NETWORK) {
1708 log_debug(
"Client %u disconnected (network error): %s", client->client_id, err_ctx.context_message);
1712 log_warn(
"Receive failed for WebRTC client %u: %s (disconnecting)", client->client_id,
1713 asciichat_error_string(recv_result));
1719 log_info(
"RECV_THREAD: Queuing %zu byte packet for async dispatch (client %u)", packet_len, client->client_id);
1722 packet_type_t pkt_type = PACKET_TYPE_PROTOCOL_VERSION;
1723 if (packet_len >=
sizeof(packet_header_t)) {
1724 const packet_header_t *pkt_header = (
const packet_header_t *)allocated_buffer;
1725 pkt_type = (packet_type_t)NET_TO_HOST_U16(pkt_header->type);
1731 int enqueue_result =
packet_queue_enqueue(client->received_packet_queue, pkt_type, allocated_buffer, packet_len,
1732 atomic_load(&client->client_id),
false);
1734 if (enqueue_result < 0) {
1735 log_warn(
"Failed to queue received packet for client %u (queue full?) - packet dropped", client->client_id);
1736 if (allocated_buffer) {
1746 uint32_t client_id_snapshot = atomic_load(&client->client_id);
1747 log_debug(
"Setting active=false in receive_thread_fn (client_id=%u, exiting receive loop)", client_id_snapshot);
1748 atomic_store(&client->active,
false);
1749 atomic_store(&client->send_thread_running,
false);
1750 atomic_store(&client->video_render_thread_running,
false);
1751 atomic_store(&client->audio_render_thread_running,
false);
1756 log_debug(
"Receive thread for client %u calling remove_client() for cleanup", client_id_snapshot);
1760 log_warn(
"Failed to remove client %u from receive thread cleanup", client_id_snapshot);
1763 log_error(
"Receive thread for client %u: server_ctx is NULL, cannot call remove_client()", client_id_snapshot);
1766 log_debug(
"Receive thread for client %u terminated", client_id_snapshot);
1776 client_info_t *client = (client_info_t *)arg;
1782 log_error(
"Invalid client info in send thread (NULL pointer)");
1788 if (atomic_load(&client->client_id) == 0) {
1789 log_debug_every(100 * US_PER_MS_INT,
"Send thread: client_id is 0, client struct may have been zeroed, exiting");
1796 mutex_lock(&client->send_mutex);
1797 bool has_socket = (client->socket != INVALID_SOCKET_VALUE);
1798 bool has_transport = (client->transport != NULL);
1799 mutex_unlock(&client->send_mutex);
1801 log_info(
"SEND_THREAD_VALIDATION: client_id=%u socket_valid=%d transport_valid=%d transport_ptr=%p",
1802 atomic_load(&client->client_id), has_socket, has_transport, (
void *)client->transport);
1804 if (!has_socket && !has_transport) {
1805 log_error(
"Invalid client connection in send thread (no socket or transport)");
1809 log_info(
"Started send thread for client %u (%s)", atomic_load(&client->client_id), client->display_name);
1812 atomic_store(&client->send_thread_running,
true);
1814 log_info(
"SEND_THREAD_LOOP_START: client_id=%u active=%d shutting_down=%d running=%d",
1815 atomic_load(&client->client_id), atomic_load(&client->active), atomic_load(&client->shutting_down),
1816 atomic_load(&client->send_thread_running));
1819 uint64_t last_video_send_time = 0;
1820 const uint64_t video_send_interval_us = 16666;
1824#define MAX_AUDIO_BATCH 8
1825 int loop_iteration_count = 0;
1826 while (!atomic_load(&
g_server_should_exit) && !atomic_load(&client->shutting_down) && atomic_load(&client->active) &&
1827 atomic_load(&client->send_thread_running)) {
1828 loop_iteration_count++;
1829 bool sent_something =
false;
1831 log_info_every(5000 * US_PER_MS_INT,
"[SEND_LOOP_%d] START: client=%u", loop_iteration_count,
1832 atomic_load(&client->client_id));
1837 int audio_packet_count = 0;
1839 if (client->audio_queue) {
1843 if (audio_packets[i]) {
1844 audio_packet_count++;
1849 if (audio_packet_count > 0) {
1850 log_dev_every(4500 * US_PER_MS_INT,
"SEND_AUDIO: client=%u dequeued=%d packets",
1851 atomic_load(&client->client_id), audio_packet_count);
1854 log_warn(
"Send thread: audio_queue is NULL for client %u", atomic_load(&client->client_id));
1858 if (audio_packet_count > 0) {
1860 mutex_lock(&client->client_state_mutex);
1861 bool crypto_ready = !GET_OPTION(no_encrypt) && client->crypto_initialized &&
1863 mutex_unlock(&client->client_state_mutex);
1866 asciichat_error_t result = ASCIICHAT_OK;
1868 if (audio_packet_count == 1) {
1870 packet_type_t pkt_type = (packet_type_t)NET_TO_HOST_U16(audio_packets[0]->header.type);
1873 mutex_lock(&client->send_mutex);
1874 if (atomic_load(&client->shutting_down) || !client->transport) {
1875 mutex_unlock(&client->send_mutex);
1876 log_warn(
"BREAK_AUDIO_SINGLE: client_id=%u shutting_down=%d transport=%p", atomic_load(&client->client_id),
1877 atomic_load(&client->shutting_down), (
void *)client->transport);
1880 acip_transport_t *transport = client->transport;
1881 mutex_unlock(&client->send_mutex);
1885 atomic_load(&client->client_id));
1886 if (result != ASCIICHAT_OK) {
1887 log_error(
"AUDIO SEND FAIL: client=%u, len=%zu, result=%d", client->client_id, audio_packets[0]->data_len,
1892 packet_type_t first_pkt_type = (packet_type_t)NET_TO_HOST_U16(audio_packets[0]->header.type);
1895 mutex_lock(&client->send_mutex);
1896 if (atomic_load(&client->shutting_down) || !client->transport) {
1897 mutex_unlock(&client->send_mutex);
1898 log_warn(
"BREAK_AUDIO_BATCH: client_id=%u shutting_down=%d transport=%p", atomic_load(&client->client_id),
1899 atomic_load(&client->shutting_down), (
void *)client->transport);
1900 result = ERROR_NETWORK;
1902 acip_transport_t *transport = client->transport;
1903 mutex_unlock(&client->send_mutex);
1905 if (first_pkt_type == PACKET_TYPE_AUDIO_OPUS_BATCH) {
1907 size_t total_opus_size = 0;
1908 for (
int i = 0; i < audio_packet_count; i++) {
1909 total_opus_size += audio_packets[i]->data_len;
1912 uint8_t *batched_opus = SAFE_MALLOC(total_opus_size, uint8_t *);
1913 uint16_t *frame_sizes = SAFE_MALLOC((
size_t)audio_packet_count *
sizeof(uint16_t), uint16_t *);
1915 if (batched_opus && frame_sizes) {
1917 for (
int i = 0; i < audio_packet_count; i++) {
1918 frame_sizes[i] = (uint16_t)audio_packets[i]->data_len;
1919 memcpy(batched_opus + offset, audio_packets[i]->data, audio_packets[i]->data_len);
1920 offset += audio_packets[i]->data_len;
1923 (uint32_t)audio_packet_count, AUDIO_SAMPLE_RATE, 20);
1924 if (result != ASCIICHAT_OK) {
1925 log_error(
"AUDIO SEND FAIL (opus batch): client=%u, frames=%d, total_size=%zu, result=%d",
1926 atomic_load(&client->client_id), audio_packet_count, total_opus_size, result);
1929 log_error(
"Failed to allocate buffer for Opus batch");
1930 result = ERROR_MEMORY;
1932 SAFE_FREE(batched_opus);
1933 SAFE_FREE(frame_sizes);
1936 size_t total_samples = 0;
1937 for (
int i = 0; i < audio_packet_count; i++) {
1938 total_samples += audio_packets[i]->data_len /
sizeof(float);
1941 float *batched_audio = SAFE_MALLOC(total_samples *
sizeof(
float),
float *);
1942 if (batched_audio) {
1944 for (
int i = 0; i < audio_packet_count; i++) {
1945 size_t packet_samples = audio_packets[i]->data_len /
sizeof(float);
1946 memcpy(batched_audio + offset, audio_packets[i]->data, audio_packets[i]->data_len);
1947 offset += packet_samples;
1949 result =
acip_send_audio_batch(transport, batched_audio, (uint32_t)total_samples, AUDIO_SAMPLE_RATE);
1950 if (result != ASCIICHAT_OK) {
1951 log_error(
"AUDIO SEND FAIL (raw batch): client=%u, packets=%d, samples=%zu, result=%d",
1952 atomic_load(&client->client_id), audio_packet_count, total_samples, result);
1955 log_error(
"Failed to allocate buffer for audio batch");
1956 result = ERROR_MEMORY;
1958 SAFE_FREE(batched_audio);
1964 for (
int i = 0; i < audio_packet_count; i++) {
1968 if (result != ASCIICHAT_OK) {
1970 log_error(
"Failed to send audio to client %u: %s", client->client_id, asciichat_error_string(result));
1972 log_warn(
"SKIP_AUDIO_ERROR: client_id=%u result=%d (continuing to send video)", atomic_load(&client->client_id),
1977 sent_something =
true;
1979 log_info_every(5000 * US_PER_MS_INT,
"[SEND_LOOP_%d] AUDIO_SENT: took %.2fms", loop_iteration_count,
1980 (audio_done_ns - loop_start_ns) / 1e6);
1983 if (audio_packet_count > 0) {
1988 log_info_every(5000 * US_PER_MS_INT,
"[SEND_LOOP_%d] NO_AUDIO: sleeping 1ms", loop_iteration_count);
1992 mutex_lock(&client->client_state_mutex);
1993 bool should_rekey = !GET_OPTION(no_encrypt) && client->crypto_initialized &&
1996 mutex_unlock(&client->client_state_mutex);
1999 log_debug(
"Rekey threshold reached for client %u, initiating session rekey", client->client_id);
2000 mutex_lock(&client->client_state_mutex);
2002 mutex_lock(&client->send_mutex);
2003 if (atomic_load(&client->shutting_down) || client->socket == INVALID_SOCKET_VALUE) {
2004 mutex_unlock(&client->send_mutex);
2005 mutex_unlock(&client->client_state_mutex);
2006 log_warn(
"BREAK_REKEY: client_id=%u shutting_down=%d socket=%d", atomic_load(&client->client_id),
2007 atomic_load(&client->shutting_down), (
int)client->socket);
2010 socket_t rekey_socket = client->socket;
2011 mutex_unlock(&client->send_mutex);
2015 mutex_unlock(&client->client_state_mutex);
2017 if (result != ASCIICHAT_OK) {
2018 log_error(
"Failed to send REKEY_REQUEST to client %u: %d", client->client_id, result);
2020 log_debug(
"Sent REKEY_REQUEST to client %u", client->client_id);
2022 log_info_client(client,
"Session rekey initiated - rotating encryption keys");
2030 if (!client->outgoing_video_buffer) {
2033 log_warn(
"⚠️ Send thread exiting: outgoing_video_buffer is NULL for client %u (client shutting down?)",
2042 log_info_every(5000 * US_PER_MS_INT,
"[SEND_LOOP_%d] VIDEO_GET_FRAME: took %.3fms, frame=%p", loop_iteration_count,
2043 (frame_get_ns - video_check_ns) / 1e6, (
void *)frame);
2044 log_dev_every(4500 * US_PER_MS_INT,
"Send thread: video_frame_get_latest returned %p for client %u", (
void *)frame,
2049 log_warn(
"⚠️ Send thread exiting: video_frame_get_latest returned NULL for client %u (buffer destroyed?)",
2057 uint64_t current_time_us = time_ns_to_us(current_time_ns);
2058 uint64_t time_since_last_send_us = current_time_us - last_video_send_time;
2059 log_dev_every(4500 * US_PER_MS_INT,
2060 "Send thread timing check: time_since_last=%llu us, interval=%llu us, should_send=%d",
2061 (
unsigned long long)time_since_last_send_us, (
unsigned long long)video_send_interval_us,
2062 (time_since_last_send_us >= video_send_interval_us));
2064 if (current_time_us - last_video_send_time >= video_send_interval_us) {
2065 log_info_every(5000 * US_PER_MS_INT,
"✓ SEND_TIME_READY: client_id=%u time_since=%llu interval=%llu",
2066 atomic_load(&client->client_id), (
unsigned long long)time_since_last_send_us,
2067 (
unsigned long long)video_send_interval_us);
2072 int rendered_sources = atomic_load(&client->last_rendered_grid_sources);
2073 int sent_sources = atomic_load(&client->last_sent_grid_sources);
2075 if (rendered_sources != sent_sources && rendered_sources > 0) {
2078 mutex_lock(&client->send_mutex);
2079 if (atomic_load(&client->shutting_down) || !client->transport) {
2080 mutex_unlock(&client->send_mutex);
2081 log_warn(
"BREAK_CLEAR_CONSOLE: client_id=%u shutting_down=%d transport=%p", atomic_load(&client->client_id),
2082 atomic_load(&client->shutting_down), (
void *)client->transport);
2085 acip_transport_t *clear_transport = client->transport;
2086 mutex_unlock(&client->send_mutex);
2090 log_debug_every(LOG_RATE_FAST,
"Client %u: Sent CLEAR_CONSOLE (grid changed %d → %d sources)",
2091 client->client_id, sent_sources, rendered_sources);
2092 atomic_store(&client->last_sent_grid_sources, rendered_sources);
2093 sent_something =
true;
2096 log_dev_every(4500 * US_PER_MS_INT,
"Send thread: frame validation - frame=%p, frame->data=%p, frame->size=%zu",
2097 (
void *)frame, (
void *)frame->data, frame->size);
2100 log_dev(
"✗ SKIP_NO_DATA: client_id=%u frame=%p data=%p",
2101 atomic_load(&client->client_id), (
void *)frame, (
void *)frame->data);
2104 log_dev(
"✓ FRAME_DATA_OK: client_id=%u data=%p", atomic_load(&client->client_id),
2105 (
void *)frame->data);
2107 if (frame->data && frame->size == 0) {
2108 log_dev(
"✗ SKIP_ZERO_SIZE: client_id=%u size=%zu", atomic_load(&client->client_id),
2113 log_dev(
"✓ FRAME_SIZE_OK: client_id=%u size=%zu", atomic_load(&client->client_id),
2117 const char *frame_data = (
const char *)frame->data;
2118 size_t frame_size = frame->size;
2119 uint32_t width = atomic_load(&client->width);
2120 uint32_t height = atomic_load(&client->height);
2127 mutex_lock(&client->client_state_mutex);
2128 bool crypto_ready = GET_OPTION(no_encrypt) ||
2130 mutex_unlock(&client->client_state_mutex);
2132 if (!crypto_ready) {
2133 log_dev(
"⚠️ SKIP_SEND_CRYPTO: client_id=%u crypto_initialized=%d no_encrypt=%d",
2134 atomic_load(&client->client_id), client->crypto_initialized, GET_OPTION(no_encrypt));
2137 log_dev(
"✓ CRYPTO_READY: client_id=%u about to send frame",
2138 atomic_load(&client->client_id));
2142 log_dev_every(4500 * US_PER_MS_INT,
2143 "Send thread: About to send frame to client %u (width=%u, height=%u, size=%zu, data=%p)",
2144 client->client_id, width, height, frame_size, (
void *)frame_data);
2147 if (frame_data && frame_size > 0) {
2148 log_info_every(5000 * US_PER_MS_INT,
2149 "FRAME_DATA_HEX: client=%u first_bytes=[%02x %02x %02x %02x %02x %02x %02x %02x %02x %02x %02x "
2150 "%02x %02x %02x %02x %02x]",
2151 atomic_load(&client->client_id), ((uint8_t *)frame_data)[0], ((uint8_t *)frame_data)[1],
2152 ((uint8_t *)frame_data)[2], ((uint8_t *)frame_data)[3], ((uint8_t *)frame_data)[4],
2153 ((uint8_t *)frame_data)[5], ((uint8_t *)frame_data)[6], ((uint8_t *)frame_data)[7],
2154 ((uint8_t *)frame_data)[8], ((uint8_t *)frame_data)[9], ((uint8_t *)frame_data)[10],
2155 ((uint8_t *)frame_data)[11], ((uint8_t *)frame_data)[12], ((uint8_t *)frame_data)[13],
2156 ((uint8_t *)frame_data)[14], ((uint8_t *)frame_data)[15]);
2159 mutex_lock(&client->send_mutex);
2160 if (atomic_load(&client->shutting_down) || !client->transport) {
2161 mutex_unlock(&client->send_mutex);
2162 log_warn(
"BREAK_FRAME_SEND: client_id=%u shutting_down=%d transport=%p loop_iter=%d",
2163 atomic_load(&client->client_id), atomic_load(&client->shutting_down), (
void *)client->transport,
2164 loop_iteration_count);
2167 acip_transport_t *frame_transport = client->transport;
2168 mutex_unlock(&client->send_mutex);
2171 log_dev_every(4500 * US_PER_MS_INT,
"SEND_ASCII_FRAME: client_id=%u size=%zu width=%u height=%u",
2172 atomic_load(&client->client_id), frame_size, width, height);
2174 log_dev(
"[SEND_LOOP_%d] FRAME_SEND_START: size=%zu", loop_iteration_count,
2176 asciichat_error_t send_result =
acip_send_ascii_frame(frame_transport, frame_data, frame_size, width, height,
2177 atomic_load(&client->client_id));
2179 uint64_t send_ms = (send_end_ns - send_start_ns) / 1e6;
2180 log_dev(
"[SEND_LOOP_%d] FRAME_SEND_END: took %.2fms, result=%d",
2181 loop_iteration_count, send_ms, send_result);
2184 if (send_result != ASCIICHAT_OK) {
2186 SET_ERRNO(ERROR_NETWORK,
"Failed to send video frame to client %u: %s", client->client_id,
2187 asciichat_error_string(send_result));
2189 log_error(
"SEND_FRAME_FAILED: client_id=%u result=%d message=%s loop_iter=%d", atomic_load(&client->client_id),
2190 send_result, asciichat_error_string(send_result), loop_iteration_count);
2191 log_warn(
"BREAK_SEND_ERROR: client_id=%u (frame send failed)", atomic_load(&client->client_id));
2195 log_dev_every(4500 * US_PER_MS_INT,
"SEND_FRAME_SUCCESS: client_id=%u size=%zu", atomic_load(&client->client_id),
2199 unsigned long frame_count = atomic_fetch_add(&client->frames_sent_count, 1) + 1;
2200 log_info(
"🎬 FRAME_SENT: client_id=%u frame_num=%lu size=%zu",
2201 atomic_load(&client->client_id), frame_count, frame_size);
2203 sent_something =
true;
2204 last_video_send_time = current_time_us;
2207 uint64_t frame_time_us = time_ns_to_us(
time_elapsed_ns(frame_start_ns, frame_end_ns));
2208 if (frame_time_us > 15 * US_PER_MS_INT) {
2209 uint64_t step1_us = time_ns_to_us(
time_elapsed_ns(frame_start_ns, step1_ns));
2210 uint64_t step2_us = time_ns_to_us(
time_elapsed_ns(step1_ns, step2_ns));
2211 uint64_t step3_us = time_ns_to_us(
time_elapsed_ns(step2_ns, step3_ns));
2212 uint64_t step4_us = time_ns_to_us(
time_elapsed_ns(step3_ns, step4_ns));
2213 uint64_t step5_us = time_ns_to_us(
time_elapsed_ns(step4_ns, step5_ns));
2216 "SEND_THREAD: Frame send took %.2fms for client %u | Snapshot: %.2fms | Memcpy: %.2fms | CRC32: %.2fms | "
2217 "Header: %.2fms | send_packet_secure: %.2fms",
2218 frame_time_us / 1000.0, client->client_id, step1_us / 1000.0, step2_us / 1000.0, step3_us / 1000.0,
2219 step4_us / 1000.0, step5_us / 1000.0);
2224 if (!sent_something) {
2225 log_info_every(5000 * US_PER_MS_INT,
"[SEND_LOOP_%d] IDLE_SLEEP: nothing sent", loop_iteration_count);
2229 uint64_t loop_ms = (loop_end_ns - loop_start_ns) / 1e6;
2231 log_warn(
"[SEND_LOOP_%d] SLOW_ITERATION: took %.2fms (client=%u)", loop_iteration_count, loop_ms,
2232 atomic_load(&client->client_id));
2237 log_warn(
"Send thread exit conditions - client_id=%u g_server_should_exit=%d shutting_down=%d active=%d "
2238 "send_thread_running=%d",
2239 atomic_load(&client->client_id), atomic_load(&
g_server_should_exit), atomic_load(&client->shutting_down),
2240 atomic_load(&client->active), atomic_load(&client->send_thread_running));
2243 atomic_store(&client->send_thread_running,
false);
2244 log_debug(
"Send thread for client %u terminated", client->client_id);
2263 const crypto_context_t *crypto_ctx;
2264 } client_snapshot_t;
2266 client_snapshot_t client_snapshots[MAX_CLIENTS];
2267 int snapshot_count = 0;
2268 int active_video_count = 0;
2274 if (lock_time_ns > 1 * NS_PER_MS_INT) {
2275 char duration_str[32];
2277 log_warn(
"broadcast_server_state: rwlock_rdlock took %s", duration_str);
2282 for (
int i = 0; i < MAX_CLIENTS; i++) {
2285 active_video_count++;
2291 log_debug(
"Skipping server_state broadcast to client %u: crypto handshake not complete",
2297 const crypto_context_t *crypto_ctx =
2299 if (!GET_OPTION(no_encrypt) && !crypto_ctx) {
2301 log_debug(
"Skipping server_state broadcast to client %u: no crypto context",
2308 client_snapshots[snapshot_count].crypto_ctx = crypto_ctx;
2314 server_state_packet_t state;
2316 state.active_client_count = active_video_count;
2317 memset(state.reserved, 0,
sizeof(state.reserved));
2320 server_state_packet_t net_state;
2321 net_state.connected_client_count = HOST_TO_NET_U32(state.connected_client_count);
2322 net_state.active_client_count = HOST_TO_NET_U32(state.active_client_count);
2323 memset(net_state.reserved, 0,
sizeof(net_state.reserved));
2328 uint64_t lock_held_ns =
time_elapsed_ns(lock_start_ns, lock_held_final_ns);
2333 for (
int i = 0; i < snapshot_count; i++) {
2334 log_debug_every(5000 * US_PER_MS_INT,
2335 "BROADCAST_DEBUG: Sending SERVER_STATE to client %u (socket %d) with crypto_ctx=%p",
2336 client_snapshots[i].client_id, client_snapshots[i].socket, (
void *)client_snapshots[i].crypto_ctx);
2343 if (atomic_load(&target->client_id) != client_snapshots[i].client_id) {
2344 log_warn(
"Client %u ID mismatch during broadcast (found %u), skipping send", client_snapshots[i].client_id,
2345 atomic_load(&target->client_id));
2349 mutex_lock(&target->send_mutex);
2352 if (atomic_load(&target->client_id) != client_snapshots[i].client_id) {
2353 mutex_unlock(&target->send_mutex);
2354 log_warn(
"Client %u was removed during broadcast send (now %u), skipping", client_snapshots[i].client_id,
2355 atomic_load(&target->client_id));
2361 mutex_unlock(&target->send_mutex);
2363 if (result != ASCIICHAT_OK) {
2364 log_error(
"Failed to send server state to client %u: %s", client_snapshots[i].client_id,
2365 asciichat_error_string(result));
2367 log_debug_every(5000 * US_PER_MS_INT,
"Sent server state to client %u: %u connected, %u active",
2368 client_snapshots[i].client_id, state.connected_client_count, state.active_client_count);
2371 log_warn(
"Client %u removed before broadcast send could complete", client_snapshots[i].client_id);
2375 if (lock_held_ns > 1 * NS_PER_MS_INT) {
2376 char duration_str[32];
2378 log_warn(
"broadcast_server_state: rwlock held for %s (includes network I/O)", duration_str);
2399 SET_ERRNO(ERROR_INVALID_PARAM,
"Server context is NULL");
2405 SET_ERRNO(ERROR_NOT_FOUND,
"Client %u not found", client_id);
2409 log_debug(
"Starting threads for WebRTC client %u...", client_id);
2410 return start_client_threads(server_ctx, client,
false);
2415 SET_ERRNO(ERROR_INVALID_PARAM,
"Client is NULL");
2420 log_debug(
"Setting active=false in stop_client_threads (client_id=%u)", atomic_load(&client->client_id));
2421 atomic_store(&client->active,
false);
2422 atomic_store(&client->send_thread_running,
false);
2425 if (asciichat_thread_is_initialized(&client->send_thread)) {
2428 if (asciichat_thread_is_initialized(&client->receive_thread)) {
2432 if (asciichat_thread_is_initialized(&client->dispatch_thread)) {
2433 atomic_store(&client->dispatch_thread_running,
false);
2440 SET_ERRNO(ERROR_INVALID_PARAM,
"Client is NULL");
2444 if (client->incoming_video_buffer) {
2446 client->incoming_video_buffer = NULL;
2450 if (client->outgoing_video_buffer) {
2452 client->outgoing_video_buffer = NULL;
2456 if (client->send_buffer) {
2457 SAFE_FREE(client->send_buffer);
2458 client->send_buffer = NULL;
2459 client->send_buffer_size = 0;
2462 if (client->incoming_audio_buffer) {
2464 client->incoming_audio_buffer = NULL;
2468 if (client->opus_decoder) {
2470 client->opus_decoder = NULL;
2478 if (client->audio_queue) {
2480 client->audio_queue = NULL;
2484 if (client->received_packet_queue) {
2486 client->received_packet_queue = NULL;
2501static inline void cleanup_client_all_buffers(client_info_t *client) {
2517 uint32_t *sender_id) {
2519 log_error(
"Received encrypted packet but crypto not ready for client %u", client->client_id);
2526 size_t original_alloc_size = *len;
2528 size_t decrypted_len;
2530 (uint8_t *)decrypted_data, original_alloc_size, &decrypted_len);
2532 if (decrypt_result != 0) {
2533 SET_ERRNO(ERROR_CRYPTO,
"Failed to process encrypted packet from client %u (result=%d)", client->client_id,
2545 *data = decrypted_data;
2546 *len = decrypted_len;
2549 if (*len <
sizeof(packet_header_t)) {
2550 SET_ERRNO(ERROR_CRYPTO,
"Decrypted packet too small for header from client %u", client->client_id);
2556 packet_header_t *header = (packet_header_t *)*data;
2557 *type = (packet_type_t)NET_TO_HOST_U16(header->type);
2558 *sender_id = NET_TO_HOST_U32(header->client_id);
2561 *data = (uint8_t *)*data +
sizeof(packet_header_t);
2562 *len -=
sizeof(packet_header_t);
2572static void acip_server_on_protocol_version(
const protocol_version_packet_t *version,
void *client_ctx,
void *app_ctx);
2573static void acip_server_on_image_frame(
const image_frame_packet_t *header,
const void *pixel_data,
size_t data_len,
2574 void *client_ctx,
void *app_ctx);
2575static void acip_server_on_audio(
const void *audio_data,
size_t audio_len,
void *client_ctx,
void *app_ctx);
2576static void acip_server_on_audio_batch(
const audio_batch_packet_t *header,
const float *samples,
size_t num_samples,
2577 void *client_ctx,
void *app_ctx);
2578static void acip_server_on_audio_opus(
const void *opus_data,
size_t opus_len,
void *client_ctx,
void *app_ctx);
2579static void acip_server_on_audio_opus_batch(
const void *batch_data,
size_t batch_len,
void *client_ctx,
void *app_ctx);
2580static void acip_server_on_client_join(
const void *join_data,
size_t data_len,
void *client_ctx,
void *app_ctx);
2581static void acip_server_on_client_leave(
void *client_ctx,
void *app_ctx);
2582static void acip_server_on_stream_start(uint32_t stream_types,
void *client_ctx,
void *app_ctx);
2583static void acip_server_on_stream_stop(uint32_t stream_types,
void *client_ctx,
void *app_ctx);
2584static void acip_server_on_capabilities(
const void *cap_data,
size_t data_len,
void *client_ctx,
void *app_ctx);
2585static void acip_server_on_ping(
void *client_ctx,
void *app_ctx);
2586static void acip_server_on_pong(
void *client_ctx,
void *app_ctx);
2587static void acip_server_on_error(
const error_packet_t *header,
const char *message,
void *client_ctx,
void *app_ctx);
2588static void acip_server_on_remote_log(
const remote_log_packet_t *header,
const char *message,
void *client_ctx,
2590static void acip_server_on_crypto_rekey_request(
const void *payload,
size_t payload_len,
void *client_ctx,
2592static void acip_server_on_crypto_rekey_response(
const void *payload,
size_t payload_len,
void *client_ctx,
2594static void acip_server_on_crypto_rekey_complete(
const void *payload,
size_t payload_len,
void *client_ctx,
2596static void acip_server_on_crypto_key_exchange_resp(packet_type_t type,
const void *payload,
size_t payload_len,
2597 void *client_ctx,
void *app_ctx);
2598static void acip_server_on_crypto_auth_response(packet_type_t type,
const void *payload,
size_t payload_len,
2599 void *client_ctx,
void *app_ctx);
2600static void acip_server_on_crypto_no_encryption(packet_type_t type,
const void *payload,
size_t payload_len,
2601 void *client_ctx,
void *app_ctx);
2609static const acip_server_callbacks_t g_acip_server_callbacks = {
2610 .on_protocol_version = acip_server_on_protocol_version,
2611 .on_image_frame = acip_server_on_image_frame,
2612 .on_audio = acip_server_on_audio,
2613 .on_audio_batch = acip_server_on_audio_batch,
2614 .on_audio_opus = acip_server_on_audio_opus,
2615 .on_audio_opus_batch = acip_server_on_audio_opus_batch,
2616 .on_client_join = acip_server_on_client_join,
2617 .on_client_leave = acip_server_on_client_leave,
2618 .on_stream_start = acip_server_on_stream_start,
2619 .on_stream_stop = acip_server_on_stream_stop,
2620 .on_capabilities = acip_server_on_capabilities,
2621 .on_ping = acip_server_on_ping,
2622 .on_pong = acip_server_on_pong,
2623 .on_error = acip_server_on_error,
2624 .on_remote_log = acip_server_on_remote_log,
2625 .on_crypto_rekey_request = acip_server_on_crypto_rekey_request,
2626 .on_crypto_rekey_response = acip_server_on_crypto_rekey_response,
2627 .on_crypto_rekey_complete = acip_server_on_crypto_rekey_complete,
2628 .on_crypto_key_exchange_resp = acip_server_on_crypto_key_exchange_resp,
2629 .on_crypto_auth_response = acip_server_on_crypto_auth_response,
2630 .on_crypto_no_encryption = acip_server_on_crypto_no_encryption,
2636static void acip_server_on_protocol_version(
const protocol_version_packet_t *version,
void *client_ctx,
void *app_ctx) {
2639 client_info_t *client = (client_info_t *)client_ctx;
2643static void acip_server_on_image_frame(
const image_frame_packet_t *header,
const void *pixel_data,
size_t data_len,
2644 void *client_ctx,
void *app_ctx) {
2647 client_info_t *client = (client_info_t *)client_ctx;
2649 log_info(
"CALLBACK_IMAGE_FRAME: client_id=%u, width=%u, height=%u, pixel_format=%u, compressed_size=%u, data_len=%zu",
2650 atomic_load(&client->client_id), header->width, header->height, header->pixel_format,
2651 header->compressed_size, data_len);
2654 if (header->width == 0 || header->height == 0) {
2655 log_error(
"Invalid image dimensions: %ux%u (width and height must be > 0)", header->width, header->height);
2660 const uint32_t MAX_WIDTH = 8192;
2661 const uint32_t MAX_HEIGHT = 8192;
2662 if (header->width > MAX_WIDTH || header->height > MAX_HEIGHT) {
2663 log_error(
"Image dimensions too large: %ux%u (max: %ux%u)", header->width, header->height, MAX_WIDTH, MAX_HEIGHT);
2670 if (atomic_load(&client->width) == 0 || atomic_load(&client->height) == 0) {
2671 atomic_store(&client->width, header->width);
2672 atomic_store(&client->height, header->height);
2673 log_info(
"Client %u: Auto-set dimensions from IMAGE_FRAME: %ux%u (CLIENT_CAPABILITIES not received)",
2674 atomic_load(&client->client_id), header->width, header->height);
2678 bool was_sending_video = atomic_load(&client->is_sending_video);
2679 if (!was_sending_video) {
2680 if (atomic_compare_exchange_strong(&client->is_sending_video, &was_sending_video,
true)) {
2681 log_info(
"Client %u auto-enabled video stream (received IMAGE_FRAME)", atomic_load(&client->client_id));
2682 log_info_client(client,
"First video frame received - streaming active");
2686 mutex_lock(&client->client_state_mutex);
2687 client->frames_received_logged++;
2688 if (client->frames_received_logged % 25000 == 0) {
2691 log_debug(
"Client %u has sent %u IMAGE_FRAME packets (%s)", atomic_load(&client->client_id),
2692 client->frames_received_logged, pretty);
2694 mutex_unlock(&client->client_state_mutex);
2698 uint32_t incoming_pixel_hash = 0;
2699 for (
size_t i = 0; i < data_len && i < 1000; i++) {
2700 incoming_pixel_hash = (uint32_t)((uint64_t)incoming_pixel_hash * 31 + ((
unsigned char *)pixel_data)[i]);
2704 uint32_t client_id = atomic_load(&client->client_id);
2705 bool is_new_frame = (incoming_pixel_hash != client->last_received_frame_hash);
2708 uint32_t first_pixel_rgb = 0;
2709 if (data_len >= 3) {
2710 first_pixel_rgb = ((uint32_t)((
unsigned char *)pixel_data)[0] << 16) |
2711 ((uint32_t)((
unsigned char *)pixel_data)[1] << 8) | (uint32_t)((
unsigned char *)pixel_data)[2];
2715 log_info(
"RECV_FRAME #%u NEW: Client %u dimensions=%ux%u pixel_size=%zu hash=0x%08x first_rgb=0x%06x (prev=0x%08x)",
2716 client->frames_received, client_id, header->width, header->height, data_len, incoming_pixel_hash,
2717 first_pixel_rgb, client->last_received_frame_hash);
2718 client->last_received_frame_hash = incoming_pixel_hash;
2720 log_info(
"RECV_FRAME #%u DUP: Client %u dimensions=%ux%u pixel_size=%zu hash=0x%08x first_rgb=0x%06x",
2721 client->frames_received, client_id, header->width, header->height, data_len, incoming_pixel_hash,
2727 if (client->incoming_video_buffer) {
2729 log_info(
"STORE_FRAME: client_id=%u, frame_ptr=%p, frame->data=%p", atomic_load(&client->client_id), (
void *)frame,
2730 frame ? frame->data : NULL);
2731 if (frame && frame->data && data_len > 0) {
2733 uint32_t width_net = HOST_TO_NET_U32(header->width);
2734 uint32_t height_net = HOST_TO_NET_U32(header->height);
2735 size_t total_size =
sizeof(uint32_t) * 2 + data_len;
2737 log_info(
"STORE_FRAME_DATA: total_size=%zu, max_allowed=2097152, fits=%d", total_size,
2738 total_size <= 2 * 1024 * 1024);
2740 if (total_size <= 2 * 1024 * 1024) {
2741 memcpy(frame->data, &width_net,
sizeof(uint32_t));
2742 memcpy((
char *)frame->data +
sizeof(uint32_t), &height_net,
sizeof(uint32_t));
2743 memcpy((
char *)frame->data +
sizeof(uint32_t) * 2, pixel_data, data_len);
2744 frame->size = total_size;
2745 frame->width = header->width;
2746 frame->height = header->height;
2747 frame->capture_timestamp_ns = (uint64_t)time(NULL) * NS_PER_SEC_INT;
2748 frame->sequence_number = ++client->frames_received;
2750 log_info(
"FRAME_COMMITTED: client_id=%u, seq=%u, size=%zu hash=0x%08x", atomic_load(&client->client_id),
2751 frame->sequence_number, total_size, incoming_pixel_hash);
2753 log_warn(
"FRAME_TOO_LARGE: client_id=%u, size=%zu > max 2MB", atomic_load(&client->client_id), total_size);
2756 log_warn(
"STORE_FRAME_FAILED: frame_ptr=%p, frame->data=%p, data_len=%zu", (
void *)frame,
2757 frame ? frame->data : NULL, data_len);
2760 log_warn(
"NO_INCOMING_VIDEO_BUFFER: client_id=%u", atomic_load(&client->client_id));
2764 char cb_duration_str[32];
2765 format_duration_ns((
double)(callback_end_ns - callback_start_ns), cb_duration_str,
sizeof(cb_duration_str));
2766 log_info(
"[WS_TIMING] on_image_frame callback took %s (data_len=%zu)", cb_duration_str, data_len);
2769static void acip_server_on_audio(
const void *audio_data,
size_t audio_len,
void *client_ctx,
void *app_ctx) {
2771 client_info_t *client = (client_info_t *)client_ctx;
2775static void acip_server_on_audio_batch(
const audio_batch_packet_t *header,
const float *samples,
size_t num_samples,
2776 void *client_ctx,
void *app_ctx) {
2779 client_info_t *client = (client_info_t *)client_ctx;
2783 log_debug_every(LOG_RATE_DEFAULT,
"Received audio batch from client %u (samples=%zu, is_sending_audio=%d)",
2784 atomic_load(&client->client_id), num_samples, atomic_load(&client->is_sending_audio));
2786 if (!atomic_load(&client->is_sending_audio)) {
2787 log_debug(
"Ignoring audio batch - client %u not in audio streaming mode", client->client_id);
2791 if (client->incoming_audio_buffer) {
2792 asciichat_error_t write_result =
2794 if (write_result != ASCIICHAT_OK) {
2795 log_error(
"Failed to write decoded audio batch to buffer: %s", asciichat_error_string(write_result));
2800static void acip_server_on_audio_opus(
const void *opus_data,
size_t opus_len,
void *client_ctx,
void *app_ctx) {
2802 client_info_t *client = (client_info_t *)client_ctx;
2807 if (opus_len < 16) {
2808 log_warn(
"AUDIO_OPUS packet too small: %zu bytes", opus_len);
2812 const uint8_t *payload = (
const uint8_t *)opus_data;
2814 int sample_rate = (int)NET_TO_HOST_U32(read_u32_unaligned(payload));
2815 int frame_duration = (int)NET_TO_HOST_U32(read_u32_unaligned(payload + 4));
2817 size_t actual_opus_size = opus_len - 16;
2819 if (actual_opus_size > 0 && actual_opus_size <= 1024 && sample_rate == 48000 && frame_duration == 20) {
2821 uint8_t batch_buffer[1024 + 20];
2822 uint8_t *batch_ptr = batch_buffer;
2825 write_u32_unaligned(batch_ptr, HOST_TO_NET_U32((uint32_t)sample_rate));
2827 write_u32_unaligned(batch_ptr, HOST_TO_NET_U32((uint32_t)frame_duration));
2829 write_u32_unaligned(batch_ptr, HOST_TO_NET_U32(1));
2831 memset(batch_ptr, 0, 4);
2835 write_u16_unaligned(batch_ptr, HOST_TO_NET_U16((uint16_t)actual_opus_size));
2839 memcpy(batch_ptr, payload + 16, actual_opus_size);
2840 batch_ptr += actual_opus_size;
2843 size_t batch_size = (size_t)(batch_ptr - batch_buffer);
2848static void acip_server_on_audio_opus_batch(
const void *batch_data,
size_t batch_len,
void *client_ctx,
void *app_ctx) {
2850 client_info_t *client = (client_info_t *)client_ctx;
2854static void acip_server_on_client_join(
const void *join_data,
size_t data_len,
void *client_ctx,
void *app_ctx) {
2856 client_info_t *client = (client_info_t *)client_ctx;
2860static void acip_server_on_client_leave(
void *client_ctx,
void *app_ctx) {
2862 client_info_t *client = (client_info_t *)client_ctx;
2866static void acip_server_on_stream_start(uint32_t stream_types,
void *client_ctx,
void *app_ctx) {
2868 client_info_t *client = (client_info_t *)client_ctx;
2871 uint32_t stream_types_net = HOST_TO_NET_U32(stream_types);
2875static void acip_server_on_stream_stop(uint32_t stream_types,
void *client_ctx,
void *app_ctx) {
2877 client_info_t *client = (client_info_t *)client_ctx;
2880 uint32_t stream_types_net = HOST_TO_NET_U32(stream_types);
2884static void acip_server_on_capabilities(
const void *cap_data,
size_t data_len,
void *client_ctx,
void *app_ctx) {
2886 client_info_t *client = (client_info_t *)client_ctx;
2890static void acip_server_on_ping(
void *client_ctx,
void *app_ctx) {
2892 client_info_t *client = (client_info_t *)client_ctx;
2896 mutex_lock(&client->send_mutex);
2897 if (atomic_load(&client->shutting_down) || !client->transport) {
2898 mutex_unlock(&client->send_mutex);
2901 acip_transport_t *pong_transport = client->transport;
2902 mutex_unlock(&client->send_mutex);
2907 if (pong_result != ASCIICHAT_OK) {
2908 SET_ERRNO(ERROR_NETWORK,
"Failed to send PONG response to client %u: %s", client->client_id,
2909 asciichat_error_string(pong_result));
2913static void acip_server_on_pong(
void *client_ctx,
void *app_ctx) {
2919static void acip_server_on_error(
const error_packet_t *header,
const char *message,
void *client_ctx,
void *app_ctx) {
2921 client_info_t *client = (client_info_t *)client_ctx;
2924 size_t msg_len = strlen(message);
2925 size_t total_len =
sizeof(*header) + msg_len;
2926 uint8_t *full_packet = SAFE_MALLOC(total_len, uint8_t *);
2928 log_error(
"Failed to allocate buffer for ERROR_MESSAGE reconstruction");
2932 memcpy(full_packet, header,
sizeof(*header));
2933 memcpy(full_packet +
sizeof(*header), message, msg_len);
2935 handle_client_error_packet(client, full_packet, total_len);
2936 SAFE_FREE(full_packet);
2939static void acip_server_on_remote_log(
const remote_log_packet_t *header,
const char *message,
void *client_ctx,
2942 client_info_t *client = (client_info_t *)client_ctx;
2945 size_t msg_len = strlen(message);
2946 size_t total_len =
sizeof(*header) + msg_len;
2947 uint8_t *full_packet = SAFE_MALLOC(total_len, uint8_t *);
2949 log_error(
"Failed to allocate buffer for REMOTE_LOG reconstruction");
2953 memcpy(full_packet, header,
sizeof(*header));
2954 memcpy(full_packet +
sizeof(*header), message, msg_len);
2957 SAFE_FREE(full_packet);
2960static void acip_server_on_crypto_rekey_request(
const void *payload,
size_t payload_len,
void *client_ctx,
2963 client_info_t *client = (client_info_t *)client_ctx;
2965 log_debug(
"Received REKEY_REQUEST from client %u", client->client_id);
2968 mutex_lock(&client->client_state_mutex);
2969 asciichat_error_t crypto_result =
2971 mutex_unlock(&client->client_state_mutex);
2973 if (crypto_result != ASCIICHAT_OK) {
2974 log_error(
"Failed to process REKEY_REQUEST from client %u: %d", client->client_id, crypto_result);
2979 mutex_lock(&client->client_state_mutex);
2981 mutex_lock(&client->send_mutex);
2982 if (atomic_load(&client->shutting_down) || client->socket == INVALID_SOCKET_VALUE) {
2983 mutex_unlock(&client->send_mutex);
2984 mutex_unlock(&client->client_state_mutex);
2987 socket_t rekey_socket = client->socket;
2988 mutex_unlock(&client->send_mutex);
2992 mutex_unlock(&client->client_state_mutex);
2994 if (crypto_result != ASCIICHAT_OK) {
2995 log_error(
"Failed to send REKEY_RESPONSE to client %u: %d", client->client_id, crypto_result);
2997 log_debug(
"Sent REKEY_RESPONSE to client %u", client->client_id);
3001static void acip_server_on_crypto_rekey_response(
const void *payload,
size_t payload_len,
void *client_ctx,
3004 client_info_t *client = (client_info_t *)client_ctx;
3006 log_debug(
"Received REKEY_RESPONSE from client %u", client->client_id);
3009 mutex_lock(&client->client_state_mutex);
3010 asciichat_error_t crypto_result =
3012 mutex_unlock(&client->client_state_mutex);
3014 if (crypto_result != ASCIICHAT_OK) {
3015 log_error(
"Failed to process REKEY_RESPONSE from client %u: %d", client->client_id, crypto_result);
3020 mutex_lock(&client->client_state_mutex);
3022 mutex_lock(&client->send_mutex);
3023 if (atomic_load(&client->shutting_down) || client->socket == INVALID_SOCKET_VALUE) {
3024 mutex_unlock(&client->send_mutex);
3025 mutex_unlock(&client->client_state_mutex);
3028 socket_t complete_socket = client->socket;
3029 mutex_unlock(&client->send_mutex);
3033 mutex_unlock(&client->client_state_mutex);
3035 if (crypto_result != ASCIICHAT_OK) {
3036 log_error(
"Failed to send REKEY_COMPLETE to client %u: %d", client->client_id, crypto_result);
3038 log_debug(
"Sent REKEY_COMPLETE to client %u - session rekeying complete", client->client_id);
3042static void acip_server_on_crypto_rekey_complete(
const void *payload,
size_t payload_len,
void *client_ctx,
3045 client_info_t *client = (client_info_t *)client_ctx;
3047 log_debug(
"Received REKEY_COMPLETE from client %u", client->client_id);
3050 mutex_lock(&client->client_state_mutex);
3051 asciichat_error_t crypto_result =
3053 mutex_unlock(&client->client_state_mutex);
3055 if (crypto_result != ASCIICHAT_OK) {
3056 log_error(
"Failed to process REKEY_COMPLETE from client %u: %d", client->client_id, crypto_result);
3058 log_debug(
"Session rekeying completed successfully with client %u", client->client_id);
3060 log_info_client(client,
"Session rekey complete - new encryption keys active");
3064static void acip_server_on_crypto_key_exchange_resp(packet_type_t type,
const void *payload,
size_t payload_len,
3065 void *client_ctx,
void *app_ctx) {
3067 client_info_t *client = (client_info_t *)client_ctx;
3069 log_debug(
"Received CRYPTO_KEY_EXCHANGE_RESP from client %u", client->client_id);
3073 type, payload, payload_len);
3075 if (result != ASCIICHAT_OK) {
3076 log_error(
"Crypto handshake auth challenge failed for client %u", client->client_id);
3080 if (client->crypto_handshake_ctx.state == CRYPTO_HANDSHAKE_READY) {
3082 log_info(
"Crypto handshake completed successfully for client %u (no authentication)", client->client_id);
3083 client->crypto_initialized =
true;
3084 client->transport->crypto_ctx = &client->crypto_handshake_ctx.crypto_ctx;
3087 log_debug(
"Sent AUTH_CHALLENGE to client %u", client->client_id);
3092static void acip_server_on_crypto_auth_response(packet_type_t type,
const void *payload,
size_t payload_len,
3093 void *client_ctx,
void *app_ctx) {
3095 client_info_t *client = (client_info_t *)client_ctx;
3097 log_debug(
"Received CRYPTO_AUTH_RESPONSE from client %u", client->client_id);
3100 asciichat_error_t result =
3103 if (result != ASCIICHAT_OK) {
3104 log_error(
"Crypto handshake complete failed for client %u", client->client_id);
3107 log_info(
"Crypto handshake completed successfully for client %u", client->client_id);
3108 log_error(
"[CRYPTO_SETUP] Setting crypto context for client %u: transport=%p, crypto_ctx=%p", client->client_id,
3109 (
void *)client->transport, (
void *)&client->crypto_handshake_ctx.crypto_ctx);
3110 client->crypto_initialized =
true;
3111 client->transport->crypto_ctx = &client->crypto_handshake_ctx.crypto_ctx;
3112 log_error(
"[CRYPTO_SETUP] Crypto context SET: transport->crypto_ctx=%p", (
void *)client->transport->crypto_ctx);
3116static void acip_server_on_crypto_no_encryption(packet_type_t type,
const void *payload,
size_t payload_len,
3117 void *client_ctx,
void *app_ctx) {
3122 client_info_t *client = (client_info_t *)client_ctx;
3124 log_error(
"Client %u sent NO_ENCRYPTION - encryption mode mismatch", client->client_id);
3138 log_debug(
"CLIENT: client_id=%u, data=%p, len=%zu", atomic_load(&client->client_id), data, len);
3150 int idx = client_dispatch_hash_lookup(g_client_dispatch_hash, type);
3151 if (type == 5000 || type == 3001) {
3152 log_error(
"DISPATCH_LOOKUP: type=%d, idx=%d (len=%zu)", type, idx, len);
3159 if (type == 5000 || type == 3001) {
3160 log_error(
"DISPATCH_HANDLER: type=%d, calling handler[%d]...", type, idx);
3162 g_client_dispatch_handlers[idx](client, data, len);
3163 if (type == 5000 || type == 3001) {
3164 log_error(
"DISPATCH_HANDLER: type=%d, handler returned", type);
void asciichat_errno_destroy(void)
void buffer_pool_free(buffer_pool_t *pool, void *data, size_t size)
void * buffer_pool_alloc(buffer_pool_t *pool, size_t size)
Per-client state management and lifecycle orchestration.
asciichat_error_t crypto_handshake_rekey_complete(crypto_handshake_context_t *ctx, socket_t socket)
const crypto_context_t * crypto_handshake_get_context(const crypto_handshake_context_t *ctx)
void crypto_handshake_destroy(crypto_handshake_context_t *ctx)
asciichat_error_t crypto_handshake_process_rekey_request(crypto_handshake_context_t *ctx, const uint8_t *packet, size_t packet_len)
bool crypto_handshake_is_ready(const crypto_handshake_context_t *ctx)
bool crypto_handshake_should_rekey(const crypto_handshake_context_t *ctx)
asciichat_error_t crypto_handshake_rekey_response(crypto_handshake_context_t *ctx, socket_t socket)
asciichat_error_t crypto_handshake_process_rekey_response(crypto_handshake_context_t *ctx, const uint8_t *packet, size_t packet_len)
asciichat_error_t crypto_handshake_process_rekey_complete(crypto_handshake_context_t *ctx, const uint8_t *packet, size_t packet_len)
asciichat_error_t crypto_handshake_rekey_request(crypto_handshake_context_t *ctx, socket_t socket)
bool check_and_record_packet_rate_limit(rate_limiter_t *rate_limiter, const char *client_ip, socket_t client_socket, packet_type_t packet_type)
void handle_audio_opus_batch_packet(client_info_t *client, const void *data, size_t len)
Process AUDIO_OPUS_BATCH packet - efficient Opus-encoded audio batch from client.
uint32_t session_host_add_client(session_host_t *host, socket_t socket, const char *ip, int port)
asciichat_error_t session_host_remove_client(session_host_t *host, uint32_t client_id)
asciichat_error_t audio_ring_buffer_write(audio_ring_buffer_t *rb, const float *data, int samples)
void audio_ring_buffer_destroy(audio_ring_buffer_t *rb)
audio_ring_buffer_t * audio_ring_buffer_create_for_capture(void)
const char * crypto_result_to_string(crypto_result_t result)
crypto_result_t crypto_decrypt(crypto_context_t *ctx, const uint8_t *ciphertext, size_t ciphertext_len, uint8_t *plaintext_out, size_t plaintext_out_size, size_t *plaintext_len_out)
asciichat_error_t crypto_handshake_server_complete(crypto_handshake_context_t *ctx, acip_transport_t *transport, packet_type_t packet_type, const uint8_t *payload, size_t payload_len)
asciichat_error_t crypto_handshake_server_auth_challenge(crypto_handshake_context_t *ctx, acip_transport_t *transport, packet_type_t packet_type, const uint8_t *payload, size_t payload_len)
asciichat_error_t acip_send_server_state(acip_transport_t *transport, const server_state_packet_t *state)
asciichat_error_t acip_server_receive_and_dispatch(acip_transport_t *transport, void *client_ctx, const acip_server_callbacks_t *callbacks)
asciichat_error_t acip_send_clear_console(acip_transport_t *transport)
asciichat_error_t acip_send_ascii_frame(acip_transport_t *transport, const char *frame_data, size_t frame_size, uint32_t width, uint32_t height, uint32_t client_id)
asciichat_error_t tcp_server_spawn_thread(tcp_server_t *server, socket_t client_socket, void *(*thread_func)(void *), void *thread_arg, int stop_id, const char *thread_name)
asciichat_error_t tcp_server_stop_client_threads(tcp_server_t *server, socket_t client_socket)
asciichat_error_t tcp_server_add_client(tcp_server_t *server, socket_t socket, void *client_data)
int mixer_add_source(mixer_t *mixer, uint32_t client_id, audio_ring_buffer_t *buffer)
void mixer_remove_source(mixer_t *mixer, uint32_t client_id)
asciichat_error_t acip_handle_server_packet(acip_transport_t *transport, packet_type_t type, const void *payload, size_t payload_len, void *client_ctx, const acip_server_callbacks_t *callbacks)
asciichat_error_t set_socket_timeout(socket_t sockfd, uint64_t timeout_ns)
Set socket timeout.
asciichat_error_t set_socket_keepalive(socket_t sockfd)
Set socket keepalive.
const char * network_error_string()
Get human-readable error string for network errors.
void opus_codec_destroy(opus_codec_t *codec)
asciichat_error_t packet_parse_error_message(const void *data, size_t len, asciichat_error_t *out_error_code, char *message_buffer, size_t message_buffer_size, size_t *out_message_length)
packet_recv_result_t receive_packet_secure(socket_t sockfd, void *crypto_ctx, bool enforce_encryption, packet_envelope_t *envelope)
Receive a packet with decryption and decompression support.
void packet_queue_stop(packet_queue_t *queue)
packet_queue_t * packet_queue_create_with_pools(size_t max_size, size_t node_pool_size, bool use_buffer_pool)
void packet_queue_free_packet(queued_packet_t *packet)
int packet_queue_enqueue(packet_queue_t *queue, packet_type_t type, const void *data, size_t data_len, uint32_t client_id, bool copy_data)
queued_packet_t * packet_queue_try_dequeue(packet_queue_t *queue)
void packet_queue_destroy(packet_queue_t *queue)
Per-client rendering threads with rate limiting.
asciichat_error_t acip_send_pong(acip_transport_t *transport)
asciichat_error_t acip_send_audio_opus_batch(acip_transport_t *transport, const void *opus_data, size_t opus_len, const uint16_t *frame_sizes, uint32_t frame_count, uint32_t sample_rate, uint32_t frame_duration)
asciichat_error_t acip_send_audio_batch(acip_transport_t *transport, const float *samples, uint32_t num_samples, uint32_t batch_count)
asciichat_error_t packet_send_via_transport(acip_transport_t *transport, packet_type_t type, const void *payload, size_t payload_len, uint32_t client_id)
Send packet via transport with proper header (exported for generic wrappers)
Server cryptographic operations and per-client handshake management.
rate_limiter_t * g_rate_limiter
Global rate limiter for connection attempts and packet processing.
mixer_t *volatile g_audio_mixer
Global audio mixer instance for multi-client audio processing.
atomic_bool g_server_should_exit
Global atomic shutdown flag shared across all threads.
ascii-chat Server Mode Entry Point Header
void handle_client_join_packet(client_info_t *client, const void *data, size_t len)
Process CLIENT_JOIN packet - client announces identity and capabilities.
void handle_pong_packet(client_info_t *client, const void *data, size_t len)
Handle PONG packet - client acknowledged our PING.
void handle_protocol_version_packet(client_info_t *client, const void *data, size_t len)
Process PROTOCOL_VERSION packet - validate protocol compatibility.
void handle_image_frame_packet(client_info_t *client, void *data, size_t len)
Process IMAGE_FRAME packet - store client's video data for rendering.
void handle_audio_batch_packet(client_info_t *client, const void *data, size_t len)
Process AUDIO_BATCH packet - store efficiently batched audio samples.
void handle_audio_packet(client_info_t *client, const void *data, size_t len)
Process AUDIO packet - store single audio sample batch (legacy format)
void handle_client_leave_packet(client_info_t *client, const void *data, size_t len)
Process CLIENT_LEAVE packet - handle clean client disconnect.
void handle_stream_stop_packet(client_info_t *client, const void *data, size_t len)
Process STREAM_STOP packet - client requests to halt media transmission.
int send_server_state_to_client(client_info_t *client)
Send current server state to a specific client.
void handle_client_capabilities_packet(client_info_t *client, const void *data, size_t len)
Process CLIENT_CAPABILITIES packet - configure client-specific rendering.
void handle_stream_start_packet(client_info_t *client, const void *data, size_t len)
Process STREAM_START packet - client requests to begin media transmission.
void handle_remote_log_packet_from_client(client_info_t *client, const void *data, size_t len)
void disconnect_client_for_bad_data(client_info_t *client, const char *format,...)
void handle_ping_packet(client_info_t *client, const void *data, size_t len)
Handle PING packet - respond with PONG.
Server packet processing and protocol implementation.
int start_webrtc_client_threads(server_context_t *server_ctx, uint32_t client_id)
Start threads for a WebRTC client after crypto initialization.
void * client_send_thread_func(void *arg)
Client packet send thread.
client_info_t * find_client_by_id(uint32_t client_id)
Fast O(1) client lookup by ID using hash table.
#define CLIENT_DISPATCH_HANDLER_COUNT
int add_client(server_context_t *server_ctx, socket_t socket, const char *client_ip, int port)
void cleanup_client_media_buffers(client_info_t *client)
void process_decrypted_packet(client_info_t *client, packet_type_t type, void *data, size_t len)
void cleanup_client_packet_queues(client_info_t *client)
rwlock_t g_client_manager_rwlock
Reader-writer lock protecting the global client manager.
void * client_receive_thread(void *arg)
int remove_client(server_context_t *server_ctx, uint32_t client_id)
void stop_client_threads(client_info_t *client)
void(* client_packet_handler_t)(client_info_t *client, const void *data, size_t len)
int add_webrtc_client(server_context_t *server_ctx, acip_transport_t *transport, const char *client_ip, bool start_threads)
Register a WebRTC client with the server.
client_info_t * find_client_by_socket(socket_t socket)
Find client by socket descriptor using linear search.
int process_encrypted_packet(client_info_t *client, packet_type_t *type, void **data, size_t *len, uint32_t *sender_id)
void * client_dispatch_thread(void *arg)
Async dispatch thread for WebRTC clients.
client_manager_t g_client_manager
Global client manager singleton - central coordination point.
#define CLIENT_DISPATCH_HASH(type)
void broadcast_server_state_to_all_clients(void)
Notify all clients of state changes.
#define CLIENT_DISPATCH_HASH_SIZE
int crypto_server_decrypt_packet(uint32_t client_id, const uint8_t *ciphertext, size_t ciphertext_len, uint8_t *plaintext, size_t plaintext_size, size_t *plaintext_len)
const crypto_context_t * crypto_server_get_context(uint32_t client_id)
int server_crypto_init(void)
bool crypto_server_is_ready(uint32_t client_id)
int server_crypto_handshake(client_info_t *client)
int create_client_render_threads(server_context_t *server_ctx, client_info_t *client)
Create and initialize per-client rendering threads.
Multi-client video mixing and ASCII frame generation.
Hash table entry for client packet dispatch.
uint8_t handler_idx
Handler index (0-based)
packet_type_t key
Packet type (0 = empty slot)
Global client manager structure for server-side client coordination.
_Atomic uint32_t next_client_id
Monotonic counter for unique client IDs (atomic for thread-safety)
client_info_t * clients_by_id
uthash head pointer for O(1) client_id -> client_info_t* lookups
client_info_t clients[MAX_CLIENTS]
Array of client_info_t structures (backing storage)
int client_count
Current number of active clients.
Server context - encapsulates all server state.
tcp_server_t * tcp_server
TCP server managing connections.
session_host_t * session_host
Session host for discovery mode support.
int safe_snprintf(char *buffer, size_t buffer_size, const char *format,...)
Safe formatted string printing to buffer.
acip_transport_t * acip_tcp_transport_create(socket_t sockfd, crypto_context_t *crypto_ctx)
void acip_transport_destroy(acip_transport_t *transport)
int mutex_init(mutex_t *mutex)
int asciichat_thread_create(asciichat_thread_t *thread, void *(*start_routine)(void *), void *arg)
asciichat_thread_t asciichat_thread_self(void)
int asciichat_thread_join(asciichat_thread_t *thread, void **retval)
int asciichat_thread_equal(asciichat_thread_t t1, asciichat_thread_t t2)
int mutex_destroy(mutex_t *mutex)
uint64_t time_get_ns(void)
int format_duration_ns(double nanoseconds, char *buffer, size_t buffer_size)
uint64_t time_elapsed_ns(uint64_t start_ns, uint64_t end_ns)
void video_frame_buffer_destroy(video_frame_buffer_t *vfb)
video_frame_buffer_t * video_frame_buffer_create(uint32_t client_id)
video_frame_t * video_frame_begin_write(video_frame_buffer_t *vfb)
void video_frame_commit(video_frame_buffer_t *vfb)
const video_frame_t * video_frame_get_latest(video_frame_buffer_t *vfb)