ascii-chat 0.8.38
Real-time terminal-based video chat with ASCII art conversion
Loading...
Searching...
No Matches
src/server/render.c
Go to the documentation of this file.
1
151#include <stdio.h>
152#include <string.h>
153#include <time.h>
154#include <errno.h>
155#include <math.h>
156
157#include "render.h"
158#include "client.h"
159#include "main.h"
160#include "stream.h"
161#include "protocol.h"
162#include <ascii-chat/common.h>
163#include <ascii-chat/options/options.h>
164#include <ascii-chat/options/rcu.h> // For RCU-based options access
165#include <ascii-chat/platform/abstraction.h>
166#include <ascii-chat/platform/init.h>
167#include <ascii-chat/network/packet_queue.h>
168#include <ascii-chat/util/time.h>
169#include <ascii-chat/audio/mixer.h>
170#include <ascii-chat/audio/audio.h>
171#include <ascii-chat/audio/opus_codec.h>
172#include <ascii-chat/util/format.h>
173#include <ascii-chat/util/fps.h>
174
175/* ============================================================================
176 * Cross-Platform Utility Functions
177 * ============================================================================
178 */
179
230// Removed interruptible_usleep - using regular platform_sleep_us instead
231// Sleep interruption isn't needed for small delays and isn't truly possible anyway
232
233/* ============================================================================
234 * Per-Client Video Rendering Implementation
235 * ============================================================================
236 */
237
339 client_info_t *client = (client_info_t *)arg;
340 if (!client) {
341 log_error("NULL client pointer in video render thread");
342 return NULL;
343 }
344
345 // Take snapshot of client ID and socket at start to avoid race conditions
346 // Use atomic_load for client_id to prevent data races.
347 uint32_t thread_client_id = atomic_load(&client->client_id);
348 socket_t thread_socket = client->socket;
349 bool is_webrtc = (thread_socket == INVALID_SOCKET_VALUE);
350 (void)is_webrtc; // May be unused in release builds
351
352 log_debug("Video render thread: client_id=%u, webrtc=%d", thread_client_id, is_webrtc);
353
354 // Get client's desired FPS from capabilities or use default
355 int client_fps = VIDEO_RENDER_FPS; // Default to 60 FPS
356 // Use snapshot pattern to avoid mutex in render thread
357 bool has_caps = client->has_terminal_caps;
358 int desired_fps = has_caps ? client->terminal_caps.desired_fps : 0;
359 if (has_caps && desired_fps > 0) {
360 client_fps = desired_fps;
361 log_debug("Client %u requested FPS: %d (has_caps=%d, desired_fps=%d)", thread_client_id, client_fps, has_caps,
362 desired_fps);
363 } else {
364 log_debug("Client %u using default FPS: %d (has_caps=%d, desired_fps=%d)", thread_client_id, client_fps, has_caps,
365 desired_fps);
366 }
367
368 int base_frame_interval_ms = 1000 / client_fps;
369 log_debug("Client %u render interval: %dms (%d FPS)", thread_client_id, base_frame_interval_ms, client_fps);
370
371 // FPS tracking for video render thread
372 fps_t video_fps_tracker = {0};
373 fps_init(&video_fps_tracker, client_fps, "SERVER VIDEO");
374
375 // Adaptive sleep for frame rate limiting
376 adaptive_sleep_state_t sleep_state = {0};
377 adaptive_sleep_config_t config = {
378 .baseline_sleep_ns = (uint64_t)(NS_PER_SEC_INT / client_fps), // Dynamic FPS (typically 16.67ms for 60 FPS)
379 .min_speed_multiplier = 1.0, // Constant rate (no slowdown)
380 .max_speed_multiplier = 1.0, // Constant rate (no speedup)
381 .speedup_rate = 0.0, // No adaptive behavior (constant FPS)
382 .slowdown_rate = 0.0 // No adaptive behavior (constant FPS)
383 };
384 adaptive_sleep_init(&sleep_state, &config);
385
386 log_info("Video render loop STARTING for client %u", thread_client_id);
387
388 bool should_continue = true;
389 while (should_continue && !atomic_load(&g_server_should_exit) && !atomic_load(&client->shutting_down)) {
390 log_dev_every(10 * NS_PER_MS_INT, "Video render loop iteration for client %u", thread_client_id);
391
392 // Check for immediate shutdown
393 if (atomic_load(&g_server_should_exit)) {
394 log_debug("Video render thread stopping for client %u (g_server_should_exit)", thread_client_id);
395 break;
396 }
397
398 bool video_running = atomic_load(&client->video_render_thread_running);
399 bool active = atomic_load(&client->active);
400 bool shutting_down = atomic_load(&client->shutting_down);
401
402 should_continue = video_running && active && !shutting_down;
403
404 if (!should_continue) {
405 log_debug("Video render thread stopping for client %u (should_continue=false: video_running=%d, active=%d, "
406 "shutting_down=%d)",
407 thread_client_id, video_running, active, shutting_down);
408 break;
409 }
410
411 // Frame rate limiting using adaptive sleep system
412 // Use queue_depth=0 and target_depth=0 for constant-rate renderer (no backlog management)
413 adaptive_sleep_do(&sleep_state, 0, 0);
414
415 // Capture timestamp for FPS tracking and frame timestamps
416 uint64_t current_time_ns = time_get_ns();
417
418 // Check thread state again before acquiring locks (client might have been destroyed during sleep).
419 should_continue = atomic_load(&client->video_render_thread_running) && atomic_load(&client->active) &&
420 !atomic_load(&client->shutting_down);
421 if (!should_continue) {
422 break;
423 }
424
425 // Optimization: No mutex needed - all fields are atomic or stable.
426 // client_id: atomic_uint - use atomic_load for thread safety
427 // width/height: atomic_ushort - use atomic_load
428 // active: atomic_bool - use atomic_load
429 uint32_t client_id_snapshot = atomic_load(&client->client_id); // Atomic read
430 unsigned short width_snapshot = atomic_load(&client->width); // Atomic read
431 unsigned short height_snapshot = atomic_load(&client->height); // Atomic read
432 bool active_snapshot = atomic_load(&client->active); // Atomic read
433
434 // Check if client is still active after getting snapshot
435 if (!active_snapshot) {
436 break;
437 }
438
439 // Phase 2 IMPLEMENTED: Generate frame specifically for THIS client using snapshot data
440 size_t frame_size = 0;
441
442 // Check if any clients are sending video
443 bool has_video_sources = any_clients_sending_video();
444
445 // DIAGNOSTIC: Track when video sources become available
446 static bool last_has_sources = false;
447 if (has_video_sources != last_has_sources) {
448 log_warn("DIAGNOSTIC: Client %u video sources: %s", thread_client_id,
449 has_video_sources ? "AVAILABLE" : "UNAVAILABLE");
450 last_has_sources = has_video_sources;
451 }
452
453 log_debug_every(5 * NS_PER_MS_INT,
454 "Video render iteration for client %u: has_video_sources=%d, width=%u, height=%u", thread_client_id,
455 has_video_sources, width_snapshot, height_snapshot);
456
457 // Skip frame generation if client dimensions are not yet received (width=0 or height=0)
458 if (width_snapshot == 0 || height_snapshot == 0) {
459 log_dev_every(5 * NS_PER_MS_INT,
460 "Skipping frame generation for client %u: dimensions not yet received (width=%u, height=%u)",
461 thread_client_id, width_snapshot, height_snapshot);
462 continue;
463 }
464
465 if (has_video_sources) {
466 int sources_count = 0; // Track number of video sources in this frame
467
468 // DIAGNOSTIC: Track every frame generation attempt
469 // Use per-client static variables to track frequency independently
470 static uint32_t frame_gen_count = 0;
471 static uint64_t frame_gen_start_time = 0;
472
473 frame_gen_count++;
474 if (frame_gen_count == 1) {
475 frame_gen_start_time = current_time_ns;
476 }
477
478 // Log every 120 attempts (should be ~2 seconds at 60 Hz)
479 if (frame_gen_count % 120 == 0) {
480 uint64_t elapsed_ns = current_time_ns - frame_gen_start_time;
481 double gen_fps = (120.0 / (elapsed_ns / (double)NS_PER_SEC_INT));
482 log_warn("DIAGNOSTIC: Client %u LOOP running at %.1f FPS (120 iterations in %.2fs)", thread_client_id, gen_fps,
483 elapsed_ns / (double)NS_PER_SEC_INT);
484 }
485
486 log_dev_every(5 * NS_PER_MS_INT,
487 "About to call create_mixed_ascii_frame_for_client for client %u with dims %ux%u", thread_client_id,
488 width_snapshot, height_snapshot);
489 char *ascii_frame = create_mixed_ascii_frame_for_client(client_id_snapshot, width_snapshot, height_snapshot,
490 false, &frame_size, NULL, &sources_count);
491
492 // DEBUG: Log frame generation details
493 static uint32_t last_frame_hash = -1; // Initialize to -1 so first frame is always new
494 uint32_t current_frame_hash = 0;
495 bool frame_is_new = false;
496 if (ascii_frame && frame_size > 0) {
497 for (size_t i = 0; i < frame_size && i < 1000; i++) {
498 current_frame_hash = (uint32_t)((uint64_t)current_frame_hash * 31 + ((unsigned char *)ascii_frame)[i]);
499 }
500 if (current_frame_hash != last_frame_hash) {
501 log_info("RENDER_FRAME CHANGE: Client %u frame #%zu sources=%d hash=0x%08x (prev=0x%08x)", thread_client_id,
502 frame_size, sources_count, current_frame_hash, last_frame_hash);
503 last_frame_hash = current_frame_hash;
504 frame_is_new = true;
505 } else {
506 log_dev_every(25000, "RENDER_FRAME DUPLICATE: Client %u frame #%zu sources=%d hash=0x%08x (no change)",
507 thread_client_id, frame_size, sources_count, current_frame_hash);
508 frame_is_new = false;
509 }
510 }
511
512 log_dev_every(5 * NS_PER_MS_INT,
513 "create_mixed_ascii_frame_for_client returned: ascii_frame=%p, frame_size=%zu, sources_count=%d",
514 (void *)ascii_frame, frame_size, sources_count);
515
516 // Phase 2 IMPLEMENTED: Write frame to double buffer (never drops!)
517 if (ascii_frame && frame_size > 0) {
518 log_debug_every(5 * NS_PER_MS_INT, "Buffering frame for client %u (size=%zu)", thread_client_id, frame_size);
519 // GRID LAYOUT CHANGE DETECTION: Store source count with frame
520 // Send thread will compare this with last sent count to detect grid changes
521 atomic_store(&client->last_rendered_grid_sources, sources_count);
522
523 // Use double-buffer system which has its own internal swap_mutex
524 // No external locking needed - the double-buffer is thread-safe by design
525 video_frame_buffer_t *vfb_snapshot = client->outgoing_video_buffer;
526
527 if (vfb_snapshot) {
528 video_frame_t *write_frame = video_frame_begin_write(vfb_snapshot);
529 if (write_frame) {
530 // Copy ASCII frame data to the back buffer (NOT holding rwlock - just double-buffer's internal lock)
531 if (write_frame->data && frame_size <= vfb_snapshot->allocated_buffer_size) {
532 memcpy(write_frame->data, ascii_frame, frame_size);
533 write_frame->size = frame_size;
534 write_frame->capture_timestamp_ns = current_time_ns;
535
536 // Only commit the frame if it's actually NEW (different from last committed frame)
537 // This prevents sending duplicate frames and improves client-side FPS tracking
538 if (frame_is_new) {
539 uint64_t commit_start_ns = time_get_ns();
540 // Commit the frame (swaps buffers atomically using vfb->swap_mutex, NOT rwlock)
541 video_frame_commit(vfb_snapshot);
542 uint64_t commit_end_ns = time_get_ns();
543 char commit_duration_str[32];
544 format_duration_ns((double)(commit_end_ns - commit_start_ns), commit_duration_str,
545 sizeof(commit_duration_str));
546
547 static uint32_t commits_count = 0;
548 static uint64_t commits_start_time = 0;
549 commits_count++;
550 if (commits_count == 1) {
551 commits_start_time = commit_end_ns;
552 }
553 if (commits_count % 10 == 0) {
554 uint64_t elapsed_ns = commit_end_ns - commits_start_time;
555 double commit_fps = (10.0 / (elapsed_ns / (double)NS_PER_SEC_INT));
556 log_warn("DIAGNOSTIC: Client %u UNIQUE frames being sent at %.1f FPS (10 commits counted)",
557 thread_client_id, commit_fps);
558 }
559
560 log_info("[FRAME_COMMIT_TIMING] Client %u frame commit took %s (hash=0x%08x)", thread_client_id,
561 commit_duration_str, current_frame_hash);
562 } else {
563 // Discard duplicate frame by not committing (back buffer is safe to reuse)
564 log_dev_every(25000, "Skipping commit for duplicate frame for client %u (hash=0x%08x)",
565 thread_client_id, current_frame_hash);
566 }
567
568 // Log occasionally for monitoring
569 char pretty_size[64];
570 format_bytes_pretty(frame_size, pretty_size, sizeof(pretty_size));
571
572 // Compute hash of ASCII frame to detect duplicates
573 uint32_t ascii_hash = 0;
574 for (size_t i = 0; i < frame_size && i < 1000; i++) {
575 ascii_hash = (uint32_t)((((uint64_t)ascii_hash << 5) - ascii_hash) + (unsigned char)ascii_frame[i]);
576 }
577 log_dev_every(5 * NS_PER_MS_INT, "Client %u: Rendered ASCII frame size=%s hash=0x%08x sources=%d",
578 thread_client_id, pretty_size, ascii_hash, sources_count);
579
580 } else {
581 log_warn("Frame too large for buffer: %zu > %zu", frame_size, vfb_snapshot->allocated_buffer_size);
582 }
583
584 // FPS tracking - frame successfully generated (handles lag detection and periodic reporting)
585 fps_frame_ns(&video_fps_tracker, current_time_ns, "frame rendered");
586 }
587 }
588
589 SAFE_FREE(ascii_frame);
590 } else {
591 // No frame generated (probably no video sources) - this is normal, no error logging needed
592 log_dev_every(10 * NS_PER_MS_INT, "Per-client render: No video sources available for client %u",
593 client_id_snapshot);
594 }
595 } else {
596 // No video sources - skip frame generation but DON'T update last_render_time
597 // This ensures the next iteration still maintains proper frame timing
598 log_debug("Skipping frame generation for client %u (no video sources)", thread_client_id);
599 }
600 }
601
602#ifdef DEBUG_THREADS
603 log_debug("Video render thread stopped for client %u", thread_client_id);
604#endif
605
606 // Clean up thread-local error context before exit
608
609 return NULL;
610}
611
612/* ============================================================================
613 * Per-Client Audio Rendering Implementation
614 * ============================================================================
615 */
616
724 client_info_t *client = (client_info_t *)arg;
725
726 if (!client) {
727 log_error("Invalid client info in audio render thread");
728 return NULL;
729 }
730
731 // Take snapshot of client ID and display name at start to avoid race conditions
732 // Use atomic_load for client_id to prevent data races.
733 uint32_t thread_client_id = atomic_load(&client->client_id);
734 char thread_display_name[64];
735 bool is_webrtc = (client->socket == INVALID_SOCKET_VALUE);
736 (void)is_webrtc; // May be unused in release builds
737
738 // LOCK OPTIMIZATION: Only need client_state_mutex, not global rwlock
739 // We already have a stable client pointer
740 mutex_lock(&client->client_state_mutex);
741 SAFE_STRNCPY(thread_display_name, client->display_name, sizeof(thread_display_name));
742 mutex_unlock(&client->client_state_mutex);
743
744#ifdef DEBUG_THREADS
745 log_debug("Audio render thread started for client %u (%s), webrtc=%d", thread_client_id, thread_display_name,
746 is_webrtc);
747#endif
748
749 // Mix buffer: up to 960 samples for adaptive reading
750 // Normal: 480 samples = 10ms @ 48kHz
751 // Catchup: 960 samples = 20ms when buffers are filling up
752 float mix_buffer[960];
753
754// Opus frame accumulation buffer (960 samples = 20ms @ 48kHz)
755// Opus requires minimum 480 samples, 960 is optimal for 20ms frames
756#define OPUS_FRAME_SAMPLES 960
757 float opus_frame_buffer[OPUS_FRAME_SAMPLES];
758 int opus_frame_accumulated = 0;
759
760 // Create Opus encoder for this client's audio stream (48kHz, mono, 128kbps, AUDIO mode for music quality)
761 opus_codec_t *opus_encoder = opus_codec_create_encoder(OPUS_APPLICATION_AUDIO, 48000, 128000);
762 if (!opus_encoder) {
763 log_error("Failed to create Opus encoder for audio render thread (client %u)", thread_client_id);
764 return NULL;
765 }
766
767 // FPS tracking for audio render thread
768 fps_t audio_fps_tracker = {0};
769 fps_init(&audio_fps_tracker, AUDIO_RENDER_FPS, "SERVER AUDIO");
770
771 // Adaptive sleep for audio rate limiting at 100 FPS (10ms intervals, 480 samples @ 48kHz)
772 adaptive_sleep_state_t audio_sleep_state = {0};
773 adaptive_sleep_config_t audio_config = {
774 .baseline_sleep_ns = 10 * NS_PER_MS_INT, // 10ms = 100 FPS (480 samples @ 48kHz)
775 .min_speed_multiplier = 1.0, // Constant rate (no slowdown)
776 .max_speed_multiplier = 1.0, // Constant rate (no speedup)
777 .speedup_rate = 0.0, // No adaptive behavior (constant rate)
778 .slowdown_rate = 0.0 // No adaptive behavior (constant rate)
779 };
780 adaptive_sleep_init(&audio_sleep_state, &audio_config);
781
782 // Per-thread counters (NOT static - each thread instance gets its own)
783 int mixer_debug_count = 0;
784 int backpressure_check_counter = 0;
785 int server_audio_frame_count = 0;
786
787 bool should_continue = true;
788 while (should_continue && !atomic_load(&g_server_should_exit) && !atomic_load(&client->shutting_down)) {
789 log_debug_every(LOG_RATE_SLOW, "Audio render loop iteration for client %u", thread_client_id);
790
791 // Check for immediate shutdown
792 if (atomic_load(&g_server_should_exit)) {
793 log_debug("Audio render thread stopping for client %u (g_server_should_exit)", thread_client_id);
794 break;
795 }
796
797 // Check thread state before acquiring any locks to prevent use-after-destroy.
798 // If we acquire locks after client is being destroyed, we'll crash with SIGSEGV
799 should_continue = (((int)atomic_load(&client->audio_render_thread_running) != 0) &&
800 ((int)atomic_load(&client->active) != 0) && !atomic_load(&client->shutting_down));
801
802 if (!should_continue) {
803 log_debug("Audio render thread stopping for client %u (should_continue=false)", thread_client_id);
804 break;
805 }
806
807 if (!g_audio_mixer) {
808 log_dev_every(10 * NS_PER_MS_INT, "Audio render waiting for mixer (client %u)", thread_client_id);
809 // Check shutdown flag while waiting
810 if (atomic_load(&g_server_should_exit))
811 break;
812 platform_sleep_ns(10 * NS_PER_MS_INT);
813 continue;
814 }
815
816 // Optimization: No mutex needed - all fields are atomic or stable.
817 // client_id: atomic_uint - use atomic_load for thread safety
818 // active: atomic_bool - use atomic_load
819 // audio_queue: Assigned once at init and never changes
820 uint32_t client_id_snapshot = atomic_load(&client->client_id); // Atomic read
821 bool active_snapshot = atomic_load(&client->active); // Atomic read
822 packet_queue_t *audio_queue_snapshot = client->audio_queue; // Stable after init
823
824 // Check if client is still active after getting snapshot
825 if (!active_snapshot || !audio_queue_snapshot) {
826 break;
827 }
828
829 // Create mix excluding THIS client's audio using snapshot data
830 START_TIMER("mix_%u", client_id_snapshot);
831
832 // ADAPTIVE READING: Read more samples when we're behind to catch up
833 // Normal: 480 samples per 10ms iteration
834 // When behind: read up to 960 samples to catch up faster
835 // Check source buffer levels to decide
836 int samples_to_read = 480; // Default: 10ms worth
837
838 // Log latency at each stage in the server pipeline
839 if (g_audio_mixer) {
840 // Check source buffer latency for all sources
841 for (int i = 0; i < g_audio_mixer->max_sources; i++) {
842 if (g_audio_mixer->source_ids[i] != 0 && g_audio_mixer->source_ids[i] != client_id_snapshot &&
843 g_audio_mixer->source_buffers[i]) {
844 size_t available = audio_ring_buffer_available_read(g_audio_mixer->source_buffers[i]);
845 float buffer_latency_ms = (float)available / 48.0f; // samples / (48000 / 1000)
846
847 // Log source buffer latency
848 log_dev_every(5 * NS_PER_MS_INT, "LATENCY: Server incoming buffer for client %u: %.1fms (%zu samples)",
849 g_audio_mixer->source_ids[i], buffer_latency_ms, available);
850
851 // If buffer is getting too full, read faster to reduce latency
852 if (available > 1920) { // > 40ms buffered - read faster!
853 samples_to_read = 960; // Double read to catch up (20ms worth)
854 log_dev_every(LOG_RATE_DEFAULT,
855 "LATENCY WARNING: Server buffer too full for client %u: %.1fms, reading double",
856 g_audio_mixer->source_ids[i], buffer_latency_ms);
857 }
858 }
859 }
860
861 // Log outgoing queue latency
862 size_t queue_depth = packet_queue_size(audio_queue_snapshot);
863 float queue_latency_ms = (float)queue_depth * 20.0f; // ~20ms per Opus packet
864 log_dev_every(5 * NS_PER_MS_INT, "LATENCY: Server send queue for client %u: %.1fms (%zu packets)",
865 client_id_snapshot, queue_latency_ms, queue_depth);
866 }
867
868 int samples_mixed = 0;
869 if (GET_OPTION(no_audio_mixer)) {
870 // Disable mixer.h processing: simple mixing without ducking/compression/etc
871 // Just add audio from all sources except this client, no processing
872 SAFE_MEMSET(mix_buffer, samples_to_read * sizeof(float), 0, samples_to_read * sizeof(float));
873
874 if (g_audio_mixer) {
875 int max_samples_in_frame = 0;
876 // Simple mixing: just add all sources except current client
877 for (int i = 0; i < g_audio_mixer->max_sources; i++) {
878 if (g_audio_mixer->source_ids[i] != 0 && g_audio_mixer->source_ids[i] != client_id_snapshot &&
879 g_audio_mixer->source_buffers[i]) {
880 // Read from this source and add to mix buffer
881 float temp_buffer[960]; // Max adaptive read size
882 int samples_read =
883 (int)audio_ring_buffer_read(g_audio_mixer->source_buffers[i], temp_buffer, samples_to_read);
884
885 // Track the maximum samples we got from any source
886 if (samples_read > max_samples_in_frame) {
887 max_samples_in_frame = samples_read;
888 }
889
890 // Add to mix buffer
891 for (int j = 0; j < samples_read; j++) {
892 mix_buffer[j] += temp_buffer[j];
893 }
894 }
895 }
896 samples_mixed = max_samples_in_frame; // Only count samples we actually read
897 }
898
899 log_debug_every(LOG_RATE_DEFAULT,
900 "Audio mixer DISABLED (--no-audio-mixer): simple mixing, samples=%d for client %u", samples_mixed,
901 client_id_snapshot);
902 } else {
903 // Use adaptive sample count in normal mixer mode
904 samples_mixed = mixer_process_excluding_source(g_audio_mixer, mix_buffer, samples_to_read, client_id_snapshot);
905 }
906
907 STOP_TIMER_AND_LOG_EVERY(dev, NS_PER_SEC_INT, 5 * NS_PER_MS_INT, "mix_%u", "Mixer for client %u: took",
908 client_id_snapshot);
909
910 // Debug logging every 100 iterations (disabled - can slow down audio rendering)
911 // log_debug_every(LOG_RATE_SLOW, "Audio render for client %u: samples_mixed=%d", client_id_snapshot,
912 // samples_mixed);
913
914 // DEBUG: Log samples mixed every iteration
915 // NOTE: mixer_debug_count is now per-thread (not static), so each client thread has its own counter
916 mixer_debug_count++;
917 log_dev_every(4500 * US_PER_MS_INT,
918 "Server mixer iteration #%d for client %u: samples_mixed=%d, opus_frame_accumulated=%d/%d",
919 mixer_debug_count, client_id_snapshot, samples_mixed, opus_frame_accumulated, OPUS_FRAME_SAMPLES);
920
921 // Accumulate all samples (including 0 or partial) until we have a full Opus frame
922 // This maintains continuous stream without silence padding
923 START_TIMER("accum_%u", client_id_snapshot);
924
925 int space_available = OPUS_FRAME_SAMPLES - opus_frame_accumulated;
926 int samples_to_copy = (samples_mixed <= space_available) ? samples_mixed : space_available;
927
928 // Only copy if we have samples, otherwise just wait for next frame
929 if (samples_to_copy > 0) {
930 SAFE_MEMCPY(opus_frame_buffer + opus_frame_accumulated,
931 (OPUS_FRAME_SAMPLES - opus_frame_accumulated) * sizeof(float), mix_buffer,
932 samples_to_copy * sizeof(float));
933 opus_frame_accumulated += samples_to_copy;
934 }
935
936 STOP_TIMER_AND_LOG_EVERY(dev, NS_PER_SEC_INT, 2 * NS_PER_MS_INT, "accum_%u", "Accumulate for client %u: took",
937 client_id_snapshot);
938
939 // Only encode and send when we have accumulated a full Opus frame
940 if (opus_frame_accumulated >= OPUS_FRAME_SAMPLES) {
941 // OPTIMIZATION: Don't check queue depth every iteration - it's expensive (requires lock)
942 // Only check periodically every 100 iterations (~0.6s at 172 fps)
943 // NOTE: backpressure_check_counter is now per-thread (not static), so each client thread has its own counter
944 bool apply_backpressure = false;
945
946 if (++backpressure_check_counter >= 100) {
947 backpressure_check_counter = 0;
948 size_t queue_depth = packet_queue_size(audio_queue_snapshot);
949 // Opus frames are produced at ~50 FPS (20ms each), so 50 packets = 1 second
950 // Keep latency bounded to ~1s max in the send queue
951 apply_backpressure = (queue_depth > 50); // > 50 packets = ~1s buffered at 50 FPS
952
953 if (apply_backpressure) {
954 log_warn_every(4500 * US_PER_MS_INT,
955 "Audio backpressure for client %u: queue depth %zu packets (%.1fs buffered)",
956 client_id_snapshot, queue_depth, (float)queue_depth / 50.0f);
957 }
958 }
959
960 if (apply_backpressure) {
961 // Skip this packet to let the queue drain
962 // Reset accumulation buffer so fresh samples can be captured on next iteration.
963 // Without this reset, we'd loop forever with stale audio and no space for new samples
964 opus_frame_accumulated = 0;
965 platform_sleep_ns(5800 * NS_PER_US_INT);
966 continue;
967 }
968
969 // Encode accumulated Opus frame (960 samples = 20ms @ 48kHz)
970 uint8_t opus_buffer[1024]; // Max Opus frame size
971
972 START_TIMER("opus_encode_%u", client_id_snapshot);
973
974 int opus_size =
975 opus_codec_encode(opus_encoder, opus_frame_buffer, OPUS_FRAME_SAMPLES, opus_buffer, sizeof(opus_buffer));
976
977 STOP_TIMER_AND_LOG_EVERY(dev, NS_PER_SEC_INT, 10 * NS_PER_MS_INT, "opus_encode_%u",
978 "Opus encode for client %u: took", client_id_snapshot);
979
980 // DEBUG: Log mix buffer and encoding results to see audio levels being sent
981 {
982 float peak = 0.0f, rms = 0.0f;
983 for (int i = 0; i < OPUS_FRAME_SAMPLES; i++) {
984 float abs_val = fabsf(opus_frame_buffer[i]);
985 if (abs_val > peak)
986 peak = abs_val;
987 rms += opus_frame_buffer[i] * opus_frame_buffer[i];
988 }
989 rms = sqrtf(rms / OPUS_FRAME_SAMPLES);
990 // NOTE: server_audio_frame_count is now per-thread (not static), so each client thread has its own counter
991 server_audio_frame_count++;
992 if (server_audio_frame_count <= 5 || server_audio_frame_count % 20 == 0) {
993 // Log first 4 samples to verify they look like valid audio (not NaN/Inf/garbage)
994 log_dev_every(4500 * US_PER_MS_INT,
995 "Server audio frame #%d for client %u: samples_mixed=%d, Peak=%.6f, RMS=%.6f, opus_size=%d, "
996 "first4=[%.4f,%.4f,%.4f,%.4f]",
997 server_audio_frame_count, client_id_snapshot, samples_mixed, peak, rms, opus_size,
998 opus_frame_buffer[0], opus_frame_buffer[1], opus_frame_buffer[2], opus_frame_buffer[3]);
999 }
1000 }
1001
1002 // Always reset accumulation buffer after attempting to encode - we've consumed these samples
1003 // If we don't reset, new audio samples would be dropped while stale data sits in the buffer
1004 opus_frame_accumulated = 0;
1005
1006 if (opus_size <= 0) {
1007 log_error("Failed to encode audio to Opus for client %u: opus_size=%d", client_id_snapshot, opus_size);
1008 } else {
1009 // Queue Opus-encoded audio for this specific client
1010 START_TIMER("audio_queue_%u", client_id_snapshot);
1011
1012 int result = packet_queue_enqueue(audio_queue_snapshot, PACKET_TYPE_AUDIO_OPUS_BATCH, opus_buffer,
1013 (size_t)opus_size, 0, true);
1014
1015 STOP_TIMER_AND_LOG_EVERY(dev, NS_PER_SEC_INT, 1 * NS_PER_MS_INT, "audio_queue_%u",
1016 "Audio queue for client %u: took", client_id_snapshot);
1017
1018 if (result < 0) {
1019 log_debug("Failed to queue Opus audio for client %u", client_id_snapshot);
1020 } else {
1021 // FPS tracking - audio packet successfully queued (handles lag detection and periodic reporting)
1022 fps_frame_ns(&audio_fps_tracker, time_get_ns(), "audio packet queued");
1023 }
1024 }
1025 // NOTE: opus_frame_accumulated is already reset at line 928 after encode attempt
1026 }
1027
1028 // Audio mixing rate limiting using adaptive sleep system
1029 // Target: 10ms intervals (100 FPS) for 480 samples @ 48kHz
1030 // Use queue_depth=0 and target_depth=0 for constant-rate audio processing
1031 adaptive_sleep_do(&audio_sleep_state, 0, 0);
1032 }
1033
1034#ifdef DEBUG_THREADS
1035 log_debug("Audio render thread stopped for client %u", thread_client_id);
1036#endif
1037
1038 // Clean up Opus encoder
1039 if (opus_encoder) {
1040 opus_codec_destroy(opus_encoder);
1041 }
1042
1043 // Clean up thread-local error context before exit
1045
1046 return NULL;
1047}
1048
1049/* ============================================================================
1050 * Thread Lifecycle Management Functions
1051 * ============================================================================
1052 */
1053
1124int create_client_render_threads(server_context_t *server_ctx, client_info_t *client) {
1125 if (!server_ctx || !client) {
1126 log_error("Cannot create render threads: NULL %s", !server_ctx ? "server_ctx" : "client");
1127 return -1;
1128 }
1129
1130#ifdef DEBUG_THREADS
1131 log_debug("Creating render threads for client %u", client->client_id);
1132#endif
1133
1134 // NOTE: Mutexes are already initialized in add_client() before any threads start
1135 // This prevents race conditions where receive thread tries to use uninitialized mutexes
1136
1137 // Initialize render thread control flags
1138 // IMPORTANT: Set to true BEFORE creating thread to avoid race condition
1139 // where thread starts and immediately exits because flag is false
1140 atomic_store(&client->video_render_thread_running, true);
1141 atomic_store(&client->audio_render_thread_running, true);
1142
1143 // Create video rendering thread (stop_id=2, stop after receive thread)
1144 char thread_name[64];
1145 safe_snprintf(thread_name, sizeof(thread_name), "video_render_%u", client->client_id);
1146 asciichat_error_t video_result = tcp_server_spawn_thread(server_ctx->tcp_server, client->socket,
1147 client_video_render_thread, client, 2, thread_name);
1148 if (video_result != ASCIICHAT_OK) {
1149 // Reset flag since thread creation failed
1150 atomic_store(&client->video_render_thread_running, false);
1151 // Mutexes will be destroyed by remove_client() which called us
1152 return -1;
1153 }
1154
1155 // Create audio rendering thread (stop_id=2, same priority as video)
1156 safe_snprintf(thread_name, sizeof(thread_name), "audio_render_%u", client->client_id);
1157 asciichat_error_t audio_result = tcp_server_spawn_thread(server_ctx->tcp_server, client->socket,
1158 client_audio_render_thread, client, 2, thread_name);
1159 if (audio_result != ASCIICHAT_OK) {
1160 // Clean up video thread (atomic operation, no mutex needed)
1161 atomic_store(&client->video_render_thread_running, false);
1162 // Reset audio flag since thread creation failed
1163 atomic_store(&client->audio_render_thread_running, false);
1164 // tcp_server_stop_client_threads() will be called by remove_client()
1165 // to clean up the video thread we just created
1166 // Mutexes will be destroyed by remove_client() which called us
1167 return -1;
1168 }
1169
1170#ifdef DEBUG_THREADS
1171 log_debug("Created render threads for client %u", client->client_id);
1172#endif
1173
1174 return 0;
1175}
1176
1258void stop_client_render_threads(client_info_t *client) {
1259 if (!client) {
1260 SET_ERRNO(ERROR_INVALID_PARAM, "Client is NULL");
1261 return;
1262 }
1263
1264 log_debug("Stopping render threads for client %u", client->client_id);
1265
1266 // Signal threads to stop (atomic operations, no mutex needed)
1267 atomic_store(&client->video_render_thread_running, false);
1268 atomic_store(&client->audio_render_thread_running, false);
1269
1270 // Wait for threads to finish (deterministic cleanup)
1271 // During shutdown, don't wait forever for threads to join
1272 bool is_shutting_down = atomic_load(&g_server_should_exit);
1273
1274 if (asciichat_thread_is_initialized(&client->video_render_thread)) {
1275 log_debug("Joining video render thread for client %u", client->client_id);
1276 int result;
1277 if (is_shutting_down) {
1278 // During shutdown, don't timeout - wait for thread to exit
1279 // Timeouts mask the real problem: threads that are still running
1280 log_debug("Shutdown mode: joining video render thread for client %u (no timeout)", client->client_id);
1281 result = asciichat_thread_join(&client->video_render_thread, NULL);
1282 if (result != 0) {
1283 log_warn("Video render thread for client %u failed to join during shutdown: %s", client->client_id,
1284 SAFE_STRERROR(result));
1285 }
1286 } else {
1287 log_debug("Calling asciichat_thread_join for video thread of client %u", client->client_id);
1288 result = asciichat_thread_join(&client->video_render_thread, NULL);
1289 log_debug("asciichat_thread_join returned %d for video thread of client %u", result, client->client_id);
1290 }
1291
1292 if (result == 0) {
1293#ifdef DEBUG_THREADS
1294 log_debug("Video render thread joined for client %u", client->client_id);
1295#endif
1296 } else if (result != -2) { // Don't log timeout errors again
1297 if (is_shutting_down) {
1298 log_warn("Failed to join video render thread for client %u during shutdown (continuing): %s", client->client_id,
1299 SAFE_STRERROR(result));
1300 } else {
1301 log_error("Failed to join video render thread for client %u: %s", client->client_id, SAFE_STRERROR(result));
1302 }
1303 }
1304 // Clear thread handle safely using platform abstraction
1305 asciichat_thread_init(&client->video_render_thread);
1306 }
1307
1308 if (asciichat_thread_is_initialized(&client->audio_render_thread)) {
1309 int result;
1310 if (is_shutting_down) {
1311 // During shutdown, don't timeout - wait for thread to exit
1312 // Timeouts mask the real problem: threads that are still running
1313 log_debug("Shutdown mode: joining audio render thread for client %u (no timeout)", client->client_id);
1314 result = asciichat_thread_join(&client->audio_render_thread, NULL);
1315 if (result != 0) {
1316 log_warn("Audio render thread for client %u failed to join during shutdown: %s", client->client_id,
1317 SAFE_STRERROR(result));
1318 }
1319 } else {
1320 result = asciichat_thread_join(&client->audio_render_thread, NULL);
1321 }
1322
1323 if (result == 0) {
1324#ifdef DEBUG_THREADS
1325 log_debug("Audio render thread joined for client %u", client->client_id);
1326#endif
1327 } else if (result != -2) { // Don't log timeout errors again
1328 if (is_shutting_down) {
1329 log_warn("Failed to join audio render thread for client %u during shutdown (continuing): %s", client->client_id,
1330 SAFE_STRERROR(result));
1331 } else {
1332 log_error("Failed to join audio render thread for client %u: %s", client->client_id, SAFE_STRERROR(result));
1333 }
1334 }
1335 // Clear thread handle safely using platform abstraction
1336 asciichat_thread_init(&client->audio_render_thread);
1337 }
1338
1339 // DO NOT destroy the mutex here - client.c will handle it
1340 // mutex_destroy(&client->client_state_mutex);
1341
1342#ifdef DEBUG_THREADS
1343 log_debug("Successfully destroyed render threads for client %u", client->client_id);
1344#endif
1345}
void asciichat_errno_destroy(void)
Per-client state management and lifecycle orchestration.
void fps_frame_ns(fps_t *tracker, uint64_t current_time_ns, const char *context)
Definition fps.c:52
void fps_init(fps_t *tracker, int expected_fps, const char *name)
Definition fps.c:32
int socket_t
size_t audio_ring_buffer_read(audio_ring_buffer_t *rb, float *data, size_t samples)
size_t audio_ring_buffer_available_read(audio_ring_buffer_t *rb)
asciichat_error_t tcp_server_spawn_thread(tcp_server_t *server, socket_t client_socket, void *(*thread_func)(void *), void *thread_arg, int stop_id, const char *thread_name)
int mixer_process_excluding_source(mixer_t *mixer, float *output, int num_samples, uint32_t exclude_client_id)
Definition mixer.c:605
opus_codec_t * opus_codec_create_encoder(opus_application_t application, int sample_rate, int bitrate)
Definition opus_codec.c:18
void opus_codec_destroy(opus_codec_t *codec)
Definition opus_codec.c:215
size_t opus_codec_encode(opus_codec_t *codec, const float *samples, int num_samples, uint8_t *out_data, size_t out_size)
Definition opus_codec.c:97
int packet_queue_enqueue(packet_queue_t *queue, packet_type_t type, const void *data, size_t data_len, uint32_t client_id, bool copy_data)
size_t packet_queue_size(packet_queue_t *queue)
Per-client rendering threads with rate limiting.
#define AUDIO_RENDER_FPS
Definition render.h:26
#define VIDEO_RENDER_FPS
Definition render.h:22
mixer_t *volatile g_audio_mixer
Global audio mixer instance for multi-client audio processing.
atomic_bool g_server_should_exit
Global atomic shutdown flag shared across all threads.
ascii-chat Server Mode Entry Point Header
Server packet processing and protocol implementation.
#define OPUS_FRAME_SAMPLES
void stop_client_render_threads(client_info_t *client)
Stop and cleanup per-client rendering threads.
void * client_video_render_thread(void *arg)
Interruptible sleep function with platform-specific optimizations.
void * client_audio_render_thread(void *arg)
Main audio rendering thread function for individual clients.
int create_client_render_threads(server_context_t *server_ctx, client_info_t *client)
Create and initialize per-client rendering threads.
bool any_clients_sending_video(void)
Check if any connected clients are currently sending video.
Definition stream.c:1305
char * create_mixed_ascii_frame_for_client(uint32_t target_client_id, unsigned short width, unsigned short height, bool wants_stretch, size_t *out_size, bool *out_grid_changed, int *out_sources_count)
Generate personalized ASCII frame for a specific client.
Definition stream.c:955
Multi-client video mixing and ASCII frame generation.
Server context - encapsulates all server state.
tcp_server_t * tcp_server
TCP server managing connections.
int safe_snprintf(char *buffer, size_t buffer_size, const char *format,...)
Safe formatted string printing to buffer.
Definition system.c:456
int asciichat_thread_join(asciichat_thread_t *thread, void **retval)
Definition threading.c:46
void format_bytes_pretty(size_t bytes, char *out, size_t out_capacity)
Definition util/format.c:10
uint64_t time_get_ns(void)
Definition util/time.c:48
int format_duration_ns(double nanoseconds, char *buffer, size_t buffer_size)
Definition util/time.c:275
void adaptive_sleep_init(adaptive_sleep_state_t *state, const adaptive_sleep_config_t *config)
Definition util/time.c:393
void adaptive_sleep_do(adaptive_sleep_state_t *state, size_t queue_depth, size_t target_depth)
Definition util/time.c:465
video_frame_t * video_frame_begin_write(video_frame_buffer_t *vfb)
void video_frame_commit(video_frame_buffer_t *vfb)