39#include <urcu/rculfhash.h>
50static void *cleanup_thread_func(
void *arg) {
58 rcu_register_thread();
59 log_info(
"Rate limit cleanup thread started (RCU registered)");
61 while (!atomic_load(&server->
shutdown)) {
63 for (
int i = 0; i < 300 && !atomic_load(&server->
shutdown); i++) {
67 if (atomic_load(&server->
shutdown)) {
72 log_debug(
"Running rate limit cleanup...");
75 log_warn(
"Rate limit cleanup failed");
81 rcu_unregister_thread();
82 log_info(
"Rate limit cleanup thread exiting (RCU unregistered)");
87 if (!server || !config) {
91 memset(server, 0,
sizeof(*server));
117 log_warn(
"Failed to load sessions from database (continuing anyway)");
135 .ipv4_address = (config->
address[0] !=
'\0') ? config->
address : NULL,
139 .accept_timeout_sec = 1,
155 atomic_store(&server->
shutdown,
false);
158 log_warn(
"Failed to create worker thread pool");
169 log_warn(
"Failed to spawn rate limit cleanup thread (continuing without cleanup)");
172 log_info(
"Discovery server initialized successfully");
193 atomic_store(&server->
shutdown,
true);
200 size_t remaining_clients;
201 int shutdown_attempts = 0;
202 const int max_shutdown_attempts = 100;
205 shutdown_attempts < max_shutdown_attempts) {
206 log_debug(
"Waiting for %zu client handler threads to exit (attempt %d/%d)", remaining_clients,
207 shutdown_attempts + 1, max_shutdown_attempts);
212 if (remaining_clients > 0) {
213 log_warn(
"Server shutdown: %zu client handler threads still running after 10 seconds", remaining_clients);
214 }
else if (shutdown_attempts > 0) {
215 log_debug(
"All client handler threads exited gracefully");
243 log_info(
"Server shutdown complete");
252#define ACDS_CREATE_TRANSPORT(socket, transport_var) \
253 acip_transport_t *transport_var = acip_tcp_transport_create(socket, NULL); \
254 if (!transport_var) { \
255 log_error("Failed to create ACDS transport"); \
259#define ACDS_DESTROY_TRANSPORT(transport_var) acip_transport_destroy(transport_var)
267static void acds_on_session_create(
const acip_session_create_t *req,
int client_socket,
const char *client_ip,
271 log_debug(
"SESSION_CREATE packet from %s", client_ip);
286 log_warn(
"SESSION_CREATE rejected from %s: invalid timestamp (replay attack protection)", client_ip);
295 req->identity_pubkey, req->timestamp, req->capabilities, req->max_participants, req->signature);
298 log_warn(
"SESSION_CREATE rejected from %s: invalid signature (identity verification failed)", client_ip);
304 log_debug(
"SESSION_CREATE signature verified from %s (pubkey: %02x%02x...)", client_ip, req->identity_pubkey[0],
305 req->identity_pubkey[1]);
312 if (req->server_address[0] ==
'\0') {
314 SAFE_STRNCPY(req->server_address, client_ip,
sizeof(req->server_address));
315 log_info(
"SESSION_CREATE from %s: auto-detected server address (bind was 0.0.0.0)", client_ip);
320 if (strcmp(req->server_address, client_ip) != 0) {
321 log_warn(
"SESSION_CREATE rejected from %s: server_address '%s' does not match actual connection IP", client_ip,
322 req->server_address);
324 "Direct TCP sessions require server_address to match your actual IP");
328 log_debug(
"SESSION_CREATE reachability verified: %s matches connection source", req->server_address);
332 memset(&resp, 0,
sizeof(resp));
337 size_t stun_size = (size_t)resp.stun_count *
sizeof(
stun_server_t);
338 size_t turn_size = (size_t)resp.turn_count *
sizeof(
turn_server_t);
339 size_t total_size =
sizeof(resp) + stun_size + turn_size;
349 memcpy(payload, &resp,
sizeof(resp));
352 if (resp.stun_count > 0) {
357 if (resp.turn_count > 0) {
365 log_info(
"Session created: %.*s (UUID: %02x%02x..., %d STUN, %d TURN servers)", resp.session_string_len,
366 resp.session_string, resp.session_id[0], resp.session_id[1], resp.stun_count, resp.turn_count);
372 struct cds_lfht_iter iter_ctx;
375 unsigned long hash = 5381;
378 while ((c = (
unsigned char)*str++)) {
379 hash = ((hash << 5) + hash) + c;
382 cds_lfht_lookup(server->
sessions->
sessions, hash, NULL, resp.session_string, &iter_ctx);
383 struct cds_lfht_node *node = cds_lfht_iter_get_node(&iter_ctx);
395 log_warn(
"Session creation failed for %s: %s", client_ip, asciichat_error_string(create_result));
401static void acds_on_session_lookup(
const acip_session_lookup_t *req,
int client_socket,
const char *client_ip,
405 log_debug(
"SESSION_LOOKUP packet from %s", client_ip);
417 memset(&resp, 0,
sizeof(resp));
420 char session_string[49] = {0};
422 (req->session_string_len <
sizeof(session_string) - 1) ? req->session_string_len :
sizeof(session_string) - 1;
423 memcpy(session_string, req->session_string, copy_len);
428 log_info(
"Session lookup for '%s' from %s: %s", session_string, client_ip, resp.found ?
"found" :
"not found");
431 log_warn(
"Session lookup failed for %s: %s", client_ip, asciichat_error_string(lookup_result));
435static void acds_on_session_join(
const acip_session_join_t *req,
int client_socket,
const char *client_ip,
439 log_debug(
"SESSION_JOIN packet from %s", client_ip);
454 log_warn(
"SESSION_JOIN rejected from %s: invalid timestamp (replay attack protection)", client_ip);
456 memset(&error_resp, 0,
sizeof(error_resp));
457 error_resp.success = 0;
459 SAFE_STRNCPY(error_resp.error_message,
"Timestamp validation failed",
sizeof(error_resp.error_message));
469 log_warn(
"SESSION_JOIN rejected from %s: invalid signature (identity verification failed)", client_ip);
471 memset(&error_resp, 0,
sizeof(error_resp));
472 error_resp.success = 0;
474 SAFE_STRNCPY(error_resp.error_message,
"Identity signature verification failed",
475 sizeof(error_resp.error_message));
480 log_debug(
"SESSION_JOIN signature verified from %s (pubkey: %02x%02x...)", client_ip, req->identity_pubkey[0],
481 req->identity_pubkey[1]);
485 memset(&resp, 0,
sizeof(resp));
492 void *retrieved_data = NULL;
495 memcpy(client_data->
session_id, resp.session_id, 16);
500 log_info(
"Client %s joined session (participant %02x%02x...)", client_ip, resp.participant_id[0],
501 resp.participant_id[1]);
508 struct cds_lfht_iter iter_ctx;
511 cds_lfht_for_each_entry(server->
sessions->
sessions, &iter_ctx, session_iter, hash_node) {
512 if (memcmp(session_iter->
session_id, resp.session_id, 16) == 0) {
513 joined_session = session_iter;
518 if (joined_session) {
525 log_warn(
"Session join failed for %s: %s", client_ip, resp.error_message);
529static void acds_on_session_leave(
const acip_session_leave_t *req,
int client_socket,
const char *client_ip,
533 log_debug(
"SESSION_LEAVE packet from %s", client_ip);
540 log_info(
"Client %s left session", client_ip);
543 void *retrieved_data = NULL;
554 struct cds_lfht_iter iter_ctx;
557 cds_lfht_for_each_entry(server->
sessions->
sessions, &iter_ctx, session_iter, hash_node) {
558 if (memcmp(session_iter->
session_id, req->session_id, 16) == 0) {
559 left_session = session_iter;
570 acip_send_error(transport, leave_result, asciichat_error_string(leave_result));
571 log_warn(
"Session leave failed for %s: %s", client_ip, asciichat_error_string(leave_result));
575static void acds_on_webrtc_sdp(
const acip_webrtc_sdp_t *sdp,
int client_socket,
const char *client_ip,
void *app_ctx) {
578 log_debug(
"WEBRTC_SDP packet from %s", client_ip);
589 log_warn(
"SDP relay failed from %s: %s", client_ip, asciichat_error_string(relay_result));
593static void acds_on_webrtc_ice(
const acip_webrtc_ice_t *ice,
int client_socket,
const char *client_ip,
void *app_ctx) {
596 log_debug(
"WEBRTC_ICE packet from %s", client_ip);
607 log_warn(
"ICE relay failed from %s: %s", client_ip, asciichat_error_string(relay_result));
611static void acds_on_discovery_ping(
const void *payload,
size_t payload_len,
int client_socket,
const char *client_ip,
621 log_debug(
"PING from %s, sending PONG", client_ip);
630 .on_session_lookup = acds_on_session_lookup,
631 .on_session_join = acds_on_session_join,
632 .on_session_leave = acds_on_session_leave,
633 .on_webrtc_sdp = acds_on_webrtc_sdp,
634 .on_webrtc_ice = acds_on_webrtc_ice,
635 .on_discovery_ping = acds_on_discovery_ping,
642 log_error(
"Client handler: NULL context");
650 char client_ip[INET6_ADDRSTRLEN] = {0};
656 rcu_register_thread();
657 log_info(
"Client handler started for %s (RCU registered)", client_ip);
664 rcu_unregister_thread();
667 memset(client_data, 0,
sizeof(*client_data));
674 rcu_unregister_thread();
678 log_debug(
"Client %s registered (socket=%d, total=%zu)", client_ip, client_socket,
687 void *payload = NULL;
688 size_t payload_size = 0;
691 int result =
receive_packet(client_socket, &packet_type, &payload, &payload_size);
694 log_info(
"Client %s disconnected", client_ip);
701 log_debug(
"Received packet type 0x%02X from %s, length=%zu", packet_type, client_ip, payload_size);
712 log_warn(
"ACIP handler failed for packet type 0x%02X from %s: %s", packet_type, client_ip,
713 asciichat_error_string(dispatch_result));
729 rcu_unregister_thread();
730 log_info(
"Client handler finished for %s (RCU unregistered)", client_ip);
asciichat_error_t acds_verify_session_create(const uint8_t identity_pubkey[32], uint64_t timestamp, uint8_t capabilities, uint8_t max_participants, const uint8_t signature[64])
Verify SESSION_CREATE signature.
asciichat_error_t acds_verify_session_join(const uint8_t identity_pubkey[32], uint64_t timestamp, const char *session_string, const uint8_t signature[64])
Verify SESSION_JOIN signature.
bool acds_validate_timestamp(uint64_t timestamp_ms, uint32_t window_seconds)
Check if timestamp is within acceptable window.
asciichat_error_t acip_handle_acds_packet(acip_transport_t *transport, packet_type_t type, const void *payload, size_t payload_len, int client_socket, const char *client_ip, const acip_acds_callbacks_t *callbacks)
Handle incoming ACDS packet with O(1) dispatch.
ACIP Discovery Server (ACDS) packet handlers.
🗃️ Lock-Free Unified Memory Buffer Pool with Lazy Allocation
asciichat_error_t database_load_sessions(sqlite3 *db, session_registry_t *registry)
Load active sessions from database into registry.
void database_close(sqlite3 *db)
Close database.
asciichat_error_t database_save_session(sqlite3 *db, const session_entry_t *session)
Save session to database.
asciichat_error_t database_init(const char *db_path, sqlite3 **db)
Initialize database and create schema.
💾 SQLite persistence for discovery service
bool check_and_record_rate_limit(rate_limiter_t *rate_limiter, const char *client_ip, rate_event_type_t event_type, socket_t client_socket, const char *operation_name)
Check rate limit and send error if exceeded.
Network error handling utilities.
@ SESSION_TYPE_DIRECT_TCP
Direct TCP connection to server IP:port (default)
void buffer_pool_free(buffer_pool_t *pool, void *data, size_t size)
Free a buffer back to the pool (lock-free)
#define SAFE_STRNCPY(dst, src, size)
#define SAFE_MALLOC(size, cast)
#define SET_ERRNO(code, context_msg,...)
Set error code with custom context message and log it.
asciichat_error_t
Error and exit codes - unified status values (0-255)
@ ERROR_CRYPTO_VERIFICATION
#define log_warn(...)
Log a WARN message.
#define log_error(...)
Log an ERROR message.
#define log_info(...)
Log an INFO message.
#define log_debug(...)
Log a DEBUG message.
int receive_packet(socket_t sockfd, packet_type_t *type, void **data, size_t *len)
Receive a basic packet without encryption.
packet_type_t
Network protocol packet type enumeration.
@ PACKET_TYPE_ACIP_SESSION_CREATED
Session created response (Discovery Server -> Client)
🌍 IP Address Parsing and Formatting Utilities
void tcp_server_reject_client(socket_t socket, const char *reason)
Reject client connection with reason.
asciichat_error_t tcp_server_remove_client(tcp_server_t *server, socket_t socket)
Remove client from registry.
asciichat_error_t tcp_server_init(tcp_server_t *server, const tcp_server_config_t *config)
Initialize TCP server.
asciichat_error_t tcp_server_get_client(tcp_server_t *server, socket_t socket, void **out_data)
Get client data.
asciichat_error_t tcp_server_run(tcp_server_t *server)
Run TCP server accept loop.
void tcp_server_shutdown(tcp_server_t *server)
Shutdown TCP server.
const char * tcp_client_context_get_ip(const tcp_client_context_t *ctx, char *buf, size_t len)
Get formatted IP address from client context.
size_t tcp_server_get_client_count(tcp_server_t *server)
Get client count.
asciichat_error_t tcp_server_add_client(tcp_server_t *server, socket_t socket, void *client_data)
Add client to registry.
📝 Logging API with multiple log levels and terminal output control
ASCII-Chat Discovery Service (ACDS) Protocol Message Formats.
🌐 Core network I/O operations with timeout support
asciichat_error_t rate_limiter_cleanup(rate_limiter_t *limiter, uint32_t max_age_secs)
Clean up old rate limit events.
void rate_limiter_destroy(rate_limiter_t *limiter)
Destroy rate limiter and free resources.
void rate_limiter_set_sqlite_db(rate_limiter_t *limiter, void *db)
Set SQLite database handle for rate limiter.
rate_limiter_t * rate_limiter_create_sqlite(const char *db_path)
Create SQLite-backed rate limiter.
🚦 Rate limiting API with pluggable backends
@ RATE_EVENT_SESSION_LOOKUP
Session lookup.
@ RATE_EVENT_SESSION_JOIN
Session join.
@ RATE_EVENT_SESSION_CREATE
Session creation.
asciichat_error_t packet_send_via_transport(acip_transport_t *transport, packet_type_t type, const void *payload, size_t payload_len)
Send packet via transport with proper header (exported for generic wrappers)
asciichat_error_t acip_send_session_joined(acip_transport_t *transport, const acip_session_joined_t *response)
Send SESSION_JOINED response packet.
asciichat_error_t acip_send_error(acip_transport_t *transport, uint32_t error_code, const char *message)
Send error message packet.
asciichat_error_t acip_send_pong(acip_transport_t *transport)
Send pong packet.
asciichat_error_t acip_send_session_info(acip_transport_t *transport, const acip_session_info_t *info)
Send SESSION_INFO response packet.
ACIP shared/bidirectional packet sending functions.
void session_registry_destroy(session_registry_t *registry)
Destroy session registry.
asciichat_error_t session_leave(session_registry_t *registry, const uint8_t session_id[16], const uint8_t participant_id[16])
Leave session.
asciichat_error_t session_registry_init(session_registry_t *registry)
Initialize session registry.
asciichat_error_t session_create(session_registry_t *registry, const acip_session_create_t *req, const acds_config_t *config, acip_session_created_t *resp)
Create new session.
asciichat_error_t session_join(session_registry_t *registry, const acip_session_join_t *req, const acds_config_t *config, acip_session_joined_t *resp)
Join existing session.
asciichat_error_t session_lookup(session_registry_t *registry, const char *session_string, const acds_config_t *config, acip_session_info_t *resp)
Lookup session by string.
🎯 Session registry for discovery service (lock-free RCU implementation)
asciichat_error_t signaling_relay_sdp(session_registry_t *registry, tcp_server_t *tcp_server, const acip_webrtc_sdp_t *sdp, size_t total_packet_len)
Relay SDP offer/answer to recipient.
asciichat_error_t signaling_relay_ice(session_registry_t *registry, tcp_server_t *tcp_server, const acip_webrtc_ice_t *ice, size_t total_packet_len)
Relay ICE candidate to recipient.
🎬 WebRTC SDP/ICE signaling relay
Cross-platform socket interface for ascii-chat.
💾 SQLite rate limiting backend interface
void * acds_client_handler(void *arg)
Per-client connection handler (thread entry point)
#define ACDS_DESTROY_TRANSPORT(transport_var)
void acds_server_shutdown(acds_server_t *server)
Shutdown discovery server.
asciichat_error_t acds_server_init(acds_server_t *server, const acds_config_t *config)
Initialize discovery server.
#define ACDS_CREATE_TRANSPORT(socket, transport_var)
asciichat_error_t acds_server_run(acds_server_t *server)
Run discovery server main loop.
🌐 Discovery server TCP connection manager
Per-client connection data.
uint8_t session_id[16]
Session UUID (valid if joined_session)
uint8_t participant_id[16]
Participant UUID (valid if joined_session)
bool joined_session
Whether client has successfully joined a session.
Discovery server configuration.
char address[256]
IPv4 bind address (empty = all interfaces)
turn_server_t turn_servers[4]
TURN server configurations.
char database_path[512]
SQLite database path.
bool require_server_identity
Require servers to provide signed identity when creating sessions.
int port
TCP listen port (default 27225)
char address6[256]
IPv6 bind address (empty = all interfaces)
bool require_client_identity
Require clients to provide signed identity when joining sessions.
stun_server_t stun_servers[4]
STUN server configurations.
acds_config_t config
Runtime configuration.
atomic_bool shutdown
Shutdown flag for worker threads.
tcp_server_t tcp_server
TCP server abstraction.
sqlite3 * db
SQLite database handle.
session_registry_t * sessions
In-memory session registry.
struct rate_limiter_s * rate_limiter
SQLite-backed rate limiter.
thread_pool_t * worker_pool
Thread pool for background workers.
ACDS packet handler callbacks.
void * app_ctx
Application context (passed to all callbacks)
void(* on_session_create)(const acip_session_create_t *req, int client_socket, const char *client_ip, void *app_ctx)
Called when client requests session creation.
Session entry (RCU hash table node)
uint8_t session_id[16]
UUID.
char session_string[48]
e.g., "swift-river-mountain" (lookup key)
Session registry (lock-free RCU)
struct cds_lfht * sessions
RCU lock-free hash table.
Per-client connection context.
socket_t client_socket
Client connection socket.
void * user_data
User-provided data from config.
TCP server configuration.
atomic_bool running
Server running flag (set false to shutdown)
void thread_pool_destroy(thread_pool_t *pool)
Destroy a thread pool.
thread_pool_t * thread_pool_create(const char *pool_name)
Create a new thread pool.
asciichat_error_t thread_pool_spawn(thread_pool_t *pool, void *(*thread_func)(void *), void *thread_arg, int stop_id, const char *thread_name)
Spawn a worker thread in the pool.
⏱️ High-precision timing utilities using sokol_time.h and uthash
Transport abstraction layer for ACIP protocol.