16#include <ascii-chat/session/host.h>
17#include <ascii-chat/common.h>
18#include <ascii-chat/options/options.h>
19#include <ascii-chat/asciichat_errno.h>
20#include <ascii-chat/platform/socket.h>
21#include <ascii-chat/platform/mutex.h>
22#include <ascii-chat/platform/thread.h>
23#include <ascii-chat/platform/network.h>
24#include <ascii-chat/log/logging.h>
25#include <ascii-chat/network/client.h>
26#include <ascii-chat/network/packet.h>
27#include <ascii-chat/ringbuffer.h>
28#include <ascii-chat/session/audio.h>
29#include <ascii-chat/audio/opus_codec.h>
30#include <ascii-chat/util/time.h>
31#include <ascii-chat/video/ascii.h>
43#define SESSION_HOST_DEFAULT_MAX_CLIENTS 32
168 SET_ERRNO(ERROR_INVALID_PARAM,
"session_host_create: NULL config");
176 host->
port = config->port > 0 ? config->port : OPT_PORT_INT_DEFAULT;
180 if (config->ipv4_address) {
184 if (config->ipv6_address) {
188 if (config->key_path) {
192 if (config->password) {
212 SET_ERRNO(ERROR_THREAD,
"Failed to initialize clients mutex");
243 if (host->
socket_v4 != INVALID_SOCKET_VALUE) {
247 if (host->
socket_v6 != INVALID_SOCKET_VALUE) {
285static socket_t create_listen_socket(
const char *address,
int port) {
286 struct addrinfo hints, *result = NULL, *rp = NULL;
292 memset(&hints, 0,
sizeof(hints));
293 hints.ai_family = AF_INET;
294 hints.ai_socktype = SOCK_STREAM;
295 hints.ai_flags = AI_PASSIVE;
299 int s = getaddrinfo(address, port_str, &hints, &result);
301 SET_ERRNO(ERROR_NETWORK,
"getaddrinfo failed: %s", gai_strerror(s));
302 return INVALID_SOCKET_VALUE;
305 socket_t listen_sock = INVALID_SOCKET_VALUE;
306 for (rp = result; rp != NULL; rp = rp->ai_next) {
307 listen_sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
308 if (listen_sock == INVALID_SOCKET_VALUE) {
314 if (setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, (
const char *)&reuse,
sizeof(reuse)) < 0) {
315 log_warn(
"setsockopt SO_REUSEADDR failed");
318 if (bind(listen_sock, rp->ai_addr, (
int)rp->ai_addrlen) == 0) {
322 socket_close(listen_sock);
323 listen_sock = INVALID_SOCKET_VALUE;
326 freeaddrinfo(result);
328 if (listen_sock == INVALID_SOCKET_VALUE) {
329 SET_ERRNO_SYS(ERROR_NETWORK_BIND,
"Failed to bind listen socket on %s:%d", address, port);
330 return INVALID_SOCKET_VALUE;
333 if (listen(listen_sock, SOMAXCONN) != 0) {
334 SET_ERRNO_SYS(ERROR_NETWORK_BIND,
"listen() failed on %s:%d", address, port);
335 socket_close(listen_sock);
336 return INVALID_SOCKET_VALUE;
350static void *accept_loop_thread(
void *arg) {
355 log_info(
"Accept loop started");
370 if (host->
socket_v4 != INVALID_SOCKET_VALUE) {
376 int activity = select(max_fd + 1, &readfds, NULL, NULL, &tv);
378 if (errno != EINTR) {
379 log_error(
"select() failed");
391 struct sockaddr_in client_addr;
392 socklen_t client_addr_len =
sizeof(client_addr);
395 socket_t client_socket = accept(host->
socket_v4, (
struct sockaddr *)&client_addr, &client_addr_len);
396 if (client_socket == INVALID_SOCKET_VALUE) {
397 log_warn(
"accept() failed");
403 inet_ntop(AF_INET, &client_addr.sin_addr, client_ip,
sizeof(client_ip));
404 int client_port = ntohs(client_addr.sin_port);
406 log_info(
"New connection from %s:%d", client_ip, client_port);
410 if (client_id == 0) {
411 log_error(
"Failed to add client");
412 socket_close(client_socket);
417 log_info(
"Accept loop stopped");
429static void *receive_loop_thread(
void *arg) {
434 log_info(
"Receive loop started");
446 max_fd = INVALID_SOCKET_VALUE;
453 if (max_fd == INVALID_SOCKET_VALUE || host->
clients[i].
socket > max_fd) {
461 if (max_fd == INVALID_SOCKET_VALUE) {
466 int activity = select((
int)max_fd + 1, &readfds, NULL, NULL, &tv);
468 if (errno != EINTR) {
469 log_error(
"select() failed in receive loop");
496 if (result != ASCIICHAT_OK) {
497 log_warn(
"packet_receive failed from client %u: %d", host->
clients[i].
client_id, result);
508 case PACKET_TYPE_IMAGE_FRAME:
510 if (data && len >=
sizeof(image_frame_packet_t)) {
511 const image_frame_packet_t *frame_hdr = (
const image_frame_packet_t *)data;
512 const uint8_t *pixel_data = (
const uint8_t *)data +
sizeof(image_frame_packet_t);
513 size_t pixel_data_size = len -
sizeof(image_frame_packet_t);
521 if (img->w == (
int)frame_hdr->width && img->h == (
int)frame_hdr->height) {
523 size_t expected_size = (size_t)frame_hdr->width * frame_hdr->height * 3;
524 if (pixel_data_size >= expected_size) {
525 memcpy(img->pixels, pixel_data, expected_size);
526 log_debug_every(500 * US_PER_MS_INT,
"Frame received from client %u (%ux%u)", client_id,
527 frame_hdr->width, frame_hdr->height);
537 case PACKET_TYPE_AUDIO_OPUS_BATCH:
539 if (data && len > 16) {
540 const uint8_t *batch_data = (
const uint8_t *)data;
543 uint32_t batch_frame_count = *(
const uint32_t *)(batch_data + 8);
545 if (batch_frame_count > 0 && batch_frame_count <= 1000) {
546 const uint16_t *frame_sizes = (
const uint16_t *)(batch_data + 16);
547 const uint8_t *opus_frames = batch_data + 16 + (batch_frame_count *
sizeof(uint16_t));
554 const uint8_t *current_frame = opus_frames;
555 for (uint32_t k = 0; k < batch_frame_count; k++) {
556 uint16_t frame_size = frame_sizes[k];
557 if (frame_size > 0) {
559 float decoded_samples[960];
562 if (decoded_count > 0) {
564 for (
int s = 0; s < decoded_count; s++) {
569 current_frame += frame_size;
571 log_debug_every(NS_PER_MS_INT,
"Audio batch received from client %u (%u frames)", client_id,
581 case PACKET_TYPE_STREAM_START:
582 log_info(
"Client %u started streaming", client_id);
593 case PACKET_TYPE_STREAM_STOP:
594 log_info(
"Client %u stopped streaming", client_id);
605 case PACKET_TYPE_PING:
607 log_debug_every(NS_PER_MS_INT,
"PING from client %u", client_id);
608 packet_send(client_socket, PACKET_TYPE_PONG, NULL, 0);
611 case PACKET_TYPE_CLIENT_LEAVE:
612 log_info(
"Client %u requested disconnect", client_id);
617 log_warn(
"Unknown packet type %u from client %u", ptype, client_id);
631 log_info(
"Receive loop stopped");
644static void *host_render_thread(
void *arg) {
647 SET_ERRNO(ERROR_INVALID_PARAM,
"host_render_thread: invalid host");
651 log_info(
"Host render thread started");
653 uint64_t last_video_render_ns = 0;
654 uint64_t last_audio_render_ns = 0;
660 if (
time_elapsed_ns(last_video_render_ns, now_ns) >= NS_PER_MS_INT * 16) {
665 int active_video_count = 0;
668 active_video_count++;
673 if (active_video_count > 0) {
675 char **ascii_frames = SAFE_MALLOC(active_video_count *
sizeof(
char *),
char **);
676 ascii_frame_source_t *sources =
677 SAFE_MALLOC(active_video_count *
sizeof(ascii_frame_source_t), ascii_frame_source_t *);
679 if (ascii_frames && sources) {
688 ascii_frames[frame_idx] =
690 if (ascii_frames[frame_idx]) {
691 sources[frame_idx].frame_data = ascii_frames[frame_idx];
692 sources[frame_idx].frame_size = strlen(ascii_frames[frame_idx]) + 1;
694 sources[frame_idx].frame_data =
"";
695 sources[frame_idx].frame_size = 1;
702 size_t grid_size = 0;
703 char *grid_frame =
ascii_create_grid(sources, active_video_count, 80, 24, &grid_size);
712 SAFE_FREE(grid_frame);
716 for (
int i = 0; i < active_video_count; i++) {
717 if (ascii_frames[i]) {
718 SAFE_FREE(ascii_frames[i]);
723 SAFE_FREE(ascii_frames);
728 log_debug_every(NS_PER_MS_INT,
"Video render cycle (%d active)", active_video_count);
729 last_video_render_ns = now_ns;
733 if (
time_elapsed_ns(last_audio_render_ns, now_ns) >= NS_PER_MS_INT * 10) {
741 float mixed_audio[960];
742 memset(mixed_audio, 0,
sizeof(mixed_audio));
757 for (
int j = 0; j < 960; j++) {
761 mixed_audio[j] += sample;
763 if (mixed_audio[j] > 1.0f) {
764 mixed_audio[j] = 1.0f;
765 }
else if (mixed_audio[j] < -1.0f) {
766 mixed_audio[j] = -1.0f;
775 uint8_t opus_buffer[1000];
780 uint16_t frame_sizes[1] = {(uint16_t)opus_len};
785 log_debug_every(NS_PER_MS_INT,
"Audio render cycle");
786 last_audio_render_ns = now_ns;
793 log_info(
"Host render thread stopped");
799 return SET_ERRNO(ERROR_INVALID_PARAM,
"session_host_start: invalid host");
810 host->
socket_v4 = create_listen_socket(bind_address, host->
port);
811 if (host->
socket_v4 == INVALID_SOCKET_VALUE) {
812 log_error(
"Failed to create IPv4 listen socket");
814 host->
callbacks.on_error(host, ERROR_NETWORK_BIND,
"Failed to create listen socket", host->
user_data);
820 log_info(
"Session host listening on %s:%d", bind_address, host->
port);
825 log_error(
"Failed to spawn accept loop thread");
828 host->
callbacks.on_error(host, ERROR_THREAD,
"Failed to spawn accept loop thread", host->
user_data);
833 return SET_ERRNO(ERROR_THREAD,
"Failed to spawn accept loop thread");
839 log_error(
"Failed to spawn receive loop thread");
844 host->
callbacks.on_error(host, ERROR_THREAD,
"Failed to spawn receive loop thread", host->
user_data);
849 return SET_ERRNO(ERROR_THREAD,
"Failed to spawn receive loop thread");
866 log_info(
"Render thread joined");
873 log_info(
"Receive loop thread joined");
880 log_info(
"Accept loop thread joined");
905 if (host->
socket_v4 != INVALID_SOCKET_VALUE) {
909 if (host->
socket_v6 != INVALID_SOCKET_VALUE) {
938 SET_ERRNO(ERROR_SESSION_FULL,
"Maximum clients reached");
974 SET_ERRNO(ERROR_MEMORY,
"Failed to allocate media buffers for client");
1006 SET_ERRNO(ERROR_SESSION_FULL,
"Maximum clients reached");
1014 SET_ERRNO(ERROR_INVALID_PARAM,
"Memory participant already exists");
1048 SET_ERRNO(ERROR_MEMORY,
"Failed to allocate media buffers for memory participant");
1074 return SET_ERRNO(ERROR_INVALID_PARAM,
"session_host_inject_frame: invalid parameters");
1086 return SET_ERRNO(ERROR_INVALID_STATE,
"Memory participant has no video buffer");
1091 if (dest->w != frame->w || dest->h != frame->h) {
1098 return SET_ERRNO(ERROR_MEMORY,
"Failed to reallocate video buffer");
1104 memcpy(dest->pixels, frame->pixels, frame->w * frame->h *
sizeof(rgb_pixel_t));
1109 return ASCIICHAT_OK;
1114 return SET_ERRNO(ERROR_NOT_FOUND,
"Memory participant not found");
1119 if (!host || !host->
initialized || !samples || count == 0) {
1120 return SET_ERRNO(ERROR_INVALID_PARAM,
"session_host_inject_audio: invalid parameters");
1132 return SET_ERRNO(ERROR_INVALID_STATE,
"Memory participant has no audio buffer");
1137 for (
size_t j = 0; j < count; j++) {
1145 if (written < count) {
1146 log_warn_every(NS_PER_MS_INT,
"Audio ringbuffer full, dropped %zu samples", count - written);
1152 return ASCIICHAT_OK;
1157 return SET_ERRNO(ERROR_NOT_FOUND,
"Memory participant not found");
1162 return SET_ERRNO(ERROR_INVALID_PARAM,
"session_host_remove_client: invalid host");
1196 return ASCIICHAT_OK;
1201 return SET_ERRNO(ERROR_NOT_FOUND,
"Client not found: %u", client_id);
1206 return SET_ERRNO(ERROR_INVALID_PARAM,
"session_host_find_client: invalid parameter");
1214 SAFE_STRNCPY(info->ip_address, host->
clients[i].
ip_address,
sizeof(info->ip_address));
1221 return ASCIICHAT_OK;
1226 return SET_ERRNO(ERROR_NOT_FOUND,
"Client not found: %u", client_id);
1237 if (!host || !host->
initialized || !ids || max_ids <= 0) {
1244 for (
int i = 0; i < host->
max_clients && count < max_ids; i++) {
1260 return SET_ERRNO(ERROR_INVALID_PARAM,
"session_host_broadcast_frame: invalid parameter");
1264 return SET_ERRNO(ERROR_INVALID_STATE,
"session_host_broadcast_frame: not running");
1268 size_t frame_len = strlen(frame) + 1;
1269 asciichat_error_t result = ASCIICHAT_OK;
1274 asciichat_error_t send_result =
1276 if (send_result != ASCIICHAT_OK) {
1277 log_warn(
"Failed to send ASCII frame to client %u", host->
clients[i].
client_id);
1278 result = send_result;
1289 return SET_ERRNO(ERROR_INVALID_PARAM,
"session_host_send_frame: invalid parameter");
1293 return SET_ERRNO(ERROR_INVALID_STATE,
"session_host_send_frame: not running");
1297 size_t frame_len = strlen(frame) + 1;
1303 asciichat_error_t result =
1311 return SET_ERRNO(ERROR_NOT_FOUND,
"session_host_send_frame: client %u not found", client_id);
1320 return SET_ERRNO(ERROR_INVALID_PARAM,
"session_host_start_render: invalid host");
1324 return SET_ERRNO(ERROR_INVALID_STATE,
"session_host_start_render: not running");
1328 return ASCIICHAT_OK;
1335 return SET_ERRNO(ERROR_INVALID_STATE,
"Failed to create audio context");
1345 return SET_ERRNO(ERROR_INVALID_STATE,
"Failed to create Opus decoder");
1357 return SET_ERRNO(ERROR_INVALID_STATE,
"Failed to create Opus encoder");
1364 log_error(
"Failed to spawn render thread");
1366 return SET_ERRNO(ERROR_THREAD,
"Failed to spawn render thread");
1369 log_info(
"Host render thread started");
1370 return ASCIICHAT_OK;
1402 log_info(
"Host render thread stopped");
1410 acip_transport_t *transport) {
1412 return SET_ERRNO(ERROR_INVALID_PARAM,
"Host is NULL or not initialized");
1420 log_info(
"session_host_set_client_transport: Setting transport=%p for client %u (was=%p)", transport, client_id,
1426 log_info(
"WebRTC transport now active for client %u", client_id);
1428 log_info(
"WebRTC transport cleared for client %u, reverting to socket", client_id);
1432 return ASCIICHAT_OK;
1437 log_warn(
"Client %u not found", client_id);
1438 return SET_ERRNO(ERROR_NOT_FOUND,
"Client not found");
1473 return has_transport;
char * ascii_convert(image_t *original, const ssize_t width, const ssize_t height, const bool color, const bool _aspect_ratio, const bool stretch, const char *palette_chars, const char luminance_palette[256])
char * ascii_create_grid(ascii_frame_source_t *sources, int source_count, int width, int height, size_t *out_size)
char g_default_luminance_palette[256]
void session_host_stop_render(session_host_t *host)
asciichat_error_t session_host_inject_audio(session_host_t *host, uint32_t participant_id, const float *samples, size_t count)
bool session_host_client_has_transport(session_host_t *host, uint32_t client_id)
bool session_host_is_running(session_host_t *host)
struct session_host session_host_t
Internal session host structure.
asciichat_error_t session_host_inject_frame(session_host_t *host, uint32_t participant_id, const image_t *frame)
uint32_t session_host_add_memory_participant(session_host_t *host)
uint32_t session_host_add_client(session_host_t *host, socket_t socket, const char *ip, int port)
void session_host_stop(session_host_t *host)
acip_transport_t * session_host_get_client_transport(session_host_t *host, uint32_t client_id)
asciichat_error_t session_host_send_frame(session_host_t *host, uint32_t client_id, const char *frame)
asciichat_error_t session_host_set_client_transport(session_host_t *host, uint32_t client_id, acip_transport_t *transport)
int session_host_get_client_count(session_host_t *host)
asciichat_error_t session_host_remove_client(session_host_t *host, uint32_t client_id)
asciichat_error_t session_host_find_client(session_host_t *host, uint32_t client_id, session_host_client_info_t *info)
asciichat_error_t session_host_start(session_host_t *host)
int session_host_get_client_ids(session_host_t *host, uint32_t *ids, int max_ids)
#define SESSION_HOST_DEFAULT_MAX_CLIENTS
Default maximum clients.
asciichat_error_t session_host_broadcast_frame(session_host_t *host, const char *frame)
void session_host_destroy(session_host_t *host)
session_host_t * session_host_create(const session_host_config_t *config)
asciichat_error_t session_host_start_render(session_host_t *host)
void session_audio_destroy(session_audio_ctx_t *ctx)
session_audio_ctx_t * session_audio_create(bool is_host)
opus_codec_t * opus_codec_create_decoder(int sample_rate)
int opus_codec_decode(opus_codec_t *codec, const uint8_t *data, size_t data_len, float *out_samples, int out_num_samples)
opus_codec_t * opus_codec_create_encoder(opus_application_t application, int sample_rate, int bitrate)
void opus_codec_destroy(opus_codec_t *codec)
size_t opus_codec_encode(opus_codec_t *codec, const float *samples, int num_samples, uint8_t *out_data, size_t out_size)
asciichat_error_t packet_receive(socket_t sockfd, packet_type_t *type, void **data, size_t *len)
Receive a packet with proper header validation and CRC32 checking.
asciichat_error_t av_send_audio_opus_batch(socket_t sockfd, const uint8_t *opus_data, size_t opus_size, const uint16_t *frame_sizes, int sample_rate, int frame_duration, int frame_count, crypto_context_t *crypto_ctx)
asciichat_error_t packet_send(socket_t sockfd, packet_type_t type, const void *data, size_t len)
Send a packet with proper header and CRC32.
ringbuffer_t * ringbuffer_create(size_t element_size, size_t capacity)
void ringbuffer_destroy(ringbuffer_t *rb)
bool ringbuffer_read(ringbuffer_t *rb, void *data)
bool ringbuffer_write(ringbuffer_t *rb, const void *data)
uint8_t participant_id[16]
Internal client record structure.
ringbuffer_t * incoming_audio
Incoming audio ringbuffer (written by receive loop, read by render thread)
participant_type_t participant_type
image_t * incoming_video
Incoming video frame buffer (for host render thread)
struct acip_transport * transport
Alternative transport (WebRTC, WebSocket, etc.) - NULL if using socket only.
Internal session host structure.
bool running
Server is running.
char ipv6_address[64]
IPv6 bind address.
char password[256]
Password.
char ipv4_address[64]
IPv4 bind address.
mutex_t clients_mutex
Client list mutex.
bool initialized
Context is initialized.
opus_codec_t * opus_encoder
Opus encoder for encoding mixed audio for broadcast.
asciichat_thread_t receive_thread
Receive thread handle.
asciichat_thread_t render_thread
Render thread handle (for video mixing and audio distribution)
int client_count
Current client count.
socket_t socket_v6
IPv6 listen socket.
session_host_client_t * clients
Client array.
opus_codec_t * opus_decoder
Opus decoder for decoding incoming Opus audio.
socket_t socket_v4
IPv4 listen socket.
void * user_data
User data for callbacks.
bool receive_thread_running
Receive thread is running.
char key_path[512]
Key path.
session_host_callbacks_t callbacks
Event callbacks.
session_audio_ctx_t * audio_ctx
Audio context for mixing (host only)
asciichat_thread_t accept_thread
Accept thread handle.
int max_clients
Maximum clients.
int port
Port to listen on.
bool render_thread_running
Render thread is running.
uint32_t next_client_id
Next client ID counter.
bool encryption_enabled
Encryption enabled.
bool accept_thread_running
Accept thread is running.
int safe_snprintf(char *buffer, size_t buffer_size, const char *format,...)
Safe formatted string printing to buffer.
int mutex_init(mutex_t *mutex)
int asciichat_thread_create(asciichat_thread_t *thread, void *(*start_routine)(void *), void *arg)
int asciichat_thread_join(asciichat_thread_t *thread, void **retval)
int mutex_destroy(mutex_t *mutex)
uint64_t time_get_ns(void)
uint64_t time_elapsed_ns(uint64_t start_ns, uint64_t end_ns)
void image_destroy(image_t *p)
image_t * image_new(size_t width, size_t height)