ascii-chat 0.6.0
Real-time terminal-based video chat with ASCII art conversion
Loading...
Searching...
No Matches
src/acds/server.c
Go to the documentation of this file.
1
16#include "acds/server.h"
17#include "acds/database.h"
18#include "acds/session.h"
19#include "acds/signaling.h"
20#include "network/acip/acds.h"
23#include "network/acip/send.h"
27#include "network/errors.h"
28#include "log/logging.h"
29#include "platform/socket.h"
30#include "network/network.h"
31#include "buffer_pool.h"
32#include "network/tcp/server.h"
33#include "util/ip.h"
34#include <string.h>
35#include <time.h>
36
37/* RCU library includes for lock-free session registry access in worker threads */
38#include <urcu.h>
39#include <urcu/rculfhash.h>
40
50static void *cleanup_thread_func(void *arg) {
51 acds_server_t *server = (acds_server_t *)arg;
52 if (!server) {
53 return NULL;
54 }
55
56 /* Register this worker thread with RCU library before any RCU operations.
57 This ensures the thread is tracked in the RCU synchronization scheme. */
58 rcu_register_thread();
59 log_info("Rate limit cleanup thread started (RCU registered)");
60
61 while (!atomic_load(&server->shutdown)) {
62 // Sleep for 5 minutes (or until shutdown)
63 for (int i = 0; i < 300 && !atomic_load(&server->shutdown); i++) {
64 platform_sleep_ms(1000); // Sleep 1 second at a time for responsive shutdown
65 }
66
67 if (atomic_load(&server->shutdown)) {
68 break;
69 }
70
71 // Run cleanup (delete events older than 1 hour)
72 log_debug("Running rate limit cleanup...");
74 if (result != ASCIICHAT_OK) {
75 log_warn("Rate limit cleanup failed");
76 }
77 }
78
79 /* Unregister from RCU library before exiting thread.
80 This finalizes any pending RCU grace periods for this thread. */
81 rcu_unregister_thread();
82 log_info("Rate limit cleanup thread exiting (RCU unregistered)");
83 return NULL;
84}
85
87 if (!server || !config) {
88 return SET_ERRNO(ERROR_INVALID_PARAM, "server or config is NULL");
89 }
90
91 memset(server, 0, sizeof(*server));
92 memcpy(&server->config, config, sizeof(acds_config_t));
93
94 // Initialize session registry
96 if (!server->sessions) {
97 return SET_ERRNO(ERROR_MEMORY, "Failed to allocate session registry");
98 }
99
101 if (result != ASCIICHAT_OK) {
102 SAFE_FREE(server->sessions);
103 return result;
104 }
105
106 // Open database
107 result = database_init(config->database_path, &server->db);
108 if (result != ASCIICHAT_OK) {
110 SAFE_FREE(server->sessions);
111 return result;
112 }
113
114 // Load sessions from database
115 result = database_load_sessions(server->db, server->sessions);
116 if (result != ASCIICHAT_OK) {
117 log_warn("Failed to load sessions from database (continuing anyway)");
118 }
119
120 // Initialize rate limiter with SQLite backend
121 server->rate_limiter = rate_limiter_create_sqlite(NULL); // NULL = externally managed DB
122 if (!server->rate_limiter) {
123 database_close(server->db);
125 SAFE_FREE(server->sessions);
126 return SET_ERRNO(ERROR_MEMORY, "Failed to create rate limiter");
127 }
128
129 // Set the database handle for the rate limiter
131
132 // Configure TCP server
133 tcp_server_config_t tcp_config = {
134 .port = config->port,
135 .ipv4_address = (config->address[0] != '\0') ? config->address : NULL,
136 .ipv6_address = (config->address6[0] != '\0') ? config->address6 : NULL,
137 .bind_ipv4 = (config->address[0] != '\0') || (config->address[0] == '\0' && config->address6[0] == '\0'),
138 .bind_ipv6 = (config->address6[0] != '\0') || (config->address[0] == '\0' && config->address6[0] == '\0'),
139 .accept_timeout_sec = 1,
140 .client_handler = acds_client_handler,
141 .user_data = server,
142 };
143
144 // Initialize TCP server
145 result = tcp_server_init(&server->tcp_server, &tcp_config);
146 if (result != ASCIICHAT_OK) {
148 database_close(server->db);
150 SAFE_FREE(server->sessions);
151 return result;
152 }
153
154 // Initialize background worker thread pool
155 atomic_store(&server->shutdown, false);
156 server->worker_pool = thread_pool_create("acds_workers");
157 if (!server->worker_pool) {
158 log_warn("Failed to create worker thread pool");
161 database_close(server->db);
163 SAFE_FREE(server->sessions);
164 return SET_ERRNO(ERROR_MEMORY, "Failed to create worker thread pool");
165 }
166
167 // Spawn rate limit cleanup thread in worker pool
168 if (thread_pool_spawn(server->worker_pool, cleanup_thread_func, server, 0, "rate_limit_cleanup") != ASCIICHAT_OK) {
169 log_warn("Failed to spawn rate limit cleanup thread (continuing without cleanup)");
170 }
171
172 log_info("Discovery server initialized successfully");
173 return ASCIICHAT_OK;
174}
175
177 if (!server) {
178 return SET_ERRNO(ERROR_INVALID_PARAM, "server is NULL");
179 }
180
181 log_info("Discovery server accepting connections on port %d", server->config.port);
182
183 // Delegate to TCP server abstraction
184 return tcp_server_run(&server->tcp_server);
185}
186
188 if (!server) {
189 return;
190 }
191
192 // Signal shutdown to worker threads
193 atomic_store(&server->shutdown, true);
194
195 // Shutdown TCP server (closes listen sockets, stops accept loop)
197
198 // Wait for all client handler threads to exit
199 // We need to stop all active client connections gracefully
200 size_t remaining_clients;
201 int shutdown_attempts = 0;
202 const int max_shutdown_attempts = 100; // 10 seconds (100 * 100ms)
203
204 while ((remaining_clients = tcp_server_get_client_count(&server->tcp_server)) > 0 &&
205 shutdown_attempts < max_shutdown_attempts) {
206 log_debug("Waiting for %zu client handler threads to exit (attempt %d/%d)", remaining_clients,
207 shutdown_attempts + 1, max_shutdown_attempts);
209 shutdown_attempts++;
210 }
211
212 if (remaining_clients > 0) {
213 log_warn("Server shutdown: %zu client handler threads still running after 10 seconds", remaining_clients);
214 } else if (shutdown_attempts > 0) {
215 log_debug("All client handler threads exited gracefully");
216 }
217
218 // Stop and destroy worker thread pool (cleanup thread, etc.)
219 if (server->worker_pool) {
221 server->worker_pool = NULL;
222 log_debug("Worker thread pool stopped");
223 }
224
225 // Destroy rate limiter
226 if (server->rate_limiter) {
228 server->rate_limiter = NULL;
229 }
230
231 // Close database
232 if (server->db) {
233 database_close(server->db);
234 server->db = NULL;
235 }
236
237 // Destroy session registry
238 if (server->sessions) {
240 SAFE_FREE(server->sessions);
241 }
242
243 log_info("Server shutdown complete");
244}
245
246// =============================================================================
247// ACIP Transport Helper Macros for ACDS
248// =============================================================================
249// ACDS uses plain TCP without encryption (discovery service)
250// These macros simplify creating temporary transports for responses
251
252#define ACDS_CREATE_TRANSPORT(socket, transport_var) \
253 acip_transport_t *transport_var = acip_tcp_transport_create(socket, NULL); \
254 if (!transport_var) { \
255 log_error("Failed to create ACDS transport"); \
256 return; \
257 }
258
259#define ACDS_DESTROY_TRANSPORT(transport_var) acip_transport_destroy(transport_var)
260
261// =============================================================================
262// ACIP Callback Wrappers for ACDS
263// =============================================================================
264// These callbacks are invoked by acip_handle_acds_packet() via O(1) array dispatch.
265// Each callback implements: Rate Limit → Crypto Verify → Business Logic → DB Save
266
267static void acds_on_session_create(const acip_session_create_t *req, int client_socket, const char *client_ip,
268 void *app_ctx) {
269 acds_server_t *server = (acds_server_t *)app_ctx;
270
271 log_debug("SESSION_CREATE packet from %s", client_ip);
272
273 // Create ACIP transport for responses
274 ACDS_CREATE_TRANSPORT(client_socket, transport);
275
276 // Rate limiting check
277 if (!check_and_record_rate_limit(server->rate_limiter, client_ip, RATE_EVENT_SESSION_CREATE, client_socket,
278 "SESSION_CREATE")) {
279 return;
280 }
281
282 // Cryptographic identity verification (if required)
283 if (server->config.require_server_identity) {
284 // Validate timestamp (5 minute window)
285 if (!acds_validate_timestamp(req->timestamp, 300)) {
286 log_warn("SESSION_CREATE rejected from %s: invalid timestamp (replay attack protection)", client_ip);
287 acip_send_error(transport, ERROR_CRYPTO_VERIFICATION, "Timestamp validation failed - too old or in the future");
288 ACDS_DESTROY_TRANSPORT(transport);
289
290 return;
291 }
292
293 // Verify Ed25519 signature
295 req->identity_pubkey, req->timestamp, req->capabilities, req->max_participants, req->signature);
296
297 if (verify_result != ASCIICHAT_OK) {
298 log_warn("SESSION_CREATE rejected from %s: invalid signature (identity verification failed)", client_ip);
299 acip_send_error(transport, ERROR_CRYPTO_VERIFICATION, "Identity signature verification failed");
300 ACDS_DESTROY_TRANSPORT(transport);
301 return;
302 }
303
304 log_debug("SESSION_CREATE signature verified from %s (pubkey: %02x%02x...)", client_ip, req->identity_pubkey[0],
305 req->identity_pubkey[1]);
306 }
307
308 // Reachability verification for Direct TCP sessions
309 // WebRTC sessions don't need this since they use P2P mesh with STUN/TURN
310 if (req->session_type == SESSION_TYPE_DIRECT_TCP) {
311 // Auto-detect public IP if server_address is empty
312 if (req->server_address[0] == '\0') {
313 // Server bound to 0.0.0.0 and wants ACDS to auto-detect public IP
314 SAFE_STRNCPY(req->server_address, client_ip, sizeof(req->server_address));
315 log_info("SESSION_CREATE from %s: auto-detected server address (bind was 0.0.0.0)", client_ip);
316 }
317
318 // Verify that the server is actually reachable at the IP it claims
319 // Compare the claimed server address with the actual connection source
320 if (strcmp(req->server_address, client_ip) != 0) {
321 log_warn("SESSION_CREATE rejected from %s: server_address '%s' does not match actual connection IP", client_ip,
322 req->server_address);
324 "Direct TCP sessions require server_address to match your actual IP");
325 ACDS_DESTROY_TRANSPORT(transport);
326 return;
327 }
328 log_debug("SESSION_CREATE reachability verified: %s matches connection source", req->server_address);
329 }
330
332 memset(&resp, 0, sizeof(resp));
333
334 asciichat_error_t create_result = session_create(server->sessions, req, &server->config, &resp);
335 if (create_result == ASCIICHAT_OK) {
336 // Build complete payload: fixed response + variable STUN/TURN servers
337 size_t stun_size = (size_t)resp.stun_count * sizeof(stun_server_t);
338 size_t turn_size = (size_t)resp.turn_count * sizeof(turn_server_t);
339 size_t total_size = sizeof(resp) + stun_size + turn_size;
340
341 uint8_t *payload = SAFE_MALLOC(total_size, uint8_t *);
342 if (!payload) {
343 acip_send_error(transport, ERROR_MEMORY, "Out of memory building response");
344 ACDS_DESTROY_TRANSPORT(transport);
345 return;
346 }
347
348 // Copy fixed response
349 memcpy(payload, &resp, sizeof(resp));
350
351 // Append STUN servers
352 if (resp.stun_count > 0) {
353 memcpy(payload + sizeof(resp), server->config.stun_servers, stun_size);
354 }
355
356 // Append TURN servers
357 if (resp.turn_count > 0) {
358 memcpy(payload + sizeof(resp) + stun_size, server->config.turn_servers, turn_size);
359 }
360
361 // Send complete response with variable-length data
362 packet_send_via_transport(transport, PACKET_TYPE_ACIP_SESSION_CREATED, payload, total_size);
363 SAFE_FREE(payload);
364
365 log_info("Session created: %.*s (UUID: %02x%02x..., %d STUN, %d TURN servers)", resp.session_string_len,
366 resp.session_string, resp.session_id[0], resp.session_id[1], resp.stun_count, resp.turn_count);
367
368 /* Save to database - look up session entry first using RCU lock-free access */
369 rcu_read_lock();
370
371 session_entry_t *session = NULL;
372 struct cds_lfht_iter iter_ctx;
373
374 /* Find session by session_string (primary key in hash table) */
375 unsigned long hash = 5381; // DJB2 hash
376 const char *str = resp.session_string;
377 int c;
378 while ((c = (unsigned char)*str++)) {
379 hash = ((hash << 5) + hash) + c;
380 }
381
382 cds_lfht_lookup(server->sessions->sessions, hash, NULL, resp.session_string, &iter_ctx);
383 struct cds_lfht_node *node = cds_lfht_iter_get_node(&iter_ctx);
384 if (node) {
385 session = caa_container_of(node, session_entry_t, hash_node);
386 if (session) {
387 database_save_session(server->db, session);
388 }
389 }
390
391 rcu_read_unlock();
392 } else {
393 // Error - send error response using proper error code
394 acip_send_error(transport, create_result, "Failed to create session");
395 log_warn("Session creation failed for %s: %s", client_ip, asciichat_error_string(create_result));
396 }
397
398 ACDS_DESTROY_TRANSPORT(transport);
399}
400
401static void acds_on_session_lookup(const acip_session_lookup_t *req, int client_socket, const char *client_ip,
402 void *app_ctx) {
403 acds_server_t *server = (acds_server_t *)app_ctx;
404
405 log_debug("SESSION_LOOKUP packet from %s", client_ip);
406
407 // Create ACIP transport for responses
408 ACDS_CREATE_TRANSPORT(client_socket, transport);
409
410 // Rate limiting check
411 if (!check_and_record_rate_limit(server->rate_limiter, client_ip, RATE_EVENT_SESSION_LOOKUP, client_socket,
412 "SESSION_LOOKUP")) {
413 return;
414 }
415
417 memset(&resp, 0, sizeof(resp));
418
419 // Null-terminate session string for lookup
420 char session_string[49] = {0};
421 size_t copy_len =
422 (req->session_string_len < sizeof(session_string) - 1) ? req->session_string_len : sizeof(session_string) - 1;
423 memcpy(session_string, req->session_string, copy_len);
424
425 asciichat_error_t lookup_result = session_lookup(server->sessions, session_string, &server->config, &resp);
426 if (lookup_result == ASCIICHAT_OK) {
427 acip_send_session_info(transport, &resp);
428 log_info("Session lookup for '%s' from %s: %s", session_string, client_ip, resp.found ? "found" : "not found");
429 } else {
430 acip_send_error(transport, lookup_result, "Session lookup failed");
431 log_warn("Session lookup failed for %s: %s", client_ip, asciichat_error_string(lookup_result));
432 }
433}
434
435static void acds_on_session_join(const acip_session_join_t *req, int client_socket, const char *client_ip,
436 void *app_ctx) {
437 acds_server_t *server = (acds_server_t *)app_ctx;
438
439 log_debug("SESSION_JOIN packet from %s", client_ip);
440
441 // Create ACIP transport for responses
442 ACDS_CREATE_TRANSPORT(client_socket, transport);
443
444 // Rate limiting check
445 if (!check_and_record_rate_limit(server->rate_limiter, client_ip, RATE_EVENT_SESSION_JOIN, client_socket,
446 "SESSION_JOIN")) {
447 return;
448 }
449
450 // Cryptographic identity verification (if required)
451 if (server->config.require_client_identity) {
452 // Validate timestamp (5 minute window)
453 if (!acds_validate_timestamp(req->timestamp, 300)) {
454 log_warn("SESSION_JOIN rejected from %s: invalid timestamp (replay attack protection)", client_ip);
455 acip_session_joined_t error_resp;
456 memset(&error_resp, 0, sizeof(error_resp));
457 error_resp.success = 0;
458 error_resp.error_code = ERROR_CRYPTO_VERIFICATION;
459 SAFE_STRNCPY(error_resp.error_message, "Timestamp validation failed", sizeof(error_resp.error_message));
460 acip_send_session_joined(transport, &error_resp);
461 return;
462 }
463
464 // Verify Ed25519 signature
465 asciichat_error_t verify_result =
466 acds_verify_session_join(req->identity_pubkey, req->timestamp, req->session_string, req->signature);
467
468 if (verify_result != ASCIICHAT_OK) {
469 log_warn("SESSION_JOIN rejected from %s: invalid signature (identity verification failed)", client_ip);
470 acip_session_joined_t error_resp;
471 memset(&error_resp, 0, sizeof(error_resp));
472 error_resp.success = 0;
473 error_resp.error_code = ERROR_CRYPTO_VERIFICATION;
474 SAFE_STRNCPY(error_resp.error_message, "Identity signature verification failed",
475 sizeof(error_resp.error_message));
476 acip_send_session_joined(transport, &error_resp);
477 return;
478 }
479
480 log_debug("SESSION_JOIN signature verified from %s (pubkey: %02x%02x...)", client_ip, req->identity_pubkey[0],
481 req->identity_pubkey[1]);
482 }
483
485 memset(&resp, 0, sizeof(resp));
486
487 asciichat_error_t join_result = session_join(server->sessions, req, &server->config, &resp);
488 if (join_result == ASCIICHAT_OK && resp.success) {
489 acip_send_session_joined(transport, &resp);
490
491 // Update client data in registry (update in place)
492 void *retrieved_data = NULL;
493 if (tcp_server_get_client(&server->tcp_server, client_socket, &retrieved_data) == ASCIICHAT_OK && retrieved_data) {
494 acds_client_data_t *client_data = (acds_client_data_t *)retrieved_data;
495 memcpy(client_data->session_id, resp.session_id, 16);
496 memcpy(client_data->participant_id, resp.participant_id, 16);
497 client_data->joined_session = true;
498 }
499
500 log_info("Client %s joined session (participant %02x%02x...)", client_ip, resp.participant_id[0],
501 resp.participant_id[1]);
502
503 // Save to database - look up session entry first (RCU lock-free iteration)
504 rcu_read_lock();
505
506 session_entry_t *session_iter;
507 session_entry_t *joined_session = NULL;
508 struct cds_lfht_iter iter_ctx;
509
510 /* Iterate through hash table using RCU-safe iterator */
511 cds_lfht_for_each_entry(server->sessions->sessions, &iter_ctx, session_iter, hash_node) {
512 if (memcmp(session_iter->session_id, resp.session_id, 16) == 0) {
513 joined_session = session_iter;
514 break;
515 }
516 }
517
518 if (joined_session) {
519 database_save_session(server->db, joined_session);
520 }
521
522 rcu_read_unlock();
523 } else {
524 acip_send_session_joined(transport, &resp);
525 log_warn("Session join failed for %s: %s", client_ip, resp.error_message);
526 }
527}
528
529static void acds_on_session_leave(const acip_session_leave_t *req, int client_socket, const char *client_ip,
530 void *app_ctx) {
531 acds_server_t *server = (acds_server_t *)app_ctx;
532
533 log_debug("SESSION_LEAVE packet from %s", client_ip);
534
535 // Create ACIP transport for responses
536 ACDS_CREATE_TRANSPORT(client_socket, transport);
537
538 asciichat_error_t leave_result = session_leave(server->sessions, req->session_id, req->participant_id);
539 if (leave_result == ASCIICHAT_OK) {
540 log_info("Client %s left session", client_ip);
541
542 // Update client data to mark as not joined
543 void *retrieved_data = NULL;
544 if (tcp_server_get_client(&server->tcp_server, client_socket, &retrieved_data) == ASCIICHAT_OK && retrieved_data) {
545 acds_client_data_t *client_data = (acds_client_data_t *)retrieved_data;
546 client_data->joined_session = false;
547 }
548
549 // Save updated session to database - look up session entry first (RCU lock-free iteration)
550 rcu_read_lock();
551
552 session_entry_t *session_iter;
553 session_entry_t *left_session = NULL;
554 struct cds_lfht_iter iter_ctx;
555
556 /* Iterate through hash table using RCU-safe iterator */
557 cds_lfht_for_each_entry(server->sessions->sessions, &iter_ctx, session_iter, hash_node) {
558 if (memcmp(session_iter->session_id, req->session_id, 16) == 0) {
559 left_session = session_iter;
560 break;
561 }
562 }
563
564 if (left_session) {
565 database_save_session(server->db, left_session);
566 }
567
568 rcu_read_unlock();
569 } else {
570 acip_send_error(transport, leave_result, asciichat_error_string(leave_result));
571 log_warn("Session leave failed for %s: %s", client_ip, asciichat_error_string(leave_result));
572 }
573}
574
575static void acds_on_webrtc_sdp(const acip_webrtc_sdp_t *sdp, int client_socket, const char *client_ip, void *app_ctx) {
576 acds_server_t *server = (acds_server_t *)app_ctx;
577
578 log_debug("WEBRTC_SDP packet from %s", client_ip);
579
580 // Create ACIP transport for responses
581 ACDS_CREATE_TRANSPORT(client_socket, transport);
582
583 // Calculate payload size (header + SDP string)
584 size_t payload_size = sizeof(acip_webrtc_sdp_t) + sdp->sdp_len;
585
586 asciichat_error_t relay_result = signaling_relay_sdp(server->sessions, &server->tcp_server, sdp, payload_size);
587 if (relay_result != ASCIICHAT_OK) {
588 acip_send_error(transport, relay_result, "SDP relay failed");
589 log_warn("SDP relay failed from %s: %s", client_ip, asciichat_error_string(relay_result));
590 }
591}
592
593static void acds_on_webrtc_ice(const acip_webrtc_ice_t *ice, int client_socket, const char *client_ip, void *app_ctx) {
594 acds_server_t *server = (acds_server_t *)app_ctx;
595
596 log_debug("WEBRTC_ICE packet from %s", client_ip);
597
598 // Create ACIP transport for responses
599 ACDS_CREATE_TRANSPORT(client_socket, transport);
600
601 // Calculate payload size (header + candidate string)
602 size_t payload_size = sizeof(acip_webrtc_ice_t) + ice->candidate_len;
603
604 asciichat_error_t relay_result = signaling_relay_ice(server->sessions, &server->tcp_server, ice, payload_size);
605 if (relay_result != ASCIICHAT_OK) {
606 acip_send_error(transport, relay_result, "ICE relay failed");
607 log_warn("ICE relay failed from %s: %s", client_ip, asciichat_error_string(relay_result));
608 }
609}
610
611static void acds_on_discovery_ping(const void *payload, size_t payload_len, int client_socket, const char *client_ip,
612 void *app_ctx) {
613 (void)payload;
614 (void)payload_len;
615 (void)app_ctx;
616
617 // Create ACIP transport for PONG response
618 ACDS_CREATE_TRANSPORT(client_socket, transport);
619
620 // Send PONG response
621 log_debug("PING from %s, sending PONG", client_ip);
622 acip_send_pong(transport);
623
624 ACDS_DESTROY_TRANSPORT(transport);
625}
626
627// Global ACIP callback structure for ACDS
628static const acip_acds_callbacks_t g_acds_callbacks = {
629 .on_session_create = acds_on_session_create,
630 .on_session_lookup = acds_on_session_lookup,
631 .on_session_join = acds_on_session_join,
632 .on_session_leave = acds_on_session_leave,
633 .on_webrtc_sdp = acds_on_webrtc_sdp,
634 .on_webrtc_ice = acds_on_webrtc_ice,
635 .on_discovery_ping = acds_on_discovery_ping,
636 .app_ctx = NULL // Set dynamically to server instance
637};
638
639void *acds_client_handler(void *arg) {
641 if (!ctx) {
642 log_error("Client handler: NULL context");
643 return NULL;
644 }
645
646 acds_server_t *server = (acds_server_t *)ctx->user_data;
647 socket_t client_socket = ctx->client_socket;
648
649 // Get client IP for logging
650 char client_ip[INET6_ADDRSTRLEN] = {0};
651 tcp_client_context_get_ip(ctx, client_ip, sizeof(client_ip));
652
653 /* Register this client handler thread with RCU library before any RCU operations.
654 This ensures the thread is tracked in the RCU synchronization scheme for
655 lock-free hash table access. */
656 rcu_register_thread();
657 log_info("Client handler started for %s (RCU registered)", client_ip);
658
659 // Register client in TCP server registry with allocated client data
661 if (!client_data) {
662 tcp_server_reject_client(client_socket, "Failed to allocate client data");
663 SAFE_FREE(ctx);
664 rcu_unregister_thread();
665 return NULL;
666 }
667 memset(client_data, 0, sizeof(*client_data));
668 client_data->joined_session = false;
669
670 if (tcp_server_add_client(&server->tcp_server, client_socket, client_data) != ASCIICHAT_OK) {
671 SAFE_FREE(client_data);
672 tcp_server_reject_client(client_socket, "Failed to register client in registry");
673 SAFE_FREE(ctx);
674 rcu_unregister_thread();
675 return NULL;
676 }
677
678 log_debug("Client %s registered (socket=%d, total=%zu)", client_ip, client_socket,
680
681 // TODO: Crypto handshake (when crypto support is added)
682 // For now, accept plain connections
683
684 // Main packet processing loop
685 while (atomic_load(&server->tcp_server.running)) {
686 packet_type_t packet_type;
687 void *payload = NULL;
688 size_t payload_size = 0;
689
690 // Receive packet (blocking with system timeout)
691 int result = receive_packet(client_socket, &packet_type, &payload, &payload_size);
692 if (result < 0) {
693 // Connection error - client disconnected
694 log_info("Client %s disconnected", client_ip);
695 if (payload) {
696 buffer_pool_free(NULL, payload, payload_size);
697 }
698 break;
699 }
700
701 log_debug("Received packet type 0x%02X from %s, length=%zu", packet_type, client_ip, payload_size);
702
703 // O(1) ACIP array-based dispatch
704 // Set server context for callbacks
705 acip_acds_callbacks_t callbacks = g_acds_callbacks;
706 callbacks.app_ctx = server;
707
708 asciichat_error_t dispatch_result =
709 acip_handle_acds_packet(NULL, packet_type, payload, payload_size, client_socket, client_ip, &callbacks);
710
711 if (dispatch_result != ASCIICHAT_OK) {
712 log_warn("ACIP handler failed for packet type 0x%02X from %s: %s", packet_type, client_ip,
713 asciichat_error_string(dispatch_result));
714 }
715
716 // Free payload (allocated by receive_packet via buffer_pool_alloc)
717 if (payload) {
718 buffer_pool_free(NULL, payload, payload_size);
719 }
720 }
721
722 // Cleanup
723 tcp_server_remove_client(&server->tcp_server, client_socket);
724 log_debug("Client %s unregistered (total=%zu)", client_ip, tcp_server_get_client_count(&server->tcp_server));
725
726 socket_close(client_socket);
727 SAFE_FREE(ctx);
728
729 rcu_unregister_thread();
730 log_info("Client handler finished for %s (RCU unregistered)", client_ip);
731 return NULL;
732}
asciichat_error_t acds_verify_session_create(const uint8_t identity_pubkey[32], uint64_t timestamp, uint8_t capabilities, uint8_t max_participants, const uint8_t signature[64])
Verify SESSION_CREATE signature.
asciichat_error_t acds_verify_session_join(const uint8_t identity_pubkey[32], uint64_t timestamp, const char *session_string, const uint8_t signature[64])
Verify SESSION_JOIN signature.
bool acds_validate_timestamp(uint64_t timestamp_ms, uint32_t window_seconds)
Check if timestamp is within acceptable window.
asciichat_error_t acip_handle_acds_packet(acip_transport_t *transport, packet_type_t type, const void *payload, size_t payload_len, int client_socket, const char *client_ip, const acip_acds_callbacks_t *callbacks)
Handle incoming ACDS packet with O(1) dispatch.
ACIP Discovery Server (ACDS) packet handlers.
🗃️ Lock-Free Unified Memory Buffer Pool with Lazy Allocation
asciichat_error_t database_load_sessions(sqlite3 *db, session_registry_t *registry)
Load active sessions from database into registry.
Definition database.c:106
void database_close(sqlite3 *db)
Close database.
Definition database.c:339
asciichat_error_t database_save_session(sqlite3 *db, const session_entry_t *session)
Save session to database.
Definition database.c:226
asciichat_error_t database_init(const char *db_path, sqlite3 **db)
Initialize database and create schema.
Definition database.c:58
💾 SQLite persistence for discovery service
bool check_and_record_rate_limit(rate_limiter_t *rate_limiter, const char *client_ip, rate_event_type_t event_type, socket_t client_socket, const char *operation_name)
Check rate limit and send error if exceeded.
Definition errors.c:38
Network error handling utilities.
acip_session_join_t
acip_session_joined_t
acip_session_lookup_t
acip_webrtc_ice_t
acip_session_created_t
acip_webrtc_sdp_t
acip_session_create_t
acip_session_info_t
acip_session_leave_t
@ SESSION_TYPE_DIRECT_TCP
Direct TCP connection to server IP:port (default)
void buffer_pool_free(buffer_pool_t *pool, void *data, size_t size)
Free a buffer back to the pool (lock-free)
#define SAFE_STRNCPY(dst, src, size)
Definition common.h:358
#define SAFE_FREE(ptr)
Definition common.h:320
#define SAFE_MALLOC(size, cast)
Definition common.h:208
unsigned char uint8_t
Definition common.h:56
#define SET_ERRNO(code, context_msg,...)
Set error code with custom context message and log it.
asciichat_error_t
Error and exit codes - unified status values (0-255)
Definition error_codes.h:46
@ ERROR_CRYPTO_VERIFICATION
Definition error_codes.h:92
@ ERROR_MEMORY
Definition error_codes.h:53
@ ASCIICHAT_OK
Definition error_codes.h:48
@ ERROR_INVALID_PARAM
#define log_warn(...)
Log a WARN message.
#define log_error(...)
Log an ERROR message.
#define log_info(...)
Log an INFO message.
#define log_debug(...)
Log a DEBUG message.
int receive_packet(socket_t sockfd, packet_type_t *type, void **data, size_t *len)
Receive a basic packet without encryption.
Definition packet.c:767
packet_type_t
Network protocol packet type enumeration.
Definition packet.h:281
@ PACKET_TYPE_ACIP_SESSION_CREATED
Session created response (Discovery Server -> Client)
Definition packet.h:371
int socket_t
Socket handle type (POSIX: int)
Definition socket.h:50
int socket_close(socket_t sock)
Close a socket.
void platform_sleep_ms(unsigned int ms)
Sleep for a specified number of milliseconds.
stun_server_t
Definition stun.h:75
turn_server_t
Definition turn.h:101
🌍 IP Address Parsing and Formatting Utilities
void tcp_server_reject_client(socket_t socket, const char *reason)
Reject client connection with reason.
asciichat_error_t tcp_server_remove_client(tcp_server_t *server, socket_t socket)
Remove client from registry.
asciichat_error_t tcp_server_init(tcp_server_t *server, const tcp_server_config_t *config)
Initialize TCP server.
asciichat_error_t tcp_server_get_client(tcp_server_t *server, socket_t socket, void **out_data)
Get client data.
asciichat_error_t tcp_server_run(tcp_server_t *server)
Run TCP server accept loop.
void tcp_server_shutdown(tcp_server_t *server)
Shutdown TCP server.
const char * tcp_client_context_get_ip(const tcp_client_context_t *ctx, char *buf, size_t len)
Get formatted IP address from client context.
size_t tcp_server_get_client_count(tcp_server_t *server)
Get client count.
asciichat_error_t tcp_server_add_client(tcp_server_t *server, socket_t socket, void *client_data)
Add client to registry.
📝 Logging API with multiple log levels and terminal output control
ASCII-Chat Discovery Service (ACDS) Protocol Message Formats.
🌐 Core network I/O operations with timeout support
asciichat_error_t rate_limiter_cleanup(rate_limiter_t *limiter, uint32_t max_age_secs)
Clean up old rate limit events.
Definition rate_limit.c:160
void rate_limiter_destroy(rate_limiter_t *limiter)
Destroy rate limiter and free resources.
Definition rate_limit.c:115
void rate_limiter_set_sqlite_db(rate_limiter_t *limiter, void *db)
Set SQLite database handle for rate limiter.
Definition rate_limit.c:106
rate_limiter_t * rate_limiter_create_sqlite(const char *db_path)
Create SQLite-backed rate limiter.
Definition rate_limit.c:89
🚦 Rate limiting API with pluggable backends
@ RATE_EVENT_SESSION_LOOKUP
Session lookup.
Definition rate_limit.h:49
@ RATE_EVENT_SESSION_JOIN
Session join.
Definition rate_limit.h:50
@ RATE_EVENT_SESSION_CREATE
Session creation.
Definition rate_limit.h:48
asciichat_error_t packet_send_via_transport(acip_transport_t *transport, packet_type_t type, const void *payload, size_t payload_len)
Send packet via transport with proper header (exported for generic wrappers)
Definition send.c:40
asciichat_error_t acip_send_session_joined(acip_transport_t *transport, const acip_session_joined_t *response)
Send SESSION_JOINED response packet.
Definition send.c:293
asciichat_error_t acip_send_error(acip_transport_t *transport, uint32_t error_code, const char *message)
Send error message packet.
Definition send.c:205
asciichat_error_t acip_send_pong(acip_transport_t *transport)
Send pong packet.
Definition send.c:184
asciichat_error_t acip_send_session_info(acip_transport_t *transport, const acip_session_info_t *info)
Send SESSION_INFO response packet.
Definition send.c:285
ACIP shared/bidirectional packet sending functions.
void session_registry_destroy(session_registry_t *registry)
Destroy session registry.
Definition session.c:190
asciichat_error_t session_leave(session_registry_t *registry, const uint8_t session_id[16], const uint8_t participant_id[16])
Leave session.
Definition session.c:559
asciichat_error_t session_registry_init(session_registry_t *registry)
Initialize session registry.
Definition session.c:169
asciichat_error_t session_create(session_registry_t *registry, const acip_session_create_t *req, const acds_config_t *config, acip_session_created_t *resp)
Create new session.
Definition session.c:241
asciichat_error_t session_join(session_registry_t *registry, const acip_session_join_t *req, const acds_config_t *config, acip_session_joined_t *resp)
Join existing session.
Definition session.c:407
asciichat_error_t session_lookup(session_registry_t *registry, const char *session_string, const acds_config_t *config, acip_session_info_t *resp)
Lookup session by string.
Definition session.c:349
🎯 Session registry for discovery service (lock-free RCU implementation)
asciichat_error_t signaling_relay_sdp(session_registry_t *registry, tcp_server_t *tcp_server, const acip_webrtc_sdp_t *sdp, size_t total_packet_len)
Relay SDP offer/answer to recipient.
Definition signaling.c:113
asciichat_error_t signaling_relay_ice(session_registry_t *registry, tcp_server_t *tcp_server, const acip_webrtc_ice_t *ice, size_t total_packet_len)
Relay ICE candidate to recipient.
Definition signaling.c:170
🎬 WebRTC SDP/ICE signaling relay
Cross-platform socket interface for ascii-chat.
💾 SQLite rate limiting backend interface
void * acds_client_handler(void *arg)
Per-client connection handler (thread entry point)
#define ACDS_DESTROY_TRANSPORT(transport_var)
void acds_server_shutdown(acds_server_t *server)
Shutdown discovery server.
asciichat_error_t acds_server_init(acds_server_t *server, const acds_config_t *config)
Initialize discovery server.
#define ACDS_CREATE_TRANSPORT(socket, transport_var)
asciichat_error_t acds_server_run(acds_server_t *server)
Run discovery server main loop.
🌐 Discovery server TCP connection manager
Per-client connection data.
uint8_t session_id[16]
Session UUID (valid if joined_session)
uint8_t participant_id[16]
Participant UUID (valid if joined_session)
bool joined_session
Whether client has successfully joined a session.
Discovery server configuration.
Definition acds/main.h:71
char address[256]
IPv4 bind address (empty = all interfaces)
Definition acds/main.h:73
turn_server_t turn_servers[4]
TURN server configurations.
Definition acds/main.h:88
char database_path[512]
SQLite database path.
Definition acds/main.h:75
bool require_server_identity
Require servers to provide signed identity when creating sessions.
Definition acds/main.h:79
int port
TCP listen port (default 27225)
Definition acds/main.h:72
char address6[256]
IPv6 bind address (empty = all interfaces)
Definition acds/main.h:74
bool require_client_identity
Require clients to provide signed identity when joining sessions.
Definition acds/main.h:80
stun_server_t stun_servers[4]
STUN server configurations.
Definition acds/main.h:86
Discovery server state.
acds_config_t config
Runtime configuration.
atomic_bool shutdown
Shutdown flag for worker threads.
tcp_server_t tcp_server
TCP server abstraction.
sqlite3 * db
SQLite database handle.
session_registry_t * sessions
In-memory session registry.
struct rate_limiter_s * rate_limiter
SQLite-backed rate limiter.
thread_pool_t * worker_pool
Thread pool for background workers.
ACDS packet handler callbacks.
void * app_ctx
Application context (passed to all callbacks)
void(* on_session_create)(const acip_session_create_t *req, int client_socket, const char *client_ip, void *app_ctx)
Called when client requests session creation.
Session entry (RCU hash table node)
Definition session.h:55
uint8_t session_id[16]
UUID.
Definition session.h:57
char session_string[48]
e.g., "swift-river-mountain" (lookup key)
Definition session.h:56
Session registry (lock-free RCU)
Definition session.h:92
struct cds_lfht * sessions
RCU lock-free hash table.
Definition session.h:93
Per-client connection context.
socket_t client_socket
Client connection socket.
void * user_data
User-provided data from config.
TCP server configuration.
atomic_bool running
Server running flag (set false to shutdown)
void thread_pool_destroy(thread_pool_t *pool)
Destroy a thread pool.
Definition thread_pool.c:43
thread_pool_t * thread_pool_create(const char *pool_name)
Create a new thread pool.
Definition thread_pool.c:12
asciichat_error_t thread_pool_spawn(thread_pool_t *pool, void *(*thread_func)(void *), void *thread_arg, int stop_id, const char *thread_name)
Spawn a worker thread in the pool.
Definition thread_pool.c:65
⏱️ High-precision timing utilities using sokol_time.h and uthash
Transport abstraction layer for ACIP protocol.