19#include <ascii-chat/network/tcp/server.h>
20#include <ascii-chat/common.h>
21#include <ascii-chat/log/logging.h>
22#include <ascii-chat/platform/socket.h>
23#include <ascii-chat/platform/thread.h>
24#include <ascii-chat/thread_pool.h>
25#include <ascii-chat/util/ip.h>
26#include <ascii-chat/util/time.h>
39static socket_t bind_and_listen(
const char *address,
int family,
int port) {
40 struct addrinfo hints, *res = NULL;
41 memset(&hints, 0,
sizeof(hints));
42 hints.ai_family = family;
43 hints.ai_socktype = SOCK_STREAM;
44 hints.ai_flags = AI_PASSIVE | AI_NUMERICHOST;
47 SAFE_SNPRINTF(port_str,
sizeof(port_str),
"%d", port);
50 const char *addr_str = (address && address[0] !=
'\0') ? address : NULL;
52 int gai_result = getaddrinfo(addr_str, port_str, &hints, &res);
53 if (gai_result != 0) {
54 log_error(
"getaddrinfo failed for %s:%d: %s", addr_str ? addr_str :
"(wildcard)", port, gai_strerror(gai_result));
55 return INVALID_SOCKET_VALUE;
58 socket_t server_socket = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
59 if (server_socket == INVALID_SOCKET_VALUE) {
60 log_error(
"Failed to create socket for %s:%d", addr_str ? addr_str :
"(wildcard)", port);
62 return INVALID_SOCKET_VALUE;
67 if (setsockopt(server_socket, SOL_SOCKET, SO_REUSEADDR, (
const char *)&reuse,
sizeof(reuse)) < 0) {
68 log_warn(
"Failed to set SO_REUSEADDR on %s:%d", addr_str ? addr_str :
"(wildcard)", port);
72 if (family == AF_INET6) {
74 if (setsockopt(server_socket, IPPROTO_IPV6, IPV6_V6ONLY, (
const char *)&ipv6only,
sizeof(ipv6only)) < 0) {
75 log_warn(
"Failed to set IPV6_V6ONLY on [%s]:%d", addr_str ? addr_str :
"::", port);
80 if (bind(server_socket, res->ai_addr, (socklen_t)res->ai_addrlen) < 0) {
81 log_error(
"Failed to bind %s:%d", addr_str ? addr_str :
"(wildcard)", port);
82 socket_close(server_socket);
84 return INVALID_SOCKET_VALUE;
90 if (listen(server_socket, 128) < 0) {
91 log_error(
"Failed to listen on %s:%d", addr_str ? addr_str :
"(wildcard)", port);
92 socket_close(server_socket);
93 return INVALID_SOCKET_VALUE;
96 log_info(
"Listening on %s%s%s:%d (%s)", family == AF_INET6 ?
"[" :
"",
97 addr_str ? addr_str : (family == AF_INET ?
"0.0.0.0" :
"::"), family == AF_INET6 ?
"]" :
"", port,
98 family == AF_INET ?
"IPv4" :
"IPv6");
100 return server_socket;
103asciichat_error_t
tcp_server_init(tcp_server_t *server,
const tcp_server_config_t *config) {
104 if (!server || !config) {
105 return SET_ERRNO(ERROR_INVALID_PARAM,
"server or config is NULL");
112 memset(server, 0,
sizeof(*server));
113 server->listen_socket = INVALID_SOCKET_VALUE;
114 server->listen_socket6 = INVALID_SOCKET_VALUE;
115 atomic_store(&server->running,
true);
116 server->config = *config;
119 server->clients = NULL;
120 server->cleanup_fn = NULL;
121 if (
mutex_init(&server->clients_mutex) != 0) {
122 return SET_ERRNO(ERROR_THREAD,
"Failed to initialize clients mutex");
126 bool should_bind_ipv4 = config->bind_ipv4;
127 bool should_bind_ipv6 = config->bind_ipv6;
130 if (should_bind_ipv4) {
131 const char *ipv4_addr = (config->ipv4_address && config->ipv4_address[0] !=
'\0') ? config->ipv4_address : NULL;
132 server->listen_socket = bind_and_listen(ipv4_addr, AF_INET, config->port);
134 if (server->listen_socket == INVALID_SOCKET_VALUE) {
135 log_warn(
"Failed to bind IPv4 socket");
140 if (should_bind_ipv6) {
141 const char *ipv6_addr = (config->ipv6_address && config->ipv6_address[0] !=
'\0') ? config->ipv6_address : NULL;
142 server->listen_socket6 = bind_and_listen(ipv6_addr, AF_INET6, config->port);
144 if (server->listen_socket6 == INVALID_SOCKET_VALUE) {
145 log_warn(
"Failed to bind IPv6 socket");
150 if (server->listen_socket == INVALID_SOCKET_VALUE && server->listen_socket6 == INVALID_SOCKET_VALUE) {
151 return SET_ERRNO(ERROR_NETWORK_BIND,
"Failed to bind any sockets (IPv4 and IPv6 both failed)");
159 return SET_ERRNO(ERROR_INVALID_PARAM,
"server is NULL");
162 if (!server->config.client_handler) {
163 return SET_ERRNO(ERROR_INVALID_PARAM,
164 "client_handler is required for tcp_server_run() - use custom accept loop if handler is NULL");
167 log_debug(
"TCP server starting accept loop...");
169 while (atomic_load(&server->running)) {
172 socket_fd_zero(&read_fds);
176 if (server->listen_socket != INVALID_SOCKET_VALUE) {
177 socket_fd_set(server->listen_socket, &read_fds);
178 max_fd = server->listen_socket > max_fd ? server->listen_socket : max_fd;
182 if (server->listen_socket6 != INVALID_SOCKET_VALUE) {
183 socket_fd_set(server->listen_socket6, &read_fds);
184 max_fd = server->listen_socket6 > max_fd ? server->listen_socket6 : max_fd;
189 double timeout_sec_double = server->config.accept_timeout_sec > 0 ? server->config.accept_timeout_sec : 1.0;
190 time_t timeout_sec = (time_t)timeout_sec_double;
191 long timeout_usec = (long)((timeout_sec_double - timeout_sec) * (double)US_PER_SEC_INT);
192 struct timeval timeout = {.tv_sec = timeout_sec, .tv_usec = timeout_usec};
194 int select_result = socket_select((
int)(max_fd + 1), &read_fds, NULL, NULL, &timeout);
196 if (select_result < 0) {
198 int err = socket_get_last_error();
201 log_debug(
"select() interrupted by signal");
205 log_error(
"select() failed in accept loop: %s (errno=%d)", socket_get_error_string(), err);
209 if (select_result == 0) {
211 if (server->config.status_update_fn) {
212 server->config.status_update_fn(server->config.status_update_data);
218 socket_t ready_socket = INVALID_SOCKET_VALUE;
219 if (server->listen_socket != INVALID_SOCKET_VALUE && socket_fd_isset(server->listen_socket, &read_fds)) {
220 ready_socket = server->listen_socket;
221 }
else if (server->listen_socket6 != INVALID_SOCKET_VALUE && socket_fd_isset(server->listen_socket6, &read_fds)) {
222 ready_socket = server->listen_socket6;
225 if (ready_socket == INVALID_SOCKET_VALUE) {
231 struct sockaddr_storage client_addr;
232 socklen_t client_addr_len =
sizeof(client_addr);
233 socket_t client_socket = accept(ready_socket, (
struct sockaddr *)&client_addr, &client_addr_len);
235 if (client_socket == INVALID_SOCKET_VALUE) {
236 log_warn(
"Failed to accept connection");
241 char client_ip[INET6_ADDRSTRLEN];
242 int addr_family = (client_addr.ss_family == AF_INET) ? AF_INET : AF_INET6;
243 if (
format_ip_address(addr_family, (
struct sockaddr *)&client_addr, client_ip,
sizeof(client_ip)) != ASCIICHAT_OK) {
244 SAFE_STRNCPY(client_ip,
"(unknown)",
sizeof(client_ip));
247 log_debug(
"Accepted connection from %s", client_ip);
250 tcp_client_context_t *ctx = SAFE_MALLOC(
sizeof(tcp_client_context_t), tcp_client_context_t *);
252 log_error(
"Failed to allocate client context");
253 socket_close(client_socket);
257 ctx->client_socket = client_socket;
258 ctx->addr = client_addr;
259 ctx->addr_len = client_addr_len;
260 ctx->user_data = server->config.user_data;
270 asciichat_thread_t thread;
272 log_error(
"Failed to create client handler thread for %s", client_ip);
274 socket_close(client_socket);
282 log_debug(
"TCP server accept loop exited");
291 log_debug(
"Shutting down TCP server...");
294 atomic_store(&server->running,
false);
297 if (server->listen_socket != INVALID_SOCKET_VALUE) {
298 log_debug(
"Closing IPv4 listen socket");
299 socket_close(server->listen_socket);
300 server->listen_socket = INVALID_SOCKET_VALUE;
303 if (server->listen_socket6 != INVALID_SOCKET_VALUE) {
304 log_debug(
"Closing IPv6 listen socket");
305 socket_close(server->listen_socket6);
306 server->listen_socket6 = INVALID_SOCKET_VALUE;
310 mutex_lock(&server->clients_mutex);
312 tcp_client_entry_t *entry = NULL, *tmp = NULL;
313 HASH_ITER(hh, server->clients, entry, tmp) {
315 if (server->cleanup_fn && entry->client_data) {
316 server->cleanup_fn(entry->client_data);
320 if (entry->threads) {
322 entry->threads = NULL;
325 HASH_DEL(server->clients, entry);
328 server->clients = NULL;
330 mutex_unlock(&server->clients_mutex);
336 log_debug(
"TCP server shutdown complete");
347 server->cleanup_fn = cleanup_fn;
352 return SET_ERRNO(ERROR_INVALID_PARAM,
"server is NULL");
355 if (socket == INVALID_SOCKET_VALUE) {
356 return SET_ERRNO(ERROR_INVALID_PARAM,
"socket is invalid");
360 tcp_client_entry_t *entry = SAFE_MALLOC(
sizeof(tcp_client_entry_t), tcp_client_entry_t *);
362 return SET_ERRNO(ERROR_MEMORY,
"Failed to allocate client entry");
365 entry->socket = socket;
366 entry->client_data = client_data;
370 SAFE_SNPRINTF(pool_name,
sizeof(pool_name),
"client_%d", socket);
372 if (!entry->threads) {
374 return SET_ERRNO(ERROR_MEMORY,
"Failed to create thread pool for client");
378 mutex_lock(&server->clients_mutex);
379 HASH_ADD(hh, server->clients, socket,
sizeof(
socket_t), entry);
380 mutex_unlock(&server->clients_mutex);
382 log_debug(
"Added client socket=%d to registry", socket);
388 return SET_ERRNO(ERROR_INVALID_PARAM,
"server is NULL");
391 mutex_lock(&server->clients_mutex);
393 tcp_client_entry_t *entry = NULL;
394 HASH_FIND(hh, server->clients, &socket,
sizeof(
socket_t), entry);
397 mutex_unlock(&server->clients_mutex);
399 log_debug(
"Client socket=%d already removed from registry", socket);
404 if (server->cleanup_fn && entry->client_data) {
405 server->cleanup_fn(entry->client_data);
409 if (entry->threads) {
411 entry->threads = NULL;
414 HASH_DEL(server->clients, entry);
417 mutex_unlock(&server->clients_mutex);
419 log_debug(
"Removed client socket=%d from registry", socket);
424 if (!server || !out_data) {
425 return SET_ERRNO(ERROR_INVALID_PARAM,
"server or out_data is NULL");
428 mutex_lock(&server->clients_mutex);
430 tcp_client_entry_t *entry = NULL;
431 HASH_FIND(hh, server->clients, &socket,
sizeof(
socket_t), entry);
435 mutex_unlock(&server->clients_mutex);
436 return SET_ERRNO(ERROR_INVALID_STATE,
"Client socket=%d not in registry", socket);
439 *out_data = entry->client_data;
440 mutex_unlock(&server->clients_mutex);
446 if (!server || !callback) {
450 mutex_lock(&server->clients_mutex);
452 tcp_client_entry_t *entry, *tmp;
453 HASH_ITER(hh, server->clients, entry, tmp) {
454 callback(entry->socket, entry->client_data, user_arg);
457 mutex_unlock(&server->clients_mutex);
465 mutex_lock(&server->clients_mutex);
466 size_t count = HASH_COUNT(server->clients);
467 mutex_unlock(&server->clients_mutex);
478 SET_ERRNO(ERROR_INVALID_PARAM,
"ctx is NULL");
482 SET_ERRNO(ERROR_INVALID_PARAM,
"buf is NULL");
486 SET_ERRNO(ERROR_INVALID_PARAM,
"len is 0");
491 int addr_family = (ctx->addr.ss_family == AF_INET) ? AF_INET : AF_INET6;
494 if (
format_ip_address(addr_family, (
struct sockaddr *)&ctx->addr, buf, len) != 0) {
503 SET_ERRNO(ERROR_INVALID_PARAM,
"ctx is NULL");
508 if (ctx->addr.ss_family == AF_INET) {
509 struct sockaddr_in *addr_in = (
struct sockaddr_in *)&ctx->addr;
510 return ntohs(addr_in->sin_port);
511 }
else if (ctx->addr.ss_family == AF_INET6) {
512 struct sockaddr_in6 *addr_in6 = (
struct sockaddr_in6 *)&ctx->addr;
513 return ntohs(addr_in6->sin6_port);
516 SET_ERRNO(ERROR_INVALID_STATE,
"Unknown address family: %d", ctx->addr.ss_family);
521 if (socket == INVALID_SOCKET_VALUE) {
522 SET_ERRNO(ERROR_INVALID_PARAM,
"socket is INVALID_SOCKET_VALUE");
526 log_warn(
"Rejecting client connection: %s", reason ? reason :
"unknown reason");
527 socket_close(socket);
535 void *thread_arg,
int stop_id,
const char *thread_name) {
536 if (!server || !thread_func) {
537 return SET_ERRNO(ERROR_INVALID_PARAM,
"server or thread_func is NULL");
542 if (client_socket == INVALID_SOCKET_VALUE) {
546 log_debug(
"Spawning standalone thread '%s' (no socket, WebRTC client)", thread_name ? thread_name :
"unnamed");
552 asciichat_thread_t temp_thread;
554 if (result != ASCIICHAT_OK) {
565 mutex_lock(&server->clients_mutex);
566 tcp_client_entry_t *entry = NULL;
567 HASH_FIND(hh, server->clients, &client_socket,
sizeof(
socket_t), entry);
570 mutex_unlock(&server->clients_mutex);
571 return SET_ERRNO(ERROR_NOT_FOUND,
"Client socket=%d not in registry", client_socket);
575 asciichat_error_t result =
thread_pool_spawn(entry->threads, thread_func, thread_arg, stop_id, thread_name);
577 mutex_unlock(&server->clients_mutex);
579 if (result != ASCIICHAT_OK) {
584 log_debug(
"Spawned thread '%s' (stop_id=%d) for client socket=%d (total_threads=%zu)",
585 thread_name ? thread_name :
"unnamed", stop_id, client_socket, thread_count);
592 return SET_ERRNO(ERROR_INVALID_PARAM,
"server is NULL");
595 if (client_socket == INVALID_SOCKET_VALUE) {
596 return SET_ERRNO(ERROR_INVALID_PARAM,
"client_socket is invalid");
600 mutex_lock(&server->clients_mutex);
601 tcp_client_entry_t *entry = NULL;
602 HASH_FIND(hh, server->clients, &client_socket,
sizeof(
socket_t), entry);
605 mutex_unlock(&server->clients_mutex);
606 return SET_ERRNO(ERROR_NOT_FOUND,
"Client socket=%d not in registry", client_socket);
610 asciichat_error_t result = ASCIICHAT_OK;
611 if (entry->threads) {
615 mutex_unlock(&server->clients_mutex);
617 log_debug(
"All threads stopped for client socket=%d", client_socket);
622 if (!server || !count) {
623 return SET_ERRNO(ERROR_INVALID_PARAM,
"server or count is NULL");
626 if (client_socket == INVALID_SOCKET_VALUE) {
627 return SET_ERRNO(ERROR_INVALID_PARAM,
"client_socket is invalid");
633 mutex_lock(&server->clients_mutex);
634 tcp_client_entry_t *entry = NULL;
635 HASH_FIND(hh, server->clients, &client_socket,
sizeof(
socket_t), entry);
638 mutex_unlock(&server->clients_mutex);
639 return SET_ERRNO(ERROR_NOT_FOUND,
"Client socket=%d not in registry", client_socket);
643 if (entry->threads) {
647 mutex_unlock(&server->clients_mutex);
asciichat_error_t format_ip_address(int family, const struct sockaddr *addr, char *output, size_t output_size)
void tcp_server_destroy(tcp_server_t *server)
void tcp_server_set_cleanup_callback(tcp_server_t *server, tcp_client_cleanup_fn cleanup_fn)
void tcp_server_reject_client(socket_t socket, const char *reason)
asciichat_error_t tcp_server_spawn_thread(tcp_server_t *server, socket_t client_socket, void *(*thread_func)(void *), void *thread_arg, int stop_id, const char *thread_name)
asciichat_error_t tcp_server_remove_client(tcp_server_t *server, socket_t socket)
void tcp_server_foreach_client(tcp_server_t *server, tcp_client_foreach_fn callback, void *user_arg)
asciichat_error_t tcp_server_stop_client_threads(tcp_server_t *server, socket_t client_socket)
asciichat_error_t tcp_server_init(tcp_server_t *server, const tcp_server_config_t *config)
asciichat_error_t tcp_server_get_client(tcp_server_t *server, socket_t socket, void **out_data)
int tcp_client_context_get_port(const tcp_client_context_t *ctx)
asciichat_error_t tcp_server_run(tcp_server_t *server)
asciichat_error_t tcp_server_get_thread_count(tcp_server_t *server, socket_t client_socket, size_t *count)
const char * tcp_client_context_get_ip(const tcp_client_context_t *ctx, char *buf, size_t len)
size_t tcp_server_get_client_count(tcp_server_t *server)
asciichat_error_t tcp_server_add_client(tcp_server_t *server, socket_t socket, void *client_data)
void thread_pool_destroy(thread_pool_t *pool)
thread_pool_t * thread_pool_create(const char *pool_name)
size_t thread_pool_get_count(const thread_pool_t *pool)
asciichat_error_t thread_pool_spawn(thread_pool_t *pool, void *(*thread_func)(void *), void *thread_arg, int stop_id, const char *thread_name)
asciichat_error_t thread_pool_stop_all(thread_pool_t *pool)
int mutex_init(mutex_t *mutex)
int asciichat_thread_create(asciichat_thread_t *thread, void *(*start_routine)(void *), void *arg)
int mutex_destroy(mutex_t *mutex)