ascii-chat 0.8.38
Real-time terminal-based video chat with ASCII art conversion
Loading...
Searching...
No Matches
websocket/transport.c
Go to the documentation of this file.
1
31#include <ascii-chat/network/acip/transport.h>
32#include <ascii-chat/network/packet.h>
33#include <ascii-chat/crypto/crypto.h>
34#include <ascii-chat/util/endian.h>
35#include <ascii-chat/network/crc32.h>
36#include <ascii-chat/log/logging.h>
37#include <ascii-chat/ringbuffer.h>
38#include <ascii-chat/buffer_pool.h>
39#include <ascii-chat/platform/mutex.h>
40#include <ascii-chat/platform/cond.h>
41#include <ascii-chat/debug/memory.h>
42#include <libwebsockets.h>
43#include <string.h>
44#include <unistd.h>
45
53#define WEBSOCKET_RECV_QUEUE_SIZE 4096
54
61#define WEBSOCKET_SEND_QUEUE_SIZE 256
62
63// Shared internal types (websocket_recv_msg_t, websocket_transport_data_t)
64#include <ascii-chat/network/websocket/internal.h>
65
66// Forward declaration for libwebsockets callback
67static int websocket_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len);
68
69// =============================================================================
70// Service Thread (Client-side only)
71// =============================================================================
72
79static void *websocket_service_thread(void *arg) {
80 websocket_transport_data_t *ws_data = (websocket_transport_data_t *)arg;
81
82 log_debug("WebSocket service thread started");
83
84 while (ws_data->service_running) {
85 // Service libwebsockets (processes network events, triggers callbacks)
86 // 50ms timeout allows checking service_running flag regularly
87 int result = lws_service(ws_data->context, 50);
88 if (result < 0) {
89 log_error("lws_service error: %d", result);
90 break;
91 }
92 }
93
94 log_debug("WebSocket service thread exiting");
95 return NULL;
96}
97
98// =============================================================================
99// libwebsockets Callbacks
100// =============================================================================
101
108static int websocket_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) {
109 websocket_transport_data_t *ws_data = (websocket_transport_data_t *)user;
110
111 switch (reason) {
112 case LWS_CALLBACK_CLIENT_ESTABLISHED:
113 log_info("WebSocket connection established");
114 if (ws_data) {
115 mutex_lock(&ws_data->state_mutex);
116 ws_data->is_connected = true;
117 mutex_unlock(&ws_data->state_mutex);
118 }
119 break;
120
121 case LWS_CALLBACK_CLIENT_RECEIVE: {
122 // Received data from server - may be fragmented for large messages
123 if (!ws_data || !in || len == 0) {
124 break;
125 }
126
127 bool is_first = lws_is_first_fragment(wsi);
128 bool is_final = lws_is_final_fragment(wsi);
129
130 log_dev_every(4500000, "WebSocket fragment: %zu bytes (first=%d, final=%d)", len, is_first, is_final);
131
132 // Queue this fragment immediately with first/final flags.
133 // Per LWS design, each fragment is processed individually by the callback.
134 // We must NOT manually reassemble fragments - that breaks LWS's internal state machine.
135 // Instead, queue each fragment with metadata, and let the receiver decide on reassembly.
136
137 websocket_recv_msg_t msg;
138 msg.data = buffer_pool_alloc(NULL, len);
139 if (!msg.data) {
140 log_error("Failed to allocate buffer for fragment (%zu bytes)", len);
141 break;
142 }
143
144 memcpy(msg.data, in, len);
145 msg.len = len;
146 msg.first = is_first;
147 msg.final = is_final;
148
149 mutex_lock(&ws_data->recv_mutex);
150 bool success = ringbuffer_write(ws_data->recv_queue, &msg);
151 if (!success) {
152 // Queue is full - drop the fragment and log warning
153 log_warn("WebSocket receive queue full - dropping fragment (len=%zu, first=%d, final=%d)", len, is_first,
154 is_final);
155 buffer_pool_free(NULL, msg.data, msg.len);
156 mutex_unlock(&ws_data->recv_mutex);
157 break;
158 }
159
160 // Signal waiting recv() call that a fragment is available
161 cond_signal(&ws_data->recv_cond);
162 mutex_unlock(&ws_data->recv_mutex);
163 break;
164 }
165
166 case LWS_CALLBACK_CLIENT_CLOSED:
167 case LWS_CALLBACK_CLOSED:
168 log_info("WebSocket connection closed");
169 if (ws_data) {
170 mutex_lock(&ws_data->state_mutex);
171 ws_data->is_connected = false;
172 mutex_unlock(&ws_data->state_mutex);
173
174 // Wake any blocking recv() calls
175 cond_broadcast(&ws_data->recv_cond);
176 }
177 break;
178
179 case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
180 log_error("WebSocket connection error: %s", in ? (const char *)in : "unknown");
181 if (ws_data) {
182 mutex_lock(&ws_data->state_mutex);
183 ws_data->is_connected = false;
184 mutex_unlock(&ws_data->state_mutex);
185
186 // Wake any blocking recv() calls
187 cond_broadcast(&ws_data->recv_cond);
188 }
189 break;
190
191 case LWS_CALLBACK_CLIENT_WRITEABLE:
192 // Socket is writable - we don't need to do anything here
193 // as we handle writes synchronously in websocket_send()
194 break;
195
196 default:
197 break;
198 }
199
200 return 0;
201}
202
203// =============================================================================
204// WebSocket Transport Methods
205// =============================================================================
206
207static asciichat_error_t websocket_send(acip_transport_t *transport, const void *data, size_t len) {
208 websocket_transport_data_t *ws_data = (websocket_transport_data_t *)transport->impl_data;
209
210 mutex_lock(&ws_data->state_mutex);
211 bool connected = ws_data->is_connected;
212 mutex_unlock(&ws_data->state_mutex);
213
214 log_dev_every(1000000, "websocket_send: is_connected=%d, wsi=%p, send_len=%zu", connected, (void *)ws_data->wsi, len);
215
216 if (!connected) {
217 log_error("WebSocket send called but transport NOT connected! wsi=%p, len=%zu", (void *)ws_data->wsi, len);
218 return SET_ERRNO(ERROR_NETWORK, "WebSocket transport not connected (wsi=%p)", (void *)ws_data->wsi);
219 }
220
221 // Check if encryption is needed (matching tcp_send logic)
222 const void *send_data = data;
223 size_t send_len = len;
224 uint8_t *encrypted_packet = NULL;
225 size_t encrypted_packet_size = 0;
226
227 if (len >= sizeof(packet_header_t) && transport->crypto_ctx && crypto_is_ready(transport->crypto_ctx)) {
228 const packet_header_t *header = (const packet_header_t *)data;
229 uint16_t packet_type = NET_TO_HOST_U16(header->type);
230
231 if (!packet_is_handshake_type((packet_type_t)packet_type)) {
232 // Encrypt the entire packet (header + payload)
233 size_t ciphertext_size = len + CRYPTO_NONCE_SIZE + CRYPTO_MAC_SIZE;
234 // Use SAFE_MALLOC (not buffer pool - encrypted_packet also uses pool and causes overlap)
235 uint8_t *ciphertext = SAFE_MALLOC(ciphertext_size, uint8_t *);
236 if (!ciphertext) {
237 return SET_ERRNO(ERROR_MEMORY, "Failed to allocate ciphertext buffer for WebSocket");
238 }
239
240 size_t ciphertext_len;
241 crypto_result_t result =
242 crypto_encrypt(transport->crypto_ctx, data, len, ciphertext, ciphertext_size, &ciphertext_len);
243 if (result != CRYPTO_OK) {
244 SAFE_FREE(ciphertext);
245 return SET_ERRNO(ERROR_CRYPTO, "Failed to encrypt WebSocket packet: %s", crypto_result_to_string(result));
246 }
247
248 // Build PACKET_TYPE_ENCRYPTED wrapper: header + ciphertext
249 size_t total_encrypted_size = sizeof(packet_header_t) + ciphertext_len;
250 encrypted_packet = buffer_pool_alloc(NULL, total_encrypted_size);
251 if (!encrypted_packet) {
252 SAFE_FREE(ciphertext);
253 return SET_ERRNO(ERROR_MEMORY, "Failed to allocate encrypted packet buffer");
254 }
255
256 packet_header_t encrypted_header;
257 encrypted_header.magic = HOST_TO_NET_U64(PACKET_MAGIC);
258 encrypted_header.type = HOST_TO_NET_U16(PACKET_TYPE_ENCRYPTED);
259 encrypted_header.length = HOST_TO_NET_U32((uint32_t)ciphertext_len);
260 encrypted_header.crc32 = HOST_TO_NET_U32(asciichat_crc32(ciphertext, ciphertext_len));
261 encrypted_header.client_id = 0;
262
263 memcpy(encrypted_packet, &encrypted_header, sizeof(encrypted_header));
264 memcpy(encrypted_packet + sizeof(encrypted_header), ciphertext, ciphertext_len);
265 SAFE_FREE(ciphertext);
266
267 send_data = encrypted_packet;
268 send_len = total_encrypted_size;
269 encrypted_packet_size = total_encrypted_size;
270
271 log_dev_every(1000000, "WebSocket: encrypted packet (original type %d as PACKET_TYPE_ENCRYPTED, %zu bytes)",
272 packet_type, send_len);
273 }
274 }
275
276 // libwebsockets requires LWS_PRE bytes before the payload for protocol headers
277 // Allocate a temporary buffer for this send to avoid thread-safety issues
278 // Each send() call gets its own buffer, preventing race conditions with concurrent sends
279 size_t required_size = LWS_PRE + send_len;
280 uint8_t *send_buffer = SAFE_MALLOC(required_size, uint8_t *);
281 if (!send_buffer) {
282 if (encrypted_packet)
283 buffer_pool_free(NULL, encrypted_packet, encrypted_packet_size);
284 return SET_ERRNO(ERROR_MEMORY, "Failed to allocate WebSocket send buffer");
285 }
286
287 // Copy data after LWS_PRE offset
288 memcpy(send_buffer + LWS_PRE, send_data, send_len);
289
290 // Server-side transports cannot call lws_write() directly
291 // They must queue data and send from LWS_CALLBACK_SERVER_WRITEABLE
292 if (!ws_data->owns_context) {
293 // Queue the data for server-side sending
294 websocket_recv_msg_t msg;
295 msg.data = SAFE_MALLOC(send_len, uint8_t *);
296 if (!msg.data) {
297 SAFE_FREE(send_buffer);
298 if (encrypted_packet)
299 buffer_pool_free(NULL, encrypted_packet, send_len);
300 return SET_ERRNO(ERROR_MEMORY, "Failed to allocate send queue buffer");
301 }
302 memcpy(msg.data, send_data, send_len);
303 msg.len = send_len;
304 msg.first = 1;
305 msg.final = 1;
306
307 mutex_lock(&ws_data->send_mutex);
308 bool success = ringbuffer_write(ws_data->send_queue, &msg);
309
310 if (!success) {
311 mutex_unlock(&ws_data->send_mutex);
312 log_error("WebSocket server send queue FULL - cannot queue %zu byte message for wsi=%p", send_len,
313 (void *)ws_data->wsi);
314 SAFE_FREE(msg.data);
315 SAFE_FREE(send_buffer);
316 if (encrypted_packet)
317 buffer_pool_free(NULL, encrypted_packet, encrypted_packet_size);
318 return SET_ERRNO(ERROR_NETWORK, "Send queue full (cannot queue %zu bytes)", send_len);
319 }
320 mutex_unlock(&ws_data->send_mutex);
321
322 // Wake the LWS event loop from this non-service thread.
323 // Only lws_cancel_service() is thread-safe from non-service threads.
324 // lws_callback_on_writable() must NOT be called here — it's only safe
325 // from the service thread. LWS_CALLBACK_EVENT_WAIT_CANCELLED (in server.c)
326 // handles calling lws_callback_on_writable_all_protocol() on the service thread.
327 log_dev_every(1000000, ">>> FRAME QUEUED: %zu bytes for wsi=%p (send_len=%zu)", send_len, (void *)ws_data->wsi,
328 send_len);
329
330 struct lws_context *ctx = lws_get_context(ws_data->wsi);
331 lws_cancel_service(ctx);
332
333 log_dev_every(1000000, "Server-side WebSocket: queued %zu bytes, cancel_service sent for wsi=%p", send_len,
334 (void *)ws_data->wsi);
335 SAFE_FREE(send_buffer);
336 if (encrypted_packet)
337 buffer_pool_free(NULL, encrypted_packet, send_len);
338 return ASCIICHAT_OK;
339 }
340
341 // Client-side: send in fragments using LWS_WRITE_BUFLIST
342 // libwebsockets will buffer and send when socket is writable
343 const size_t FRAGMENT_SIZE = 4096;
344 size_t offset = 0;
345
346 while (offset < send_len) {
347 size_t chunk_size = (send_len - offset > FRAGMENT_SIZE) ? FRAGMENT_SIZE : (send_len - offset);
348 int is_start = (offset == 0);
349 int is_end = (offset + chunk_size >= send_len);
350
351 // Get appropriate flags for this fragment
352 enum lws_write_protocol flags = lws_write_ws_flags(LWS_WRITE_BINARY, is_start, is_end);
353
354 // Use BUFLIST to let libwebsockets handle buffering and writable callbacks
355 flags = (enum lws_write_protocol)((int)flags | LWS_WRITE_BUFLIST);
356
357 // Send this fragment
358 int written = lws_write(ws_data->wsi, send_buffer + LWS_PRE + offset, chunk_size, flags);
359
360 if (written < 0) {
361 SAFE_FREE(send_buffer);
362 if (encrypted_packet)
363 buffer_pool_free(NULL, encrypted_packet, send_len);
364 return SET_ERRNO(ERROR_NETWORK, "WebSocket write failed on fragment at offset %zu", offset);
365 }
366
367 if ((size_t)written != chunk_size) {
368 SAFE_FREE(send_buffer);
369 if (encrypted_packet)
370 buffer_pool_free(NULL, encrypted_packet, send_len);
371 return SET_ERRNO(ERROR_NETWORK, "WebSocket partial write: %d/%zu bytes at offset %zu", written, chunk_size,
372 offset);
373 }
374
375 offset += chunk_size;
376 log_dev_every(1000000, "WebSocket sent fragment %zu bytes (offset %zu/%zu)", chunk_size, offset, send_len);
377 }
378
379 log_dev_every(1000000, "WebSocket sent complete message: %zu bytes in fragments", send_len);
380 SAFE_FREE(send_buffer);
381 if (encrypted_packet)
382 buffer_pool_free(NULL, encrypted_packet, encrypted_packet_size);
383 return ASCIICHAT_OK;
384}
385
386static asciichat_error_t websocket_recv(acip_transport_t *transport, void **buffer, size_t *out_len,
387 void **out_allocated_buffer) {
388 websocket_transport_data_t *ws_data = (websocket_transport_data_t *)transport->impl_data;
389
390 // Check connection first without holding queue lock
391 mutex_lock(&ws_data->state_mutex);
392 bool connected = ws_data->is_connected;
393 mutex_unlock(&ws_data->state_mutex);
394
395 if (!connected) {
396 return SET_ERRNO(ERROR_NETWORK, "Connection not established");
397 }
398
399 mutex_lock(&ws_data->recv_mutex);
400
401 // Reassemble fragmented WebSocket messages with SHORT timeout (100ms)
402 // We queue each fragment from the LWS callback with first/final flags.
403 // Unlike long blocking waits, 100ms is short enough that polling retry
404 // is acceptable. The dispatch thread handles retries.
405
406 uint8_t *assembled_buffer = NULL;
407 size_t assembled_size = 0;
408 size_t assembled_capacity = 0;
409 uint64_t assembly_start_ns = time_get_ns();
410 int fragment_count = 0;
411 const uint64_t MAX_REASSEMBLY_TIME_NS = 100 * 1000000ULL; // 100ms - short timeout for polling-based retry
412
413 while (true) {
414 // Wait for fragment if queue is empty (with short timeout)
415 while (ringbuffer_is_empty(ws_data->recv_queue)) {
416 uint64_t elapsed_ns = time_get_ns() - assembly_start_ns;
417 if (elapsed_ns > MAX_REASSEMBLY_TIME_NS) {
418 // Timeout - return error instead of partial fragments
419 // Dispatch thread will retry, avoiding fragment loss issue
420 if (assembled_buffer) {
421 buffer_pool_free(NULL, assembled_buffer, assembled_capacity);
422 }
423 mutex_unlock(&ws_data->recv_mutex);
424
425 if (assembled_size > 0) {
426 log_dev_every(4500000,
427 "🔄 WEBSOCKET_RECV: Reassembly timeout after %llums (have %zu bytes, expecting final fragment)",
428 (unsigned long long)(elapsed_ns / 1000000ULL), assembled_size);
429 }
430 return SET_ERRNO(ERROR_NETWORK, "Fragment reassembly timeout - no data from network");
431 }
432
433 // Check connection state
434 mutex_lock(&ws_data->state_mutex);
435 bool still_connected = ws_data->is_connected;
436 mutex_unlock(&ws_data->state_mutex);
437
438 if (!still_connected) {
439 if (assembled_buffer) {
440 buffer_pool_free(NULL, assembled_buffer, assembled_capacity);
441 }
442 mutex_unlock(&ws_data->recv_mutex);
443 return SET_ERRNO(ERROR_NETWORK, "Connection closed while reassembling fragments");
444 }
445
446 // Wait for next fragment with 1ms timeout
447 cond_timedwait(&ws_data->recv_cond, &ws_data->recv_mutex, 1 * 1000000ULL);
448 }
449
450 // Read next fragment from queue
451 websocket_recv_msg_t frag;
452 bool success = ringbuffer_read(ws_data->recv_queue, &frag);
453 if (!success) {
454 if (assembled_buffer) {
455 buffer_pool_free(NULL, assembled_buffer, assembled_capacity);
456 }
457 mutex_unlock(&ws_data->recv_mutex);
458 return SET_ERRNO(ERROR_NETWORK, "Failed to read fragment from queue");
459 }
460
461 fragment_count++;
462 log_warn("[WS_REASSEMBLE] Fragment #%d: %zu bytes, first=%d, final=%d, assembled_so_far=%zu", fragment_count,
463 frag.len, frag.first, frag.final, assembled_size);
464
465 // Sanity check: first fragment must have first=1, continuations must have first=0
466 if (assembled_size == 0 && !frag.first) {
467 log_error("[WS_REASSEMBLE] ERROR: Expected first=1 for first fragment, got first=%d", frag.first);
468 buffer_pool_free(NULL, frag.data, frag.len);
469 if (assembled_buffer) {
470 buffer_pool_free(NULL, assembled_buffer, assembled_capacity);
471 }
472 mutex_unlock(&ws_data->recv_mutex);
473 return SET_ERRNO(ERROR_NETWORK, "Protocol error: continuation fragment without first fragment");
474 }
475
476 // Grow assembled buffer if needed
477 size_t required_size = assembled_size + frag.len;
478 if (required_size > assembled_capacity) {
479 size_t new_capacity = (assembled_capacity == 0) ? 8192 : (assembled_capacity * 3 / 2);
480 if (new_capacity < required_size) {
481 new_capacity = required_size;
482 }
483
484 uint8_t *new_buffer = buffer_pool_alloc(NULL, new_capacity);
485 if (!new_buffer) {
486 log_error("[WS_REASSEMBLE] Failed to allocate reassembly buffer (%zu bytes)", new_capacity);
487 buffer_pool_free(NULL, frag.data, frag.len);
488 if (assembled_buffer) {
489 buffer_pool_free(NULL, assembled_buffer, assembled_capacity);
490 }
491 mutex_unlock(&ws_data->recv_mutex);
492 return SET_ERRNO(ERROR_MEMORY, "Failed to allocate fragment reassembly buffer");
493 }
494
495 // Copy existing data to new buffer
496 if (assembled_size > 0) {
497 memcpy(new_buffer, assembled_buffer, assembled_size);
498 }
499
500 // Free old buffer
501 if (assembled_buffer) {
502 buffer_pool_free(NULL, assembled_buffer, assembled_capacity);
503 }
504
505 assembled_buffer = new_buffer;
506 assembled_capacity = new_capacity;
507 }
508
509 // Append fragment data
510 memcpy(assembled_buffer + assembled_size, frag.data, frag.len);
511 assembled_size += frag.len;
512
513 // Free fragment data (we've copied it)
514 buffer_pool_free(NULL, frag.data, frag.len);
515
516 // Check if we have the final fragment
517 if (frag.final) {
518 // Complete message assembled
519 log_info("[WS_REASSEMBLE] Complete message: %zu bytes in %d fragments", assembled_size, fragment_count);
520 *buffer = assembled_buffer;
521 *out_len = assembled_size;
522 *out_allocated_buffer = assembled_buffer;
523 mutex_unlock(&ws_data->recv_mutex);
524 return ASCIICHAT_OK;
525 }
526
527 // More fragments coming, continue reassembling
528 }
529
530 mutex_unlock(&ws_data->recv_mutex);
531
532 // Return reassembled message to caller
533 *buffer = assembled_buffer;
534 *out_len = assembled_size;
535 *out_allocated_buffer = assembled_buffer;
536
537 log_info_every(LOG_RATE_DEFAULT, "[WS_TIMING] websocket_recv dequeued %zu bytes (from %d fragments) at t=%llu",
538 assembled_size, fragment_count, (unsigned long long)time_get_ns());
539 return ASCIICHAT_OK;
540}
541
542static asciichat_error_t websocket_close(acip_transport_t *transport) {
543 websocket_transport_data_t *ws_data = (websocket_transport_data_t *)transport->impl_data;
544
545 mutex_lock(&ws_data->state_mutex);
546
547 if (!ws_data->is_connected) {
548 mutex_unlock(&ws_data->state_mutex);
549 log_debug("websocket_close: Already closed (is_connected=false), wsi=%p", (void *)ws_data->wsi);
550 return ASCIICHAT_OK; // Already closed
551 }
552
553 log_info("websocket_close: Setting is_connected=false, wsi=%p", (void *)ws_data->wsi);
554 ws_data->is_connected = false;
555 mutex_unlock(&ws_data->state_mutex);
556
557 // Close WebSocket connection
558 if (ws_data->wsi) {
559 log_debug("websocket_close: Calling lws_close_reason for wsi=%p", (void *)ws_data->wsi);
560 lws_close_reason(ws_data->wsi, LWS_CLOSE_STATUS_NORMAL, NULL, 0);
561 }
562
563 // Wake any blocking recv() calls
564 cond_broadcast(&ws_data->recv_cond);
565
566 log_debug("WebSocket transport closed");
567 return ASCIICHAT_OK;
568}
569
570static acip_transport_type_t websocket_get_type(acip_transport_t *transport) {
571 (void)transport;
572 return ACIP_TRANSPORT_WEBSOCKET;
573}
574
575static socket_t websocket_get_socket(acip_transport_t *transport) {
576 (void)transport;
577 return INVALID_SOCKET_VALUE; // WebSocket has no underlying socket handle we can expose
578}
579
580static bool websocket_is_connected(acip_transport_t *transport) {
581 websocket_transport_data_t *ws_data = (websocket_transport_data_t *)transport->impl_data;
582
583 mutex_lock(&ws_data->state_mutex);
584 bool connected = ws_data->is_connected;
585 mutex_unlock(&ws_data->state_mutex);
586
587 return connected;
588}
589
590// =============================================================================
591// WebSocket Transport Destroy Implementation
592// =============================================================================
593
601static void websocket_destroy_impl(acip_transport_t *transport) {
602 if (!transport || !transport->impl_data) {
603 return;
604 }
605
606 websocket_transport_data_t *ws_data = (websocket_transport_data_t *)transport->impl_data;
607
608 // Stop service thread (client-side only)
609 if (ws_data->service_running) {
610 log_debug("Stopping WebSocket service thread");
611 ws_data->service_running = false;
612 asciichat_thread_join(&ws_data->service_thread, NULL);
613 log_debug("WebSocket service thread stopped");
614 }
615
616 // Destroy WebSocket instance (if not already destroyed)
617 if (ws_data->wsi) {
618 // libwebsockets will clean up the wsi when we destroy the context
619 ws_data->wsi = NULL;
620 }
621
622 // Destroy WebSocket context (only if we own it - client transports only)
623 if (ws_data->context && ws_data->owns_context) {
624 lws_context_destroy(ws_data->context);
625 ws_data->context = NULL;
626 }
627
628 // Clear receive queue and free buffered messages
629 if (ws_data->recv_queue) {
630 mutex_lock(&ws_data->recv_mutex);
631
632 websocket_recv_msg_t msg;
633 while (ringbuffer_read(ws_data->recv_queue, &msg)) {
634 if (msg.data) {
635 buffer_pool_free(NULL, msg.data, msg.len);
636 }
637 }
638
639 mutex_unlock(&ws_data->recv_mutex);
640 ringbuffer_destroy(ws_data->recv_queue);
641 ws_data->recv_queue = NULL;
642
643 if (ws_data->send_queue) {
644 // Drain send queue before destroying to free allocated message data
645 websocket_recv_msg_t msg;
646 while (ringbuffer_read(ws_data->send_queue, &msg)) {
647 if (msg.data) {
648 SAFE_FREE(msg.data);
649 }
650 }
651 ringbuffer_destroy(ws_data->send_queue);
652 ws_data->send_queue = NULL;
653 }
654 }
655
656 // Free send buffer
657 if (ws_data->send_buffer) {
658 SAFE_FREE(ws_data->send_buffer);
659 ws_data->send_buffer = NULL;
660 }
661
662 // Destroy synchronization primitives
663 mutex_destroy(&ws_data->state_mutex);
664 cond_destroy(&ws_data->recv_cond);
665 mutex_destroy(&ws_data->recv_mutex);
666 mutex_destroy(&ws_data->send_mutex);
667
668 // Clear impl_data pointer BEFORE freeing to prevent use-after-free in callbacks
669 transport->impl_data = NULL;
670
671 // Free the websocket transport data structure
672 SAFE_FREE(ws_data);
673
674 log_debug("Destroyed WebSocket transport resources");
675}
676
677// =============================================================================
678// WebSocket Transport Method Table
679// =============================================================================
680
681static const acip_transport_methods_t websocket_methods = {
682 .send = websocket_send,
683 .recv = websocket_recv,
684 .close = websocket_close,
685 .get_type = websocket_get_type,
686 .get_socket = websocket_get_socket,
687 .is_connected = websocket_is_connected,
688 .destroy_impl = websocket_destroy_impl,
689};
690
691// =============================================================================
692// WebSocket Transport Creation
693// =============================================================================
694
702acip_transport_t *acip_websocket_client_transport_create(const char *url, crypto_context_t *crypto_ctx) {
703 if (!url) {
704 SET_ERRNO(ERROR_INVALID_PARAM, "url is required");
705 return NULL;
706 }
707
708 // Parse URL to extract host, port, and path
709 // Format: ws://host:port/path or wss://host:port/path
710 const char *protocol_end = strstr(url, "://");
711 if (!protocol_end) {
712 SET_ERRNO(ERROR_INVALID_PARAM, "Invalid WebSocket URL format (missing ://)");
713 return NULL;
714 }
715
716 bool use_ssl = (strncmp(url, "wss://", 6) == 0);
717 const char *host_start = protocol_end + 3;
718
719 // Find port (if specified)
720 const char *port_start = strchr(host_start, ':');
721 const char *path_start = strchr(host_start, '/');
722
723 char host[256] = {0};
724 int port = use_ssl ? 443 : 27226; // Default: wss:// uses 443, ws:// uses 27226 (ascii-chat WebSocket port)
725 char path[256] = "/";
726
727 if (port_start && (!path_start || port_start < path_start)) {
728 // Port is specified
729 size_t host_len = port_start - host_start;
730 if (host_len >= sizeof(host)) {
731 SET_ERRNO(ERROR_INVALID_PARAM, "Host name too long");
732 return NULL;
733 }
734 memcpy(host, host_start, host_len);
735 host[host_len] = '\0';
736
737 // Extract port
738 port = atoi(port_start + 1);
739 if (port <= 0 || port > 65535) {
740 SET_ERRNO(ERROR_INVALID_PARAM, "Invalid port number");
741 return NULL;
742 }
743 } else {
744 // No port specified, use default
745 size_t host_len = path_start ? (size_t)(path_start - host_start) : strlen(host_start);
746 if (host_len >= sizeof(host)) {
747 SET_ERRNO(ERROR_INVALID_PARAM, "Host name too long");
748 return NULL;
749 }
750 memcpy(host, host_start, host_len);
751 host[host_len] = '\0';
752 }
753
754 // Extract path
755 if (path_start) {
756 SAFE_STRNCPY(path, path_start, sizeof(path) - 1);
757 path[sizeof(path) - 1] = '\0';
758 }
759
760 log_info("Connecting to WebSocket: %s (host=%s, port=%d, path=%s, ssl=%d)", url, host, port, path, use_ssl);
761
762 // Allocate transport structure
763 acip_transport_t *transport = SAFE_MALLOC(sizeof(acip_transport_t), acip_transport_t *);
764 if (!transport) {
765 SET_ERRNO(ERROR_MEMORY, "Failed to allocate WebSocket transport");
766 return NULL;
767 }
768
769 // Allocate WebSocket-specific data
770 websocket_transport_data_t *ws_data =
771 SAFE_CALLOC(1, sizeof(websocket_transport_data_t), websocket_transport_data_t *);
772 if (!ws_data) {
773 SAFE_FREE(transport);
774 SET_ERRNO(ERROR_MEMORY, "Failed to allocate WebSocket transport data");
775 return NULL;
776 }
777
778 // Create receive queue
779 ws_data->recv_queue = ringbuffer_create(sizeof(websocket_recv_msg_t), WEBSOCKET_RECV_QUEUE_SIZE);
780 if (!ws_data->recv_queue) {
781 SAFE_FREE(ws_data);
782 SAFE_FREE(transport);
783 SET_ERRNO(ERROR_MEMORY, "Failed to create receive queue");
784 return NULL;
785 }
786
787 // Initialize synchronization primitives
788 if (mutex_init(&ws_data->recv_mutex) != 0) {
789 ringbuffer_destroy(ws_data->recv_queue);
790 SAFE_FREE(ws_data);
791 SAFE_FREE(transport);
792 SET_ERRNO(ERROR_INTERNAL, "Failed to initialize recv mutex");
793 return NULL;
794 }
795
796 if (cond_init(&ws_data->recv_cond) != 0) {
797 mutex_destroy(&ws_data->recv_mutex);
798 ringbuffer_destroy(ws_data->recv_queue);
799 SAFE_FREE(ws_data);
800 SAFE_FREE(transport);
801 SET_ERRNO(ERROR_INTERNAL, "Failed to initialize recv condition variable");
802 return NULL;
803 }
804
805 if (mutex_init(&ws_data->state_mutex) != 0) {
806 cond_destroy(&ws_data->recv_cond);
807 mutex_destroy(&ws_data->recv_mutex);
808 ringbuffer_destroy(ws_data->recv_queue);
809 SAFE_FREE(ws_data);
810 SAFE_FREE(transport);
811 SET_ERRNO(ERROR_INTERNAL, "Failed to initialize state mutex");
812 return NULL;
813 }
814
815 // Allocate initial send buffer
816 ws_data->send_buffer_capacity = LWS_PRE + 8192; // Initial 8KB buffer
817 ws_data->send_buffer = SAFE_MALLOC(ws_data->send_buffer_capacity, uint8_t *);
818 if (!ws_data->send_buffer) {
819 mutex_destroy(&ws_data->state_mutex);
820 cond_destroy(&ws_data->recv_cond);
821 mutex_destroy(&ws_data->recv_mutex);
822 ringbuffer_destroy(ws_data->recv_queue);
823 SAFE_FREE(ws_data);
824 SAFE_FREE(transport);
825 SET_ERRNO(ERROR_MEMORY, "Failed to allocate send buffer");
826 return NULL;
827 }
828
829 // Create libwebsockets context
830 // Protocol array must persist for lifetime of context - use static
831 static struct lws_protocols client_protocols[] = {
832 {"acip", websocket_callback, 0, 4096, 0, NULL, 0}, {NULL, NULL, 0, 0, 0, NULL, 0} // Terminator
833 };
834
835 struct lws_context_creation_info info;
836 memset(&info, 0, sizeof(info));
837 info.port = CONTEXT_PORT_NO_LISTEN; // Client mode - no listening
838 info.protocols = client_protocols;
839 info.gid = (gid_t)-1; // Cast to avoid undefined behavior with unsigned type
840 info.uid = (uid_t)-1; // Cast to avoid undefined behavior with unsigned type
841 info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
842
843 ws_data->context = lws_create_context(&info);
844 if (!ws_data->context) {
845 SAFE_FREE(ws_data->send_buffer);
846 mutex_destroy(&ws_data->state_mutex);
847 cond_destroy(&ws_data->recv_cond);
848 mutex_destroy(&ws_data->recv_mutex);
849 ringbuffer_destroy(ws_data->recv_queue);
850 SAFE_FREE(ws_data);
851 SAFE_FREE(transport);
852 SET_ERRNO(ERROR_NETWORK, "Failed to create libwebsockets context");
853 return NULL;
854 }
855
856 // Connect to WebSocket server
857 log_debug("Initiating WebSocket connection to %s:%d%s", host, port, path);
858 struct lws_client_connect_info connect_info;
859 memset(&connect_info, 0, sizeof(connect_info));
860 connect_info.context = ws_data->context;
861 connect_info.address = host;
862 connect_info.port = port;
863 connect_info.path = path;
864 connect_info.host = host;
865 connect_info.origin = host;
866 connect_info.protocol = "acip";
867 connect_info.ssl_connection = use_ssl ? LCCSCF_USE_SSL : 0;
868 connect_info.userdata = ws_data;
869
870 log_debug("Calling lws_client_connect_via_info...");
871 ws_data->wsi = lws_client_connect_via_info(&connect_info);
872 log_debug("lws_client_connect_via_info returned: %p", (void *)ws_data->wsi);
873 if (!ws_data->wsi) {
874 lws_context_destroy(ws_data->context);
875 SAFE_FREE(ws_data->send_buffer);
876 mutex_destroy(&ws_data->state_mutex);
877 cond_destroy(&ws_data->recv_cond);
878 mutex_destroy(&ws_data->recv_mutex);
879 ringbuffer_destroy(ws_data->recv_queue);
880 SAFE_FREE(ws_data);
881 SAFE_FREE(transport);
882 SET_ERRNO(ERROR_NETWORK, "Failed to connect to WebSocket server");
883 return NULL;
884 }
885
886 ws_data->is_connected = false; // Will be set to true in LWS_CALLBACK_CLIENT_ESTABLISHED
887 ws_data->owns_context = true; // Client transport owns the context
888
889 // Initialize transport
890 transport->methods = &websocket_methods;
891 transport->crypto_ctx = crypto_ctx;
892 transport->impl_data = ws_data;
893
894 // Wait for connection to establish (synchronous connection)
895 // Service the libwebsockets event loop until connected or timeout
896 log_debug("Waiting for WebSocket connection to establish...");
897 int timeout_ms = 5000; // 5 second timeout
898 int elapsed_ms = 0;
899 while (!ws_data->is_connected && elapsed_ms < timeout_ms) {
900 // Service libwebsockets (processes network events, triggers callbacks)
901 int result = lws_service(ws_data->context, 50); // 50ms timeout per iteration
902 if (result < 0) {
903 log_error("lws_service error during connection: %d", result);
904 lws_context_destroy(ws_data->context);
905 SAFE_FREE(ws_data->send_buffer);
906 mutex_destroy(&ws_data->state_mutex);
907 cond_destroy(&ws_data->recv_cond);
908 mutex_destroy(&ws_data->recv_mutex);
909 ringbuffer_destroy(ws_data->recv_queue);
910 SAFE_FREE(ws_data);
911 SAFE_FREE(transport);
912 SET_ERRNO(ERROR_NETWORK, "WebSocket connection failed");
913 return NULL;
914 }
915 elapsed_ms += 50;
916 }
917
918 if (!ws_data->is_connected) {
919 log_error("WebSocket connection timeout after %d ms", elapsed_ms);
920 lws_context_destroy(ws_data->context);
921 SAFE_FREE(ws_data->send_buffer);
922 mutex_destroy(&ws_data->state_mutex);
923 cond_destroy(&ws_data->recv_cond);
924 mutex_destroy(&ws_data->recv_mutex);
925 ringbuffer_destroy(ws_data->recv_queue);
926 SAFE_FREE(ws_data);
927 SAFE_FREE(transport);
928 SET_ERRNO(ERROR_NETWORK, "WebSocket connection timeout");
929 return NULL;
930 }
931
932 log_info("WebSocket connection established (crypto: %s)", crypto_ctx ? "enabled" : "disabled");
933
934 // Start service thread to process incoming messages
935 ws_data->service_running = true;
936 if (asciichat_thread_create(&ws_data->service_thread, websocket_service_thread, ws_data) != 0) {
937 log_error("Failed to create WebSocket service thread");
938 ws_data->service_running = false;
939 lws_context_destroy(ws_data->context);
940 SAFE_FREE(ws_data->send_buffer);
941 mutex_destroy(&ws_data->state_mutex);
942 cond_destroy(&ws_data->recv_cond);
943 mutex_destroy(&ws_data->recv_mutex);
944 ringbuffer_destroy(ws_data->recv_queue);
945 SAFE_FREE(ws_data);
946 SAFE_FREE(transport);
947 SET_ERRNO(ERROR_INTERNAL, "Failed to create service thread");
948 return NULL;
949 }
950
951 log_debug("WebSocket service thread started for client transport");
952
953 return transport;
954}
955
966acip_transport_t *acip_websocket_server_transport_create(struct lws *wsi, crypto_context_t *crypto_ctx) {
967 if (!wsi) {
968 SET_ERRNO(ERROR_INVALID_PARAM, "Invalid wsi parameter");
969 return NULL;
970 }
971
972 // Allocate transport structure
973 acip_transport_t *transport = SAFE_CALLOC(1, sizeof(acip_transport_t), acip_transport_t *);
974 if (!transport) {
975 SET_ERRNO(ERROR_MEMORY, "Failed to allocate WebSocket transport");
976 return NULL;
977 }
978
979 // Allocate transport-specific data
980 websocket_transport_data_t *ws_data =
981 SAFE_CALLOC(1, sizeof(websocket_transport_data_t), websocket_transport_data_t *);
982 if (!ws_data) {
983 SAFE_FREE(transport);
984 SET_ERRNO(ERROR_MEMORY, "Failed to allocate WebSocket transport data");
985 return NULL;
986 }
987
988 // Initialize receive queue
989 ws_data->recv_queue = ringbuffer_create(sizeof(websocket_recv_msg_t), WEBSOCKET_RECV_QUEUE_SIZE);
990 if (!ws_data->recv_queue) {
991 SAFE_FREE(ws_data);
992 SAFE_FREE(transport);
993 SET_ERRNO(ERROR_MEMORY, "Failed to create receive queue");
994 return NULL;
995 }
996
997 // Initialize send queue (for server-side transports)
998 ws_data->send_queue = ringbuffer_create(sizeof(websocket_recv_msg_t), WEBSOCKET_SEND_QUEUE_SIZE);
999 if (!ws_data->send_queue) {
1000 ringbuffer_destroy(ws_data->recv_queue);
1001 SAFE_FREE(ws_data);
1002 SAFE_FREE(transport);
1003 SET_ERRNO(ERROR_MEMORY, "Failed to create send queue");
1004 return NULL;
1005 }
1006
1007 // Initialize synchronization primitives
1008 if (mutex_init(&ws_data->recv_mutex) != 0) {
1009 ringbuffer_destroy(ws_data->recv_queue);
1010 ringbuffer_destroy(ws_data->send_queue);
1011 SAFE_FREE(ws_data);
1012 SAFE_FREE(transport);
1013 SET_ERRNO(ERROR_NETWORK, "Failed to initialize recv mutex");
1014 return NULL;
1015 }
1016
1017 if (cond_init(&ws_data->recv_cond) != 0) {
1018 mutex_destroy(&ws_data->recv_mutex);
1019 ringbuffer_destroy(ws_data->recv_queue);
1020 ringbuffer_destroy(ws_data->send_queue);
1021 SAFE_FREE(ws_data);
1022 SAFE_FREE(transport);
1023 SET_ERRNO(ERROR_NETWORK, "Failed to initialize recv condition variable");
1024 return NULL;
1025 }
1026
1027 if (mutex_init(&ws_data->send_mutex) != 0) {
1028 cond_destroy(&ws_data->recv_cond);
1029 mutex_destroy(&ws_data->recv_mutex);
1030 ringbuffer_destroy(ws_data->recv_queue);
1031 ringbuffer_destroy(ws_data->send_queue);
1032 SAFE_FREE(ws_data);
1033 SAFE_FREE(transport);
1034 SET_ERRNO(ERROR_NETWORK, "Failed to initialize send mutex");
1035 return NULL;
1036 }
1037
1038 if (mutex_init(&ws_data->state_mutex) != 0) {
1039 mutex_destroy(&ws_data->send_mutex);
1040 cond_destroy(&ws_data->recv_cond);
1041 mutex_destroy(&ws_data->recv_mutex);
1042 ringbuffer_destroy(ws_data->recv_queue);
1043 ringbuffer_destroy(ws_data->send_queue);
1044 SAFE_FREE(ws_data);
1045 SAFE_FREE(transport);
1046 SET_ERRNO(ERROR_NETWORK, "Failed to initialize state mutex");
1047 return NULL;
1048 }
1049
1050 // Allocate send buffer with LWS_PRE padding
1051 size_t initial_capacity = 4096 + LWS_PRE;
1052 ws_data->send_buffer = SAFE_MALLOC(initial_capacity, uint8_t *);
1053 if (!ws_data->send_buffer) {
1054 mutex_destroy(&ws_data->state_mutex);
1055 mutex_destroy(&ws_data->send_mutex);
1056 cond_destroy(&ws_data->recv_cond);
1057 mutex_destroy(&ws_data->recv_mutex);
1058 ringbuffer_destroy(ws_data->recv_queue);
1059 ringbuffer_destroy(ws_data->send_queue);
1060 SAFE_FREE(ws_data);
1061 SAFE_FREE(transport);
1062 SET_ERRNO(ERROR_MEMORY, "Failed to allocate send buffer");
1063 return NULL;
1064 }
1065 ws_data->send_buffer_capacity = initial_capacity;
1066
1067 // Store connection info (server-side: no context ownership, connection already established)
1068 ws_data->wsi = wsi;
1069 ws_data->context = lws_get_context(wsi); // Get context from wsi (not owned)
1070 ws_data->owns_context = false; // Server owns context, not transport
1071 ws_data->is_connected = true; // Already connected (server-side)
1072 log_debug("Server transport created: is_connected=true, wsi=%p", (void *)wsi);
1073
1074 // Initialize transport
1075 transport->methods = &websocket_methods;
1076 transport->crypto_ctx = crypto_ctx;
1077 transport->impl_data = ws_data;
1078
1079 log_info("Created WebSocket server transport (crypto: %s)", crypto_ctx ? "enabled" : "disabled");
1080
1081 return transport;
1082}
void buffer_pool_free(buffer_pool_t *pool, void *data, size_t size)
void * buffer_pool_alloc(buffer_pool_t *pool, size_t size)
Definition buffer_pool.c:99
int socket_t
crypto_result_t crypto_encrypt(crypto_context_t *ctx, const uint8_t *plaintext, size_t plaintext_len, uint8_t *ciphertext_out, size_t ciphertext_out_size, size_t *ciphertext_len_out)
const char * crypto_result_to_string(crypto_result_t result)
bool crypto_is_ready(const crypto_context_t *ctx)
ringbuffer_t * ringbuffer_create(size_t element_size, size_t capacity)
Definition ringbuffer.c:28
void ringbuffer_destroy(ringbuffer_t *rb)
Definition ringbuffer.c:54
bool ringbuffer_is_empty(const ringbuffer_t *rb)
Definition ringbuffer.c:126
bool ringbuffer_read(ringbuffer_t *rb, void *data)
Definition ringbuffer.c:83
bool ringbuffer_write(ringbuffer_t *rb, const void *data)
Definition ringbuffer.c:61
int mutex_init(mutex_t *mutex)
Definition threading.c:16
int asciichat_thread_create(asciichat_thread_t *thread, void *(*start_routine)(void *), void *arg)
Definition threading.c:42
int asciichat_thread_join(asciichat_thread_t *thread, void **retval)
Definition threading.c:46
int mutex_destroy(mutex_t *mutex)
Definition threading.c:21
uint64_t time_get_ns(void)
Definition util/time.c:48
acip_transport_t * acip_websocket_server_transport_create(struct lws *wsi, crypto_context_t *crypto_ctx)
Create WebSocket server transport from existing connection.
#define WEBSOCKET_SEND_QUEUE_SIZE
Maximum send queue size (messages buffered for server-side sending)
acip_transport_t * acip_websocket_client_transport_create(const char *url, crypto_context_t *crypto_ctx)
Create WebSocket client transport.
#define WEBSOCKET_RECV_QUEUE_SIZE
Maximum receive queue size (messages buffered before recv())