ascii-chat 0.8.38
Real-time terminal-based video chat with ASCII art conversion
Loading...
Searching...
No Matches
lib/network/websocket/server.c
Go to the documentation of this file.
1
10#include <ascii-chat/network/websocket/server.h>
11#include <ascii-chat/network/acip/transport.h>
12#include <ascii-chat/log/logging.h>
13#include <ascii-chat/common.h>
14#include <ascii-chat/platform/abstraction.h>
15#include <ascii-chat/ringbuffer.h>
16#include <ascii-chat/buffer_pool.h>
17#include <ascii-chat/platform/mutex.h>
18#include <ascii-chat/platform/cond.h>
19#include <ascii-chat/util/time.h>
20#include <libwebsockets.h>
21#include <string.h>
22#include <errno.h>
23#ifndef _WIN32
24#include <sys/socket.h>
25#include <netinet/tcp.h>
26#include <netinet/in.h>
27#endif
28
29// Shared internal types (websocket_recv_msg_t, websocket_transport_data_t)
30#include <ascii-chat/network/websocket/internal.h>
31
37typedef struct {
38 websocket_server_t *server;
39 acip_transport_t *transport;
40 asciichat_thread_t handler_thread;
43
44 // Pending message send state (for LWS_CALLBACK_SERVER_WRITEABLE)
50
51// Per-connection callback counters for diagnosing event loop interleaving.
52// These track how many RECEIVE vs WRITEABLE callbacks fire during message assembly.
53// TODO: Make these per-connection instead of global for proper multi-client support
54static _Atomic uint64_t g_receive_callback_count = 0;
55static _Atomic uint64_t g_writeable_callback_count = 0;
56
61static void websocket_lws_log_callback(int level, const char *line) {
62 // Convert LWS log level to our log level
63 if (level & LLL_ERR) {
64 log_error("[LWS] %s", line);
65 } else if (level & LLL_WARN) {
66 log_warn("[LWS] %s", line);
67 } else if (level & LLL_NOTICE) {
68 log_info("[LWS] %s", line);
69 } else if (level & LLL_INFO) {
70 log_info("[LWS] %s", line);
71 } else if (level & LLL_DEBUG) {
72 log_debug("[LWS] %s", line);
73 }
74}
75
81static int websocket_server_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in,
82 size_t len) {
84 const char *proto_name = lws_get_protocol(wsi) ? lws_get_protocol(wsi)->name : "NULL";
85
86 // LOG EVERY SINGLE CALLBACK WITH PROTOCOL NAME
87 log_dev("๐Ÿ”ด CALLBACK: reason=%d, proto=%s, wsi=%p, len=%zu", reason, proto_name, (void *)wsi, len);
88
89 switch (reason) {
90 case LWS_CALLBACK_ESTABLISHED: {
91 // New WebSocket connection established
92 uint64_t established_ns = time_get_ns();
93 log_info("๐Ÿ”ด๐Ÿ”ด๐Ÿ”ด LWS_CALLBACK_ESTABLISHED FIRED! wsi=%p", (void *)wsi);
94 log_info("[LWS_CALLBACK_ESTABLISHED] WebSocket client connection established at timestamp=%llu",
95 (unsigned long long)established_ns);
96 log_info("WebSocket client connected");
97
98 // Get server instance from protocol user data
99 const struct lws_protocols *protocol = lws_get_protocol(wsi);
100 log_debug("[LWS_CALLBACK_ESTABLISHED] Got protocol: %p", (void *)protocol);
101 if (!protocol || !protocol->user) {
102 log_error("[LWS_CALLBACK_ESTABLISHED] FAILED: Missing protocol user data (protocol=%p, user=%p)",
103 (void *)protocol, protocol ? protocol->user : NULL);
104 return -1;
105 }
106 websocket_server_t *server = (websocket_server_t *)protocol->user;
107 log_debug("[LWS_CALLBACK_ESTABLISHED] Got server: %p, handler: %p", (void *)server, (void *)server->handler);
108
109 // Initialize connection data
110 conn_data->server = server;
111 conn_data->transport = NULL;
112 conn_data->handler_started = false;
113 conn_data->pending_send_data = NULL;
114 conn_data->pending_send_len = 0;
115 conn_data->pending_send_offset = 0;
116 conn_data->has_pending_send = false;
117
118 // Get client address
119 char client_name[128];
120 char client_ip[64];
121 lws_get_peer_simple(wsi, client_name, sizeof(client_name));
122 lws_get_peer_addresses(wsi, lws_get_socket_fd(wsi), client_name, sizeof(client_name), client_ip, sizeof(client_ip));
123
124 log_info("WebSocket client connected from %s", client_ip);
125 log_debug("[LWS_CALLBACK_ESTABLISHED] Client IP: %s", client_ip);
126
127 // Optimize TCP for high-throughput large message transfer.
128 // TCP delayed ACK (default ~40ms) causes the sender to stall waiting for ACKs,
129 // creating ~30ms gaps between 128KB chunk deliveries for large messages.
130 // TCP_QUICKACK forces immediate ACKs so the sender can push data continuously (Linux only).
131 // TCP_NODELAY disables Nagle's algorithm for the send path.
132#ifdef __linux__
133 {
134 int fd = lws_get_socket_fd(wsi);
135 if (fd >= 0) {
136 int quickack = 1;
137 if (setsockopt(fd, IPPROTO_TCP, TCP_QUICKACK, &quickack, sizeof(quickack)) < 0) {
138 log_warn("Failed to set TCP_QUICKACK: %s", SAFE_STRERROR(errno));
139 }
140 int nodelay = 1;
141 if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &nodelay, sizeof(nodelay)) < 0) {
142 log_warn("Failed to set TCP_NODELAY: %s", SAFE_STRERROR(errno));
143 }
144 // Do NOT set SO_RCVBUF/SO_SNDBUF manually.
145 // Setting SO_RCVBUF disables TCP autotuning on Linux, which locks the receive buffer
146 // at the rmem_default (212KB). Without manual override, the kernel can autotune up to
147 // tcp_rmem max (typically 6MB), allowing the entire 921KB video frame to fit in one
148 // TCP window without flow control stalls.
149 int actual_rcv = 0, actual_snd = 0;
150 socklen_t optlen = sizeof(int);
151 getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &actual_rcv, &optlen);
152 getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &actual_snd, &optlen);
153 log_info("[WS_SOCKET] TCP_QUICKACK=1, TCP_NODELAY=1, SO_RCVBUF=%d (autotuned), SO_SNDBUF=%d", actual_rcv,
154 actual_snd);
155 }
156 }
157#endif
158
159 // Create ACIP WebSocket server transport for this connection
160 // Note: We pass NULL for crypto_ctx here - crypto handshake happens at ACIP level
161 log_debug("[LWS_CALLBACK_ESTABLISHED] Creating ACIP WebSocket transport...");
163 if (!conn_data->transport) {
164 log_error("[LWS_CALLBACK_ESTABLISHED] FAILED: acip_websocket_server_transport_create returned NULL");
165 return -1;
166 }
167 log_debug("[LWS_CALLBACK_ESTABLISHED] Transport created: %p", (void *)conn_data->transport);
168
169 // Create client context for handler
170 websocket_client_context_t *client_ctx =
171 SAFE_CALLOC(1, sizeof(websocket_client_context_t), websocket_client_context_t *);
172 if (!client_ctx) {
173 log_error("Failed to allocate client context");
175 conn_data->transport = NULL;
176 return -1;
177 }
178
179 client_ctx->transport = conn_data->transport;
180 SAFE_STRNCPY(client_ctx->client_ip, client_ip, sizeof(client_ctx->client_ip));
181 client_ctx->client_port = 0; // WebSocket doesn't expose client port easily
182 client_ctx->user_data = server->user_data;
183
184 // Spawn handler thread - this calls websocket_client_handler which creates client_info_t
185 log_info("๐Ÿ”ด ABOUT TO SPAWN HANDLER THREAD: handler=%p, ctx=%p", (void *)server->handler, (void *)client_ctx);
186 log_debug("[LWS_CALLBACK_ESTABLISHED] Spawning handler thread (handler=%p, ctx=%p)...", (void *)server->handler,
187 (void *)client_ctx);
188 int handler_create_result = asciichat_thread_create(&conn_data->handler_thread, server->handler, client_ctx);
189 log_info("๐Ÿ”ด asciichat_thread_create returned: %d", handler_create_result);
190 if (handler_create_result != 0) {
191 log_error("[LWS_CALLBACK_ESTABLISHED] FAILED: asciichat_thread_create returned error");
192 SAFE_FREE(client_ctx);
194 conn_data->transport = NULL;
195 return -1;
196 }
197
198 conn_data->handler_started = true;
199 log_debug("[LWS_CALLBACK_ESTABLISHED] Handler thread spawned successfully");
200 log_info("โ˜…โ˜…โ˜… ESTABLISHED CALLBACK SUCCESS - handler thread spawned! โ˜…โ˜…โ˜…");
201 break;
202 }
203
204 case LWS_CALLBACK_CLOSED: {
205 // Connection closed
206 uint64_t close_callback_start_ns = time_get_ns();
207 log_info("โ˜…โ˜…โ˜… LWS_CALLBACK_CLOSED FIRED - WHY IS CONNECTION CLOSING? โ˜…โ˜…โ˜…");
208 log_info("[LWS_CALLBACK_CLOSED] WebSocket client disconnected, wsi=%p, handler_started=%d, timestamp=%llu",
209 (void *)wsi, conn_data ? conn_data->handler_started : -1, (unsigned long long)close_callback_start_ns);
210
211 // Try to extract close code from 'in' parameter if available
212 uint16_t close_code = 0;
213 if (in && len >= 2) {
214 close_code = (uint16_t)((uint8_t *)in)[0] << 8 | ((uint8_t *)in)[1];
215 log_info("[LWS_CALLBACK_CLOSED] Close frame received with code=%u (1000=normal, 1001=going away, 1006=abnormal, "
216 "1009=message too big)",
217 close_code);
218 } else {
219 log_info(
220 "[LWS_CALLBACK_CLOSED] No close frame (in=%p, len=%zu) - connection closed without WebSocket close handshake",
221 in, len);
222 }
223
224 // Mark cleanup in progress to prevent race conditions with other threads accessing transport
225 if (conn_data) {
226 conn_data->cleaning_up = true;
227 }
228
229 // Clean up pending send data only if not already freed
230 if (conn_data && conn_data->has_pending_send && conn_data->pending_send_data) {
231 SAFE_FREE(conn_data->pending_send_data);
232 conn_data->pending_send_data = NULL;
233 conn_data->has_pending_send = false;
234 }
235
236 // Close the transport to mark it as disconnected.
237 // This signals the receive thread to exit, allowing clean shutdown.
238 // Guard against use-after-free: check both conn_data and transport are valid
239 acip_transport_t *transport_snapshot = NULL;
240 if (conn_data && conn_data->transport) {
241 transport_snapshot = conn_data->transport;
242 conn_data->transport = NULL; // NULL out immediately to prevent race condition
243
244 log_debug("[LWS_CALLBACK_CLOSED] Closing transport=%p for wsi=%p", (void *)transport_snapshot, (void *)wsi);
245 // Now safely close the transport (methods pointer is stable)
246 if (transport_snapshot && transport_snapshot->methods) {
247 transport_snapshot->methods->close(transport_snapshot);
248 }
249 }
250
251 if (conn_data && conn_data->handler_started) {
252 // Wait for handler thread to complete
253 uint64_t join_start_ns = time_get_ns();
254 log_debug("[LWS_CALLBACK_CLOSED] Waiting for handler thread to complete...");
255 asciichat_thread_join(&conn_data->handler_thread, NULL);
256 uint64_t join_end_ns = time_get_ns();
257 char join_duration_str[32];
258 format_duration_ns((double)(join_end_ns - join_start_ns), join_duration_str, sizeof(join_duration_str));
259 conn_data->handler_started = false;
260 log_info("[LWS_CALLBACK_CLOSED] Handler thread completed (join took %s)", join_duration_str);
261 }
262
263 // Destroy the transport and free all its resources (send_queue, recv_queue, etc)
264 if (transport_snapshot) {
265 log_debug("[LWS_CALLBACK_CLOSED] Destroying transport=%p", (void *)transport_snapshot);
266 acip_transport_destroy(transport_snapshot);
267 }
268
269 // Ensure transport pointer is NULL
270 if (conn_data) {
271 conn_data->transport = NULL;
272 }
273
274 uint64_t close_callback_end_ns = time_get_ns();
275 char total_duration_str[32];
276 format_duration_ns((double)(close_callback_end_ns - close_callback_start_ns), total_duration_str,
277 sizeof(total_duration_str));
278 log_info("[LWS_CALLBACK_CLOSED] Complete cleanup took %s", total_duration_str);
279 break;
280 }
281
282 case LWS_CALLBACK_SERVER_WRITEABLE: {
283 uint64_t writeable_callback_start_ns = time_get_ns();
284 atomic_fetch_add(&g_writeable_callback_count, 1);
285 log_dev_every(4500 * US_PER_MS_INT, "=== LWS_CALLBACK_SERVER_WRITEABLE FIRED === wsi=%p, timestamp=%llu",
286 (void *)wsi, (unsigned long long)writeable_callback_start_ns);
287
288 // Dequeue and send pending data
289 if (!conn_data) {
290 log_dev_every(4500 * US_PER_MS_INT, "SERVER_WRITEABLE: No conn_data");
291 break;
292 }
293
294 // Check if cleanup is in progress to avoid race condition with remove_client
295 if (conn_data->cleaning_up) {
296 log_dev_every(4500 * US_PER_MS_INT, "SERVER_WRITEABLE: Cleanup in progress, skipping");
297 break;
298 }
299
300 // Snapshot the transport pointer to avoid race condition with cleanup thread
301 acip_transport_t *transport_snapshot = conn_data->transport;
302 if (!transport_snapshot) {
303 log_dev_every(4500 * US_PER_MS_INT, "SERVER_WRITEABLE: No transport");
304 break;
305 }
306
307 websocket_transport_data_t *ws_data = (websocket_transport_data_t *)transport_snapshot->impl_data;
308 if (!ws_data || !ws_data->send_queue) {
309 log_dev_every(4500 * US_PER_MS_INT, "SERVER_WRITEABLE: No ws_data or send_queue");
310 break;
311 }
312
313 // Fragment size must match rx_buffer_size per LWS performance guidelines.
314 // Sending fragments larger than rx_buffer_size causes lws_write() to internally
315 // buffer data, leading to performance degradation. The recommended approach is to
316 // send chunks equal to (or slightly smaller than) the rx_buffer_size.
317 // See: https://github.com/warmcat/libwebsockets/issues/464
318 // We configured rx_buffer_size=524288 (512KB) above in websocket_protocols.
319 // This reduces 1.2MB frames from 300x4KB fragments to 2-3x512KB fragments,
320 // dramatically improving throughput and FPS.
321 const size_t FRAGMENT_SIZE = 262144; // 256KB - balance between throughput and stability
322
323 // Check if we have a message in progress
324 if (conn_data->has_pending_send) {
325 size_t chunk_size = (conn_data->pending_send_len - conn_data->pending_send_offset > FRAGMENT_SIZE)
326 ? FRAGMENT_SIZE
327 : (conn_data->pending_send_len - conn_data->pending_send_offset);
328 int is_start = (conn_data->pending_send_offset == 0);
329 int is_end = (conn_data->pending_send_offset + chunk_size >= conn_data->pending_send_len);
330
331 enum lws_write_protocol flags = lws_write_ws_flags(LWS_WRITE_BINARY, is_start, is_end);
332
333 // Ensure send buffer is large enough
334 size_t required_size = LWS_PRE + chunk_size;
335 if (ws_data->send_buffer_capacity < required_size) {
336 SAFE_FREE(ws_data->send_buffer);
337 ws_data->send_buffer = SAFE_MALLOC(required_size, uint8_t *);
338 if (!ws_data->send_buffer) {
339 log_error("Failed to allocate send buffer");
340 SAFE_FREE(conn_data->pending_send_data);
341 conn_data->pending_send_data = NULL;
342 conn_data->has_pending_send = false;
343 break;
344 }
345 ws_data->send_buffer_capacity = required_size;
346 }
347
348 memcpy(ws_data->send_buffer + LWS_PRE, conn_data->pending_send_data + conn_data->pending_send_offset, chunk_size);
349
350 uint64_t write_start_ns = time_get_ns();
351 int written = lws_write(wsi, ws_data->send_buffer + LWS_PRE, chunk_size, flags);
352 uint64_t write_end_ns = time_get_ns();
353 char write_duration_str[32];
354 format_duration_ns((double)(write_end_ns - write_start_ns), write_duration_str, sizeof(write_duration_str));
355 log_dev_every(4500 * US_PER_MS_INT, "lws_write returned %d bytes in %s (chunk_size=%zu)", written,
356 write_duration_str, chunk_size);
357
358 if (written < 0) {
359 log_error("Server WebSocket write error: %d at offset %zu/%zu", written, conn_data->pending_send_offset,
360 conn_data->pending_send_len);
361 SAFE_FREE(conn_data->pending_send_data);
362 conn_data->pending_send_data = NULL;
363 conn_data->has_pending_send = false;
364 break;
365 }
366
367 if ((size_t)written != chunk_size) {
368 log_warn("Server WebSocket partial write: %d/%zu bytes at offset %zu/%zu", written, chunk_size,
369 conn_data->pending_send_offset, conn_data->pending_send_len);
370 // Don't fail on partial write - request another callback to continue
371 conn_data->pending_send_offset += written;
372 lws_callback_on_writable(wsi);
373 break;
374 }
375
376 conn_data->pending_send_offset += chunk_size;
377
378 if (is_end) {
379 // Message fully sent
380 log_dev_every(4500 * US_PER_MS_INT, "SERVER_WRITEABLE: Message fully sent (%zu bytes)",
381 conn_data->pending_send_len);
382 SAFE_FREE(conn_data->pending_send_data);
383 conn_data->pending_send_data = NULL;
384 conn_data->has_pending_send = false;
385 } else {
386 // More fragments to send
387 log_dev_every(4500 * US_PER_MS_INT,
388 "SERVER_WRITEABLE: Sent fragment %zu/%zu bytes, requesting another callback",
389 conn_data->pending_send_offset, conn_data->pending_send_len);
390 lws_callback_on_writable(wsi);
391 break;
392 }
393 }
394
395 // Try to dequeue next message if current one is done
396 mutex_lock(&ws_data->send_mutex);
397 bool has_messages = !ringbuffer_is_empty(ws_data->send_queue);
398 mutex_unlock(&ws_data->send_mutex);
399
400 if (has_messages) {
401 mutex_lock(&ws_data->send_mutex);
402 websocket_recv_msg_t msg;
403 bool success = ringbuffer_read(ws_data->send_queue, &msg);
404 mutex_unlock(&ws_data->send_mutex);
405
406 if (success && msg.data) {
407 // Start sending this message
408 conn_data->pending_send_data = msg.data;
409 conn_data->pending_send_len = msg.len;
410 conn_data->pending_send_offset = 0;
411 conn_data->has_pending_send = true;
412
413 log_dev_every(4500 * US_PER_MS_INT, ">>> SERVER_WRITEABLE: Dequeued message %zu bytes, sending first fragment",
414 msg.len);
415
416 // Send first fragment
417 size_t chunk_size = (msg.len > FRAGMENT_SIZE) ? FRAGMENT_SIZE : msg.len;
418 int is_start = 1;
419 int is_end = (chunk_size >= msg.len);
420
421 enum lws_write_protocol flags = lws_write_ws_flags(LWS_WRITE_BINARY, is_start, is_end);
422
423 size_t required_size = LWS_PRE + chunk_size;
424 if (ws_data->send_buffer_capacity < required_size) {
425 SAFE_FREE(ws_data->send_buffer);
426 ws_data->send_buffer = SAFE_MALLOC(required_size, uint8_t *);
427 if (!ws_data->send_buffer) {
428 log_error("Failed to allocate send buffer");
429 SAFE_FREE(msg.data);
430 conn_data->has_pending_send = false;
431 break;
432 }
433 ws_data->send_buffer_capacity = required_size;
434 }
435
436 memcpy(ws_data->send_buffer + LWS_PRE, msg.data, chunk_size);
437
438 int written = lws_write(wsi, ws_data->send_buffer + LWS_PRE, chunk_size, flags);
439 if (written < 0) {
440 log_error("Server WebSocket write error on first fragment: %d", written);
441 SAFE_FREE(msg.data);
442 conn_data->has_pending_send = false;
443 break;
444 }
445
446 if ((size_t)written != chunk_size) {
447 log_warn("Server WebSocket partial write on first fragment: %d/%zu", written, chunk_size);
448 conn_data->pending_send_offset = written;
449 } else {
450 conn_data->pending_send_offset = chunk_size;
451 }
452
453 if (!is_end) {
454 log_dev_every(4500 * US_PER_MS_INT,
455 ">>> SERVER_WRITEABLE: First fragment sent, requesting callback for next fragment");
456 lws_callback_on_writable(wsi);
457 } else {
458 log_dev_every(4500 * US_PER_MS_INT, "SERVER_WRITEABLE: Message fully sent in first fragment (%zu bytes)",
459 chunk_size);
460 SAFE_FREE(msg.data);
461 conn_data->has_pending_send = false;
462
463 // Check if there are more messages
464 mutex_lock(&ws_data->send_mutex);
465 if (!ringbuffer_is_empty(ws_data->send_queue)) {
466 lws_callback_on_writable(wsi);
467 }
468 mutex_unlock(&ws_data->send_mutex);
469 }
470 }
471 }
472
473 break;
474 }
475
476 case LWS_CALLBACK_RECEIVE: {
477 // Received data from client - may be fragmented for large messages
478 log_dev_every(4500 * US_PER_MS_INT, "LWS_CALLBACK_RECEIVE: conn_data=%p, transport=%p, len=%zu", (void *)conn_data,
479 conn_data ? (void *)conn_data->transport : NULL, len);
480
481 if (!conn_data) {
482 log_error("LWS_CALLBACK_RECEIVE: conn_data is NULL! Need to initialize from ESTABLISHED or handle here");
483 break;
484 }
485
486 // Check if cleanup is in progress to avoid race condition with remove_client
487 if (conn_data->cleaning_up) {
488 log_debug("LWS_CALLBACK_RECEIVE: Cleanup in progress, discarding received data");
489 break;
490 }
491
492 // Snapshot the transport pointer to avoid race condition with cleanup thread
493 acip_transport_t *transport_snapshot = conn_data->transport;
494 log_info("๐Ÿ”ด [WS_RECEIVE] conn_data=%p transport_snapshot=%p handler_started=%d", (void *)conn_data,
495 (void *)transport_snapshot, conn_data ? conn_data->handler_started : -1);
496 if (!transport_snapshot) {
497 log_error("LWS_CALLBACK_RECEIVE: transport is NULL! ESTABLISHED never called?");
498 // Try to initialize the transport here as a fallback
499 const struct lws_protocols *protocol = lws_get_protocol(wsi);
500 if (!protocol || !protocol->user) {
501 log_error("LWS_CALLBACK_RECEIVE: Cannot get protocol or user data for fallback initialization");
502 break;
503 }
504
505 websocket_server_t *server = (websocket_server_t *)protocol->user;
506 char client_ip[64];
507 lws_get_peer_addresses(wsi, lws_get_socket_fd(wsi), NULL, 0, client_ip, sizeof(client_ip));
508
509 log_info("LWS_CALLBACK_RECEIVE: Initializing transport as fallback (client_ip=%s)", client_ip);
510 conn_data->server = server;
511 conn_data->transport = acip_websocket_server_transport_create(wsi, NULL);
512 conn_data->handler_started = false;
513 conn_data->pending_send_data = NULL;
514 conn_data->pending_send_len = 0;
515 conn_data->pending_send_offset = 0;
516 conn_data->has_pending_send = false;
517
518 if (!conn_data->transport) {
519 log_error("LWS_CALLBACK_RECEIVE: Failed to create transport in fallback");
520 break;
521 }
522
523 log_debug("LWS_CALLBACK_RECEIVE: Spawning handler thread in fallback");
524 websocket_client_context_t *client_ctx =
525 SAFE_CALLOC(1, sizeof(websocket_client_context_t), websocket_client_context_t *);
526 if (!client_ctx) {
527 log_error("Failed to allocate client context");
528 acip_transport_destroy(conn_data->transport);
529 conn_data->transport = NULL;
530 break;
531 }
532
533 client_ctx->transport = conn_data->transport;
534 SAFE_STRNCPY(client_ctx->client_ip, client_ip, sizeof(client_ctx->client_ip));
535 client_ctx->client_port = 0;
536 client_ctx->user_data = server->user_data;
537
538 if (asciichat_thread_create(&conn_data->handler_thread, server->handler, client_ctx) != 0) {
539 log_error("LWS_CALLBACK_RECEIVE: Failed to spawn handler thread");
540 SAFE_FREE(client_ctx);
541 acip_transport_destroy(conn_data->transport);
542 conn_data->transport = NULL;
543 break;
544 }
545
546 conn_data->handler_started = true;
547 log_info("LWS_CALLBACK_RECEIVE: Handler thread spawned in fallback");
548
549 // Update snapshot after creating transport
550 transport_snapshot = conn_data->transport;
551 }
552
553 if (!in || len == 0) {
554 break;
555 }
556
557 // Check if cleanup is already in progress to avoid race condition with LWS_CALLBACK_CLOSED
558 if (conn_data && conn_data->cleaning_up) {
559 log_debug("RECEIVE: Cleanup in progress, ignoring fragment");
560 break;
561 }
562
563 // Handler thread was spawned in LWS_CALLBACK_ESTABLISHED (or RECEIVE fallback)
564 // (It fires for server-side connections and properly initializes client_info_t)
565
566 // Get transport data using snapshotted pointer
567 websocket_transport_data_t *ws_data = (websocket_transport_data_t *)transport_snapshot->impl_data;
568 if (!ws_data || !ws_data->recv_queue) {
569 log_error("WebSocket transport has no implementation data or recv_queue (ws_data=%p, recv_queue=%p)",
570 (void *)ws_data, ws_data ? (void *)ws_data->recv_queue : NULL);
571 break;
572 }
573
574 uint64_t callback_enter_ns = time_get_ns(); // Capture entry time for duration measurement
575 log_debug("[WS_TIMING] callback_enter_ns captured: %llu", (unsigned long long)callback_enter_ns);
576
577 bool is_first = lws_is_first_fragment(wsi);
578 bool is_final = lws_is_final_fragment(wsi);
579 log_debug("[WS_TIMING] is_first=%d is_final=%d, about to increment callback count", is_first, is_final);
580 log_info("[WS_FRAG_DEBUG] === RECEIVE CALLBACK: is_first=%d is_final=%d len=%zu ===", is_first, is_final, len);
581
582 atomic_fetch_add(&g_receive_callback_count, 1);
583 log_debug("[WS_TIMING] incremented callback count");
584
585 // Re-enable TCP_QUICKACK on EVERY fragment delivery (Linux only).
586 // Linux resets TCP_QUICKACK after each ACK, reverting to delayed ACK mode (~40ms).
587 // Without this, only the first fragment batch benefits from quick ACKs, and subsequent
588 // batches see ~30ms gaps as the sender waits for delayed ACKs before sending more data.
589#ifdef __linux__
590 {
591 int fd = lws_get_socket_fd(wsi);
592 if (fd >= 0) {
593 int quickack = 1;
594 setsockopt(fd, IPPROTO_TCP, TCP_QUICKACK, &quickack, sizeof(quickack));
595 }
596 }
597#endif
598
599 if (is_first) {
600 // Reset fragment counters at start of new message
601 atomic_store(&g_writeable_callback_count, 0);
602 atomic_store(&g_receive_callback_count, 1);
603 }
604
605 // Log fragment arrival
606 // Note: Timing calculation with global g_receive_first_fragment_ns is broken for multi-fragment
607 // messages and will be fixed with per-connection tracking
608 {
609 uint64_t frag_num = atomic_load(&g_receive_callback_count);
610 log_info("[WS_FRAG] Fragment #%llu: %zu bytes (first=%d final=%d)", (unsigned long long)frag_num, len, is_first,
611 is_final);
612 }
613
614 // Debug: Log raw bytes of incoming fragment
615 log_dev_every(4500 * US_PER_MS_INT, "WebSocket fragment: %zu bytes (first=%d, final=%d)", len, is_first, is_final);
616 if (len > 0 && len <= 256) {
617 const uint8_t *bytes = (const uint8_t *)in;
618 char hex_buf[1024];
619 size_t hex_pos = 0;
620 for (size_t i = 0; i < len && hex_pos < sizeof(hex_buf) - 4; i++) {
621 hex_pos += snprintf(hex_buf + hex_pos, sizeof(hex_buf) - hex_pos, "%02x ", bytes[i]);
622 }
623 log_dev_every(4500 * US_PER_MS_INT, " Raw bytes: %s", hex_buf);
624 }
625
626 // If this is a single-fragment message (first and final), log the packet type for debugging
627 if (is_first && is_final && len >= 18) {
628 const uint8_t *data = (const uint8_t *)in;
629 // ACIP packet header: magic(8) + type(2) + length(4) + crc(4) + client_id(2)
630 uint16_t pkt_type = (data[8] << 8) | data[9];
631 uint32_t pkt_len = (data[10] << 24) | (data[11] << 16) | (data[12] << 8) | data[13];
632 log_dev_every(4500 * US_PER_MS_INT, "Single-fragment ACIP packet: type=%d (0x%x) len=%u total_size=%zu", pkt_type,
633 pkt_type, pkt_len, len);
634 }
635
636 // Queue this fragment immediately with first/final flags.
637 // Per LWS design, each fragment is processed individually by the callback.
638 // We must NOT manually reassemble fragments - that breaks LWS's internal state machine.
639 // Instead, queue each fragment with metadata, and let the receiver decide on reassembly.
640 // This follows the pattern in lws examples (minimal-ws-server-echo, etc).
641
642 websocket_recv_msg_t msg;
643 msg.data = buffer_pool_alloc(NULL, len);
644 if (!msg.data) {
645 log_error("Failed to allocate buffer for fragment (%zu bytes)", len);
646 break;
647 }
648
649 memcpy(msg.data, in, len);
650 msg.len = len;
651 msg.first = is_first;
652 msg.final = is_final;
653
654 mutex_lock(&ws_data->recv_mutex);
655
656 // Re-check recv_queue is still valid after acquiring lock
657 // (LWS_CALLBACK_CLOSED may have cleared it while we were waiting for the lock)
658 if (!ws_data->recv_queue) {
659 log_warn("recv_queue was cleared during lock wait, dropping fragment");
660 mutex_unlock(&ws_data->recv_mutex);
661 buffer_pool_free(NULL, msg.data, msg.len);
662 break;
663 }
664
665 // Snapshot queue status for logging (safe now that we hold the lock)
666 size_t queue_current_size = ringbuffer_size(ws_data->recv_queue);
667 size_t queue_capacity = ws_data->recv_queue->capacity;
668 size_t queue_free = queue_capacity - queue_current_size;
669 log_dev_every(4500 * US_PER_MS_INT, "[WS_FLOW] Queue: free=%zu/%zu (used=%zu)", queue_free, queue_capacity,
670 queue_current_size);
671
672 bool success = ringbuffer_write(ws_data->recv_queue, &msg);
673 if (!success) {
674 // Queue is full - drop fragment (flow control would deadlock the dispatch thread)
675 log_warn("[WS_FLOW] Receive queue FULL - dropping fragment (len=%zu, first=%d, final=%d)", len, is_first,
676 is_final);
677 buffer_pool_free(NULL, msg.data, msg.len);
678
679 mutex_unlock(&ws_data->recv_mutex);
680 break;
681 }
682
683 // Signal waiting recv() call that a fragment is available
684 log_dev("[WS_DEBUG] RECEIVE: About to signal recv_cond (queue size=%zu)", ringbuffer_size(ws_data->recv_queue));
685 cond_signal(&ws_data->recv_cond);
686 log_dev("[WS_DEBUG] RECEIVE: Signaled recv_cond");
687 mutex_unlock(&ws_data->recv_mutex);
688 log_dev("[WS_DEBUG] RECEIVE: Unlocked recv_mutex");
689
690 // Signal LWS to call WRITEABLE callback (matches lws example pattern)
691 // This keeps the event loop active and allows server to send responses
692 lws_callback_on_writable(wsi);
693
694 log_info("[WS_FRAG] Queued fragment: %zu bytes (first=%d final=%d, total_fragments=%llu)", len, is_first, is_final,
695 (unsigned long long)atomic_load(&g_receive_callback_count));
696
697 // Log callback duration
698 {
699 uint64_t callback_exit_ns = time_get_ns();
700 double callback_dur_us = (double)(callback_exit_ns - callback_enter_ns) / 1e3;
701 if (callback_dur_us > 200) {
702 log_warn("[WS_CALLBACK_DURATION] RECEIVE callback took %.1f ยตs (> 200ยตs threshold)", callback_dur_us);
703 }
704 log_debug("[WS_CALLBACK_DURATION] RECEIVE callback completed in %.1f ยตs (fragment: first=%d final=%d len=%zu)",
705 callback_dur_us, is_first, is_final, len);
706 }
707 log_debug("[WS_RECEIVE] ===== RECEIVE CALLBACK COMPLETE, returning 0 to continue =====");
708 log_info("[WS_RECEIVE_RETURN] Returning 0 from RECEIVE callback (success). fragmented=%d (first=%d final=%d)",
709 (!is_final ? 1 : 0), is_first, is_final);
710 break;
711 }
712
713 case LWS_CALLBACK_FILTER_HTTP_CONNECTION: {
714 // WebSocket upgrade handshake - allow all connections
715 log_info("[FILTER_HTTP_CONNECTION] WebSocket upgrade request (allow protocol upgrade)");
716 return 0; // Allow the connection
717 }
718
719 case LWS_CALLBACK_PROTOCOL_INIT: {
720 // Protocol initialization
721 log_info("[PROTOCOL_INIT] Protocol initialized, proto=%s", proto_name);
722 break;
723 }
724
725 case LWS_CALLBACK_EVENT_WAIT_CANCELLED: {
726 // Fired on the service thread when lws_cancel_service() is called from another thread.
727 // This is how we safely convert cross-thread send requests into writable callbacks.
728 // lws_callback_on_writable() is only safe from the service thread context.
729 log_dev_every(4500 * US_PER_MS_INT, "LWS_CALLBACK_EVENT_WAIT_CANCELLED triggered - requesting writable callbacks");
730 const struct lws_protocols *protocol = lws_get_protocol(wsi);
731 if (protocol) {
732 log_dev_every(4500 * US_PER_MS_INT, "EVENT_WAIT_CANCELLED: Calling lws_callback_on_writable_all_protocol");
733 lws_callback_on_writable_all_protocol(lws_get_context(wsi), protocol);
734 } else {
735 log_error("EVENT_WAIT_CANCELLED: No protocol found on wsi");
736 }
737 break;
738 }
739
740 default:
741 break;
742 }
743
744 return 0;
745}
746
750static struct lws_protocols websocket_protocols[] = {
751 {
752 "http", // Default HTTP protocol (required for WebSocket upgrade)
753 websocket_server_callback, // Use same callback for all protocols
754 sizeof(websocket_connection_data_t), // Per-session data size
755 524288, // RX buffer size (512KB for video frames)
756 0, // ID (auto-assigned)
757 NULL, // User pointer (set to server instance)
758 524288 // TX packet size (512KB for video frames)
759 },
760 {
761 "acip", // ACIP WebSocket subprotocol
762 websocket_server_callback, // Callback function
763 sizeof(websocket_connection_data_t), // Per-session data size
764 524288, // RX buffer size (512KB for video frames)
765 0, // ID (auto-assigned)
766 NULL, // User pointer (set to server instance)
767 524288 // TX packet size (512KB for video frames)
768 },
769 {NULL, NULL, 0, 0, 0, NULL, 0} // Terminator
770};
771
783// RX BUFFER UNDERFLOW FIX: Configure permessage-deflate with smaller window bits
784// LWS 4.5.2 underflows when decompressing large fragmented messages with max window (15 bits).
785// Using server_max_window_bits=11 (2KB window) reduces decompression buffer needs and
786// prevents the "rx buffer underflow" error while maintaining reasonable compression.
787static const struct lws_extension websocket_extensions[] = {
788 {"permessage-deflate", lws_extension_callback_pm_deflate, "permessage-deflate; server_max_window_bits=8"},
789 {NULL, NULL, NULL}};
790
791asciichat_error_t websocket_server_init(websocket_server_t *server, const websocket_server_config_t *config) {
792 if (!server || !config || !config->client_handler) {
793 return SET_ERRNO(ERROR_INVALID_PARAM, "Invalid parameters");
794 }
795
796 memset(server, 0, sizeof(*server));
797 server->handler = config->client_handler;
798 server->user_data = config->user_data;
799 server->port = config->port;
800 atomic_store(&server->running, true);
801
802 // Store server pointer in protocol user data so callbacks can access it
803 // Both the HTTP and ACIP protocols need access to the server
804 websocket_protocols[0].user = server; // http protocol
805 websocket_protocols[1].user = server; // acip protocol
806
807 // Enable libwebsockets debug logging with custom callback
808 lws_set_log_level(LLL_ERR | LLL_WARN | LLL_NOTICE | LLL_INFO | LLL_DEBUG, websocket_lws_log_callback);
809
810 // Configure libwebsockets context
811 struct lws_context_creation_info info = {0};
812 info.port = config->port;
813 info.protocols = websocket_protocols;
814 info.gid = (gid_t)-1; // Cast to avoid undefined behavior with unsigned type
815 info.uid = (uid_t)-1; // Cast to avoid undefined behavior with unsigned type
816 info.options = 0; // Don't validate UTF8 - we send binary ACIP packets
817 info.extensions = NULL; // Disable permessage-deflate - causes rx buffer underflow in LWS 4.5.2
818 // TODO: Re-enable compression with proper LWS configuration if needed for bandwidth optimization
819 // Current issue: Even with server_max_window_bits=11, large fragmented messages still trigger
820 // "lws_extension_callback_pm_deflate: rx buffer underflow" errors in LWS 4.5.2
821
822 // Create libwebsockets context
823 server->context = lws_create_context(&info);
824 if (!server->context) {
825 return SET_ERRNO(ERROR_NETWORK_BIND, "Failed to create libwebsockets context");
826 }
827
828 log_info("WebSocket server initialized on port %d with static file serving", config->port);
829 return ASCIICHAT_OK;
830}
831
832asciichat_error_t websocket_server_run(websocket_server_t *server) {
833 if (!server || !server->context) {
834 return SET_ERRNO(ERROR_INVALID_PARAM, "Invalid server");
835 }
836
837 log_info("WebSocket server starting event loop on port %d", server->port);
838
839 // Run libwebsockets event loop
840 uint64_t last_service_ns = 0;
841 int service_call_count = 0;
842 while (atomic_load(&server->running)) {
843 // Service libwebsockets with 50ms timeout.
844 // This provides frequent event processing (~20 callback invocations per second) for all
845 // connected WebSocket clients while avoiding excessive CPU usage from polling.
846 // All client connections share this single server context, so a single lws_service() call
847 // services fragments and events for all clients simultaneously.
848 uint64_t service_start_ns = time_get_ns();
849 if (last_service_ns && service_start_ns - last_service_ns > 30 * US_PER_MS_INT) {
850 // > 30ms gap between service calls
851 double gap_ms = (double)(service_start_ns - last_service_ns) / 1e6;
852 log_info_every(1 * US_PER_MS_INT, "[LWS_SERVICE_GAP] %.1fms gap between lws_service calls", gap_ms);
853 }
854 service_call_count++;
855 log_debug_every(500 * US_PER_MS_INT, "[LWS_SERVICE] Call #%d, context=%p", service_call_count,
856 (void *)server->context);
857 int result = lws_service(server->context, 50);
858 if (result < 0) {
859 log_error("libwebsockets service error: %d", result);
860 break;
861 }
862 }
863
864 log_info("WebSocket server event loop exited, destroying context from event loop thread");
865
866 // Destroy context from the event loop thread. When called from a different
867 // thread after the event loop has stopped, lws_context_destroy tries to
868 // gracefully close WebSocket connections but can't process close responses
869 // (no event loop running), so it waits for the close handshake timeout
870 // (5+ seconds). Destroying from the event loop thread avoids this.
871 if (server->context) {
872 lws_context_destroy(server->context);
873 server->context = NULL;
874 }
875
876 log_info("WebSocket server context destroyed");
877 return ASCIICHAT_OK;
878}
879
880void websocket_server_cancel_service(websocket_server_t *server) {
881 if (server && server->context) {
882 lws_cancel_service(server->context);
883 }
884}
885
886void websocket_server_destroy(websocket_server_t *server) {
887 if (!server) {
888 return;
889 }
890
891 atomic_store(&server->running, false);
892
893 // Context is normally destroyed by websocket_server_run (from the event loop
894 // thread) for fast shutdown. This handles the case where run() wasn't called
895 // or didn't complete normally.
896 if (server->context) {
897 log_debug("WebSocket context still alive in destroy, cleaning up");
898 lws_context_destroy(server->context);
899 server->context = NULL;
900 }
901
902 log_debug("WebSocket server destroyed");
903}
void buffer_pool_free(buffer_pool_t *pool, void *data, size_t size)
void * buffer_pool_alloc(buffer_pool_t *pool, size_t size)
Definition buffer_pool.c:99
asciichat_error_t websocket_server_init(websocket_server_t *server, const websocket_server_config_t *config)
asciichat_error_t websocket_server_run(websocket_server_t *server)
void websocket_server_destroy(websocket_server_t *server)
void websocket_server_cancel_service(websocket_server_t *server)
size_t ringbuffer_size(const ringbuffer_t *rb)
Definition ringbuffer.c:122
bool ringbuffer_is_empty(const ringbuffer_t *rb)
Definition ringbuffer.c:126
bool ringbuffer_read(ringbuffer_t *rb, void *data)
Definition ringbuffer.c:83
bool ringbuffer_write(ringbuffer_t *rb, const void *data)
Definition ringbuffer.c:61
asciichat_thread_t handler_thread
Client handler thread.
bool cleaning_up
True if cleanup is in progress (prevents race with remove_client)
bool handler_started
True if handler thread was started.
acip_transport_t * transport
ACIP transport for this connection.
uint8_t * pending_send_data
Current message being sent.
bool has_pending_send
True if there's an in-progress message.
size_t pending_send_len
Total length of message being sent.
websocket_server_t * server
Back-reference to server.
size_t pending_send_offset
Current offset in message (bytes already sent)
void acip_transport_destroy(acip_transport_t *transport)
int asciichat_thread_create(asciichat_thread_t *thread, void *(*start_routine)(void *), void *arg)
Definition threading.c:42
int asciichat_thread_join(asciichat_thread_t *thread, void **retval)
Definition threading.c:46
uint64_t time_get_ns(void)
Definition util/time.c:48
int format_duration_ns(double nanoseconds, char *buffer, size_t buffer_size)
Definition util/time.c:275
acip_transport_t * acip_websocket_server_transport_create(struct lws *wsi, crypto_context_t *crypto_ctx)
Create WebSocket server transport from existing connection.