ascii-chat 0.8.38
Real-time terminal-based video chat with ASCII art conversion
Loading...
Searching...
No Matches
src/client/audio.c
Go to the documentation of this file.
1
74#include "audio.h"
75#include <ascii-chat/audio/analysis.h>
76#include "main.h"
77#include "../main.h" // Global exit API
78#include "server.h"
79#include <ascii-chat/util/fps.h>
80#include <ascii-chat/util/thread.h>
81#include <ascii-chat/util/time.h> // For timing instrumentation
82
83#include <ascii-chat/audio/audio.h> // lib/audio/audio.h for PortAudio wrapper
84#include <ascii-chat/audio/client_audio_pipeline.h> // Unified audio processing pipeline
85#include <ascii-chat/audio/wav_writer.h> // WAV file dumping for debugging
86#include <ascii-chat/common.h>
87#include <ascii-chat/options/options.h>
88#include <ascii-chat/options/rcu.h> // For RCU-based options access
89#include <ascii-chat/platform/system.h> // For platform_memcpy
90
91#include <stdatomic.h>
92#include <string.h>
93#include <math.h>
94
95#include <ascii-chat/platform/abstraction.h>
96#include <ascii-chat/platform/init.h>
97#include <ascii-chat/thread_pool.h>
98
99/* ============================================================================
100 * Audio System State
101 * ============================================================================ */
102
112static audio_context_t g_audio_context = {0};
113
126static client_audio_pipeline_t *g_audio_pipeline = NULL;
127
128/* ============================================================================
129 * Audio Debugging - WAV File Dumpers
130 * ============================================================================ */
131
133static wav_writer_t *g_wav_capture_raw = NULL;
134
136static wav_writer_t *g_wav_capture_processed = NULL;
137
139static wav_writer_t *g_wav_playback_received = NULL;
140
141/* ============================================================================
142 * Audio Capture Thread Management
143 * ============================================================================ */
144
154static asciichat_thread_t g_audio_capture_thread;
155
164static bool g_audio_capture_thread_created = false;
165
174static atomic_bool g_audio_capture_thread_exited = false;
175
176/* ============================================================================
177 * Async Audio Packet Queue (decouples capture from network I/O)
178 * ============================================================================ */
179
186typedef struct {
187 uint8_t data[8 * 4000]; // Max 8 frames * ~500 bytes each (with safety margin)
188 size_t size;
189 uint16_t frame_sizes[8];
192
194#define AUDIO_SEND_QUEUE_SIZE 32
195static audio_send_packet_t g_audio_send_queue[AUDIO_SEND_QUEUE_SIZE];
196static int g_audio_send_queue_head = 0; // Write position
197static int g_audio_send_queue_tail = 0; // Read position
198static mutex_t g_audio_send_queue_mutex;
199static cond_t g_audio_send_queue_cond;
200static bool g_audio_send_queue_initialized = false;
201static static_mutex_t g_audio_send_queue_init_mutex = STATIC_MUTEX_INIT;
202
204static bool g_audio_sender_thread_created = false;
205static atomic_bool g_audio_sender_should_exit = false;
206
218static int audio_queue_packet(const uint8_t *opus_data, size_t opus_size, const uint16_t *frame_sizes,
219 int frame_count) {
220 if (!g_audio_send_queue_initialized || !opus_data || opus_size == 0) {
221 return -1;
222 }
223
224 mutex_lock(&g_audio_send_queue_mutex);
225
226 // Check if queue is full
227 int next_head = (g_audio_send_queue_head + 1) % AUDIO_SEND_QUEUE_SIZE;
228 if (next_head == g_audio_send_queue_tail) {
229 mutex_unlock(&g_audio_send_queue_mutex);
230 log_warn_every(LOG_RATE_FAST, "Audio send queue full, dropping packet");
231 return -1;
232 }
233
234 // Copy packet to queue
235 audio_send_packet_t *packet = &g_audio_send_queue[g_audio_send_queue_head];
236 if (opus_size <= sizeof(packet->data)) {
237 memcpy(packet->data, opus_data, opus_size);
238 packet->size = opus_size;
239 packet->frame_count = frame_count;
240 for (int i = 0; i < frame_count && i < 8; i++) {
241 packet->frame_sizes[i] = frame_sizes[i];
242 }
243 g_audio_send_queue_head = next_head;
244 }
245
246 // Signal sender thread
247 cond_signal(&g_audio_send_queue_cond);
248 mutex_unlock(&g_audio_send_queue_mutex);
249
250 return 0;
251}
252
259static void *audio_sender_thread_func(void *arg) {
260 (void)arg;
261 log_debug("Audio sender thread started");
262
263 // Initialize timing system for performance profiling
264 if (!timer_is_initialized()) {
266 }
267
268 static int send_count = 0;
269
270 while (!atomic_load(&g_audio_sender_should_exit)) {
271 mutex_lock(&g_audio_send_queue_mutex);
272
273 // Wait for packet or exit signal
274 while (g_audio_send_queue_head == g_audio_send_queue_tail && !atomic_load(&g_audio_sender_should_exit)) {
275 cond_wait(&g_audio_send_queue_cond, &g_audio_send_queue_mutex);
276 }
277
278 if (atomic_load(&g_audio_sender_should_exit)) {
279 mutex_unlock(&g_audio_send_queue_mutex);
280 break;
281 }
282
283 // Dequeue packet
284 audio_send_packet_t packet = g_audio_send_queue[g_audio_send_queue_tail];
285 g_audio_send_queue_tail = (g_audio_send_queue_tail + 1) % AUDIO_SEND_QUEUE_SIZE;
286
287 mutex_unlock(&g_audio_send_queue_mutex);
288
289 // Send packet (may block on network I/O - that's OK, we're not in capture thread)
290 START_TIMER("network_send_audio");
291 asciichat_error_t send_result =
292 threaded_send_audio_opus_batch(packet.data, packet.size, packet.frame_sizes, packet.frame_count);
293 double send_time_ns = STOP_TIMER("network_send_audio");
294
295 send_count++;
296 if (send_result < 0) {
297 log_debug_every(LOG_RATE_VERY_FAST, "Failed to send audio packet");
298 } else if (send_count % 50 == 0) {
299 char duration_str[32];
300 format_duration_ns(send_time_ns, duration_str, sizeof(duration_str));
301 log_debug("Audio network send #%d: %zu bytes (%d frames) in %s", send_count, packet.size, packet.frame_count,
302 duration_str);
303 }
304 }
305
306 log_debug("Audio sender thread exiting");
307
308 // Clean up thread-local error context before exit
310
311 return NULL;
312}
313
320static void audio_sender_init(void) {
321 static_mutex_lock(&g_audio_send_queue_init_mutex);
322
323 // Check again under lock to prevent race condition
324 if (g_audio_send_queue_initialized) {
325 static_mutex_unlock(&g_audio_send_queue_init_mutex);
326 return;
327 }
328
329 // Initialize queue structures under lock
330 mutex_init(&g_audio_send_queue_mutex);
331 cond_init(&g_audio_send_queue_cond);
332 g_audio_send_queue_head = 0;
333 g_audio_send_queue_tail = 0;
334 g_audio_send_queue_initialized = true;
335 atomic_store(&g_audio_sender_should_exit, false);
336
337 static_mutex_unlock(&g_audio_send_queue_init_mutex);
338
339 // Start sender thread (after lock release to avoid blocking other threads)
340 if (thread_pool_spawn(g_client_worker_pool, audio_sender_thread_func, NULL, 5, "audio_sender") == ASCIICHAT_OK) {
341 g_audio_sender_thread_created = true;
342 log_debug("Audio sender thread created");
343 } else {
344 log_error("Failed to spawn audio sender thread in worker pool");
345 LOG_ERRNO_IF_SET("Audio sender thread creation failed");
346 }
347}
348
352static void audio_sender_cleanup(void) {
353 if (!g_audio_send_queue_initialized) {
354 return;
355 }
356
357 // Signal thread to exit
358 atomic_store(&g_audio_sender_should_exit, true);
359 mutex_lock(&g_audio_send_queue_mutex);
360 cond_signal(&g_audio_send_queue_cond);
361 mutex_unlock(&g_audio_send_queue_mutex);
362
363 // Thread will be joined by thread_pool_stop_all() in protocol_stop_connection()
364 if (THREAD_IS_CREATED(g_audio_sender_thread_created)) {
365 g_audio_sender_thread_created = false;
366 log_debug("Audio sender thread will be joined by thread pool");
367 }
368
369 mutex_destroy(&g_audio_send_queue_mutex);
370 cond_destroy(&g_audio_send_queue_cond);
371 g_audio_send_queue_initialized = false;
372}
373
374/* ============================================================================
375 * Audio Processing Constants
376 * ============================================================================ */
377
379#define AUDIO_VOLUME_BOOST 1.0f // No boost/attenuation
380
381/* ============================================================================
382 * Audio Processing Functions
383 * ============================================================================ */
384
399void audio_process_received_samples(const float *samples, int num_samples) {
400 // Validate parameters
401 if (!samples || num_samples <= 0) {
402 SET_ERRNO(ERROR_INVALID_PARAM, "Invalid audio samples: samples=%p, num_samples=%d", (void *)samples, num_samples);
403 return;
404 }
405
406 if (!GET_OPTION(audio_enabled)) {
407 log_warn_every(NS_PER_MS_INT, "Received audio samples but audio is disabled");
408 return;
409 }
410
411 // Allow both single packets and batched packets
412 if (num_samples > AUDIO_BATCH_SAMPLES) {
413 log_warn("Audio packet too large: %d samples (max %d)", num_samples, AUDIO_BATCH_SAMPLES);
414 return;
415 }
416
417 // Calculate RMS energy of received samples
418 float sum_squares = 0.0f;
419 for (int i = 0; i < num_samples; i++) {
420 sum_squares += samples[i] * samples[i];
421 }
422 float received_rms = sqrtf(sum_squares / num_samples);
423
424 // DUMP: Received audio from server (before playback processing)
425 if (g_wav_playback_received) {
426 wav_writer_write(g_wav_playback_received, samples, num_samples);
427 }
428
429 // Track samples for analysis
430 if (GET_OPTION(audio_analysis_enabled)) {
431 for (int i = 0; i < num_samples; i++) {
433 }
434 }
435
436 // Copy samples to playback buffer (no processing needed - mixer already handled clipping)
437 float audio_buffer[AUDIO_BATCH_SAMPLES];
438 memcpy(audio_buffer, samples, (size_t)num_samples * sizeof(float));
439
440 // DEBUG: Log what we're writing to playback buffer (with first 4 samples to verify audio integrity)
441 static int recv_count = 0;
442 recv_count++;
443 if (recv_count <= 10 || recv_count % 50 == 0) {
444 float peak = 0.0f;
445 for (int i = 0; i < num_samples; i++) {
446 float abs_val = fabsf(samples[i]);
447 if (abs_val > peak)
448 peak = abs_val;
449 }
450 log_debug("CLIENT AUDIO RECV #%d: %d samples, RMS=%.6f, Peak=%.6f, first4=[%.4f,%.4f,%.4f,%.4f]", recv_count,
451 num_samples, received_rms, peak, num_samples > 0 ? samples[0] : 0.0f, num_samples > 1 ? samples[1] : 0.0f,
452 num_samples > 2 ? samples[2] : 0.0f, num_samples > 3 ? samples[3] : 0.0f);
453 }
454
455 // Submit to playback system (goes to jitter buffer and speakers)
456 // NOTE: AEC3's AnalyzeRender is called in output_callback() when audio actually plays,
457 // NOT here. The jitter buffer adds 50-100ms delay, so calling AnalyzeRender here
458 // would give AEC3 the wrong timing and break echo cancellation.
459 audio_write_samples(&g_audio_context, audio_buffer, num_samples);
460
461 // Log latency after writing to playback buffer
462 if (g_audio_context.playback_buffer) {
463 size_t buffer_samples = audio_ring_buffer_available_read(g_audio_context.playback_buffer);
464 float buffer_latency_ms = (float)buffer_samples / 48.0f;
465 log_dev_every(500 * US_PER_MS_INT, "LATENCY: Client playback buffer after recv: %.1fms (%zu samples)",
466 buffer_latency_ms, buffer_samples);
467 }
468
469#ifdef DEBUG_AUDIO
470 log_debug("Processed %d received audio samples", num_samples);
471#endif
472}
473
474/* ============================================================================
475 * Audio Capture Thread Implementation
476 * ============================================================================ */
477
499static void *audio_capture_thread_func(void *arg) {
500 (void)arg;
501
502 log_debug("Audio capture thread started");
503
504 // Initialize timing system for performance profiling
505 if (!timer_is_initialized()) {
507 }
508
509 // FPS tracking for audio capture thread (tracking Opus frames, ~50 FPS at 20ms per frame)
510 static fps_t fps_tracker = {0};
511 static bool fps_tracker_initialized = false;
512 if (!fps_tracker_initialized) {
513 fps_init(&fps_tracker, 50, "AUDIO_TX");
514 fps_tracker_initialized = true;
515 }
516
517 // Detailed timing stats
518 static double total_loop_ns = 0;
519 static double total_read_ns = 0;
520 static double total_encode_ns = 0;
521 static double total_queue_ns = 0;
522 static double max_loop_ns = 0;
523 static double max_read_ns = 0;
524 static double max_encode_ns = 0;
525 static double max_queue_ns = 0;
526 static uint64_t timing_loop_count = 0;
527
528// Opus frame size: 960 samples = 20ms @ 48kHz (must match pipeline config)
529#define OPUS_FRAME_SAMPLES 960
530#define OPUS_MAX_PACKET_SIZE 500 // Max Opus packet size
531
532 // Read enough samples per iteration to drain faster than we fill
533 // Buffer holds multiple Opus frames worth to prevent overflow
534 // 4 frames = 3840 samples = 80ms, but we'll read what's available up to this
535#define CAPTURE_READ_SIZE (OPUS_FRAME_SAMPLES * 4)
536
537 float audio_buffer[CAPTURE_READ_SIZE];
538 static bool wav_dumpers_initialized = false;
539
540 // Initialize WAV dumpers only once (file handles persist)
541 if (!wav_dumpers_initialized && wav_dump_enabled()) {
542 g_wav_capture_raw = wav_writer_open("/tmp/audio_capture_raw.wav", AUDIO_SAMPLE_RATE, 1);
543 g_wav_capture_processed = wav_writer_open("/tmp/audio_capture_processed.wav", AUDIO_SAMPLE_RATE, 1);
544 log_debug("Audio debugging enabled: dumping to /tmp/audio_capture_*.wav");
545 wav_dumpers_initialized = true;
546 }
547
548 // Accumulator for building complete Opus frames
549 float opus_frame_buffer[OPUS_FRAME_SAMPLES];
550 int opus_frame_samples_collected = 0;
551
552 // Batch buffer for multiple Opus frames - send all at once to reduce blocking
553#define MAX_BATCH_FRAMES 8
554#define BATCH_TIMEOUT_NS (40LL * NS_PER_MS_INT) // Flush batch after 40ms even if not full (2 Opus frames @ 20ms each)
555 static uint8_t batch_buffer[MAX_BATCH_FRAMES * OPUS_MAX_PACKET_SIZE];
556 static uint16_t batch_frame_sizes[MAX_BATCH_FRAMES];
557 static int batch_frame_count = 0;
558 static size_t batch_total_size = 0;
559 static uint64_t batch_start_time_ns = 0;
560 static bool batch_has_data = false;
561
562 while (!should_exit() && !server_connection_is_lost()) {
563 START_TIMER("audio_capture_loop_iteration");
564 timing_loop_count++;
565
567 STOP_TIMER("audio_capture_loop_iteration"); // Don't count sleep time
568 platform_sleep_us(100 * US_PER_MS_INT); // Wait for connection
569 continue;
570 }
571
572 // Check if pipeline is ready
573 if (!g_audio_pipeline) {
574 STOP_TIMER("audio_capture_loop_iteration"); // Don't count sleep time
575 platform_sleep_us(100 * US_PER_MS_INT);
576 continue;
577 }
578
579 // Check how many samples are available in the ring buffer
580 int available = audio_ring_buffer_available_read(g_audio_context.capture_buffer);
581 if (available <= 0) {
582 // Flush partial batch before sleeping (prevent starvation during idle periods)
583 if (batch_has_data && batch_frame_count > 0) {
584 uint64_t now_ns = time_get_ns();
585 uint64_t elapsed_ns = time_elapsed_ns(batch_start_time_ns, now_ns);
586
587 if (elapsed_ns >= BATCH_TIMEOUT_NS) {
588 long elapsed_ms = (long)time_ns_to_ms(elapsed_ns);
589 log_debug_every(LOG_RATE_FAST, "Idle timeout flush: %d frames (%zu bytes) after %ld ms", batch_frame_count,
590 batch_total_size, elapsed_ms);
591 (void)audio_queue_packet(batch_buffer, batch_total_size, batch_frame_sizes, batch_frame_count);
592 batch_frame_count = 0;
593 batch_total_size = 0;
594 batch_has_data = false;
595 }
596 }
597
598 // Sleep briefly to reduce CPU usage when idle
599 // 5ms polling = 200 times/sec, fast enough to catch audio promptly
600 // Note: 50ms was causing 872ms gaps in audio transmission!
601 STOP_TIMER("audio_capture_loop_iteration"); // Must stop before loop repeats
602 platform_sleep_us(5 * US_PER_MS_INT); // 5ms (was 50ms - caused huge gaps!)
603 continue;
604 }
605
606 // Read as many samples as possible (up to CAPTURE_READ_SIZE) to drain faster
607 // This prevents buffer overflow when processing is slower than capture
608 int to_read = (available < CAPTURE_READ_SIZE) ? available : CAPTURE_READ_SIZE;
609
610 START_TIMER("audio_read_samples");
611 asciichat_error_t read_result = audio_read_samples(&g_audio_context, audio_buffer, to_read);
612 double read_time_ns = STOP_TIMER("audio_read_samples");
613
614 total_read_ns += read_time_ns;
615 if (read_time_ns > max_read_ns)
616 max_read_ns = read_time_ns;
617
618 if (read_result != ASCIICHAT_OK) {
619 log_error("Failed to read audio samples from ring buffer");
620 STOP_TIMER("audio_capture_loop_iteration"); // Don't count sleep time
621 platform_sleep_us(5 * US_PER_MS_INT); // 5ms (error path - was 50ms, caused gaps!)
622 continue;
623 }
624
625 int samples_read = to_read;
626
627 // Log every 10 reads to see if we're getting samples
628 static int total_reads = 0;
629 total_reads++;
630 if (total_reads % 10 == 0) {
631 log_debug("Audio capture loop iteration #%d: available=%d, samples_read=%d", total_reads, available,
632 samples_read);
633 }
634
635 if (samples_read > 0) {
636 // Normalize input to prevent clipping: bring peak to ±0.99
637 // Calculate peak level first
638 float peak = 0.0f;
639 for (int i = 0; i < samples_read; i++) {
640 float abs_val = fabsf(audio_buffer[i]);
641 if (abs_val > peak)
642 peak = abs_val;
643 }
644
645 // Apply normalization if peak exceeds 1.0
646 // Use 0.99 to leave headroom for processing
647 if (peak > 1.0f) {
648 float gain = 0.99f / peak;
649 for (int i = 0; i < samples_read; i++) {
650 audio_buffer[i] *= gain;
651 }
652 static int norm_count = 0;
653 norm_count++;
654 if (norm_count <= 5 || norm_count % 100 == 0) {
655 log_debug("Input normalization #%d: peak=%.4f, gain=%.4f", norm_count, peak, gain);
656 }
657 }
658
659 // DUMP: Raw captured audio (before any processing)
660 if (g_wav_capture_raw) {
661 wav_writer_write(g_wav_capture_raw, audio_buffer, samples_read);
662 }
663
664 // DEBUG: Log EVERY read to see what we're getting from the ring buffer
665 static int read_count = 0;
666 read_count++;
667 float sum_squares = 0.0f;
668 for (int i = 0; i < samples_read && i < 10; i++) {
669 sum_squares += audio_buffer[i] * audio_buffer[i];
670 }
671 float rms = sqrtf(sum_squares / (samples_read > 10 ? 10 : samples_read));
672 if (read_count <= 5 || read_count % 20 == 0) {
673 log_debug("Audio capture read #%d: available=%d, samples_read=%d, first=[%.6f,%.6f,%.6f], RMS=%.6f", read_count,
674 available, samples_read, samples_read > 0 ? audio_buffer[0] : 0.0f,
675 samples_read > 1 ? audio_buffer[1] : 0.0f, samples_read > 2 ? audio_buffer[2] : 0.0f, rms);
676 }
677
678 // Track sent samples for analysis
679 if (GET_OPTION(audio_analysis_enabled)) {
680 for (int i = 0; i < samples_read; i++) {
681 audio_analysis_track_sent_sample(audio_buffer[i]);
682 }
683 }
684
685 // Accumulate samples into Opus frame buffer
686 int samples_to_process = samples_read;
687 int sample_offset = 0;
688
689 while (samples_to_process > 0) {
690 // How many samples can we add to current frame?
691 int space_in_frame = OPUS_FRAME_SAMPLES - opus_frame_samples_collected;
692 int samples_to_copy = (samples_to_process < space_in_frame) ? samples_to_process : space_in_frame;
693
694 // Copy samples to frame buffer
695 memcpy(&opus_frame_buffer[opus_frame_samples_collected], &audio_buffer[sample_offset],
696 (size_t)samples_to_copy * sizeof(float));
697
698 opus_frame_samples_collected += samples_to_copy;
699 sample_offset += samples_to_copy;
700 samples_to_process -= samples_to_copy;
701
702 // Do we have a complete frame?
703 if (opus_frame_samples_collected >= OPUS_FRAME_SAMPLES) {
704 // Process through pipeline: AEC, filters, AGC, noise gate, Opus encode
705 uint8_t opus_packet[OPUS_MAX_PACKET_SIZE];
706
707 START_TIMER("opus_encode");
708 int opus_len = client_audio_pipeline_capture(g_audio_pipeline, opus_frame_buffer, OPUS_FRAME_SAMPLES,
709 opus_packet, OPUS_MAX_PACKET_SIZE);
710
711 static int encode_count = 0;
712 encode_count++;
713 double opus_elapsed_ns = STOP_TIMER("opus_encode");
714 if (encode_count % 50 == 0) {
715 if (opus_elapsed_ns >= 0.0) {
716 char _duration_str[32];
717 format_duration_ns(opus_elapsed_ns, _duration_str, sizeof(_duration_str));
718 log_dev("Opus encode #%d: %d samples -> %d bytes in %s", encode_count, OPUS_FRAME_SAMPLES, opus_len,
719 _duration_str);
720 }
721 }
722
723 double encode_time_ns = 0; // Timing already logged
724
725 total_encode_ns += encode_time_ns;
726 if (encode_time_ns > max_encode_ns)
727 max_encode_ns = encode_time_ns;
728
729 if (opus_len > 0) {
730
731 log_debug_every(LOG_RATE_VERY_FAST, "Pipeline encoded: %d samples -> %d bytes (compression: %.1fx)",
732 OPUS_FRAME_SAMPLES, opus_len,
733 (float)(OPUS_FRAME_SAMPLES * sizeof(float)) / (float)opus_len);
734
735 // Add to batch buffer
736 if (batch_frame_count < MAX_BATCH_FRAMES && batch_total_size + (size_t)opus_len <= sizeof(batch_buffer)) {
737 // Mark batch start time on first frame
738 if (batch_frame_count == 0) {
739 batch_start_time_ns = time_get_ns();
740 batch_has_data = true;
741 }
742
743 memcpy(batch_buffer + batch_total_size, opus_packet, (size_t)opus_len);
744 batch_frame_sizes[batch_frame_count] = (uint16_t)opus_len;
745 batch_total_size += (size_t)opus_len;
746 batch_frame_count++;
747
748 if (GET_OPTION(audio_analysis_enabled)) {
749 audio_analysis_track_sent_packet((size_t)opus_len);
750 }
751 }
752 } else if (opus_len == 0) {
753 // DTX frame (silence) - no data to send
754 log_debug_every(LOG_RATE_VERY_FAST, "Pipeline DTX frame (silence detected)");
755 }
756
757 // Reset frame buffer
758 opus_frame_samples_collected = 0;
759 }
760 }
761
762 // Queue batch for async sending (non-blocking - sender thread handles network I/O)
763 if (batch_frame_count > 0) {
764 static int batch_send_count = 0;
765 batch_send_count++;
766
767 START_TIMER("audio_queue_packet");
768 int queue_result = audio_queue_packet(batch_buffer, batch_total_size, batch_frame_sizes, batch_frame_count);
769 double queue_time_ns = STOP_TIMER("audio_queue_packet");
770
771 total_queue_ns += queue_time_ns;
772 if (queue_time_ns > max_queue_ns)
773 max_queue_ns = queue_time_ns;
774
775 if (queue_result < 0) {
776 log_debug_every(LOG_RATE_VERY_FAST, "Failed to queue audio batch (queue full)");
777 } else {
778 if (batch_send_count <= 10 || batch_send_count % 50 == 0) {
779 char queue_duration_str[32];
780 format_duration_ns(queue_time_ns, queue_duration_str, sizeof(queue_duration_str));
781 log_debug("CLIENT: Queued Opus batch #%d (%d frames, %zu bytes) in %s", batch_send_count, batch_frame_count,
782 batch_total_size, queue_duration_str);
783 }
784 // Track audio frame for FPS reporting
785 fps_frame_ns(&fps_tracker, time_get_ns(), "audio batch queued");
786 }
787
788 // Reset batch
789 batch_frame_count = 0;
790 batch_total_size = 0;
791 batch_has_data = false;
792 }
793
794 // Log overall loop iteration time periodically
795 double loop_time_ns = STOP_TIMER("audio_capture_loop_iteration");
796 total_loop_ns += loop_time_ns;
797 if (loop_time_ns > max_loop_ns)
798 max_loop_ns = loop_time_ns;
799
800 // Comprehensive timing report every 100 iterations (~2 seconds)
801 if (timing_loop_count % 100 == 0) {
802 char avg_loop_str[32], max_loop_str[32];
803 char avg_read_str[32], max_read_str[32];
804 char avg_encode_str[32], max_encode_str[32];
805 char avg_queue_str[32], max_queue_str[32];
806
807 format_duration_ns(total_loop_ns / timing_loop_count, avg_loop_str, sizeof(avg_loop_str));
808 format_duration_ns(max_loop_ns, max_loop_str, sizeof(max_loop_str));
809 format_duration_ns(total_read_ns / timing_loop_count, avg_read_str, sizeof(avg_read_str));
810 format_duration_ns(max_read_ns, max_read_str, sizeof(max_read_str));
811 format_duration_ns(total_encode_ns / timing_loop_count, avg_encode_str, sizeof(avg_encode_str));
812 format_duration_ns(max_encode_ns, max_encode_str, sizeof(max_encode_str));
813 format_duration_ns(total_queue_ns / timing_loop_count, avg_queue_str, sizeof(avg_queue_str));
814 format_duration_ns(max_queue_ns, max_queue_str, sizeof(max_queue_str));
815
816 log_debug("CAPTURE TIMING #%lu: loop avg=%s max=%s, read avg=%s max=%s", timing_loop_count, avg_loop_str,
817 max_loop_str, avg_read_str, max_read_str);
818 log_info(" encode avg=%s max=%s, queue avg=%s max=%s", avg_encode_str, max_encode_str, avg_queue_str,
819 max_queue_str);
820 }
821
822 // Check if we have a partial batch that's been waiting too long (time-based flush)
823 // This prevents batches from sitting indefinitely when audio capture is irregular
824 if (batch_has_data && batch_frame_count > 0) {
825 uint64_t now_ns = time_get_ns();
826 uint64_t elapsed_ns = time_elapsed_ns(batch_start_time_ns, now_ns);
827
828 if (elapsed_ns >= BATCH_TIMEOUT_NS) {
829 static int timeout_flush_count = 0;
830 timeout_flush_count++;
831
832 long elapsed_ms = (long)time_ns_to_ms(elapsed_ns);
833 log_debug_every(LOG_RATE_FAST, "Timeout flush #%d: %d frames (%zu bytes) after %ld ms", timeout_flush_count,
834 batch_frame_count, batch_total_size, elapsed_ms);
835
836 // Queue partial batch
837 int queue_result = audio_queue_packet(batch_buffer, batch_total_size, batch_frame_sizes, batch_frame_count);
838 if (queue_result == 0) {
839 // Track audio frame for FPS reporting
840 fps_frame_ns(&fps_tracker, time_get_ns(), "audio batch timeout flush");
841 }
842
843 // Reset batch
844 batch_frame_count = 0;
845 batch_total_size = 0;
846 batch_has_data = false;
847 }
848 }
849
850 // Yield to reduce CPU usage - audio arrives at ~20ms per Opus frame (960 samples @ 48kHz)
851 // Without sleep, thread spins at 90-100% CPU constantly checking for new samples
852 // Even 1ms sleep reduces CPU usage from 90% to <10% with minimal latency impact
853 platform_sleep_us(1 * US_PER_MS_INT); // 1ms
854 } else {
855 // Track loop time even when no samples processed
856 double loop_time_ns = STOP_TIMER("audio_capture_loop_iteration");
857 total_loop_ns += loop_time_ns;
858 if (loop_time_ns > max_loop_ns)
859 max_loop_ns = loop_time_ns;
860
861 // Flush partial batch before sleeping on error path (prevent starvation)
862 if (batch_has_data && batch_frame_count > 0) {
863 uint64_t now_ns = time_get_ns();
864 uint64_t elapsed_ns = time_elapsed_ns(batch_start_time_ns, now_ns);
865
866 if (elapsed_ns >= BATCH_TIMEOUT_NS) {
867 long elapsed_ms = (long)time_ns_to_ms(elapsed_ns);
868 log_debug_every(LOG_RATE_FAST, "Error path timeout flush: %d frames (%zu bytes) after %ld ms",
869 batch_frame_count, batch_total_size, elapsed_ms);
870 (void)audio_queue_packet(batch_buffer, batch_total_size, batch_frame_sizes, batch_frame_count);
871 batch_frame_count = 0;
872 batch_total_size = 0;
873 batch_has_data = false;
874 }
875 }
876
877 platform_sleep_us(5 * US_PER_MS_INT); // 5ms (error path - was 50ms, caused gaps!)
878 }
879 }
880
881 log_debug("Audio capture thread stopped");
882 atomic_store(&g_audio_capture_thread_exited, true);
883
884 // Clean up thread-local error context before exit
886
887 return NULL;
888}
889
890/* ============================================================================
891 * Public Interface Functions
892 * ============================================================================ */
893
905 if (!GET_OPTION(audio_enabled)) {
906 return 0; // Audio disabled - not an error
907 }
908
909 // Initialize WAV dumper for received audio if debugging enabled
910 if (wav_dump_enabled()) {
911 g_wav_playback_received = wav_writer_open("/tmp/audio_playback_received.wav", AUDIO_SAMPLE_RATE, 1);
912 if (g_wav_playback_received) {
913 log_debug("Audio debugging enabled: dumping received audio to /tmp/audio_playback_received.wav");
914 }
915 }
916
917 // Initialize PortAudio context using library function
918 log_debug("DEBUG: About to call audio_init()...");
919 if (audio_init(&g_audio_context) != ASCIICHAT_OK) {
920 log_error("Failed to initialize audio system");
921 // Clean up WAV writer if it was opened
922 if (g_wav_playback_received) {
923 wav_writer_close(g_wav_playback_received);
924 g_wav_playback_received = NULL;
925 }
926 return -1;
927 }
928 log_debug("DEBUG: audio_init() completed successfully");
929
930 // Create unified audio pipeline (handles AEC, AGC, noise suppression, Opus)
931 client_audio_pipeline_config_t pipeline_config = client_audio_pipeline_default_config();
932 pipeline_config.opus_bitrate = 128000; // 128 kbps AUDIO mode for music quality
933
934 // Enable echo cancellation, AGC, and essential processing for clear audio
935 // Noise suppression and VAD can destroy music quality, so keep them disabled
936 pipeline_config.flags.echo_cancel = true; // ENABLE: removes echo
937 pipeline_config.flags.jitter_buffer = true; // ENABLE: needed for AEC sync
938 pipeline_config.flags.noise_suppress = false; // DISABLED: destroys music quality
939 pipeline_config.flags.agc = true; // ENABLE: boost quiet microphones (35 dB gain)
940 pipeline_config.flags.vad = false; // DISABLED: destroys music quality
941 pipeline_config.flags.compressor = true; // ENABLE: prevent clipping from AGC boost
942 pipeline_config.flags.noise_gate = false; // DISABLED: would cut quiet music passages
943 pipeline_config.flags.highpass = true; // ENABLE: remove rumble and low-frequency feedback
944 pipeline_config.flags.lowpass = false; // DISABLED: preserve high-frequency content
945
946 // Set jitter buffer margin for smooth playback without excessive delay
947 // 100ms is conservative - AEC3 will adapt to actual network delay automatically
948 // We don't tune this; let the system adapt to its actual conditions
949 pipeline_config.jitter_margin_ns = 100;
950
951 log_debug("DEBUG: About to create audio pipeline...");
952 g_audio_pipeline = client_audio_pipeline_create(&pipeline_config);
953 log_debug("DEBUG: client_audio_pipeline_create() returned");
954 if (!g_audio_pipeline) {
955 log_error("Failed to create audio pipeline");
956 audio_destroy(&g_audio_context);
957 // Clean up WAV writer if it was opened
958 if (g_wav_playback_received) {
959 wav_writer_close(g_wav_playback_received);
960 g_wav_playback_received = NULL;
961 }
962 return -1;
963 }
964
965 log_debug("Audio pipeline created: %d Hz sample rate, %d bps bitrate", pipeline_config.sample_rate,
966 pipeline_config.opus_bitrate);
967
968 // Associate pipeline with audio context for echo cancellation
969 // The audio output callback will feed playback samples directly to AEC3 from the speaker output,
970 // ensuring proper timing synchronization (not from the decode path 50-100ms earlier)
971 audio_set_pipeline(&g_audio_context, (void *)g_audio_pipeline);
972
973 // Start full-duplex audio (simultaneous capture + playback for perfect AEC3 timing)
974 if (audio_start_duplex(&g_audio_context) != ASCIICHAT_OK) {
975 log_error("Failed to start full-duplex audio");
976 client_audio_pipeline_destroy(g_audio_pipeline);
977 g_audio_pipeline = NULL;
978 audio_destroy(&g_audio_context);
979 // Clean up WAV writer if it was opened
980 if (g_wav_playback_received) {
981 wav_writer_close(g_wav_playback_received);
982 g_wav_playback_received = NULL;
983 }
984 return -1;
985 }
986
987 // Initialize async audio sender (decouples capture from network I/O)
988 audio_sender_init();
989
990 return 0;
991}
992
1004 log_debug("audio_start_thread called: audio_enabled=%d", GET_OPTION(audio_enabled));
1005
1006 if (!GET_OPTION(audio_enabled)) {
1007 log_debug("Audio is disabled, skipping audio capture thread creation");
1008 return 0; // Audio disabled - not an error
1009 }
1010
1011 // Check if thread is already running (not just created flag)
1012 if (g_audio_capture_thread_created && !atomic_load(&g_audio_capture_thread_exited)) {
1013 log_warn("Audio capture thread already running");
1014 return 0;
1015 }
1016
1017 // If thread exited, allow recreation
1018 if (g_audio_capture_thread_created && atomic_load(&g_audio_capture_thread_exited)) {
1019 log_debug("Previous audio capture thread exited, recreating");
1020 // Use timeout to prevent indefinite blocking
1021 int join_result = asciichat_thread_join_timeout(&g_audio_capture_thread, NULL, 5000 * NS_PER_MS_INT);
1022 if (join_result != 0) {
1023 log_warn("Audio capture thread join timed out after 5s - thread may be deadlocked, "
1024 "forcing thread handle reset (stuck thread resources will not be cleaned up)");
1025 // Thread is stuck - we can't safely reuse the handle, but we can reset our tracking
1026 // This is a resource leak of the stuck thread but continuing is safer than hanging
1027 }
1028 g_audio_capture_thread_created = false;
1029 }
1030
1031 // Notify server we're starting to send audio BEFORE spawning thread
1032 // IMPORTANT: Must send STREAM_START before thread starts sending packets to avoid protocol violation
1033 if (threaded_send_stream_start_packet(STREAM_TYPE_AUDIO) < 0) {
1034 log_error("Failed to send audio stream start packet");
1035 return -1; // Don't start thread if we can't notify server
1036 }
1037
1038 // Start audio capture thread
1039 atomic_store(&g_audio_capture_thread_exited, false);
1040 if (thread_pool_spawn(g_client_worker_pool, audio_capture_thread_func, NULL, 4, "audio_capture") != ASCIICHAT_OK) {
1041 log_error("Failed to spawn audio capture thread in worker pool");
1042 LOG_ERRNO_IF_SET("Audio capture thread creation failed");
1043 return -1;
1044 }
1045
1046 g_audio_capture_thread_created = true;
1047
1048 return 0;
1049}
1050
1060 // Signal audio sender thread to exit first.
1061 // This must happen before thread_pool_stop_all() is called, otherwise the sender
1062 // thread will be stuck in cond_wait() and thread_pool_stop_all() will hang forever.
1063 // The sender thread uses a condition variable to wait for packets - we must wake it up.
1064 if (g_audio_send_queue_initialized) {
1065 log_debug("Signaling audio sender thread to exit");
1066 atomic_store(&g_audio_sender_should_exit, true);
1067 mutex_lock(&g_audio_send_queue_mutex);
1068 cond_signal(&g_audio_send_queue_cond);
1069 mutex_unlock(&g_audio_send_queue_mutex);
1070 }
1071
1072 if (!THREAD_IS_CREATED(g_audio_capture_thread_created)) {
1073 return;
1074 }
1075
1076 // Note: We don't call signal_exit() here because that's for global shutdown only
1077 // The audio capture thread checks server_connection_is_active() to detect connection loss
1078
1079 // Wait for thread to exit gracefully
1080 int wait_count = 0;
1081 while (wait_count < 20 && !atomic_load(&g_audio_capture_thread_exited)) {
1082 platform_sleep_us(100 * US_PER_MS_INT); // 100ms
1083 wait_count++;
1084 }
1085
1086 if (!atomic_load(&g_audio_capture_thread_exited)) {
1087 log_warn("Audio capture thread not responding - will be joined by thread pool");
1088 }
1089
1090 // Thread will be joined by thread_pool_stop_all() in protocol_stop_connection()
1091 g_audio_capture_thread_created = false;
1092
1093 log_debug("Audio capture thread stopped");
1094}
1095
1104 return atomic_load(&g_audio_capture_thread_exited);
1105}
1106
1116 if (!GET_OPTION(audio_enabled)) {
1117 return;
1118 }
1119
1120 // Stop capture thread first (stops producing packets)
1122
1123 // Stop async sender thread (drains queue and exits)
1124 audio_sender_cleanup();
1125
1126 // Terminate PortAudio FIRST to properly free device resources before cleanup
1127 // This must happen before audio_stop_duplex() and audio_destroy()
1129
1130 // Stop audio stream before destroying pipeline to prevent race condition.
1131 // PortAudio may invoke the callback one more time after we request stop.
1132 // We need to clear the pipeline pointer first so the callback can't access freed memory.
1133 if (g_audio_context.initialized) {
1134 audio_stop_duplex(&g_audio_context);
1135 }
1136
1137 // Clear the pipeline pointer from audio context BEFORE destroying pipeline
1138 // This prevents any lingering PortAudio callbacks from trying to access freed memory
1139 audio_set_pipeline(&g_audio_context, NULL);
1140
1141 // Sleep to allow CoreAudio threads to finish executing callbacks.
1142 // On macOS, CoreAudio's internal threads may continue running after Pa_StopStream() returns.
1143 // The duplex_callback may still be in-flight on other threads. Even after we set the pipeline
1144 // pointer to NULL, a CoreAudio thread may have already cached the pointer before the assignment.
1145 // This sleep ensures all in-flight callbacks have fully completed before we destroy the pipeline.
1146 // 500ms is sufficient on macOS for CoreAudio's internal thread pool to completely wind down.
1147 platform_sleep_us(500 * US_PER_MS_INT); // 500ms - macOS CoreAudio needs time to shut down all threads
1148
1149 // Destroy audio pipeline (handles Opus, AEC, etc.)
1150 if (g_audio_pipeline) {
1151 client_audio_pipeline_destroy(g_audio_pipeline);
1152 g_audio_pipeline = NULL;
1153 log_debug("Audio pipeline destroyed");
1154 }
1155
1156 // Close WAV dumpers
1157 if (g_wav_capture_raw) {
1158 wav_writer_close(g_wav_capture_raw);
1159 g_wav_capture_raw = NULL;
1160 log_debug("Closed audio capture raw dump");
1161 }
1162 if (g_wav_capture_processed) {
1163 wav_writer_close(g_wav_capture_processed);
1164 g_wav_capture_processed = NULL;
1165 log_debug("Closed audio capture processed dump");
1166 }
1167 if (g_wav_playback_received) {
1168 wav_writer_close(g_wav_playback_received);
1169 g_wav_playback_received = NULL;
1170 log_debug("Closed audio playback received dump");
1171 }
1172
1173 // Finally destroy the audio context
1174 if (g_audio_context.initialized) {
1175 audio_destroy(&g_audio_context);
1176 }
1177}
1178
1185client_audio_pipeline_t *audio_get_pipeline(void) {
1186 return g_audio_pipeline;
1187}
1188
1199int audio_decode_opus(const uint8_t *opus_data, size_t opus_len, float *output, int max_samples) {
1200 if (!g_audio_pipeline || !output || max_samples <= 0) {
1201 return -1;
1202 }
1203
1204 return client_audio_pipeline_playback(g_audio_pipeline, opus_data, (int)opus_len, output, max_samples);
1205}
1206
1213audio_context_t *audio_get_context(void) {
1214 return &g_audio_context;
1215}
void audio_analysis_track_received_sample(float sample)
Definition analysis.c:272
void audio_analysis_track_sent_sample(float sample)
Definition analysis.c:173
void audio_analysis_track_sent_packet(size_t size)
Definition analysis.c:245
void asciichat_errno_destroy(void)
thread_pool_t * g_client_worker_pool
Global client worker thread pool.
int client_audio_pipeline_playback(client_audio_pipeline_t *pipeline, const uint8_t *opus_in, int opus_len, float *output, int num_samples)
client_audio_pipeline_t * client_audio_pipeline_create(const client_audio_pipeline_config_t *config)
Create and initialize a client audio pipeline.
client_audio_pipeline_config_t client_audio_pipeline_default_config(void)
int client_audio_pipeline_capture(client_audio_pipeline_t *pipeline, const float *input, int num_samples, uint8_t *opus_out, int max_opus_len)
void client_audio_pipeline_destroy(client_audio_pipeline_t *pipeline)
bool should_exit(void)
Definition main.c:90
void fps_frame_ns(fps_t *tracker, uint64_t current_time_ns, const char *context)
Definition fps.c:52
void fps_init(fps_t *tracker, int expected_fps, const char *name)
Definition fps.c:32
void audio_process_received_samples(const float *samples, int num_samples)
Process received audio samples from server.
void audio_stop_thread()
Stop audio capture thread.
int audio_client_init()
Initialize audio subsystem.
client_audio_pipeline_t * audio_get_pipeline(void)
Get the audio pipeline (for advanced usage)
void audio_cleanup()
Cleanup audio subsystem.
int audio_decode_opus(const uint8_t *opus_data, size_t opus_len, float *output, int max_samples)
Decode Opus packet using the audio pipeline.
bool audio_thread_exited()
Check if audio capture thread has exited.
audio_context_t * audio_get_context(void)
Get the global audio context for use by other subsystems.
int audio_start_thread()
Start audio capture thread.
bool server_connection_is_active()
Check if server connection is currently active.
bool server_connection_is_lost()
Check if connection loss has been detected.
asciichat_error_t threaded_send_audio_opus_batch(const uint8_t *opus_data, size_t opus_size, const uint16_t *frame_sizes, int frame_count)
Thread-safe Opus audio batch packet transmission.
asciichat_error_t threaded_send_stream_start_packet(uint32_t stream_type)
Thread-safe stream start packet transmission.
asciichat_error_t audio_init(audio_context_t *ctx)
size_t audio_ring_buffer_available_read(audio_ring_buffer_t *rb)
asciichat_error_t audio_start_duplex(audio_context_t *ctx)
void audio_destroy(audio_context_t *ctx)
asciichat_error_t audio_stop_duplex(audio_context_t *ctx)
asciichat_error_t audio_write_samples(audio_context_t *ctx, const float *buffer, int num_samples)
void audio_terminate_portaudio_final(void)
Terminate PortAudio and free all device resources.
void audio_set_pipeline(audio_context_t *ctx, void *pipeline)
asciichat_error_t audio_read_samples(audio_context_t *ctx, float *buffer, int num_samples)
void platform_sleep_us(unsigned int us)
ascii-chat Server Mode Entry Point Header
#define OPUS_FRAME_SAMPLES
#define MAX_BATCH_FRAMES
#define AUDIO_SEND_QUEUE_SIZE
#define CAPTURE_READ_SIZE
#define BATCH_TIMEOUT_NS
#define OPUS_MAX_PACKET_SIZE
ascii-chat Client Audio Processing Management Interface
Audio packet for async sending.
uint8_t data[8 *4000]
asciichat_error_t thread_pool_spawn(thread_pool_t *pool, void *(*thread_func)(void *), void *thread_arg, int stop_id, const char *thread_name)
Definition thread_pool.c:70
int mutex_init(mutex_t *mutex)
Definition threading.c:16
int mutex_destroy(mutex_t *mutex)
Definition threading.c:21
uint64_t time_get_ns(void)
Definition util/time.c:48
int format_duration_ns(double nanoseconds, char *buffer, size_t buffer_size)
Definition util/time.c:275
uint64_t time_elapsed_ns(uint64_t start_ns, uint64_t end_ns)
Definition util/time.c:90
bool timer_is_initialized(void)
Definition util/time.c:267
bool timer_system_init(void)
Definition util/time.c:125
bool wav_dump_enabled(void)
Definition wav_writer.c:139
wav_writer_t * wav_writer_open(const char *filepath, int sample_rate, int channels)
Definition wav_writer.c:49
int wav_writer_write(wav_writer_t *writer, const float *samples, int num_samples)
Definition wav_writer.c:95
void wav_writer_close(wav_writer_t *writer)
Definition wav_writer.c:113