114#include <stdatomic.h>
122#include <ascii-chat/common.h>
123#include <ascii-chat/common/buffer_sizes.h>
124#include <ascii-chat/util/endian.h>
125#include <ascii-chat/util/validation.h>
126#include <ascii-chat/util/endian.h>
127#include <ascii-chat/util/bytes.h>
128#include <ascii-chat/util/image.h>
129#include <ascii-chat/util/time.h>
130#include <ascii-chat/video/video_frame.h>
131#include <ascii-chat/audio/audio.h>
132#include <ascii-chat/video/palette.h>
133#include <ascii-chat/video/image.h>
134#include <ascii-chat/network/compression.h>
135#include <ascii-chat/network/packet_parsing.h>
136#include <ascii-chat/network/frame_validator.h>
137#include <ascii-chat/network/acip/send.h>
138#include <ascii-chat/network/acip/server.h>
139#include <ascii-chat/util/format.h>
140#include <ascii-chat/platform/system.h>
141#include <ascii-chat/audio/opus_codec.h>
142#include <ascii-chat/network/packet_parsing.h>
143#include <ascii-chat/network/logging.h>
144#include <ascii-chat/crypto/handshake/common.h>
146static void protocol_cleanup_thread_locals(
void) {
157 protocol_cleanup_thread_locals();
159 bool already_requested = atomic_exchange(&client->protocol_disconnect_requested,
true);
160 if (already_requested) {
164 char reason[BUFFER_SIZE_SMALL] = {0};
167 va_start(
args, format);
171 SAFE_STRNCPY(reason,
"Protocol violation",
sizeof(reason));
174 const char *reason_str = reason[0] !=
'\0' ? reason :
"Protocol violation";
175 uint32_t client_id = atomic_load(&client->client_id);
177 socket_t socket_snapshot = INVALID_SOCKET_VALUE;
178 const crypto_context_t *crypto_ctx = NULL;
179 acip_transport_t *transport_snapshot = NULL;
181 mutex_lock(&client->client_state_mutex);
182 if (client->socket != INVALID_SOCKET_VALUE) {
183 socket_snapshot = client->socket;
184 if (client->crypto_initialized) {
188 mutex_unlock(&client->client_state_mutex);
191 mutex_lock(&client->send_mutex);
192 transport_snapshot = client->transport;
193 mutex_unlock(&client->send_mutex);
198 log_warn(
"Disconnecting client %u due to protocol violation: %s", client_id, reason_str);
200 if (socket_snapshot != INVALID_SOCKET_VALUE) {
203 mutex_lock(&client->send_mutex);
205 asciichat_error_t log_result =
206 log_network_message(socket_snapshot, (
const struct crypto_context_t *)crypto_ctx, LOG_ERROR,
207 REMOTE_LOG_DIRECTION_SERVER_TO_CLIENT,
"Protocol violation: %s", reason_str);
208 if (log_result != ASCIICHAT_OK) {
209 log_warn(
"Failed to send remote log to client %u: %s", client_id, asciichat_error_string(log_result));
212 asciichat_error_t send_result =
packet_send_error(socket_snapshot, crypto_ctx, ERROR_NETWORK_PROTOCOL, reason_str);
213 if (send_result != ASCIICHAT_OK) {
214 log_warn(
"Failed to send error packet to client %u: %s", client_id, asciichat_error_string(send_result));
217 mutex_unlock(&client->send_mutex);
218 }
else if (transport_snapshot) {
220 log_debug(
"Sending error to WebSocket client %u via transport", client_id);
221 acip_send_error(transport_snapshot, ERROR_NETWORK_PROTOCOL, reason_str);
226 log_debug(
"Setting active=false in disconnect_client_for_bad_data (client_id=%u, reason=%s)", client_id, reason_str);
227 atomic_store(&client->active,
false);
228 atomic_store(&client->shutting_down,
true);
229 atomic_store(&client->send_thread_running,
false);
230 atomic_store(&client->video_render_thread_running,
false);
231 atomic_store(&client->audio_render_thread_running,
false);
233 if (client->audio_queue) {
237 mutex_lock(&client->client_state_mutex);
238 if (client->socket != INVALID_SOCKET_VALUE) {
239 socket_shutdown(client->socket, 2);
240 socket_close(client->socket);
241 client->socket = INVALID_SOCKET_VALUE;
243 mutex_unlock(&client->client_state_mutex);
289 VALIDATE_PACKET_SIZE(client, data, len,
sizeof(client_info_packet_t),
"CLIENT_JOIN");
291 const client_info_packet_t *join_info = (
const client_info_packet_t *)data;
294 if (join_info->display_name[0] ==
'\0') {
299 uint32_t capabilities = NET_TO_HOST_U32(join_info->capabilities);
302 const uint32_t VALID_CAP_MASK = CLIENT_CAP_VIDEO | CLIENT_CAP_AUDIO | CLIENT_CAP_COLOR | CLIENT_CAP_STRETCH;
303 VALIDATE_CAPABILITY_FLAGS(client, capabilities, VALID_CAP_MASK,
"CLIENT_JOIN");
306 VALIDATE_FLAGS_MASK(client, capabilities, VALID_CAP_MASK,
"CLIENT_JOIN");
308 SAFE_STRNCPY(client->display_name, join_info->display_name, MAX_DISPLAY_NAME_LEN - 1);
310 client->can_send_video = (capabilities & CLIENT_CAP_VIDEO) != 0;
311 client->can_send_audio = (capabilities & CLIENT_CAP_AUDIO) != 0;
312 client->wants_stretch = (capabilities & CLIENT_CAP_STRETCH) != 0;
314 log_info(
"Client %u joined: %s (video=%d, audio=%d, stretch=%d)", atomic_load(&client->client_id),
315 client->display_name, client->can_send_video, client->can_send_audio, client->wants_stretch);
318 if (client->socket != INVALID_SOCKET_VALUE) {
319 log_info_client(client,
"Joined as '%s' (video=%s, audio=%s)", client->display_name,
320 client->can_send_video ?
"yes" :
"no", client->can_send_audio ?
"yes" :
"no");
361 if (len !=
sizeof(protocol_version_packet_t)) {
363 sizeof(protocol_version_packet_t));
367 const protocol_version_packet_t *version = (
const protocol_version_packet_t *)data;
368 uint16_t client_major = NET_TO_HOST_U16(version->protocol_version);
369 uint16_t client_minor = NET_TO_HOST_U16(version->protocol_revision);
372 if (client_major != PROTOCOL_VERSION_MAJOR) {
373 log_warn(
"Client %u protocol version mismatch: client=%u.%u, server=%u.%u", atomic_load(&client->client_id),
374 client_major, client_minor, PROTOCOL_VERSION_MAJOR, PROTOCOL_VERSION_MINOR);
377 }
else if (client_minor != PROTOCOL_VERSION_MINOR) {
378 log_info(
"Client %u has different protocol revision: client=%u.%u, server=%u.%u", atomic_load(&client->client_id),
379 client_major, client_minor, PROTOCOL_VERSION_MAJOR, PROTOCOL_VERSION_MINOR);
383 for (
size_t i = 0; i <
sizeof(version->reserved); i++) {
384 if (version->reserved[i] != 0) {
385 log_warn(
"Client %u sent non-zero reserved bytes in PROTOCOL_VERSION packet", atomic_load(&client->client_id));
392 if (ACIP_CRYPTO_HAS_ENCRYPT(version->supports_encryption)) {
393 log_debug(
"Client %u supports encryption", atomic_load(&client->client_id));
395 if (version->compression_algorithms != 0) {
396 log_debug(
"Client %u supports compression: 0x%02x", atomic_load(&client->client_id),
397 version->compression_algorithms);
399 if (version->feature_flags != 0) {
400 uint16_t feature_flags = NET_TO_HOST_U16(version->feature_flags);
401 log_debug(
"Client %u supports features: 0x%04x", atomic_load(&client->client_id), feature_flags);
437 uint32_t client_id = atomic_load(&client->client_id);
441 log_info(
"Client %u sent leave notification (no reason)", client_id);
442 }
else if (len <= 256) {
445 SET_ERRNO(ERROR_INVALID_STATE,
"Client %u sent leave notification with non-zero length but NULL data", client_id);
449 char reason[257] = {0};
450 memcpy(reason, data, len);
454 bool all_printable =
true;
455 for (
size_t i = 0; i < len; i++) {
456 uint8_t c = (uint8_t)reason[i];
457 if (c < 32 && c !=
'\t' && c !=
'\n') {
458 all_printable =
false;
464 log_info(
"Client %u sent leave notification: %s", client_id, reason);
466 log_info(
"Client %u sent leave notification (reason contains non-printable characters)", client_id);
470 log_warn(
"Client %u sent oversized leave reason (%zu bytes, max 256)", client_id, len);
475 log_debug(
"Setting active=false in handle_client_leave_packet (client_id=%u)", client_id);
476 atomic_store(&client->active,
false);
519 VALIDATE_PACKET_SIZE(client, data, len,
sizeof(uint32_t),
"STREAM_START");
521 uint32_t stream_type_net;
522 memcpy(&stream_type_net, data,
sizeof(uint32_t));
523 uint32_t stream_type = NET_TO_HOST_U32(stream_type_net);
526 const uint32_t VALID_STREAM_MASK = STREAM_TYPE_VIDEO | STREAM_TYPE_AUDIO;
527 VALIDATE_CAPABILITY_FLAGS(client, stream_type, VALID_STREAM_MASK,
"STREAM_START");
530 VALIDATE_FLAGS_MASK(client, stream_type, VALID_STREAM_MASK,
"STREAM_START");
532 if (stream_type & STREAM_TYPE_VIDEO) {
533 atomic_store(&client->is_sending_video,
true);
535 if (stream_type & STREAM_TYPE_AUDIO) {
536 atomic_store(&client->is_sending_audio,
true);
539 if (!client->opus_decoder) {
541 if (client->opus_decoder) {
542 log_info(
"Client %u: Opus decoder created (48kHz)", atomic_load(&client->client_id));
544 log_error(
"Client %u: Failed to create Opus decoder", atomic_load(&client->client_id));
549 if (stream_type & STREAM_TYPE_VIDEO) {
550 log_info(
"Client %u announced video stream (waiting for first frame)", atomic_load(&client->client_id));
552 if (stream_type & STREAM_TYPE_AUDIO) {
553 log_info(
"Client %u started audio stream", atomic_load(&client->client_id));
557 const char *streams = (stream_type & STREAM_TYPE_VIDEO) && (stream_type & STREAM_TYPE_AUDIO)
559 : ((stream_type & STREAM_TYPE_VIDEO) ?
"video" :
"audio");
561 if (client->socket != INVALID_SOCKET_VALUE) {
562 log_info_client(client,
"Stream started: %s", streams);
602 VALIDATE_PACKET_SIZE(client, data, len,
sizeof(uint32_t),
"STREAM_STOP");
604 uint32_t stream_type_net;
605 memcpy(&stream_type_net, data,
sizeof(uint32_t));
606 uint32_t stream_type = NET_TO_HOST_U32(stream_type_net);
609 const uint32_t VALID_STREAM_MASK = STREAM_TYPE_VIDEO | STREAM_TYPE_AUDIO;
610 VALIDATE_CAPABILITY_FLAGS(client, stream_type, VALID_STREAM_MASK,
"STREAM_STOP");
613 VALIDATE_FLAGS_MASK(client, stream_type, VALID_STREAM_MASK,
"STREAM_STOP");
615 if (stream_type & STREAM_TYPE_VIDEO) {
616 atomic_store(&client->is_sending_video,
false);
618 if (stream_type & STREAM_TYPE_AUDIO) {
619 atomic_store(&client->is_sending_audio,
false);
622 if (stream_type & STREAM_TYPE_VIDEO) {
623 log_info(
"Client %u stopped video stream", atomic_load(&client->client_id));
625 if (stream_type & STREAM_TYPE_AUDIO) {
626 log_info(
"Client %u stopped audio stream", atomic_load(&client->client_id));
630 const char *streams = (stream_type & STREAM_TYPE_VIDEO) && (stream_type & STREAM_TYPE_AUDIO)
632 : ((stream_type & STREAM_TYPE_VIDEO) ?
"video" :
"audio");
634 if (client->socket != INVALID_SOCKET_VALUE) {
635 log_info_client(client,
"Stream stopped: %s", streams);
647 mutex_lock(&client->send_mutex);
648 if (atomic_load(&client->shutting_down) || !client->transport) {
649 mutex_unlock(&client->send_mutex);
652 acip_transport_t *pong_transport = client->transport;
653 mutex_unlock(&client->send_mutex);
657 if (pong_result != ASCIICHAT_OK) {
658 SET_ERRNO(ERROR_NETWORK,
"Failed to send PONG response to client %u: %s", atomic_load(&client->client_id),
659 asciichat_error_string(pong_result));
738 log_info(
"RECV_IMAGE_FRAME: client_id=%u, len=%zu", atomic_load(&client->client_id), len);
740 if (!data || len <
sizeof(uint32_t) * 2) {
744 bool was_sending_video = atomic_load(&client->is_sending_video);
745 if (!was_sending_video) {
748 if (atomic_compare_exchange_strong(&client->is_sending_video, &was_sending_video,
true)) {
749 log_info(
"Client %u auto-enabled video stream (received IMAGE_FRAME)", atomic_load(&client->client_id));
751 if (client->socket != INVALID_SOCKET_VALUE) {
752 log_info_client(client,
"First video frame received - streaming active");
758 mutex_lock(&client->client_state_mutex);
759 client->frames_received_logged++;
760 if (client->frames_received_logged % 25000 == 0) {
763 log_debug(
"Client %u has sent %u IMAGE_FRAME packets (%s)", atomic_load(&client->client_id),
764 client->frames_received_logged, pretty);
766 mutex_unlock(&client->client_state_mutex);
770 uint32_t img_width_net, img_height_net;
771 memcpy(&img_width_net, data,
sizeof(uint32_t));
772 memcpy(&img_height_net, (
char *)data +
sizeof(uint32_t),
sizeof(uint32_t));
773 uint32_t img_width = NET_TO_HOST_U32(img_width_net);
774 uint32_t img_height = NET_TO_HOST_U32(img_height_net);
776 log_debug(
"IMAGE_FRAME packet: width=%u, height=%u, payload_len=%zu", img_width, img_height, len);
780 log_error(
"IMAGE_FRAME validation failed for dimensions: %u x %u", img_width, img_height);
787 if (
image_calc_rgb_size((
size_t)img_width, (
size_t)img_height, &rgb_size) != ASCIICHAT_OK) {
799 if (rgb_size > SIZE_MAX - FRAME_HEADER_SIZE_LEGACY) {
805 size_t expected_size = FRAME_HEADER_SIZE_LEGACY + rgb_size;
807 if (len != expected_size) {
814 if (validate_result != ASCIICHAT_OK) {
819 void *rgb_data = (
char *)data + FRAME_HEADER_SIZE_LEGACY;
820 size_t rgb_data_size = rgb_size;
821 bool needs_free =
false;
823 if (client->incoming_video_buffer) {
827 if (frame && frame->data) {
831 if (overflow_check != ASCIICHAT_OK) {
832 if (needs_free && rgb_data) {
838 size_t old_packet_size = FRAME_HEADER_SIZE_LEGACY + rgb_data_size;
840 if (old_packet_size <= MAX_FRAME_BUFFER_SIZE) {
841 uint32_t width_net = HOST_TO_NET_U32(img_width);
842 uint32_t height_net = HOST_TO_NET_U32(img_height);
845 memcpy(frame->data, &width_net,
sizeof(uint32_t));
846 memcpy((
char *)frame->data +
sizeof(uint32_t), &height_net,
sizeof(uint32_t));
847 memcpy((
char *)frame->data +
sizeof(uint32_t) * 2, rgb_data, rgb_data_size);
849 frame->size = old_packet_size;
850 frame->width = img_width;
851 frame->height = img_height;
852 frame->capture_timestamp_ns = (uint64_t)time(NULL) * NS_PER_SEC_INT;
853 frame->sequence_number = ++client->frames_received;
856 uint32_t incoming_rgb_hash = 0;
857 for (
size_t i = 0; i < rgb_data_size && i < 1000; i++) {
858 incoming_rgb_hash = (uint32_t)((uint64_t)incoming_rgb_hash * 31 + ((
unsigned char *)rgb_data)[i]);
862 uint32_t client_id = atomic_load(&client->client_id);
863 bool is_new_frame = (incoming_rgb_hash != client->last_received_frame_hash);
866 log_info(
"RECV_FRAME #%u NEW: Client %u size=%zu dims=%ux%u hash=0x%08x (prev=0x%08x)",
867 client->frames_received, client_id, rgb_data_size, img_width, img_height, incoming_rgb_hash,
868 client->last_received_frame_hash);
869 client->last_received_frame_hash = incoming_rgb_hash;
871 log_info(
"RECV_FRAME #%u DUP: Client %u size=%zu dims=%ux%u hash=0x%08x", client->frames_received, client_id,
872 rgb_data_size, img_width, img_height, incoming_rgb_hash);
877 if (needs_free && rgb_data) {
884 log_warn(
"Failed to get write buffer for client %u (frame=%p, frame->data=%p)", atomic_load(&client->client_id),
885 (
void *)frame, frame ? frame->data : NULL);
890 SET_ERRNO(ERROR_INVALID_STATE,
"Client %u has no incoming video buffer!", atomic_load(&client->client_id));
892 log_debug(
"Client %u: ignoring video packet during shutdown", atomic_load(&client->client_id));
897 if (needs_free && rgb_data) {
944 VALIDATE_NOTNULL_DATA(client, data,
"AUDIO");
945 VALIDATE_AUDIO_ALIGNMENT(client, len,
sizeof(
float),
"AUDIO");
946 VALIDATE_AUDIO_STREAM_ENABLED(client,
"AUDIO");
948 int num_samples = (int)(len /
sizeof(
float));
949 VALIDATE_AUDIO_SAMPLE_COUNT(client, num_samples, AUDIO_SAMPLES_PER_PACKET,
"AUDIO");
950 VALIDATE_RESOURCE_INITIALIZED(client, client->incoming_audio_buffer,
"audio buffer");
952 const float *samples = (
const float *)data;
954 if (result != ASCIICHAT_OK) {
955 log_error(
"Failed to write audio samples to buffer: %s", asciichat_error_string(result));
964 log_level_t remote_level = LOG_INFO;
965 remote_log_direction_t direction = REMOTE_LOG_DIRECTION_UNKNOWN;
967 char message[MAX_REMOTE_LOG_MESSAGE_LENGTH + 1] = {0};
969 asciichat_error_t parse_result =
971 if (parse_result != ASCIICHAT_OK) {
976 if (direction != REMOTE_LOG_DIRECTION_CLIENT_TO_SERVER) {
981 const bool truncated = (flags & REMOTE_LOG_FLAG_TRUNCATED) != 0;
982 const char *display_name = client->display_name[0] ? client->display_name :
"(unnamed)";
983 uint32_t client_id = atomic_load(&client->client_id);
986 log_msg(remote_level, __FILE__, __LINE__, __func__,
"[REMOTE CLIENT %u \"%s\"] %s [message truncated]", client_id,
987 display_name, message);
989 log_msg(remote_level, __FILE__, __LINE__, __func__,
"[REMOTE CLIENT %u \"%s\"] %s", client_id, display_name,
1042 log_debug_every(LOG_RATE_DEFAULT,
"Received audio batch packet from client %u (len=%zu, is_sending_audio=%d)",
1043 atomic_load(&client->client_id), len, atomic_load(&client->is_sending_audio));
1045 VALIDATE_NOTNULL_DATA(client, data,
"AUDIO_BATCH");
1046 VALIDATE_MIN_SIZE(client, len,
sizeof(audio_batch_packet_t),
"AUDIO_BATCH");
1047 VALIDATE_AUDIO_STREAM_ENABLED(client,
"AUDIO_BATCH");
1052 if (parse_result != ASCIICHAT_OK) {
1057 uint32_t packet_batch_count = batch_info.
batch_count;
1061 (void)packet_batch_count;
1064 VALIDATE_NONZERO(client, packet_batch_count,
"batch_count",
"AUDIO_BATCH");
1065 VALIDATE_NONZERO(client, total_samples,
"total_samples",
"AUDIO_BATCH");
1067 size_t samples_bytes = 0;
1068 if (safe_size_mul(total_samples,
sizeof(uint32_t), &samples_bytes)) {
1073 size_t expected_size =
sizeof(audio_batch_packet_t) + samples_bytes;
1074 if (len != expected_size) {
1082 const uint32_t MAX_AUDIO_SAMPLES = AUDIO_BATCH_SAMPLES * 2;
1083 if (total_samples > MAX_AUDIO_SAMPLES) {
1089 const uint8_t *samples_ptr = (
const uint8_t *)data +
sizeof(audio_batch_packet_t);
1092 size_t alloc_size = (size_t)total_samples *
sizeof(
float);
1093 float *samples = SAFE_MALLOC(alloc_size,
float *);
1095 SET_ERRNO(ERROR_MEMORY,
"Failed to allocate memory for audio sample conversion");
1101 if (dq_result != ASCIICHAT_OK) {
1107 static int recv_count = 0;
1109 if (recv_count % 100 == 0) {
1110 uint32_t raw0 = bytes_read_u32_unaligned(samples_ptr + 0 *
sizeof(uint32_t));
1111 uint32_t raw1 = bytes_read_u32_unaligned(samples_ptr + 1 *
sizeof(uint32_t));
1112 uint32_t raw2 = bytes_read_u32_unaligned(samples_ptr + 2 *
sizeof(uint32_t));
1113 int32_t scaled0 = (int32_t)NET_TO_HOST_U32(raw0);
1114 int32_t scaled1 = (int32_t)NET_TO_HOST_U32(raw1);
1115 int32_t scaled2 = (int32_t)NET_TO_HOST_U32(raw2);
1116 log_info(
"RECV: network[0]=0x%08x, network[1]=0x%08x, network[2]=0x%08x", raw0, raw1, raw2);
1117 log_info(
"RECV: scaled[0]=%d, scaled[1]=%d, scaled[2]=%d", scaled0, scaled1, scaled2);
1118 log_info(
"RECV: samples[0]=%.6f, samples[1]=%.6f, samples[2]=%.6f", samples[0], samples[1], samples[2]);
1122 if (client->incoming_audio_buffer) {
1123 asciichat_error_t write_result =
audio_ring_buffer_write(client->incoming_audio_buffer, samples, total_samples);
1124 if (write_result != ASCIICHAT_OK) {
1125 log_error(
"Failed to write decoded audio batch to buffer: %s", asciichat_error_string(write_result));
1168 log_debug_every(LOG_RATE_SLOW,
"Received Opus audio batch from client %u (len=%zu)", atomic_load(&client->client_id),
1171 VALIDATE_NOTNULL_DATA(client, data,
"AUDIO_OPUS_BATCH");
1172 VALIDATE_AUDIO_STREAM_ENABLED(client,
"AUDIO_OPUS_BATCH");
1173 VALIDATE_RESOURCE_INITIALIZED(client, client->opus_decoder,
"Opus decoder");
1176 const uint8_t *opus_data = NULL;
1177 size_t opus_size = 0;
1178 const uint16_t *frame_sizes = NULL;
1179 int sample_rate = 0;
1180 int frame_duration = 0;
1181 int frame_count = 0;
1183 asciichat_error_t result =
packet_parse_opus_batch(data, len, &opus_data, &opus_size, &frame_sizes, &sample_rate,
1184 &frame_duration, &frame_count);
1186 if (result != ASCIICHAT_OK) {
1191 VALIDATE_NONZERO(client, frame_count,
"frame_count",
"AUDIO_OPUS_BATCH");
1192 VALIDATE_NONZERO(client, opus_size,
"opus_size",
"AUDIO_OPUS_BATCH");
1195 int samples_per_frame = (sample_rate * frame_duration) / 1000;
1196 VALIDATE_RANGE(client, samples_per_frame, 1, 4096,
"samples_per_frame",
"AUDIO_OPUS_BATCH");
1201#define OPUS_DECODE_STATIC_MAX_SAMPLES (32 * 960)
1204 size_t total_samples = (size_t)samples_per_frame * (
size_t)frame_count;
1205 float *decoded_samples;
1206 bool used_malloc =
false;
1209 decoded_samples = static_decode_buffer;
1212 log_warn(
"Client %u: Large audio batch requires malloc (%zu samples)", atomic_load(&client->client_id),
1214 decoded_samples = SAFE_MALLOC(total_samples *
sizeof(
float),
float *);
1215 if (!decoded_samples) {
1216 SET_ERRNO(ERROR_MEMORY,
"Failed to allocate buffer for Opus decoded samples");
1223 int total_decoded = 0;
1224 size_t opus_offset = 0;
1226 for (
int i = 0; i < frame_count; i++) {
1228 size_t frame_size = (size_t)NET_TO_HOST_U16(frame_sizes[i]);
1231 if (frame_size > 0) {
1232 log_debug_every(LOG_RATE_DEFAULT,
"Client %u: Opus frame %d: size=%zu, first_bytes=[0x%02x,0x%02x,0x%02x,0x%02x]",
1233 atomic_load(&client->client_id), i, frame_size, opus_data[opus_offset] & 0xFF,
1234 frame_size > 1 ? (opus_data[opus_offset + 1] & 0xFF) : 0,
1235 frame_size > 2 ? (opus_data[opus_offset + 2] & 0xFF) : 0,
1236 frame_size > 3 ? (opus_data[opus_offset + 3] & 0xFF) : 0);
1239 if (opus_offset + frame_size > opus_size) {
1240 log_error(
"Client %u: Frame %d size overflow (offset=%zu, frame_size=%zu, total=%zu)",
1241 atomic_load(&client->client_id), i + 1, opus_offset, frame_size, opus_size);
1243 SAFE_FREE(decoded_samples);
1250 if ((
size_t)total_decoded + (
size_t)samples_per_frame > total_samples) {
1251 log_error(
"Client %u: Opus decode would overflow buffer (decoded=%d, frame_samples=%d, max=%zu)",
1252 atomic_load(&client->client_id), total_decoded, samples_per_frame, total_samples);
1254 SAFE_FREE(decoded_samples);
1259 int decoded_count =
opus_codec_decode((opus_codec_t *)client->opus_decoder, &opus_data[opus_offset], frame_size,
1260 &decoded_samples[total_decoded], samples_per_frame);
1262 if (decoded_count < 0) {
1263 log_error(
"Client %u: Opus decoding failed for frame %d/%d (size=%zu)", atomic_load(&client->client_id), i + 1,
1264 frame_count, frame_size);
1266 SAFE_FREE(decoded_samples);
1271 total_decoded += decoded_count;
1272 opus_offset += frame_size;
1275 log_debug_every(LOG_RATE_DEFAULT,
"Client %u: Decoded %d Opus frames -> %d samples", atomic_load(&client->client_id),
1276 frame_count, total_decoded);
1279 static int server_decode_count = 0;
1280 server_decode_count++;
1281 if (total_decoded > 0 && (server_decode_count <= 10 || server_decode_count % 100 == 0)) {
1282 float peak = 0.0f, rms = 0.0f;
1283 for (
int i = 0; i < total_decoded && i < 100; i++) {
1284 float abs_val = fabsf(decoded_samples[i]);
1287 rms += decoded_samples[i] * decoded_samples[i];
1289 rms = sqrtf(rms / (total_decoded > 100 ? 100 : total_decoded));
1291 log_info(
"SERVER OPUS DECODE #%d from client %u: decoded_rms=%.6f, opus_first4=[0x%02x,0x%02x,0x%02x,0x%02x]",
1292 server_decode_count, atomic_load(&client->client_id), rms, opus_size > 0 ? opus_data[0] : 0,
1293 opus_size > 1 ? opus_data[1] : 0, opus_size > 2 ? opus_data[2] : 0, opus_size > 3 ? opus_data[3] : 0);
1299 if (client->incoming_audio_buffer && total_decoded > 0) {
1300 asciichat_error_t result =
audio_ring_buffer_write(client->incoming_audio_buffer, decoded_samples, total_decoded);
1301 if (result != ASCIICHAT_OK) {
1302 log_error(
"Client %u: Failed to write decoded audio to buffer: %d", atomic_load(&client->client_id), result);
1307 SAFE_FREE(decoded_samples);
1330 log_debug_every(LOG_RATE_DEFAULT,
"Received Opus audio from client %u (len=%zu)", atomic_load(&client->client_id),
1333 if (VALIDATE_PACKET_NOT_NULL(client, data,
"AUDIO_OPUS")) {
1343 if (!atomic_load(&client->is_sending_audio)) {
1348 if (!client->opus_decoder) {
1354 const uint8_t *buf = (
const uint8_t *)data;
1355 uint32_t sample_rate_net, frame_duration_net;
1356 memcpy(&sample_rate_net, buf, 4);
1357 memcpy(&frame_duration_net, buf + 4, 4);
1358 uint32_t sample_rate = NET_TO_HOST_U32(sample_rate_net);
1359 uint32_t frame_duration = NET_TO_HOST_U32(frame_duration_net);
1362 const uint8_t *opus_data = buf + 16;
1363 size_t opus_size = len - 16;
1366 if (sample_rate == 0 || sample_rate > 192000) {
1371 if (frame_duration == 0 || frame_duration > 120) {
1377 int samples_per_frame = (int)((sample_rate * frame_duration) / 1000);
1378 if (samples_per_frame <= 0 || samples_per_frame > 5760) {
1384 float decoded_samples[5760];
1386 opus_codec_decode((opus_codec_t *)client->opus_decoder, opus_data, opus_size, decoded_samples, samples_per_frame);
1388 if (decoded_count < 0) {
1389 log_error(
"Client %u: Opus decoding failed (size=%zu)", atomic_load(&client->client_id), opus_size);
1393 log_debug_every(LOG_RATE_VERY_FAST,
"Client %u: Decoded Opus frame -> %d samples", atomic_load(&client->client_id),
1397 if (client->incoming_audio_buffer && decoded_count > 0) {
1398 asciichat_error_t write_result =
1400 if (write_result != ASCIICHAT_OK) {
1401 log_error(
"Failed to write decoded Opus samples to buffer: %s", asciichat_error_string(write_result));
1476 log_debug(
"CLIENT_CAPABILITIES: client_id=%u, data=%p, len=%zu", atomic_load(&client->client_id), data, len);
1478 VALIDATE_PACKET_SIZE(client, data, len,
sizeof(terminal_capabilities_packet_t),
"CLIENT_CAPABILITIES");
1480 const terminal_capabilities_packet_t *caps = (
const terminal_capabilities_packet_t *)data;
1483 uint16_t width = NET_TO_HOST_U16(caps->width);
1484 uint16_t height = NET_TO_HOST_U16(caps->height);
1485 log_error(
"CLIENT_CAPABILITIES: dimensions=%ux%u", width, height);
1487 VALIDATE_NONZERO(client, width,
"width",
"CLIENT_CAPABILITIES");
1488 VALIDATE_NONZERO(client, height,
"height",
"CLIENT_CAPABILITIES");
1489 VALIDATE_RANGE(client, width, 1, 4096,
"width",
"CLIENT_CAPABILITIES");
1490 VALIDATE_RANGE(client, height, 1, 4096,
"height",
"CLIENT_CAPABILITIES");
1493 uint32_t color_level = NET_TO_HOST_U32(caps->color_level);
1494 VALIDATE_RANGE(client, color_level, 0, 3,
"color_level",
"CLIENT_CAPABILITIES");
1497 uint32_t render_mode = NET_TO_HOST_U32(caps->render_mode);
1498 VALIDATE_RANGE(client, render_mode, 0, 2,
"render_mode",
"CLIENT_CAPABILITIES");
1501 uint32_t palette_type = NET_TO_HOST_U32(caps->palette_type);
1502 VALIDATE_RANGE(client, palette_type, 0, 5,
"palette_type",
"CLIENT_CAPABILITIES");
1505 VALIDATE_RANGE(client, caps->desired_fps, 1, 144,
"desired_fps",
"CLIENT_CAPABILITIES");
1507 mutex_lock(&client->client_state_mutex);
1509 atomic_store(&client->width, width);
1510 atomic_store(&client->height, height);
1512 log_debug(
"Client %u dimensions: %ux%u, desired_fps=%u", atomic_load(&client->client_id), client->width,
1513 client->height, caps->desired_fps);
1515 client->terminal_caps.capabilities = NET_TO_HOST_U32(caps->capabilities);
1516 client->terminal_caps.color_level = color_level;
1517 client->terminal_caps.color_count = NET_TO_HOST_U32(caps->color_count);
1518 client->terminal_caps.render_mode = render_mode;
1519 client->terminal_caps.detection_reliable = caps->detection_reliable;
1520 client->terminal_caps.wants_background = (render_mode == RENDER_MODE_BACKGROUND);
1522 SAFE_STRNCPY(client->terminal_caps.term_type, caps->term_type,
sizeof(client->terminal_caps.term_type));
1523 SAFE_STRNCPY(client->terminal_caps.colorterm, caps->colorterm,
sizeof(client->terminal_caps.colorterm));
1525 client->terminal_caps.utf8_support = NET_TO_HOST_U32(caps->utf8_support);
1526 client->terminal_caps.palette_type = palette_type;
1527 SAFE_STRNCPY(client->terminal_caps.palette_custom, caps->palette_custom,
1528 sizeof(client->terminal_caps.palette_custom));
1530 client->terminal_caps.desired_fps = caps->desired_fps;
1533 client->terminal_caps.wants_padding = (caps->wants_padding != 0);
1535 const char *custom_chars =
1536 (client->terminal_caps.palette_type == PALETTE_CUSTOM && client->terminal_caps.palette_custom[0])
1537 ? client->terminal_caps.palette_custom
1541 client->client_palette_chars, &client->client_palette_len,
1542 client->client_luminance_palette) == 0) {
1543 client->client_palette_type = (palette_type_t)client->terminal_caps.palette_type;
1544 client->client_palette_initialized =
true;
1545 log_info(
"Client %d palette initialized: type=%u, %zu chars, utf8=%u", atomic_load(&client->client_id),
1546 client->terminal_caps.palette_type, client->client_palette_len, client->terminal_caps.utf8_support);
1548 SET_ERRNO(ERROR_INVALID_STATE,
"Failed to initialize palette for client %d", atomic_load(&client->client_id));
1549 client->client_palette_initialized =
false;
1552 client->has_terminal_caps =
true;
1554 log_info(
"Client %u capabilities: %ux%u, color_level=%s (%u colors), caps=0x%x, term=%s, colorterm=%s, "
1555 "render_mode=%s, reliable=%s, fps=%u, wants_padding=%d",
1556 atomic_load(&client->client_id), client->width, client->height,
1557 terminal_color_level_name(client->terminal_caps.color_level), client->terminal_caps.color_count,
1558 client->terminal_caps.capabilities, client->terminal_caps.term_type, client->terminal_caps.colorterm,
1559 (client->terminal_caps.render_mode == RENDER_MODE_HALF_BLOCK
1561 : (client->terminal_caps.render_mode == RENDER_MODE_BACKGROUND ?
"background" :
"foreground")),
1562 client->terminal_caps.detection_reliable ?
"yes" :
"no", client->terminal_caps.desired_fps,
1563 client->terminal_caps.wants_padding);
1566 if (client->socket != INVALID_SOCKET_VALUE) {
1567 log_info_client(client,
"Terminal configured: %ux%u, %s, %s mode, %u fps", client->width, client->height,
1568 terminal_color_level_name(client->terminal_caps.color_level),
1569 (client->terminal_caps.render_mode == RENDER_MODE_HALF_BLOCK
1571 : (client->terminal_caps.render_mode == RENDER_MODE_BACKGROUND ?
"background" :
"foreground")),
1572 client->terminal_caps.desired_fps);
1575 mutex_unlock(&client->client_state_mutex);
1613 VALIDATE_PACKET_SIZE(client, data, len,
sizeof(size_packet_t),
"SIZE");
1615 const size_packet_t *size_pkt = (
const size_packet_t *)data;
1618 uint16_t width = NET_TO_HOST_U16(size_pkt->width);
1619 uint16_t height = NET_TO_HOST_U16(size_pkt->height);
1621 VALIDATE_NONZERO(client, width,
"width",
"SIZE");
1622 VALIDATE_NONZERO(client, height,
"height",
"SIZE");
1623 VALIDATE_RANGE(client, width, 1, 4096,
"width",
"SIZE");
1624 VALIDATE_RANGE(client, height, 1, 4096,
"height",
"SIZE");
1626 mutex_lock(&client->client_state_mutex);
1627 client->width = width;
1628 client->height = height;
1629 mutex_unlock(&client->client_state_mutex);
1631 log_info(
"Client %u updated terminal size: %ux%u", atomic_load(&client->client_id), width, height);
1692 int active_count = 0;
1693 for (
int i = 0; i < MAX_CLIENTS; i++) {
1700 server_state_packet_t state;
1701 state.connected_client_count = active_count;
1702 state.active_client_count = active_count;
1703 memset(state.reserved, 0,
sizeof(state.reserved));
1706 server_state_packet_t net_state;
1707 net_state.connected_client_count = HOST_TO_NET_U32(state.connected_client_count);
1708 net_state.active_client_count = HOST_TO_NET_U32(state.active_client_count);
1709 memset(net_state.reserved, 0,
sizeof(net_state.reserved));
1713 mutex_lock(&client->send_mutex);
1715 mutex_unlock(&client->send_mutex);
1717 if (result != ASCIICHAT_OK) {
1718 SET_ERRNO(ERROR_NETWORK,
"Failed to send server state to client %u: %s", client->client_id,
1719 asciichat_error_string(result));
1723 log_debug(
"Sent server state to client %u: %u connected, %u active", client->client_id, state.connected_client_count,
1724 state.active_client_count);
1757 SET_ERRNO(ERROR_INVALID_STATE,
"broadcast_clear_console_to_all_clients() called - unexpected usage");
1758 log_warn(
"CLEAR_CONSOLE is now sent from render threads, not broadcast");
Per-client state management and lifecycle orchestration.
const crypto_context_t * crypto_handshake_get_context(const crypto_handshake_context_t *ctx)
void handle_audio_opus_packet(client_info_t *client, const void *data, size_t len)
Process AUDIO_OPUS packet - decode single Opus frame from client.
void handle_audio_opus_batch_packet(client_info_t *client, const void *data, size_t len)
Process AUDIO_OPUS_BATCH packet - efficient Opus-encoded audio batch from client.
asciichat_error_t audio_ring_buffer_write(audio_ring_buffer_t *rb, const float *data, int samples)
asciichat_error_t audio_parse_batch_header(const void *data, size_t len, audio_batch_info_t *out_batch)
asciichat_error_t audio_dequantize_samples(const uint8_t *samples_ptr, uint32_t total_samples, float *out_samples)
asciichat_error_t acip_send_server_state(acip_transport_t *transport, const server_state_packet_t *state)
void log_msg(log_level_t level, const char *file, int line, const char *func, const char *fmt,...)
asciichat_error_t log_network_message(socket_t sockfd, const struct crypto_context_t *crypto_ctx, log_level_t level, remote_log_direction_t direction, const char *fmt,...)
asciichat_error_t frame_validate_legacy(size_t len, size_t expected_rgb_size)
asciichat_error_t frame_check_size_overflow(size_t header_size, size_t data_size)
opus_codec_t * opus_codec_create_decoder(int sample_rate)
int opus_codec_decode(opus_codec_t *codec, const uint8_t *data, size_t data_len, float *out_samples, int out_num_samples)
asciichat_error_t packet_parse_remote_log(const void *data, size_t len, log_level_t *out_level, remote_log_direction_t *out_direction, uint16_t *out_flags, char *message_buffer, size_t message_buffer_size, size_t *out_message_length)
asciichat_error_t packet_send_error(socket_t sockfd, const crypto_context_t *crypto_ctx, asciichat_error_t error_code, const char *message)
asciichat_error_t packet_parse_opus_batch(const void *packet_data, size_t packet_len, const uint8_t **out_opus_data, size_t *out_opus_size, const uint16_t **out_frame_sizes, int *out_sample_rate, int *out_frame_duration, int *out_frame_count)
void packet_queue_stop(packet_queue_t *queue)
int initialize_client_palette(palette_type_t palette_type, const char *custom_chars, char client_palette_chars[256], size_t *client_palette_len, char client_luminance_palette[256])
asciichat_error_t acip_send_error(acip_transport_t *transport, uint32_t error_code, const char *message)
asciichat_error_t acip_send_pong(acip_transport_t *transport)
atomic_bool g_server_should_exit
Global atomic shutdown flag shared across all threads.
ascii-chat Server Mode Entry Point Header
void handle_client_join_packet(client_info_t *client, const void *data, size_t len)
Process CLIENT_JOIN packet - client announces identity and capabilities.
void handle_pong_packet(client_info_t *client, const void *data, size_t len)
Handle PONG packet - client acknowledged our PING.
void handle_protocol_version_packet(client_info_t *client, const void *data, size_t len)
Process PROTOCOL_VERSION packet - validate protocol compatibility.
void handle_image_frame_packet(client_info_t *client, void *data, size_t len)
Process IMAGE_FRAME packet - store client's video data for rendering.
void handle_audio_batch_packet(client_info_t *client, const void *data, size_t len)
Process AUDIO_BATCH packet - store efficiently batched audio samples.
void handle_size_packet(client_info_t *client, const void *data, size_t len)
Process terminal size update packet - handle client window resize.
void handle_audio_packet(client_info_t *client, const void *data, size_t len)
Process AUDIO packet - store single audio sample batch (legacy format)
void handle_client_leave_packet(client_info_t *client, const void *data, size_t len)
Process CLIENT_LEAVE packet - handle clean client disconnect.
#define OPUS_DECODE_STATIC_MAX_SAMPLES
void handle_stream_stop_packet(client_info_t *client, const void *data, size_t len)
Process STREAM_STOP packet - client requests to halt media transmission.
int send_server_state_to_client(client_info_t *client)
Send current server state to a specific client.
void handle_client_capabilities_packet(client_info_t *client, const void *data, size_t len)
Process CLIENT_CAPABILITIES packet - configure client-specific rendering.
void broadcast_clear_console_to_all_clients(void)
Signal all active clients to clear their displays before next video frame.
void handle_stream_start_packet(client_info_t *client, const void *data, size_t len)
Process STREAM_START packet - client requests to begin media transmission.
void handle_remote_log_packet_from_client(client_info_t *client, const void *data, size_t len)
void disconnect_client_for_bad_data(client_info_t *client, const char *format,...)
void handle_ping_packet(client_info_t *client, const void *data, size_t len)
Handle PING packet - respond with PONG.
Server packet processing and protocol implementation.
client_manager_t g_client_manager
Global client manager singleton - central coordination point.
client_info_t clients[MAX_CLIENTS]
Array of client_info_t structures (backing storage)
int safe_vsnprintf(char *buffer, size_t buffer_size, const char *format, va_list ap)
Safe formatted string printing with va_list.
asciichat_error_t image_validate_buffer_size(size_t requested_size)
asciichat_error_t image_calc_rgb_size(size_t width, size_t height, size_t *out_size)
asciichat_error_t image_validate_dimensions(size_t width, size_t height)
video_frame_t * video_frame_begin_write(video_frame_buffer_t *vfb)
void video_frame_commit(video_frame_buffer_t *vfb)