ascii-chat 0.8.38
Real-time terminal-based video chat with ASCII art conversion
Loading...
Searching...
No Matches
src/discovery-service/server.c
Go to the documentation of this file.
1
16#include <ascii-chat/discovery/database.h>
17#include <ascii-chat/discovery/session.h>
18#include <ascii-chat/discovery/identity.h>
19#include <ascii-chat/crypto/handshake/server.h>
21#include <ascii-chat/network/acip/acds.h>
22#include <ascii-chat/network/acip/acds_handlers.h>
23#include <ascii-chat/network/acip/acds_client.h>
24#include <ascii-chat/network/acip/send.h>
25#include <ascii-chat/network/acip/transport.h>
26#include <ascii-chat/network/rate_limit/rate_limit.h>
27#include <ascii-chat/network/rate_limit/sqlite.h>
28#include <ascii-chat/network/errors.h>
29#include <ascii-chat/log/logging.h>
30#include <ascii-chat/platform/socket.h>
31#include <ascii-chat/network/network.h>
32#include <ascii-chat/buffer_pool.h>
33#include <ascii-chat/network/tcp/server.h>
34#include <ascii-chat/util/ip.h>
35#include <ascii-chat/util/time.h>
36#include <string.h>
37
55static uint64_t get_current_time_ns(void) {
56 return time_get_ns();
57}
58
59static migration_context_t *find_or_create_migration(acds_server_t *server, const uint8_t session_id[16], bool create) {
60 if (!server || !session_id) {
61 return NULL;
62 }
63
64 // Search for existing migration
65 for (size_t i = 0; i < server->num_active_migrations; i++) {
66 if (memcmp(server->active_migrations[i].session_id, session_id, 16) == 0) {
67 return &server->active_migrations[i];
68 }
69 }
70
71 // Not found
72 if (!create) {
73 return NULL;
74 }
75
76 // Create new entry if space available
77 if (server->num_active_migrations >= 32) {
78 log_warn("Too many active migrations (max 32)");
79 return NULL;
80 }
81
83 memcpy(ctx->session_id, session_id, 16);
84 ctx->migration_start_ns = get_current_time_ns();
85 server->num_active_migrations++;
86
87 return ctx;
88}
89
105static void monitor_host_migrations(acds_server_t *server, uint64_t migration_timeout_ms) {
106 if (!server || !server->db || server->num_active_migrations == 0) {
107 return;
108 }
109
110 uint64_t now = get_current_time_ns();
111 uint64_t migration_timeout_ns = migration_timeout_ms * NS_PER_MS_INT; // Convert ms to ns
112
113 size_t i = 0;
114 while (i < server->num_active_migrations) {
115 migration_context_t *ctx = &server->active_migrations[i];
116
117 // Check if migration has timed out (stuck for >migration_timeout_ns)
118 uint64_t elapsed_ns = now - ctx->migration_start_ns;
119 if (elapsed_ns < migration_timeout_ns) {
120 i++;
121 continue;
122 }
123
124 uint64_t elapsed_ms = elapsed_ns / NS_PER_MS_INT; // Convert ns to ms for logging
125 log_warn("Host migration timeout for session %02x%02x... (elapsed %llu ms)", ctx->session_id[0], ctx->session_id[1],
126 (unsigned long long)elapsed_ms);
127
128 // Migration timed out - mark session as failed and clear migration state
129 asciichat_error_t result = database_session_clear_host(server->db, ctx->session_id);
130 if (result != ASCIICHAT_OK) {
131 log_warn("Failed to clear host for timed-out migration: %s", asciichat_error_string(result));
132 }
133
134 // Remove this migration from tracking (shift remaining entries down)
135 if (i < server->num_active_migrations - 1) {
136 memmove(&server->active_migrations[i], &server->active_migrations[i + 1],
137 (server->num_active_migrations - i - 1) * sizeof(migration_context_t));
138 }
139 server->num_active_migrations--;
140 }
141}
142
150static void *cleanup_thread_func(void *arg) {
151 acds_server_t *server = (acds_server_t *)arg;
152 if (!server) {
153 return NULL;
154 }
155
156 log_info("Cleanup thread started (rate limits + expired sessions)");
157
158 while (!atomic_load(&server->shutdown)) {
159 // Sleep for 5 minutes (or until shutdown)
160 // Use 100ms sleep intervals for responsive shutdown on timeout
161 for (int i = 0; i < 3000 && !atomic_load(&server->shutdown); i++) {
162 platform_sleep_ms(100); // Sleep 100ms at a time for responsive shutdown
163 }
164
165 if (atomic_load(&server->shutdown)) {
166 break;
167 }
168
169 // Run rate limit cleanup (delete events older than 1 hour)
170 log_debug("Running rate limit cleanup...");
171 asciichat_error_t result = rate_limiter_prune(server->rate_limiter, 3600);
172 if (result != ASCIICHAT_OK) {
173 log_warn("Rate limit cleanup failed");
174 }
175
176 // Run expired session cleanup
177 log_debug("Running expired session cleanup...");
179
180 // Monitor host migrations (check for completed windows)
181 log_debug("Checking for completed host migrations...");
182 monitor_host_migrations(server, 5000); // 5-second collection window for testing
183 }
184
185 log_info("Cleanup thread exiting");
186 return NULL;
187}
188
189asciichat_error_t acds_server_init(acds_server_t *server, const acds_config_t *config) {
190 if (!server || !config) {
191 return SET_ERRNO(ERROR_INVALID_PARAM, "server or config is NULL");
192 }
193
194 memset(server, 0, sizeof(*server));
195 memcpy(&server->config, config, sizeof(acds_config_t));
196
197 // Open database (SQLite as single source of truth)
198 asciichat_error_t result = database_init(config->database_path, &server->db);
199 if (result != ASCIICHAT_OK) {
200 return result;
201 }
202
203 // Initialize rate limiter with SQLite backend
204 server->rate_limiter = rate_limiter_create_sqlite(NULL); // NULL = externally managed DB
205 if (!server->rate_limiter) {
206 database_close(server->db);
207 return SET_ERRNO(ERROR_MEMORY, "Failed to create rate limiter");
208 }
209
210 // Set the database handle for the rate limiter
212
213 // Configure TCP server
214 tcp_server_config_t tcp_config = {
215 .port = config->port,
216 .ipv4_address = (config->address[0] != '\0') ? config->address : NULL,
217 .ipv6_address = (config->address6[0] != '\0') ? config->address6 : NULL,
218 .bind_ipv4 = (config->address[0] != '\0') || (config->address[0] == '\0' && config->address6[0] == '\0'),
219 .bind_ipv6 = (config->address6[0] != '\0') || (config->address[0] == '\0' && config->address6[0] == '\0'),
220 .accept_timeout_sec = 1,
221 .client_handler = acds_client_handler,
222 .user_data = server,
223 };
224
225 // Initialize TCP server
226 result = tcp_server_init(&server->tcp_server, &tcp_config);
227 if (result != ASCIICHAT_OK) {
229 database_close(server->db);
230 return result;
231 }
232
233 // Initialize background worker thread pool
234 atomic_store(&server->shutdown, false);
235 server->worker_pool = thread_pool_create("acds_workers");
236 if (!server->worker_pool) {
237 log_warn("Failed to create worker thread pool");
240 database_close(server->db);
241 return SET_ERRNO(ERROR_MEMORY, "Failed to create worker thread pool");
242 }
243
244 // Spawn cleanup thread in worker pool
245 if (thread_pool_spawn(server->worker_pool, cleanup_thread_func, server, 0, "cleanup") != ASCIICHAT_OK) {
246 log_warn("Failed to spawn cleanup thread (continuing without cleanup)");
247 }
248
249 log_info("Discovery server initialized successfully");
250 return ASCIICHAT_OK;
251}
252
253asciichat_error_t acds_server_run(acds_server_t *server) {
254 if (!server) {
255 return SET_ERRNO(ERROR_INVALID_PARAM, "server is NULL");
256 }
257
258 log_info("Discovery server accepting connections on port %d", server->config.port);
259
260 // Delegate to TCP server abstraction
261 return tcp_server_run(&server->tcp_server);
262}
263
265 if (!server) {
266 return;
267 }
268
269 // Signal shutdown to worker threads
270 atomic_store(&server->shutdown, true);
271
272 // Shutdown TCP server (closes listen sockets, stops accept loop)
274
275 // Wait for all client handler threads to exit
276 size_t remaining_clients;
277 int shutdown_attempts = 0;
278 const int max_shutdown_attempts = 100; // 10 seconds (100 * 100ms)
279
280 while ((remaining_clients = tcp_server_get_client_count(&server->tcp_server)) > 0 &&
281 shutdown_attempts < max_shutdown_attempts) {
282 log_debug("Waiting for %zu client handler threads to exit (attempt %d/%d)", remaining_clients,
283 shutdown_attempts + 1, max_shutdown_attempts);
285 shutdown_attempts++;
286 }
287
288 if (remaining_clients > 0) {
289 log_warn("Server shutdown: %zu client handler threads still running after 10 seconds", remaining_clients);
290 } else if (shutdown_attempts > 0) {
291 log_debug("All client handler threads exited gracefully");
292 }
293
294 // Stop and destroy worker thread pool (cleanup thread, etc.)
295 if (server->worker_pool) {
297 server->worker_pool = NULL;
298 log_debug("Worker thread pool stopped");
299 }
300
301 // Destroy rate limiter
302 if (server->rate_limiter) {
304 server->rate_limiter = NULL;
305 }
306
307 // Close database
308 if (server->db) {
309 database_close(server->db);
310 server->db = NULL;
311 }
312
313 log_info("Server shutdown complete");
314}
315
316// =============================================================================
317// ACIP Transport Helper Macros for ACDS
318// =============================================================================
319// ACDS uses plain TCP without encryption (discovery service)
320// These macros simplify creating temporary transports for responses
321
322#define ACDS_CREATE_TRANSPORT(socket, transport_var) \
323 acip_transport_t *transport_var = acip_tcp_transport_create(socket, NULL); \
324 if (!transport_var) { \
325 log_error("Failed to create ACDS transport"); \
326 return; \
327 }
328
329#define ACDS_DESTROY_TRANSPORT(transport_var) acip_transport_destroy(transport_var)
330
331// =============================================================================
332// ACIP Callback Wrappers for ACDS
333// =============================================================================
334// These callbacks are invoked by acip_handle_acds_packet() via O(1) array dispatch.
335// Each callback implements: Rate Limit → Crypto Verify → Business Logic → DB Save
336
337static void acds_on_session_create(const acip_session_create_t *req, int client_socket, const char *client_ip,
338 void *app_ctx) {
339 acds_server_t *server = (acds_server_t *)app_ctx;
340
341 log_debug("SESSION_CREATE packet from %s", client_ip);
342
343 // Create ACIP transport for responses
344 ACDS_CREATE_TRANSPORT(client_socket, transport);
345
346 // Get client data for multi-key session state tracking
347 void *client_data_ptr = NULL;
348 if (tcp_server_get_client(&server->tcp_server, client_socket, &client_data_ptr) != ASCIICHAT_OK || !client_data_ptr) {
349 acip_send_error(transport, ERROR_INVALID_PARAM, "Client data not found");
350 ACDS_DESTROY_TRANSPORT(transport);
351 return;
352 }
353 acds_client_data_t *client_data = (acds_client_data_t *)client_data_ptr;
354
355 // Check if identity_pubkey is all zeros (finalize sentinel)
356 bool is_zero_key = true;
357 for (size_t i = 0; i < 32; i++) {
358 if (req->identity_pubkey[i] != 0) {
359 is_zero_key = false;
360 break;
361 }
362 }
363
364 // === MULTI-KEY PROTOCOL: Finalize session ===
365 if (is_zero_key) {
366 if (!client_data->in_multikey_session_create) {
367 // Special case: If identity verification is not required, allow single zero-key session creation
368 if (!server->config.require_server_identity) {
369 log_debug(
370 "SESSION_CREATE with zero key from %s: identity verification not required, treating as anonymous session",
371 client_ip);
372 // Store the zero key as first key and proceed to finalize immediately
373 memcpy(&client_data->pending_session, req, sizeof(acip_session_create_t));
374 memcpy(client_data->pending_session_keys[0], req->identity_pubkey, 32);
375 client_data->num_pending_keys = 1;
376 client_data->in_multikey_session_create = true;
377 // Fall through to finalize logic below
378 } else {
379 acip_send_error(transport, ERROR_INVALID_PARAM, "Zero key received but not in multi-key session creation mode");
380 ACDS_DESTROY_TRANSPORT(transport);
381 return;
382 }
383 }
384
385 log_info("SESSION_CREATE finalize from %s: %zu identity key(s)", client_ip, client_data->num_pending_keys);
386
387 // Auto-detect server's public IP if not provided (empty address)
388 if (client_data->pending_session.server_address[0] == '\0') {
389 log_info("ACDS: Auto-detecting server public IP from connection source: %s", client_ip);
390 SAFE_STRNCPY(client_data->pending_session.server_address, client_ip,
391 sizeof(client_data->pending_session.server_address));
392 log_info("ACDS: Auto-detected server_address='%s'", client_data->pending_session.server_address);
393 }
394
395 // Create session in database with all collected keys
396 // For now, database_session_create only supports single key, so use first key
397 // TODO: Extend database schema to support multiple keys per session
398 acip_session_created_t resp;
399 memset(&resp, 0, sizeof(resp));
400
401 asciichat_error_t create_result =
402 database_session_create(server->db, &client_data->pending_session, &server->config, &resp);
403 if (create_result == ASCIICHAT_OK) {
404 // Build complete payload: fixed response + variable STUN/TURN servers
405 size_t stun_size = (size_t)resp.stun_count * sizeof(stun_server_t);
406 size_t turn_size = (size_t)resp.turn_count * sizeof(turn_server_t);
407 size_t total_size = sizeof(resp) + stun_size + turn_size;
408
409 uint8_t *payload = SAFE_MALLOC(total_size, uint8_t *);
410 if (!payload) {
411 acip_send_error(transport, ERROR_MEMORY, "Out of memory building response");
412 client_data->in_multikey_session_create = false;
413 client_data->num_pending_keys = 0;
414 ACDS_DESTROY_TRANSPORT(transport);
415 return;
416 }
417
418 // Copy fixed response
419 memcpy(payload, &resp, sizeof(resp));
420
421 // Append STUN servers
422 if (resp.stun_count > 0) {
423 memcpy(payload + sizeof(resp), server->config.stun_servers, stun_size);
424 }
425
426 // Append TURN servers
427 if (resp.turn_count > 0) {
428 memcpy(payload + sizeof(resp) + stun_size, server->config.turn_servers, turn_size);
429 }
430
431 // Send complete response with variable-length data
432 packet_send_via_transport(transport, PACKET_TYPE_ACIP_SESSION_CREATED, payload, total_size, 0);
433 SAFE_FREE(payload);
434
435 log_info("Session created: %.*s (UUID: %02x%02x..., %zu keys, %d STUN, %d TURN servers)", resp.session_string_len,
436 resp.session_string, resp.session_id[0], resp.session_id[1], client_data->num_pending_keys,
437 resp.stun_count, resp.turn_count);
438 } else {
439 // Error - send error response using proper error code
440 acip_send_error(transport, create_result, "Failed to create session");
441 log_warn("Session creation failed for %s: %s", client_ip, asciichat_error_string(create_result));
442 }
443
444 // Clear multi-key state
445 client_data->in_multikey_session_create = false;
446 client_data->num_pending_keys = 0;
447
448 ACDS_DESTROY_TRANSPORT(transport);
449 return;
450 }
451
452 // === MULTI-KEY PROTOCOL: Add key to pending session ===
453 if (client_data->in_multikey_session_create) {
454 // Subsequent SESSION_CREATE with non-zero key - add to pending keys
455
456 // Validate we haven't exceeded max keys
457 if (client_data->num_pending_keys >= MAX_IDENTITY_KEYS) {
458 acip_send_error(transport, ERROR_INVALID_PARAM, "Maximum identity keys exceeded");
459 ACDS_DESTROY_TRANSPORT(transport);
460 return;
461 }
462
463 // Validate key is different from all existing keys
464 for (size_t i = 0; i < client_data->num_pending_keys; i++) {
465 if (memcmp(client_data->pending_session_keys[i], req->identity_pubkey, 32) == 0) {
466 acip_send_error(transport, ERROR_INVALID_PARAM, "Duplicate identity key");
467 ACDS_DESTROY_TRANSPORT(transport);
468 return;
469 }
470 }
471
472 // Add key to pending array
473 memcpy(client_data->pending_session_keys[client_data->num_pending_keys], req->identity_pubkey, 32);
474 client_data->num_pending_keys++;
475
476 log_debug("SESSION_CREATE key #%zu from %s (pubkey: %02x%02x...)", client_data->num_pending_keys, client_ip,
477 req->identity_pubkey[0], req->identity_pubkey[1]);
478
479 // Don't send response yet - client will send more keys or zero-key to finalize
480 ACDS_DESTROY_TRANSPORT(transport);
481 return;
482 }
483
484 // === MULTI-KEY PROTOCOL: Start new session ===
485 // First SESSION_CREATE with non-zero key - start multi-key mode
486
487 // Rate limiting check (only on first SESSION_CREATE)
488 if (!check_and_record_rate_limit(server->rate_limiter, client_ip, RATE_EVENT_SESSION_CREATE, client_socket,
489 "SESSION_CREATE")) {
490 ACDS_DESTROY_TRANSPORT(transport);
491 return;
492 }
493
494 // Cryptographic identity verification (if required)
495 if (server->config.require_server_identity) {
496 // Validate timestamp (5 minute window)
497 if (!acds_validate_timestamp(req->timestamp, 300)) {
498 log_warn("SESSION_CREATE rejected from %s: invalid timestamp (replay attack protection)", client_ip);
499 acip_send_error(transport, ERROR_CRYPTO_VERIFICATION, "Timestamp validation failed - too old or in the future");
500 ACDS_DESTROY_TRANSPORT(transport);
501 return;
502 }
503
504 // Verify Ed25519 signature
505 asciichat_error_t verify_result = acds_verify_session_create(
506 req->identity_pubkey, req->timestamp, req->capabilities, req->max_participants, req->signature);
507
508 if (verify_result != ASCIICHAT_OK) {
509 log_warn("SESSION_CREATE rejected from %s: invalid signature (identity verification failed)", client_ip);
510 acip_send_error(transport, ERROR_CRYPTO_VERIFICATION, "Identity signature verification failed");
511 ACDS_DESTROY_TRANSPORT(transport);
512 return;
513 }
514
515 log_debug("SESSION_CREATE signature verified from %s (pubkey: %02x%02x...)", client_ip, req->identity_pubkey[0],
516 req->identity_pubkey[1]);
517 }
518
519 // Reachability verification for Direct TCP sessions
520 if (req->session_type == SESSION_TYPE_DIRECT_TCP) {
521 // Auto-detect public IP if server_address is empty
522 if (req->server_address[0] == '\0') {
523 SAFE_STRNCPY(req->server_address, client_ip, sizeof(req->server_address));
524 log_info("SESSION_CREATE from %s: auto-detected server address (bind was 0.0.0.0)", client_ip);
525 }
526
527 // Verify server_address matches connection source
528 if (strcmp(req->server_address, client_ip) != 0) {
529 log_warn("SESSION_CREATE rejected from %s: server_address '%s' does not match actual connection IP", client_ip,
530 req->server_address);
531 acip_send_error(transport, ERROR_INVALID_PARAM,
532 "Direct TCP sessions require server_address to match your actual IP");
533 ACDS_DESTROY_TRANSPORT(transport);
534 return;
535 }
536 log_debug("SESSION_CREATE reachability verified: %s matches connection source", req->server_address);
537 }
538
539 // Store pending session data and first key
540 memcpy(&client_data->pending_session, req, sizeof(acip_session_create_t));
541 memcpy(client_data->pending_session_keys[0], req->identity_pubkey, 32);
542 client_data->num_pending_keys = 1;
543 client_data->in_multikey_session_create = true;
544
545 log_info("SESSION_CREATE started from %s: multi-key mode (key #1 stored, waiting for more or zero-key finalize)",
546 client_ip);
547
548 // Don't send response yet - wait for more keys or zero-key finalize
549 ACDS_DESTROY_TRANSPORT(transport);
550}
551
552static void acds_on_session_lookup(const acip_session_lookup_t *req, int client_socket, const char *client_ip,
553 void *app_ctx) {
554 acds_server_t *server = (acds_server_t *)app_ctx;
555
556 log_debug("SESSION_LOOKUP packet from %s", client_ip);
557
558 // Create ACIP transport for responses
559 ACDS_CREATE_TRANSPORT(client_socket, transport);
560
561 // Rate limiting check
562 if (!check_and_record_rate_limit(server->rate_limiter, client_ip, RATE_EVENT_SESSION_LOOKUP, client_socket,
563 "SESSION_LOOKUP")) {
564 return;
565 }
566
567 acip_session_info_t resp;
568 memset(&resp, 0, sizeof(resp));
569
570 // Null-terminate session string for lookup
571 char session_string[49] = {0};
572 size_t copy_len =
573 (req->session_string_len < sizeof(session_string) - 1) ? req->session_string_len : sizeof(session_string) - 1;
574 memcpy(session_string, req->session_string, copy_len);
575
576 asciichat_error_t lookup_result = database_session_lookup(server->db, session_string, &server->config, &resp);
577 if (lookup_result == ASCIICHAT_OK) {
578 acip_send_session_info(transport, &resp);
579 log_info("Session lookup for '%s' from %s: %s", session_string, client_ip, resp.found ? "found" : "not found");
580 } else {
581 acip_send_error(transport, lookup_result, "Session lookup failed");
582 log_warn("Session lookup failed for %s: %s", client_ip, asciichat_error_string(lookup_result));
583 }
584}
585
586static void acds_on_session_join(const acip_session_join_t *req, int client_socket, const char *client_ip,
587 void *app_ctx) {
588 acds_server_t *server = (acds_server_t *)app_ctx;
589
590 log_debug("SESSION_JOIN packet from %s", client_ip);
591
592 // Create ACIP transport for responses
593 ACDS_CREATE_TRANSPORT(client_socket, transport);
594
595 // Rate limiting check
596 if (!check_and_record_rate_limit(server->rate_limiter, client_ip, RATE_EVENT_SESSION_JOIN, client_socket,
597 "SESSION_JOIN")) {
598 return;
599 }
600
601 // Cryptographic identity verification (if required)
602 if (server->config.require_client_identity) {
603 // Validate timestamp (5 minute window)
604 if (!acds_validate_timestamp(req->timestamp, 300)) {
605 log_warn("SESSION_JOIN rejected from %s: invalid timestamp (replay attack protection)", client_ip);
606 acip_session_joined_t error_resp;
607 memset(&error_resp, 0, sizeof(error_resp));
608 error_resp.success = 0;
609 error_resp.error_code = ERROR_CRYPTO_VERIFICATION;
610 SAFE_STRNCPY(error_resp.error_message, "Timestamp validation failed", sizeof(error_resp.error_message));
611 acip_send_session_joined(transport, &error_resp);
612 return;
613 }
614
615 // Verify Ed25519 signature
616 asciichat_error_t verify_result =
617 acds_verify_session_join(req->identity_pubkey, req->timestamp, req->session_string, req->signature);
618
619 if (verify_result != ASCIICHAT_OK) {
620 log_warn("SESSION_JOIN rejected from %s: invalid signature (identity verification failed)", client_ip);
621 acip_session_joined_t error_resp;
622 memset(&error_resp, 0, sizeof(error_resp));
623 error_resp.success = 0;
624 error_resp.error_code = ERROR_CRYPTO_VERIFICATION;
625 SAFE_STRNCPY(error_resp.error_message, "Identity signature verification failed",
626 sizeof(error_resp.error_message));
627 acip_send_session_joined(transport, &error_resp);
628 return;
629 }
630
631 log_debug("SESSION_JOIN signature verified from %s (pubkey: %02x%02x...)", client_ip, req->identity_pubkey[0],
632 req->identity_pubkey[1]);
633 }
634
635 acip_session_joined_t resp;
636 memset(&resp, 0, sizeof(resp));
637
638 asciichat_error_t join_result = database_session_join(server->db, req, &server->config, &resp);
639 if (join_result == ASCIICHAT_OK && resp.success) {
640 acip_send_session_joined(transport, &resp);
641
642 // Update client data in registry (update in place)
643 void *retrieved_data = NULL;
644 if (tcp_server_get_client(&server->tcp_server, client_socket, &retrieved_data) == ASCIICHAT_OK && retrieved_data) {
645 acds_client_data_t *client_data = (acds_client_data_t *)retrieved_data;
646 memcpy(client_data->session_id, resp.session_id, 16);
647 memcpy(client_data->participant_id, resp.participant_id, 16);
648 client_data->joined_session = true;
649 }
650
651 log_info("Client %s joined session (participant %02x%02x...)", client_ip, resp.participant_id[0],
652 resp.participant_id[1]);
653 } else {
654 acip_send_session_joined(transport, &resp);
655 log_warn("Session join failed for %s: %s", client_ip, resp.error_message);
656 }
657}
658
659static void acds_on_session_leave(const acip_session_leave_t *req, int client_socket, const char *client_ip,
660 void *app_ctx) {
661 acds_server_t *server = (acds_server_t *)app_ctx;
662
663 log_debug("SESSION_LEAVE packet from %s", client_ip);
664
665 // Create ACIP transport for responses
666 ACDS_CREATE_TRANSPORT(client_socket, transport);
667
668 asciichat_error_t leave_result = database_session_leave(server->db, req->session_id, req->participant_id);
669 if (leave_result == ASCIICHAT_OK) {
670 log_info("Client %s left session", client_ip);
671
672 // Update client data to mark as not joined
673 void *retrieved_data = NULL;
674 if (tcp_server_get_client(&server->tcp_server, client_socket, &retrieved_data) == ASCIICHAT_OK && retrieved_data) {
675 acds_client_data_t *client_data = (acds_client_data_t *)retrieved_data;
676 client_data->joined_session = false;
677 }
678 } else {
679 acip_send_error(transport, leave_result, asciichat_error_string(leave_result));
680 log_warn("Session leave failed for %s: %s", client_ip, asciichat_error_string(leave_result));
681 }
682}
683
684static void acds_on_webrtc_sdp(const acip_webrtc_sdp_t *sdp, size_t payload_len, int client_socket,
685 const char *client_ip, void *app_ctx) {
686 acds_server_t *server = (acds_server_t *)app_ctx;
687
688 log_debug("WEBRTC_SDP packet from %s", client_ip);
689
690 // Create ACIP transport for responses
691 ACDS_CREATE_TRANSPORT(client_socket, transport);
692
693 // Validation is done in the library handler (lib/network/acip/acds_handlers.c)
694 asciichat_error_t relay_result = signaling_relay_sdp(server->db, &server->tcp_server, sdp, payload_len);
695 if (relay_result != ASCIICHAT_OK) {
696 acip_send_error(transport, relay_result, "SDP relay failed");
697 log_warn("SDP relay failed from %s: %s", client_ip, asciichat_error_string(relay_result));
698 }
699}
700
701static void acds_on_webrtc_ice(const acip_webrtc_ice_t *ice, size_t payload_len, int client_socket,
702 const char *client_ip, void *app_ctx) {
703 acds_server_t *server = (acds_server_t *)app_ctx;
704
705 log_debug("WEBRTC_ICE packet from %s", client_ip);
706
707 // Create ACIP transport for responses
708 ACDS_CREATE_TRANSPORT(client_socket, transport);
709
710 // Validation is done in the library handler (lib/network/acip/acds_handlers.c)
711 asciichat_error_t relay_result = signaling_relay_ice(server->db, &server->tcp_server, ice, payload_len);
712 if (relay_result != ASCIICHAT_OK) {
713 acip_send_error(transport, relay_result, "ICE relay failed");
714 log_warn("ICE relay failed from %s: %s", client_ip, asciichat_error_string(relay_result));
715 }
716}
717
718static void acds_on_discovery_ping(const void *payload, size_t payload_len, int client_socket, const char *client_ip,
719 void *app_ctx) {
720 (void)payload;
721 (void)payload_len;
722 (void)app_ctx;
723
724 // Create ACIP transport for PONG response
725 ACDS_CREATE_TRANSPORT(client_socket, transport);
726
727 // Send PONG response
728 log_debug("PING from %s, sending PONG", client_ip);
729 acip_send_pong(transport);
730
731 ACDS_DESTROY_TRANSPORT(transport);
732}
733
734static void acds_on_host_announcement(const acip_host_announcement_t *announcement, int client_socket,
735 const char *client_ip, void *app_ctx) {
736 acds_server_t *server = (acds_server_t *)app_ctx;
737
738 log_info("HOST_ANNOUNCEMENT from %s: host_id=%02x%02x..., address=%s:%u, conn_type=%d", client_ip,
739 announcement->host_id[0], announcement->host_id[1], announcement->host_address, announcement->host_port,
740 announcement->connection_type);
741
742 // Create ACIP transport for responses
743 ACDS_CREATE_TRANSPORT(client_socket, transport);
744
745 // Update session host in database
746 asciichat_error_t result =
747 database_session_update_host(server->db, announcement->session_id, announcement->host_id,
748 announcement->host_address, announcement->host_port, announcement->connection_type);
749
750 if (result != ASCIICHAT_OK) {
751 acip_send_error(transport, result, "Failed to update session host");
752 log_warn("HOST_ANNOUNCEMENT failed from %s: %s", client_ip, asciichat_error_string(result));
753 ACDS_DESTROY_TRANSPORT(transport);
754 return;
755 }
756
757 // TODO: Broadcast HOST_DESIGNATED to all participants in the session
758 // This requires iterating connected clients and sending to those in the same session
759 // For now, new participants will get host info when they join
760
761 log_info("Session host updated via HOST_ANNOUNCEMENT from %s", client_ip);
762 ACDS_DESTROY_TRANSPORT(transport);
763}
764
765static void acds_on_host_lost(const acip_host_lost_t *host_lost, int client_socket, const char *client_ip,
766 void *app_ctx) {
767 acds_server_t *server = (acds_server_t *)app_ctx;
768
769 log_info("HOST_LOST from %s: session=%02x%02x..., participant=%02x%02x..., last_host=%02x%02x..., reason=%u",
770 client_ip, host_lost->session_id[0], host_lost->session_id[1], host_lost->participant_id[0],
771 host_lost->participant_id[1], host_lost->last_host_id[0], host_lost->last_host_id[1],
772 host_lost->disconnect_reason);
773
774 // Create ACIP transport for responses
775 ACDS_CREATE_TRANSPORT(client_socket, transport);
776
777 // Start host migration (set in_migration flag in database for bookkeeping)
778 asciichat_error_t result = database_session_start_migration(server->db, host_lost->session_id);
779 if (result != ASCIICHAT_OK) {
780 acip_send_error(transport, result, "Failed to start host migration");
781 log_warn("HOST_LOST failed from %s: %s", client_ip, asciichat_error_string(result));
782 ACDS_DESTROY_TRANSPORT(transport);
783 return;
784 }
785
786 // Track migration for timeout detection
787 // (Host already elected future host 5 minutes ago; participants will failover to pre-elected host)
788 migration_context_t *migration = find_or_create_migration(server, host_lost->session_id, true);
789 if (!migration) {
790 acip_send_error(transport, ERROR_MEMORY, "Failed to track migration");
791 log_warn("HOST_LOST: Failed to create migration context from %s", client_ip);
792 ACDS_DESTROY_TRANSPORT(transport);
793 return;
794 }
795
796 log_info("Migration tracking started for session %02x%02x... (participant %02x%02x...)", host_lost->session_id[0],
797 host_lost->session_id[1], host_lost->participant_id[0], host_lost->participant_id[1]);
798
799 // NOTE: No candidate collection needed - future host was pre-elected 5 minutes ago.
800 // Participants already know who the new host is from the last FUTURE_HOST_ELECTED broadcast.
801 // They will instantly failover to the pre-elected host.
802 // ACDS just tracks migration timeout for cleanup.
803
804 ACDS_DESTROY_TRANSPORT(transport);
805}
806
807// Global ACIP callback structure for ACDS
808static const acip_acds_callbacks_t g_acds_callbacks = {
809 .on_session_create = acds_on_session_create,
810 .on_session_lookup = acds_on_session_lookup,
811 .on_session_join = acds_on_session_join,
812 .on_session_leave = acds_on_session_leave,
813 .on_webrtc_sdp = acds_on_webrtc_sdp,
814 .on_webrtc_ice = acds_on_webrtc_ice,
815 .on_discovery_ping = acds_on_discovery_ping,
816 .on_host_announcement = acds_on_host_announcement,
817 .on_host_lost = acds_on_host_lost,
818 .app_ctx = NULL // Set dynamically to server instance
819};
820
821void *acds_client_handler(void *arg) {
822 tcp_client_context_t *ctx = (tcp_client_context_t *)arg;
823 if (!ctx) {
824 log_error("Client handler: NULL context");
825 return NULL;
826 }
827
828 acds_server_t *server = (acds_server_t *)ctx->user_data;
829 socket_t client_socket = ctx->client_socket;
830
831 // Get client IP for logging
832 char client_ip[INET6_ADDRSTRLEN] = {0};
833 tcp_client_context_get_ip(ctx, client_ip, sizeof(client_ip));
834
835 log_info("Client handler started for %s", client_ip);
836
837 // Register client in TCP server registry with allocated client data
838 acds_client_data_t *client_data = SAFE_MALLOC(sizeof(acds_client_data_t), acds_client_data_t *);
839 if (!client_data) {
840 tcp_server_reject_client(client_socket, "Failed to allocate client data");
841 SAFE_FREE(ctx);
842 return NULL;
843 }
844 memset(client_data, 0, sizeof(*client_data));
845 client_data->joined_session = false;
846 client_data->handshake_complete = false;
847
848 // Initialize crypto handshake context
849 asciichat_error_t handshake_result = crypto_handshake_init(&client_data->handshake_ctx, true);
850 if (handshake_result != ASCIICHAT_OK) {
851 log_error("Failed to initialize crypto handshake for client %s", client_ip);
852 SAFE_FREE(client_data);
853 tcp_server_reject_client(client_socket, "Failed to initialize crypto handshake");
854 SAFE_FREE(ctx);
855 return NULL;
856 }
857
858 // Set server identity keys for handshake
859 client_data->handshake_ctx.server_public_key.type = KEY_TYPE_ED25519;
860 client_data->handshake_ctx.server_private_key.type = KEY_TYPE_ED25519;
861 memcpy(client_data->handshake_ctx.server_public_key.key, server->identity_public, 32);
862 memcpy(client_data->handshake_ctx.server_private_key.key.ed25519, server->identity_secret, 64);
863
864 if (tcp_server_add_client(&server->tcp_server, client_socket, client_data) != ASCIICHAT_OK) {
865 SAFE_FREE(client_data);
866 tcp_server_reject_client(client_socket, "Failed to register client in registry");
867 SAFE_FREE(ctx);
868 return NULL;
869 }
870
871 log_debug("Client %s registered (socket=%d, total=%zu)", client_ip, client_socket,
873
874 // Perform crypto handshake (three-step process)
875 log_debug("Performing crypto handshake with client %s", client_ip);
876
877 // Step 1: Start handshake (send server key, receive client key)
878 handshake_result = crypto_handshake_server_start_socket(&client_data->handshake_ctx, client_socket);
879 if (handshake_result != ASCIICHAT_OK) {
880 log_warn("Crypto handshake start failed for client %s", client_ip);
881 tcp_server_remove_client(&server->tcp_server, client_socket);
882 SAFE_FREE(ctx);
883 return NULL;
884 }
885
886 // Step 2: Authentication challenge (if required)
887 handshake_result = crypto_handshake_server_auth_challenge_socket(&client_data->handshake_ctx, client_socket);
888 if (handshake_result != ASCIICHAT_OK) {
889 log_warn("Crypto handshake auth challenge failed for client %s", client_ip);
890 tcp_server_remove_client(&server->tcp_server, client_socket);
891 SAFE_FREE(ctx);
892 return NULL;
893 }
894
895 // Step 3: Complete handshake (verify and finalize)
896 handshake_result = crypto_handshake_server_complete_socket(&client_data->handshake_ctx, client_socket);
897 if (handshake_result != ASCIICHAT_OK) {
898 log_warn("Crypto handshake complete failed for client %s", client_ip);
899 tcp_server_remove_client(&server->tcp_server, client_socket);
900 SAFE_FREE(ctx);
901 return NULL;
902 }
903
904 client_data->handshake_complete = true;
905 log_info("Crypto handshake complete for client %s", client_ip);
906
907 // Main packet processing loop
908 while (atomic_load(&server->tcp_server.running)) {
909 packet_type_t packet_type;
910 void *payload = NULL;
911 size_t payload_size = 0;
912
913 // Receive packet (blocking with system timeout)
914 int result = receive_packet(client_socket, &packet_type, &payload, &payload_size);
915 if (result < 0) {
916 // Check error context to distinguish timeout from actual disconnect
917 asciichat_error_context_t err_ctx;
918 bool has_context = HAS_ERRNO(&err_ctx);
919
920 // Check if this is a timeout (non-fatal) or actual disconnect (fatal)
921 asciichat_error_t error = GET_ERRNO();
922 if (error == ERROR_NETWORK_TIMEOUT ||
923 (error == ERROR_NETWORK && has_context && strstr(err_ctx.context_message, "timed out") != NULL)) {
924 // Timeout waiting for next packet - this is normal, continue waiting
925 log_debug("Client %s: receive timeout, continuing to wait for packets", client_ip);
926 if (payload) {
927 buffer_pool_free(NULL, payload, payload_size);
928 }
929 continue;
930 }
931
932 // Actual disconnect or fatal error
933 log_info("Client %s disconnected", client_ip);
934 if (payload) {
935 buffer_pool_free(NULL, payload, payload_size);
936 }
937 break;
938 }
939
940 log_debug("Received packet type 0x%02X from %s, length=%zu", packet_type, client_ip, payload_size);
941
942 // Multi-key session creation protocol: block non-PING/PONG/SESSION_CREATE messages
943 if (client_data->in_multikey_session_create) {
944 bool allowed =
945 (packet_type == PACKET_TYPE_ACIP_SESSION_CREATE || packet_type == PACKET_TYPE_ACIP_DISCOVERY_PING ||
946 packet_type == PACKET_TYPE_PING || packet_type == PACKET_TYPE_PONG);
947
948 if (!allowed) {
949 log_warn("Client %s sent packet type 0x%02X during multi-key session creation - only SESSION_CREATE/PING/PONG "
950 "allowed",
951 client_ip, packet_type);
952
953 // Send error response
954 acip_transport_t *error_transport = acip_tcp_transport_create(client_socket, NULL);
955 if (error_transport) {
956 acip_send_error(error_transport, ERROR_INVALID_PARAM,
957 "Only SESSION_CREATE/PING/PONG allowed during multi-key session creation");
958 ACDS_DESTROY_TRANSPORT(error_transport);
959 }
960
961 // Free payload and continue
962 if (payload) {
963 buffer_pool_free(NULL, payload, payload_size);
964 }
965 continue;
966 }
967 }
968
969 // O(1) ACIP array-based dispatch
970 // Set server context for callbacks
971 acip_acds_callbacks_t callbacks = g_acds_callbacks;
972 callbacks.app_ctx = server;
973
974 asciichat_error_t dispatch_result =
975 acip_handle_acds_packet(NULL, packet_type, payload, payload_size, client_socket, client_ip, &callbacks);
976
977 if (dispatch_result != ASCIICHAT_OK) {
978 log_warn("ACIP handler failed for packet type 0x%02X from %s: %s", packet_type, client_ip,
979 asciichat_error_string(dispatch_result));
980 }
981
982 // Free payload (allocated by receive_packet via buffer_pool_alloc)
983 if (payload) {
984 buffer_pool_free(NULL, payload, payload_size);
985 }
986 }
987
988 // Cleanup
989 tcp_server_remove_client(&server->tcp_server, client_socket);
990 log_debug("Client %s unregistered (total=%zu)", client_ip, tcp_server_get_client_count(&server->tcp_server));
991
992 socket_close(client_socket);
993 SAFE_FREE(ctx);
994
995 log_info("Client handler finished for %s", client_ip);
996 return NULL;
997}
asciichat_error_t acds_verify_session_create(const uint8_t identity_pubkey[32], uint64_t timestamp, uint8_t capabilities, uint8_t max_participants, const uint8_t signature[64])
asciichat_error_t acds_verify_session_join(const uint8_t identity_pubkey[32], uint64_t timestamp, const char *session_string, const uint8_t signature[64])
bool acds_validate_timestamp(uint64_t timestamp_ms, uint32_t window_seconds)
asciichat_error_t acip_handle_acds_packet(acip_transport_t *transport, packet_type_t type, const void *payload, size_t payload_len, int client_socket, const char *client_ip, const acip_acds_callbacks_t *callbacks)
void buffer_pool_free(buffer_pool_t *pool, void *data, size_t size)
asciichat_error_t crypto_handshake_init(crypto_handshake_context_t *ctx, bool is_server)
asciichat_error_t database_session_update_host(sqlite3 *db, const uint8_t session_id[16], const uint8_t host_participant_id[16], const char *host_address, uint16_t host_port, uint8_t connection_type)
asciichat_error_t database_session_clear_host(sqlite3 *db, const uint8_t session_id[16])
asciichat_error_t database_session_lookup(sqlite3 *db, const char *session_string, const acds_config_t *config, acip_session_info_t *resp)
asciichat_error_t database_session_join(sqlite3 *db, const acip_session_join_t *req, const acds_config_t *config, acip_session_joined_t *resp)
asciichat_error_t database_session_start_migration(sqlite3 *db, const uint8_t session_id[16])
asciichat_error_t database_session_leave(sqlite3 *db, const uint8_t session_id[16], const uint8_t participant_id[16])
void database_close(sqlite3 *db)
asciichat_error_t database_session_create(sqlite3 *db, const acip_session_create_t *req, const acds_config_t *config, acip_session_created_t *resp)
asciichat_error_t database_init(const char *db_path, sqlite3 **db)
void database_session_cleanup_expired(sqlite3 *db)
bool check_and_record_rate_limit(rate_limiter_t *rate_limiter, const char *client_ip, rate_event_type_t event_type, socket_t client_socket, const char *operation_name)
Definition errors.c:38
int socket_t
asciichat_error_t crypto_handshake_server_auth_challenge_socket(crypto_handshake_context_t *ctx, socket_t client_socket)
Legacy wrapper: Auth challenge using socket (TCP clients only)
asciichat_error_t crypto_handshake_server_start_socket(crypto_handshake_context_t *ctx, socket_t client_socket)
Legacy wrapper: Start handshake using socket (TCP clients only)
asciichat_error_t crypto_handshake_server_complete_socket(crypto_handshake_context_t *ctx, socket_t client_socket)
Legacy wrapper: Complete handshake using socket (TCP clients only)
void tcp_server_destroy(tcp_server_t *server)
void tcp_server_reject_client(socket_t socket, const char *reason)
asciichat_error_t tcp_server_remove_client(tcp_server_t *server, socket_t socket)
asciichat_error_t tcp_server_init(tcp_server_t *server, const tcp_server_config_t *config)
asciichat_error_t tcp_server_get_client(tcp_server_t *server, socket_t socket, void **out_data)
asciichat_error_t tcp_server_run(tcp_server_t *server)
const char * tcp_client_context_get_ip(const tcp_client_context_t *ctx, char *buf, size_t len)
size_t tcp_server_get_client_count(tcp_server_t *server)
asciichat_error_t tcp_server_add_client(tcp_server_t *server, socket_t socket, void *client_data)
int receive_packet(socket_t sockfd, packet_type_t *type, void **data, size_t *len)
Receive a basic packet without encryption.
Definition packet.c:766
void platform_sleep_ms(unsigned int ms)
void rate_limiter_destroy(rate_limiter_t *limiter)
Definition rate_limit.c:116
void rate_limiter_set_sqlite_db(rate_limiter_t *limiter, void *db)
Definition rate_limit.c:107
rate_limiter_t * rate_limiter_create_sqlite(const char *db_path)
Definition rate_limit.c:90
asciichat_error_t rate_limiter_prune(rate_limiter_t *limiter, uint32_t max_age_secs)
Definition rate_limit.c:161
asciichat_error_t acip_send_session_joined(acip_transport_t *transport, const acip_session_joined_t *response)
Definition send.c:321
asciichat_error_t acip_send_error(acip_transport_t *transport, uint32_t error_code, const char *message)
Definition send.c:233
asciichat_error_t acip_send_pong(acip_transport_t *transport)
Definition send.c:212
asciichat_error_t acip_send_session_info(acip_transport_t *transport, const acip_session_info_t *info)
Definition send.c:313
asciichat_error_t packet_send_via_transport(acip_transport_t *transport, packet_type_t type, const void *payload, size_t payload_len, uint32_t client_id)
Send packet via transport with proper header (exported for generic wrappers)
Definition send.c:41
asciichat_error_t signaling_relay_ice(sqlite3 *db, tcp_server_t *tcp_server, const acip_webrtc_ice_t *ice, size_t total_packet_len)
Relay ICE candidate to recipient.
Definition signaling.c:174
asciichat_error_t signaling_relay_sdp(sqlite3 *db, tcp_server_t *tcp_server, const acip_webrtc_sdp_t *sdp, size_t total_packet_len)
Relay SDP offer/answer to recipient.
Definition signaling.c:127
uint8_t session_id[16]
void * acds_client_handler(void *arg)
Per-client connection handler (thread entry point)
#define ACDS_DESTROY_TRANSPORT(transport_var)
void acds_server_shutdown(acds_server_t *server)
Shutdown discovery server.
asciichat_error_t acds_server_init(acds_server_t *server, const acds_config_t *config)
Initialize discovery server.
#define ACDS_CREATE_TRANSPORT(socket, transport_var)
asciichat_error_t acds_server_run(acds_server_t *server)
Run discovery server main loop.
Per-client connection data.
acip_session_create_t pending_session
Pending session data (from first SESSION_CREATE)
bool handshake_complete
Whether crypto handshake has completed.
uint8_t session_id[16]
Session UUID (valid if joined_session)
uint8_t participant_id[16]
Participant UUID (valid if joined_session)
crypto_handshake_context_t handshake_ctx
Handshake context for encrypted communication.
bool joined_session
Whether client has successfully joined a session.
size_t num_pending_keys
Number of keys received so far.
uint8_t pending_session_keys[MAX_IDENTITY_KEYS][32]
Array of identity public keys.
bool in_multikey_session_create
True during multi-key SESSION_CREATE sequence.
Discovery server configuration.
char address[256]
IPv4 bind address (empty = all interfaces)
turn_server_t turn_servers[4]
TURN server configurations.
char database_path[512]
SQLite database path.
bool require_server_identity
Require servers to provide signed identity when creating sessions.
int port
TCP listen port (default 27225)
char address6[256]
IPv6 bind address (empty = all interfaces)
bool require_client_identity
Require clients to provide signed identity when joining sessions.
stun_server_t stun_servers[4]
STUN server configurations.
Discovery server state.
acds_config_t config
Runtime configuration.
uint8_t identity_public[32]
Ed25519 public key.
size_t num_active_migrations
Number of active migrations.
atomic_bool shutdown
Shutdown flag for worker threads.
tcp_server_t tcp_server
TCP server abstraction.
uint8_t identity_secret[64]
Ed25519 secret key.
sqlite3 * db
SQLite database handle.
struct rate_limiter_s * rate_limiter
SQLite-backed rate limiter.
migration_context_t active_migrations[32]
Slots for up to 32 concurrent migrations.
thread_pool_t * worker_pool
Thread pool for background workers.
In-memory host migration context.
uint64_t migration_start_ns
When migration started (nanoseconds since sokol_time setup)
uint8_t session_id[16]
Session UUID.
acip_transport_t * acip_tcp_transport_create(socket_t sockfd, crypto_context_t *crypto_ctx)
void thread_pool_destroy(thread_pool_t *pool)
Definition thread_pool.c:48
thread_pool_t * thread_pool_create(const char *pool_name)
Definition thread_pool.c:17
asciichat_error_t thread_pool_spawn(thread_pool_t *pool, void *(*thread_func)(void *), void *thread_arg, int stop_id, const char *thread_name)
Definition thread_pool.c:70
uint64_t time_get_ns(void)
Definition util/time.c:48