ascii-chat 0.6.0
Real-time terminal-based video chat with ASCII art conversion
Loading...
Searching...
No Matches
render.c
Go to the documentation of this file.
1
151#include <stdatomic.h>
152#include <stdio.h>
153#include <string.h>
154#include <time.h>
155#include <errno.h>
156#include <math.h>
157
158#include "render.h"
159#include "client.h"
160#include "main.h"
161#include "stream.h"
162#include "protocol.h"
163#include "common.h"
164#include "options/options.h"
165#include "options/rcu.h" // For RCU-based options access
166#include "platform/abstraction.h"
167#include "platform/init.h"
168#include "network/packet_queue.h"
169#include "util/time.h"
170#include "audio/mixer.h"
171#include "audio/audio.h"
172#include "audio/opus_codec.h"
173#include "util/format.h"
174#include "util/fps.h"
175
176// Global client manager lock for thread-safe access
178
186extern atomic_bool g_server_should_exit;
187
194extern mixer_t *g_audio_mixer;
195
196/* ============================================================================
197 * Cross-Platform Utility Functions
198 * ============================================================================
199 */
200
251// Removed interruptible_usleep - using regular platform_sleep_usec instead
252// Sleep interruption isn't needed for small delays and isn't truly possible anyway
253
254/* ============================================================================
255 * Per-Client Video Rendering Implementation
256 * ============================================================================
257 */
258
360 client_info_t *client = (client_info_t *)arg;
361 if (!client) {
362 log_error("NULL client pointer in video render thread");
363 return NULL;
364 }
365
366 // Take snapshot of client ID and socket at start to avoid race conditions
367 // CRITICAL: Use atomic_load for client_id to prevent data races
368 uint32_t thread_client_id = atomic_load(&client->client_id);
369 socket_t thread_socket = client->socket;
370
371 log_debug("Video render thread: client_id=%u", thread_client_id);
372
373 if (thread_socket == INVALID_SOCKET_VALUE) {
374 log_error("Invalid socket in video render thread for client %u", thread_client_id);
375 return NULL;
376 }
377
378 // Get client's desired FPS from capabilities or use default
379 int client_fps = VIDEO_RENDER_FPS; // Default to 60 FPS
380 // Use snapshot pattern to avoid mutex in render thread
381 bool has_caps = client->has_terminal_caps;
382 int desired_fps = has_caps ? client->terminal_caps.desired_fps : 0;
383 if (has_caps && desired_fps > 0) {
384 client_fps = desired_fps;
385 log_debug("Client %u requested FPS: %d (has_caps=%d, desired_fps=%d)", thread_client_id, client_fps, has_caps,
386 desired_fps);
387 } else {
388 log_debug("Client %u using default FPS: %d (has_caps=%d, desired_fps=%d)", thread_client_id, client_fps, has_caps,
389 desired_fps);
390 }
391
392 int base_frame_interval_ms = 1000 / client_fps;
393 log_debug("Client %u render interval: %dms (%d FPS)", thread_client_id, base_frame_interval_ms, client_fps);
394 struct timespec last_render_time;
395 (void)clock_gettime(CLOCK_MONOTONIC, &last_render_time);
396
397 // FPS tracking for video render thread
398 fps_t video_fps_tracker = {0};
399 fps_init(&video_fps_tracker, client_fps, "SERVER VIDEO");
400
401 log_info("Video render loop STARTING for client %u", thread_client_id);
402
403 bool should_continue = true;
404 while (should_continue && !atomic_load(&g_server_should_exit) && !atomic_load(&client->shutting_down)) {
405 log_debug_every(LOG_RATE_FAST, "Video render loop iteration for client %u", thread_client_id);
406
407 // Check for immediate shutdown
408 if (atomic_load(&g_server_should_exit)) {
409 log_debug("Video render thread stopping for client %u (g_server_should_exit)", thread_client_id);
410 break;
411 }
412
413 bool video_running = atomic_load(&client->video_render_thread_running);
414 bool active = atomic_load(&client->active);
415 bool shutting_down = atomic_load(&client->shutting_down);
416
417 should_continue = video_running && active && !shutting_down;
418
419 if (!should_continue) {
420 log_debug("Video render thread stopping for client %u (should_continue=false: video_running=%d, active=%d, "
421 "shutting_down=%d)",
422 thread_client_id, video_running, active, shutting_down);
423 break;
424 }
425
426 // Rate limiting with better shutdown responsiveness
427 struct timespec current_time;
428 (void)clock_gettime(CLOCK_MONOTONIC, &current_time);
429
430 // Use microseconds for precision - avoid integer division precision loss
431 int64_t elapsed_us = ((int64_t)(current_time.tv_sec - last_render_time.tv_sec) * 1000000LL) +
432 ((int64_t)(current_time.tv_nsec - last_render_time.tv_nsec) / 1000);
433 int64_t base_frame_interval_us = (int64_t)base_frame_interval_ms * 1000;
434
435 if (elapsed_us < base_frame_interval_us) {
436 long sleep_us = (long)(base_frame_interval_us - elapsed_us);
437 // Sleep in small chunks for reasonable shutdown response (balance performance vs responsiveness)
438 const long max_sleep_chunk = 5000; // 5ms chunks for good shutdown response without destroying performance
439 while (sleep_us > 0 && !atomic_load(&g_server_should_exit)) {
440 long chunk = sleep_us > max_sleep_chunk ? max_sleep_chunk : sleep_us;
441 platform_sleep_usec(chunk);
442 sleep_us -= chunk;
443
444 // Check all shutdown conditions after each tiny sleep
445 if (atomic_load(&g_server_should_exit))
446 break;
447
448 bool still_running = atomic_load(&client->video_render_thread_running) && atomic_load(&client->active);
449 if (!still_running)
450 break;
451 }
452 // Fall through to render frame after sleeping
453 }
454
455 // CRITICAL: Check thread state again BEFORE acquiring locks (client might have been destroyed during sleep)
456 should_continue = atomic_load(&client->video_render_thread_running) && atomic_load(&client->active) &&
457 !atomic_load(&client->shutting_down);
458 if (!should_continue) {
459 break;
460 }
461
462 // CRITICAL OPTIMIZATION: No mutex needed - all fields are atomic or stable!
463 // client_id: atomic_uint - use atomic_load for thread safety
464 // width/height: atomic_ushort - use atomic_load
465 // active: atomic_bool - use atomic_load
466 uint32_t client_id_snapshot = atomic_load(&client->client_id); // Atomic read
467 unsigned short width_snapshot = atomic_load(&client->width); // Atomic read
468 unsigned short height_snapshot = atomic_load(&client->height); // Atomic read
469 bool active_snapshot = atomic_load(&client->active); // Atomic read
470
471 // Check if client is still active after getting snapshot
472 if (!active_snapshot) {
473 break;
474 }
475
476 // Phase 2 IMPLEMENTED: Generate frame specifically for THIS client using snapshot data
477 size_t frame_size = 0;
478
479 // Check if any clients are sending video
480 bool has_video_sources = any_clients_sending_video();
481 log_debug("Video render iteration for client %u: has_video_sources=%d", thread_client_id, has_video_sources);
482
483 if (!has_video_sources) {
484 // No video sources - skip frame generation but DON'T update last_render_time
485 // This ensures the next iteration still maintains proper frame timing
486 // DON'T continue here - let the loop update last_render_time at the bottom
487 // Fall through to update last_render_time at bottom of loop
488 log_debug("Skipping frame generation for client %u (no video sources)", thread_client_id);
489 goto skip_frame_generation;
490 }
491
492 int sources_count = 0; // Track number of video sources in this frame
493
494 log_debug("About to call create_mixed_ascii_frame_for_client for client %u", thread_client_id);
495 char *ascii_frame = create_mixed_ascii_frame_for_client(client_id_snapshot, width_snapshot, height_snapshot, false,
496 &frame_size, NULL, &sources_count);
497 log_debug("create_mixed_ascii_frame_for_client returned: ascii_frame=%p, frame_size=%zu, sources_count=%d",
498 (void *)ascii_frame, frame_size, sources_count);
499
500 // Phase 2 IMPLEMENTED: Write frame to double buffer (never drops!)
501 if (ascii_frame && frame_size > 0) {
502 log_debug("Buffering frame for client %u (size=%zu)", thread_client_id, frame_size);
503 // GRID LAYOUT CHANGE DETECTION: Store source count with frame
504 // Send thread will compare this with last sent count to detect grid changes
505 atomic_store(&client->last_rendered_grid_sources, sources_count);
506
507 // Use double-buffer system which has its own internal swap_mutex
508 // No external locking needed - the double-buffer is thread-safe by design
509 video_frame_buffer_t *vfb_snapshot = client->outgoing_video_buffer;
510
511 if (vfb_snapshot) {
512 video_frame_t *write_frame = video_frame_begin_write(vfb_snapshot);
513 if (write_frame) {
514 // Copy ASCII frame data to the back buffer (NOT holding rwlock - just double-buffer's internal lock)
515 if (write_frame->data && frame_size <= vfb_snapshot->allocated_buffer_size) {
516 memcpy(write_frame->data, ascii_frame, frame_size);
517 write_frame->size = frame_size;
518 write_frame->capture_timestamp_us =
519 (uint64_t)current_time.tv_sec * 1000000 + (uint64_t)current_time.tv_nsec / 1000;
520
521 // Commit the frame (swaps buffers atomically using vfb->swap_mutex, NOT rwlock)
522 video_frame_commit(vfb_snapshot);
523
524 // Log occasionally for monitoring
525 char pretty_size[64];
526 format_bytes_pretty(frame_size, pretty_size, sizeof(pretty_size));
527
528 } else {
529 log_warn("Frame too large for buffer: %zu > %zu", frame_size, vfb_snapshot->allocated_buffer_size);
530 }
531
532 // FPS tracking - frame successfully generated (handles lag detection and periodic reporting)
533 fps_frame(&video_fps_tracker, &current_time, "frame rendered");
534 }
535 }
536
537 SAFE_FREE(ascii_frame);
538 } else {
539 // No frame generated (probably no video sources) - this is normal, no error logging needed
540 log_debug_every(LOG_RATE_NORMAL, "Per-client render: No video sources available for client %u",
541 client_id_snapshot);
542 }
543
544 skip_frame_generation:
545 // Update last_render_time to maintain consistent frame timing
546 // CRITICAL: Use the current_time captured at the START of this iteration for rate limiting.
547 // This ensures we maintain the target frame rate based on when we STARTED processing,
548 // not when we FINISHED. This prevents timing drift from frame generation overhead.
549 last_render_time = current_time;
550 }
551
552#ifdef DEBUG_THREADS
553 log_debug("Video render thread stopped for client %u", thread_client_id);
554#endif
555
556 // Clean up thread-local error context before exit
558
559 return NULL;
560}
561
562/* ============================================================================
563 * Per-Client Audio Rendering Implementation
564 * ============================================================================
565 */
566
674 client_info_t *client = (client_info_t *)arg;
675
676 if (!client || client->socket == INVALID_SOCKET_VALUE) {
677 log_error("Invalid client info in audio render thread");
678 return NULL;
679 }
680
681 // Take snapshot of client ID and display name at start to avoid race conditions
682 // CRITICAL: Use atomic_load for client_id to prevent data races
683 uint32_t thread_client_id = atomic_load(&client->client_id);
684 char thread_display_name[64];
685
686 // LOCK OPTIMIZATION: Only need client_state_mutex, not global rwlock
687 // We already have a stable client pointer
689 SAFE_STRNCPY(thread_display_name, client->display_name, sizeof(thread_display_name));
691
692#ifdef DEBUG_THREADS
693 log_debug("Audio render thread started for client %u (%s)", thread_client_id, thread_display_name);
694#endif
695
696 // Mix buffer: up to 960 samples for adaptive reading
697 // Normal: 480 samples = 10ms @ 48kHz
698 // Catchup: 960 samples = 20ms when buffers are filling up
699 float mix_buffer[960];
700
701// Opus frame accumulation buffer (960 samples = 20ms @ 48kHz)
702// Opus requires minimum 480 samples, 960 is optimal for 20ms frames
703#define OPUS_FRAME_SAMPLES 960
704 float opus_frame_buffer[OPUS_FRAME_SAMPLES];
705 int opus_frame_accumulated = 0;
706
707 // Create Opus encoder for this client's audio stream (48kHz, mono, 128kbps, AUDIO mode for music quality)
708 opus_codec_t *opus_encoder = opus_codec_create_encoder(OPUS_APPLICATION_AUDIO, 48000, 128000);
709 if (!opus_encoder) {
710 log_error("Failed to create Opus encoder for audio render thread (client %u)", thread_client_id);
711 return NULL;
712 }
713
714 // FPS tracking for audio render thread
715 fps_t audio_fps_tracker = {0};
716 fps_init(&audio_fps_tracker, AUDIO_RENDER_FPS, "SERVER AUDIO");
717 struct timespec last_packet_send_time; // For time-based packet transmission (every 20ms)
718 (void)clock_gettime(CLOCK_MONOTONIC, &last_packet_send_time);
719
720 // Per-thread counters (NOT static - each thread instance gets its own)
721 int mixer_debug_count = 0;
722 int backpressure_check_counter = 0;
723 int server_audio_frame_count = 0;
724
725 bool should_continue = true;
726 while (should_continue && !atomic_load(&g_server_should_exit) && !atomic_load(&client->shutting_down)) {
727 // Capture loop start time for precise timing
728 struct timespec loop_start_time;
729 (void)clock_gettime(CLOCK_MONOTONIC, &loop_start_time);
730
731 log_debug_every(LOG_RATE_SLOW, "Audio render loop iteration for client %u", thread_client_id);
732
733 // Check for immediate shutdown
734 if (atomic_load(&g_server_should_exit)) {
735 log_debug("Audio render thread stopping for client %u (g_server_should_exit)", thread_client_id);
736 break;
737 }
738
739 // CRITICAL: Check thread state BEFORE acquiring any locks to prevent use-after-destroy
740 // If we acquire locks after client is being destroyed, we'll crash with SIGSEGV
741 should_continue = (((int)atomic_load(&client->audio_render_thread_running) != 0) &&
742 ((int)atomic_load(&client->active) != 0) && !atomic_load(&client->shutting_down));
743
744 if (!should_continue) {
745 log_debug("Audio render thread stopping for client %u (should_continue=false)", thread_client_id);
746 break;
747 }
748
749 if (!g_audio_mixer) {
750 log_info_every(LOG_RATE_FAST, "Audio render waiting for mixer (client %u)", thread_client_id);
751 // Check shutdown flag while waiting
752 if (atomic_load(&g_server_should_exit))
753 break;
754 platform_sleep_usec(10000);
755 continue;
756 }
757
758 // CRITICAL OPTIMIZATION: No mutex needed - all fields are atomic or stable!
759 // client_id: atomic_uint - use atomic_load for thread safety
760 // active: atomic_bool - use atomic_load
761 // audio_queue: Assigned once at init and never changes
762 uint32_t client_id_snapshot = atomic_load(&client->client_id); // Atomic read
763 bool active_snapshot = atomic_load(&client->active); // Atomic read
764 packet_queue_t *audio_queue_snapshot = client->audio_queue; // Stable after init
765
766 // Check if client is still active after getting snapshot
767 if (!active_snapshot || !audio_queue_snapshot) {
768 break;
769 }
770
771 // Create mix excluding THIS client's audio using snapshot data
772 struct timespec mix_start_time;
773 (void)clock_gettime(CLOCK_MONOTONIC, &mix_start_time);
774
775 // ADAPTIVE READING: Read more samples when we're behind to catch up
776 // Normal: 480 samples per 10ms iteration
777 // When behind: read up to 960 samples to catch up faster
778 // Check source buffer levels to decide
779 int samples_to_read = 480; // Default: 10ms worth
780
781 // Log latency at each stage in the server pipeline
782 if (g_audio_mixer) {
783 // Check source buffer latency for all sources
784 for (int i = 0; i < g_audio_mixer->max_sources; i++) {
785 if (g_audio_mixer->source_ids[i] != 0 && g_audio_mixer->source_ids[i] != client_id_snapshot &&
788 float buffer_latency_ms = (float)available / 48.0f; // samples / (48000 / 1000)
789
790 // Log source buffer latency
791 log_debug_every(500000, "LATENCY: Server incoming buffer for client %u: %.1fms (%zu samples)",
792 g_audio_mixer->source_ids[i], buffer_latency_ms, available);
793
794 // If buffer is getting too full, read faster to reduce latency
795 if (available > 1920) { // > 40ms buffered - read faster!
796 samples_to_read = 960; // Double read to catch up (20ms worth)
798 "LATENCY WARNING: Server buffer too full for client %u: %.1fms, reading double",
799 g_audio_mixer->source_ids[i], buffer_latency_ms);
800 }
801 }
802 }
803
804 // Log outgoing queue latency
805 size_t queue_depth = packet_queue_size(audio_queue_snapshot);
806 float queue_latency_ms = (float)queue_depth * 20.0f; // ~20ms per Opus packet
807 log_debug_every(500000, "LATENCY: Server send queue for client %u: %.1fms (%zu packets)", client_id_snapshot,
808 queue_latency_ms, queue_depth);
809 }
810
811 int samples_mixed = 0;
812 if (GET_OPTION(no_audio_mixer)) {
813 // Disable mixer.h processing: simple mixing without ducking/compression/etc
814 // Just add audio from all sources except this client, no processing
815 SAFE_MEMSET(mix_buffer, samples_to_read * sizeof(float), 0, samples_to_read * sizeof(float));
816
817 if (g_audio_mixer) {
818 int max_samples_in_frame = 0;
819 // Simple mixing: just add all sources except current client
820 for (int i = 0; i < g_audio_mixer->max_sources; i++) {
821 if (g_audio_mixer->source_ids[i] != 0 && g_audio_mixer->source_ids[i] != client_id_snapshot &&
823 // Read from this source and add to mix buffer
824 float temp_buffer[960]; // Max adaptive read size
825 int samples_read =
826 (int)audio_ring_buffer_read(g_audio_mixer->source_buffers[i], temp_buffer, samples_to_read);
827
828 // Track the maximum samples we got from any source
829 if (samples_read > max_samples_in_frame) {
830 max_samples_in_frame = samples_read;
831 }
832
833 // Add to mix buffer
834 for (int j = 0; j < samples_read; j++) {
835 mix_buffer[j] += temp_buffer[j];
836 }
837 }
838 }
839 samples_mixed = max_samples_in_frame; // Only count samples we actually read
840 }
841
843 "Audio mixer DISABLED (--no-audio-mixer): simple mixing, samples=%d for client %u", samples_mixed,
844 client_id_snapshot);
845 } else {
846 // Use adaptive sample count in normal mixer mode
847 samples_mixed = mixer_process_excluding_source(g_audio_mixer, mix_buffer, samples_to_read, client_id_snapshot);
848 }
849
850 struct timespec mix_end_time;
851 (void)clock_gettime(CLOCK_MONOTONIC, &mix_end_time);
852 uint64_t mix_time_us = ((uint64_t)mix_end_time.tv_sec * 1000000 + (uint64_t)mix_end_time.tv_nsec / 1000) -
853 ((uint64_t)mix_start_time.tv_sec * 1000000 + (uint64_t)mix_start_time.tv_nsec / 1000);
854
855 if (mix_time_us > 2000) { // Log if mixing takes > 2ms
856 log_warn_every(LOG_RATE_DEFAULT, "Slow mixer for client %u: took %lluus (%.2fms)", client_id_snapshot,
857 mix_time_us, (float)mix_time_us / 1000.0f);
858 }
859
860 // Debug logging every 100 iterations (disabled - can slow down audio rendering)
861 // log_debug_every(LOG_RATE_SLOW, "Audio render for client %u: samples_mixed=%d", client_id_snapshot,
862 // samples_mixed);
863
864 // DEBUG: Log samples mixed every iteration
865 // NOTE: mixer_debug_count is now per-thread (not static), so each client thread has its own counter
866 mixer_debug_count++;
867 if (samples_mixed > 0 && (mixer_debug_count <= 3 || mixer_debug_count % 50 == 0)) {
868 log_info("Server mixer iteration #%d for client %u: samples_mixed=%d, opus_frame_accumulated=%d/%d",
869 mixer_debug_count, client_id_snapshot, samples_mixed, opus_frame_accumulated, OPUS_FRAME_SAMPLES);
870 }
871
872 // Accumulate all samples (including 0 or partial) until we have a full Opus frame
873 // This maintains continuous stream without silence padding
874 struct timespec accum_start = {0};
875 (void)clock_gettime(CLOCK_MONOTONIC, &accum_start);
876
877 int space_available = OPUS_FRAME_SAMPLES - opus_frame_accumulated;
878 int samples_to_copy = (samples_mixed <= space_available) ? samples_mixed : space_available;
879
880 // Only copy if we have samples, otherwise just wait for next frame
881 if (samples_to_copy > 0) {
882 SAFE_MEMCPY(opus_frame_buffer + opus_frame_accumulated,
883 (OPUS_FRAME_SAMPLES - opus_frame_accumulated) * sizeof(float), mix_buffer,
884 samples_to_copy * sizeof(float));
885 opus_frame_accumulated += samples_to_copy;
886 }
887
888 struct timespec accum_end = {0};
889 (void)clock_gettime(CLOCK_MONOTONIC, &accum_end);
890 uint64_t accum_time_us = ((uint64_t)accum_end.tv_sec * 1000000 + (uint64_t)accum_end.tv_nsec / 1000) -
891 ((uint64_t)accum_start.tv_sec * 1000000 + (uint64_t)accum_start.tv_nsec / 1000);
892
893 if (accum_time_us > 500) {
894 log_warn_every(LOG_RATE_DEFAULT, "Slow accumulate for client %u: took %lluus", client_id_snapshot, accum_time_us);
895 }
896
897 // Only encode and send when we have accumulated a full Opus frame
898 if (opus_frame_accumulated >= OPUS_FRAME_SAMPLES) {
899 // OPTIMIZATION: Don't check queue depth every iteration - it's expensive (requires lock)
900 // Only check periodically every 100 iterations (~0.6s at 172 fps)
901 // NOTE: backpressure_check_counter is now per-thread (not static), so each client thread has its own counter
902 bool apply_backpressure = false;
903
904 if (++backpressure_check_counter >= 100) {
905 backpressure_check_counter = 0;
906 size_t queue_depth = packet_queue_size(audio_queue_snapshot);
907 // Opus frames are produced at ~50 FPS (20ms each), so 50 packets = 1 second
908 // Keep latency bounded to ~1s max in the send queue
909 apply_backpressure = (queue_depth > 50); // > 50 packets = ~1s buffered at 50 FPS
910
911 if (apply_backpressure) {
912 log_warn("Audio backpressure for client %u: queue depth %zu packets (%.1fs buffered)", client_id_snapshot,
913 queue_depth, (float)queue_depth / 50.0f);
914 }
915 }
916
917 if (apply_backpressure) {
918 // Skip this packet to let the queue drain
919 // CRITICAL: Reset accumulation buffer so fresh samples can be captured on next iteration
920 // Without this reset, we'd loop forever with stale audio and no space for new samples
921 opus_frame_accumulated = 0;
923 continue;
924 }
925
926 // Encode accumulated Opus frame (960 samples = 20ms @ 48kHz)
927 uint8_t opus_buffer[1024]; // Max Opus frame size
928
929 struct timespec opus_start_time;
930 (void)clock_gettime(CLOCK_MONOTONIC, &opus_start_time);
931
932 int opus_size =
933 opus_codec_encode(opus_encoder, opus_frame_buffer, OPUS_FRAME_SAMPLES, opus_buffer, sizeof(opus_buffer));
934
935 struct timespec opus_end_time;
936 (void)clock_gettime(CLOCK_MONOTONIC, &opus_end_time);
937 uint64_t opus_time_us = ((uint64_t)opus_end_time.tv_sec * 1000000 + (uint64_t)opus_end_time.tv_nsec / 1000) -
938 ((uint64_t)opus_start_time.tv_sec * 1000000 + (uint64_t)opus_start_time.tv_nsec / 1000);
939
940 if (opus_time_us > 2000) { // Log if encoding takes > 2ms
941 log_warn_every(LOG_RATE_DEFAULT, "Slow Opus encode for client %u: took %lluus (%.2fms), size=%d",
942 client_id_snapshot, opus_time_us, (float)opus_time_us / 1000.0f, opus_size);
943 }
944
945 // DEBUG: Log mix buffer and encoding results to see audio levels being sent
946 {
947 float peak = 0.0f, rms = 0.0f;
948 for (int i = 0; i < OPUS_FRAME_SAMPLES; i++) {
949 float abs_val = fabsf(opus_frame_buffer[i]);
950 if (abs_val > peak)
951 peak = abs_val;
952 rms += opus_frame_buffer[i] * opus_frame_buffer[i];
953 }
954 rms = sqrtf(rms / OPUS_FRAME_SAMPLES);
955 // NOTE: server_audio_frame_count is now per-thread (not static), so each client thread has its own counter
956 server_audio_frame_count++;
957 if (server_audio_frame_count <= 5 || server_audio_frame_count % 20 == 0) {
958 // Log first 4 samples to verify they look like valid audio (not NaN/Inf/garbage)
959 log_info("Server audio frame #%d for client %u: samples_mixed=%d, Peak=%.6f, RMS=%.6f, opus_size=%d, "
960 "first4=[%.4f,%.4f,%.4f,%.4f]",
961 server_audio_frame_count, client_id_snapshot, samples_mixed, peak, rms, opus_size,
962 opus_frame_buffer[0], opus_frame_buffer[1], opus_frame_buffer[2], opus_frame_buffer[3]);
963 }
964 }
965
966 // Always reset accumulation buffer after attempting to encode - we've consumed these samples
967 // If we don't reset, new audio samples would be dropped while stale data sits in the buffer
968 opus_frame_accumulated = 0;
969
970 if (opus_size <= 0) {
971 log_error("Failed to encode audio to Opus for client %u: opus_size=%d", client_id_snapshot, opus_size);
972 } else {
973 // Queue Opus-encoded audio for this specific client
974 struct timespec queue_start = {0};
975 (void)clock_gettime(CLOCK_MONOTONIC, &queue_start);
976
977 int result =
978 packet_queue_enqueue(audio_queue_snapshot, PACKET_TYPE_AUDIO_OPUS, opus_buffer, (size_t)opus_size, 0, true);
979
980 struct timespec queue_end = {0};
981 (void)clock_gettime(CLOCK_MONOTONIC, &queue_end);
982 uint64_t queue_time_us = ((uint64_t)queue_end.tv_sec * 1000000 + (uint64_t)queue_end.tv_nsec / 1000) -
983 ((uint64_t)queue_start.tv_sec * 1000000 + (uint64_t)queue_start.tv_nsec / 1000);
984
985 if (queue_time_us > 500) {
986 log_warn_every(LOG_RATE_DEFAULT, "Slow queue for client %u: took %lluus", client_id_snapshot, queue_time_us);
987 }
988
989 if (result < 0) {
990 log_debug("Failed to queue Opus audio for client %u", client_id_snapshot);
991 } else {
992 // FPS tracking - audio packet successfully queued (handles lag detection and periodic reporting)
993 struct timespec current_time;
994 (void)clock_gettime(CLOCK_MONOTONIC, &current_time);
995 fps_frame(&audio_fps_tracker, &current_time, "audio packet queued");
996 }
997 }
998 // NOTE: opus_frame_accumulated is already reset at line 928 after encode attempt
999 }
1000
1001 // Audio mixing rate - 10ms to match 48kHz sample rate (480 samples per iteration)
1002 // Use ABSOLUTE timing to prevent drift from chunked sleep overhead
1003 // Previous approach slept in 1ms chunks causing ~20% timing drift (12ms instead of 10ms)
1004 // which drained client buffers faster than they were filled.
1005 struct timespec loop_end_time;
1006 (void)clock_gettime(CLOCK_MONOTONIC, &loop_end_time);
1007
1008 uint64_t loop_elapsed_us = ((uint64_t)loop_end_time.tv_sec * 1000000 + (uint64_t)loop_end_time.tv_nsec / 1000) -
1009 ((uint64_t)loop_start_time.tv_sec * 1000000 + (uint64_t)loop_start_time.tv_nsec / 1000);
1010
1011 // Target loop time: 10ms to match the audio sample rate
1012 // We read 480 samples per iteration at 48kHz: 480 / 48000 = 0.01s = 10ms
1013 const uint64_t target_loop_us = 10000; // 10ms for exactly 48kHz rate
1014
1015 if (loop_elapsed_us >= target_loop_us) {
1016 // Processing took longer than target - skip sleep but warn
1018 "Audio processing took %lluus (%.1fms) - exceeds target %lluus (%.1fms) for client %u",
1019 loop_elapsed_us, (float)loop_elapsed_us / 1000.0f, target_loop_us, (float)target_loop_us / 1000.0f,
1020 thread_client_id);
1021 } else {
1022 // Sleep for remaining time in ONE call to avoid chunked sleep overhead
1023 // Check shutdown first, then do one accurate sleep
1024 if (!atomic_load(&g_server_should_exit) && atomic_load(&client->audio_render_thread_running)) {
1025 long remaining_sleep_us = (long)(target_loop_us - loop_elapsed_us);
1026 platform_sleep_usec(remaining_sleep_us);
1027 }
1028 }
1029 }
1030
1031#ifdef DEBUG_THREADS
1032 log_debug("Audio render thread stopped for client %u", thread_client_id);
1033#endif
1034
1035 // Clean up Opus encoder
1036 if (opus_encoder) {
1037 opus_codec_destroy(opus_encoder);
1038 }
1039
1040 // Clean up thread-local error context before exit
1042
1043 return NULL;
1044}
1045
1046/* ============================================================================
1047 * Thread Lifecycle Management Functions
1048 * ============================================================================
1049 */
1050
1122 if (!server_ctx || !client) {
1123 log_error("Cannot create render threads: NULL %s", !server_ctx ? "server_ctx" : "client");
1124 return -1;
1125 }
1126
1127#ifdef DEBUG_THREADS
1128 log_debug("Creating render threads for client %u", client->client_id);
1129#endif
1130
1131 // NOTE: Mutexes are already initialized in add_client() before any threads start
1132 // This prevents race conditions where receive thread tries to use uninitialized mutexes
1133
1134 // Initialize render thread control flags
1135 // IMPORTANT: Set to true BEFORE creating thread to avoid race condition
1136 // where thread starts and immediately exits because flag is false
1137 atomic_store(&client->video_render_thread_running, true);
1138 atomic_store(&client->audio_render_thread_running, true);
1139
1140 // Create video rendering thread (stop_id=2, stop after receive thread)
1141 char thread_name[64];
1142 snprintf(thread_name, sizeof(thread_name), "video_render_%u", client->client_id);
1143 asciichat_error_t video_result = tcp_server_spawn_thread(server_ctx->tcp_server, client->socket,
1144 client_video_render_thread, client, 2, thread_name);
1145 if (video_result != ASCIICHAT_OK) {
1146 // Reset flag since thread creation failed
1147 atomic_store(&client->video_render_thread_running, false);
1148 // Mutexes will be destroyed by remove_client() which called us
1149 return -1;
1150 }
1151
1152 // Create audio rendering thread (stop_id=2, same priority as video)
1153 snprintf(thread_name, sizeof(thread_name), "audio_render_%u", client->client_id);
1154 asciichat_error_t audio_result = tcp_server_spawn_thread(server_ctx->tcp_server, client->socket,
1155 client_audio_render_thread, client, 2, thread_name);
1156 if (audio_result != ASCIICHAT_OK) {
1157 // Clean up video thread (atomic operation, no mutex needed)
1158 atomic_store(&client->video_render_thread_running, false);
1159 // Reset audio flag since thread creation failed
1160 atomic_store(&client->audio_render_thread_running, false);
1161 // tcp_server_stop_client_threads() will be called by remove_client()
1162 // to clean up the video thread we just created
1163 // Mutexes will be destroyed by remove_client() which called us
1164 return -1;
1165 }
1166
1167#ifdef DEBUG_THREADS
1168 log_debug("Created render threads for client %u", client->client_id);
1169#endif
1170
1171 return 0;
1172}
1173
1256 if (!client) {
1257 SET_ERRNO(ERROR_INVALID_PARAM, "Client is NULL");
1258 return;
1259 }
1260
1261 log_debug("Stopping render threads for client %u", client->client_id);
1262
1263 // Signal threads to stop (atomic operations, no mutex needed)
1264 atomic_store(&client->video_render_thread_running, false);
1265 atomic_store(&client->audio_render_thread_running, false);
1266
1267 // Wait for threads to finish (deterministic cleanup)
1268 // During shutdown, don't wait forever for threads to join
1269 bool is_shutting_down = atomic_load(&g_server_should_exit);
1270
1272 log_debug("Joining video render thread for client %u", client->client_id);
1273 int result;
1274 if (is_shutting_down) {
1275 // During shutdown, don't timeout - wait for thread to exit
1276 // Timeouts mask the real problem: threads that are still running
1277 log_debug("Shutdown mode: joining video render thread for client %u (no timeout)", client->client_id);
1278 result = asciichat_thread_join(&client->video_render_thread, NULL);
1279 if (result != 0) {
1280 log_warn("Video render thread for client %u failed to join during shutdown: %s", client->client_id,
1281 SAFE_STRERROR(result));
1282 }
1283 } else {
1284 log_debug("Calling asciichat_thread_join for video thread of client %u", client->client_id);
1285 result = asciichat_thread_join(&client->video_render_thread, NULL);
1286 log_debug("asciichat_thread_join returned %d for video thread of client %u", result, client->client_id);
1287 }
1288
1289 if (result == 0) {
1290#ifdef DEBUG_THREADS
1291 log_debug("Video render thread joined for client %u", client->client_id);
1292#endif
1293 } else if (result != -2) { // Don't log timeout errors again
1294 if (is_shutting_down) {
1295 log_warn("Failed to join video render thread for client %u during shutdown (continuing): %s", client->client_id,
1296 SAFE_STRERROR(result));
1297 } else {
1298 log_error("Failed to join video render thread for client %u: %s", client->client_id, SAFE_STRERROR(result));
1299 }
1300 }
1301 // Clear thread handle safely using platform abstraction
1303 }
1304
1306 int result;
1307 if (is_shutting_down) {
1308 // During shutdown, don't timeout - wait for thread to exit
1309 // Timeouts mask the real problem: threads that are still running
1310 log_debug("Shutdown mode: joining audio render thread for client %u (no timeout)", client->client_id);
1311 result = asciichat_thread_join(&client->audio_render_thread, NULL);
1312 if (result != 0) {
1313 log_warn("Audio render thread for client %u failed to join during shutdown: %s", client->client_id,
1314 SAFE_STRERROR(result));
1315 }
1316 } else {
1317 result = asciichat_thread_join(&client->audio_render_thread, NULL);
1318 }
1319
1320 if (result == 0) {
1321#ifdef DEBUG_THREADS
1322 log_debug("Audio render thread joined for client %u", client->client_id);
1323#endif
1324 } else if (result != -2) { // Don't log timeout errors again
1325 if (is_shutting_down) {
1326 log_warn("Failed to join audio render thread for client %u during shutdown (continuing): %s", client->client_id,
1327 SAFE_STRERROR(result));
1328 } else {
1329 log_error("Failed to join audio render thread for client %u: %s", client->client_id, SAFE_STRERROR(result));
1330 }
1331 }
1332 // Clear thread handle safely using platform abstraction
1334 }
1335
1336 // DO NOT destroy the mutex here - client.c will handle it
1337 // mutex_destroy(&client->client_state_mutex);
1338
1339#ifdef DEBUG_THREADS
1340 log_debug("Successfully destroyed render threads for client %u", client->client_id);
1341#endif
1342}
🔌 Cross-platform abstraction layer umbrella header for ascii-chat
📊 String Formatting Utilities
⏱️ FPS tracking utility for monitoring frame throughput across all threads
size_t audio_ring_buffer_read(audio_ring_buffer_t *rb, float *data, size_t samples)
Read audio samples from ring buffer.
size_t audio_ring_buffer_available_read(audio_ring_buffer_t *rb)
Get number of samples available for reading.
opus_codec_t * opus_codec_create_encoder(opus_application_t application, int sample_rate, int bitrate)
Create an Opus encoder.
Definition opus_codec.c:18
uint32_t * source_ids
Array of client IDs (one per source slot)
Definition mixer.h:336
int max_sources
Maximum number of sources (allocated array sizes)
Definition mixer.h:329
int mixer_process_excluding_source(mixer_t *mixer, float *output, int num_samples, uint32_t exclude_client_id)
Process audio from all sources except one (for per-client output)
Definition mixer.c:604
void opus_codec_destroy(opus_codec_t *codec)
Destroy an Opus codec instance.
Definition opus_codec.c:215
size_t opus_codec_encode(opus_codec_t *codec, const float *samples, int num_samples, uint8_t *out_data, size_t out_size)
Encode audio frame with Opus.
Definition opus_codec.c:97
audio_ring_buffer_t ** source_buffers
Array of pointers to client audio ring buffers.
Definition mixer.h:334
@ OPUS_APPLICATION_AUDIO
General audio (optimized for music)
Definition opus_codec.h:78
unsigned int uint32_t
Definition common.h:58
#define SAFE_STRNCPY(dst, src, size)
Definition common.h:358
#define SAFE_FREE(ptr)
Definition common.h:320
#define SAFE_MEMSET(dest, dest_size, ch, count)
Definition common.h:389
void fps_frame(fps_t *tracker, const struct timespec *current_time, const char *context)
Track a frame and detect lag conditions.
Definition fps.c:57
#define SAFE_STRERROR(errnum)
Definition common.h:385
unsigned long long uint64_t
Definition common.h:59
unsigned char uint8_t
Definition common.h:56
#define SAFE_MEMCPY(dest, dest_size, src, count)
Definition common.h:388
void fps_init(fps_t *tracker, int expected_fps, const char *name)
Initialize FPS tracker.
Definition fps.c:36
#define SET_ERRNO(code, context_msg,...)
Set error code with custom context message and log it.
void asciichat_errno_cleanup(void)
Cleanup error system resources.
asciichat_error_t
Error and exit codes - unified status values (0-255)
Definition error_codes.h:46
@ ASCIICHAT_OK
Definition error_codes.h:48
@ ERROR_INVALID_PARAM
#define LOG_RATE_FAST
Log rate limit: 1 second (1,000,000 microseconds)
Definition log_rates.h:26
#define LOG_RATE_SLOW
Log rate limit: 10 seconds (10,000,000 microseconds)
Definition log_rates.h:35
#define LOG_RATE_DEFAULT
Log rate limit: 5 seconds (5,000,000 microseconds) - default for audio/video packets.
Definition log_rates.h:32
#define LOG_RATE_NORMAL
Log rate limit: 3 seconds (3,000,000 microseconds)
Definition log_rates.h:29
#define log_warn(...)
Log a WARN message.
#define log_info_every(interval_us, fmt,...)
Rate-limited INFO logging.
#define log_error(...)
Log an ERROR message.
#define log_debug_every(interval_us, fmt,...)
Rate-limited DEBUG logging.
#define log_info(...)
Log an INFO message.
#define log_debug(...)
Log a DEBUG message.
#define log_warn_every(interval_us, fmt,...)
Rate-limited WARN logging.
#define GET_OPTION(field)
Safely get a specific option field (lock-free read)
Definition options.h:644
int packet_queue_enqueue(packet_queue_t *queue, packet_type_t type, const void *data, size_t data_len, uint32_t client_id, bool copy_data)
Enqueue a packet into the queue.
size_t packet_queue_size(packet_queue_t *queue)
Get current number of packets in queue.
@ PACKET_TYPE_AUDIO_OPUS
Opus-encoded single audio frame.
Definition packet.h:357
int socket_t
Socket handle type (POSIX: int)
Definition socket.h:50
void asciichat_thread_init(asciichat_thread_t *thread)
Initialize a thread handle to an uninitialized state.
#define mutex_lock(mutex)
Lock a mutex (with debug tracking in debug builds)
Definition mutex.h:140
#define INVALID_SOCKET_VALUE
Invalid socket value (POSIX: -1)
Definition socket.h:52
bool asciichat_thread_is_initialized(asciichat_thread_t *thread)
Check if a thread handle has been initialized.
void platform_sleep_usec(unsigned int usec)
High-precision sleep function with microsecond precision.
pthread_rwlock_t rwlock_t
Read-write lock type (POSIX: pthread_rwlock_t)
Definition rwlock.h:40
int asciichat_thread_join(asciichat_thread_t *thread, void **retval)
Wait for a thread to complete (blocking)
#define mutex_unlock(mutex)
Unlock a mutex (with debug tracking in debug builds)
Definition mutex.h:175
void format_bytes_pretty(size_t bytes, char *out, size_t out_capacity)
Format byte count into human-readable string.
Definition format.c:10
video_frame_t * video_frame_begin_write(video_frame_buffer_t *vfb)
Writer API: Start writing a new frame.
void video_frame_commit(video_frame_buffer_t *vfb)
Writer API: Commit the frame and swap buffers.
Platform initialization and static synchronization helpers.
🔊 Audio Capture and Playback Interface for ascii-chat
asciichat_error_t tcp_server_spawn_thread(tcp_server_t *server, socket_t client_socket, void *(*thread_func)(void *), void *thread_arg, int stop_id, const char *thread_name)
Spawn a worker thread for a client.
🔢 Mathematical Utility Functions
Multi-Source Audio Mixing and Processing System.
⚙️ Command-line options parsing and configuration management for ascii-chat
Opus audio codec wrapper for real-time encoding/decoding.
📬 Thread-safe packet queue system for per-client send threads
#define OPUS_FRAME_SAMPLES
void stop_client_render_threads(client_info_t *client)
Stop and cleanup per-client rendering threads.
Definition render.c:1255
mixer_t * g_audio_mixer
Global audio mixer from main.c - provides multi-client audio mixing.
rwlock_t g_client_manager_rwlock
Reader-writer lock protecting the global client manager.
atomic_bool g_server_should_exit
Global shutdown flag from main.c - coordinate graceful thread termination.
void * client_video_render_thread(void *arg)
Interruptible sleep function with platform-specific optimizations.
Definition render.c:359
void * client_audio_render_thread(void *arg)
Main audio rendering thread function for individual clients.
Definition render.c:673
int create_client_render_threads(server_context_t *server_ctx, client_info_t *client)
Create and initialize per-client rendering threads.
Definition render.c:1121
Per-client rendering threads with rate limiting.
#define AUDIO_RENDER_FPS
Definition render.h:26
#define VIDEO_RENDER_FPS
Definition render.h:22
ascii-chat Server Mode Entry Point Header
Per-client state management and lifecycle orchestration.
Server packet processing and protocol implementation.
bool any_clients_sending_video(void)
Check if any connected clients are currently sending video.
Definition stream.c:1192
char * create_mixed_ascii_frame_for_client(uint32_t target_client_id, unsigned short width, unsigned short height, bool wants_stretch, size_t *out_size, bool *out_grid_changed, int *out_sources_count)
Generate personalized ASCII frame for a specific client.
Definition stream.c:959
Multi-client video mixing and ASCII frame generation.
Per-client state structure for server-side client management.
atomic_bool video_render_thread_running
terminal_capabilities_t terminal_caps
atomic_uint client_id
char display_name[MAX_DISPLAY_NAME_LEN]
asciichat_thread_t audio_render_thread
video_frame_buffer_t * outgoing_video_buffer
atomic_ushort height
atomic_ushort width
mutex_t client_state_mutex
atomic_bool audio_render_thread_running
atomic_int last_rendered_grid_sources
packet_queue_t * audio_queue
atomic_bool shutting_down
asciichat_thread_t video_render_thread
atomic_bool active
FPS tracking state.
Definition fps.h:51
Main mixer structure for multi-source audio processing.
Definition mixer.h:325
Opus codec context for encoding or decoding.
Definition opus_codec.h:95
Thread-safe packet queue for producer-consumer communication.
Server context - encapsulates all server state.
Definition server/main.h:81
tcp_server_t * tcp_server
TCP server managing connections.
Definition server/main.h:83
uint8_t desired_fps
Client's desired frame rate (1-144 FPS)
Definition terminal.h:509
Video frame buffer manager.
size_t allocated_buffer_size
Size of allocated data buffers (for cleanup)
Video frame structure.
uint64_t capture_timestamp_us
Timestamp when frame was captured (microseconds)
size_t size
Size of frame data in bytes.
void * data
Frame data pointer (points to pre-allocated buffer)
⏱️ High-precision timing utilities using sokol_time.h and uthash