ascii-chat 0.8.38
Real-time terminal-based video chat with ASCII art conversion
Loading...
Searching...
No Matches
transport.c File Reference

WebSocket transport implementation for ACIP protocol. More...

Go to the source code of this file.

Macros

#define WEBSOCKET_RECV_QUEUE_SIZE   4096
 Maximum receive queue size (messages buffered before recv())
 
#define WEBSOCKET_SEND_QUEUE_SIZE   256
 Maximum send queue size (messages buffered for server-side sending)
 

Functions

acip_transport_t * acip_websocket_client_transport_create (const char *url, crypto_context_t *crypto_ctx)
 Create WebSocket client transport.
 
acip_transport_t * acip_websocket_server_transport_create (struct lws *wsi, crypto_context_t *crypto_ctx)
 Create WebSocket server transport from existing connection.
 

Detailed Description

WebSocket transport implementation for ACIP protocol.

Implements the acip_transport_t interface for WebSocket connections. Enables browser clients to connect via WebSocket protocol.

ARCHITECTURE:

  • Uses libwebsockets for WebSocket protocol handling
  • Async libwebsockets callbacks bridge to sync recv() via ringbuffer
  • Thread-safe receive queue handles async message arrival
  • Same pattern as WebRTC transport for consistency

MESSAGE FLOW:

  1. send(): Synchronous write via lws_write()
  2. LWS callback: Async write to receive ringbuffer
  3. recv(): Blocking read from receive ringbuffer

MEMORY OWNERSHIP:

  • Transport OWNS wsi (WebSocket instance)
  • Receive queue owns buffered message data
  • recv() allocates message buffer, caller must free
Author
Zachary Fogg me@zf.nosp@m.o.gg
Date
February 2026

Definition in file websocket/transport.c.

Macro Definition Documentation

◆ WEBSOCKET_RECV_QUEUE_SIZE

#define WEBSOCKET_RECV_QUEUE_SIZE   4096

Maximum receive queue size (messages buffered before recv())

Power of 2 for ringbuffer optimization. Increased from 512 to buffer multiple large frames and reduce queue pressure. Each slot holds one message (up to 921KB). With 4096 slots, can buffer ~3.7GB.

Definition at line 53 of file websocket/transport.c.

◆ WEBSOCKET_SEND_QUEUE_SIZE

#define WEBSOCKET_SEND_QUEUE_SIZE   256

Maximum send queue size (messages buffered for server-side sending)

Larger than receive queue because video frames are continuously sent. Must be large enough to buffer frames while event loop processes them.

Definition at line 61 of file websocket/transport.c.

Function Documentation

◆ acip_websocket_client_transport_create()

acip_transport_t * acip_websocket_client_transport_create ( const char *  url,
crypto_context_t *  crypto_ctx 
)

Create WebSocket client transport.

Parameters
urlWebSocket URL (e.g., "ws://localhost:27225")
crypto_ctxOptional encryption context (can be NULL)
Returns
Transport instance or NULL on failure

Definition at line 702 of file websocket/transport.c.

702 {
703 if (!url) {
704 SET_ERRNO(ERROR_INVALID_PARAM, "url is required");
705 return NULL;
706 }
707
708 // Parse URL to extract host, port, and path
709 // Format: ws://host:port/path or wss://host:port/path
710 const char *protocol_end = strstr(url, "://");
711 if (!protocol_end) {
712 SET_ERRNO(ERROR_INVALID_PARAM, "Invalid WebSocket URL format (missing ://)");
713 return NULL;
714 }
715
716 bool use_ssl = (strncmp(url, "wss://", 6) == 0);
717 const char *host_start = protocol_end + 3;
718
719 // Find port (if specified)
720 const char *port_start = strchr(host_start, ':');
721 const char *path_start = strchr(host_start, '/');
722
723 char host[256] = {0};
724 int port = use_ssl ? 443 : 27226; // Default: wss:// uses 443, ws:// uses 27226 (ascii-chat WebSocket port)
725 char path[256] = "/";
726
727 if (port_start && (!path_start || port_start < path_start)) {
728 // Port is specified
729 size_t host_len = port_start - host_start;
730 if (host_len >= sizeof(host)) {
731 SET_ERRNO(ERROR_INVALID_PARAM, "Host name too long");
732 return NULL;
733 }
734 memcpy(host, host_start, host_len);
735 host[host_len] = '\0';
736
737 // Extract port
738 port = atoi(port_start + 1);
739 if (port <= 0 || port > 65535) {
740 SET_ERRNO(ERROR_INVALID_PARAM, "Invalid port number");
741 return NULL;
742 }
743 } else {
744 // No port specified, use default
745 size_t host_len = path_start ? (size_t)(path_start - host_start) : strlen(host_start);
746 if (host_len >= sizeof(host)) {
747 SET_ERRNO(ERROR_INVALID_PARAM, "Host name too long");
748 return NULL;
749 }
750 memcpy(host, host_start, host_len);
751 host[host_len] = '\0';
752 }
753
754 // Extract path
755 if (path_start) {
756 SAFE_STRNCPY(path, path_start, sizeof(path) - 1);
757 path[sizeof(path) - 1] = '\0';
758 }
759
760 log_info("Connecting to WebSocket: %s (host=%s, port=%d, path=%s, ssl=%d)", url, host, port, path, use_ssl);
761
762 // Allocate transport structure
763 acip_transport_t *transport = SAFE_MALLOC(sizeof(acip_transport_t), acip_transport_t *);
764 if (!transport) {
765 SET_ERRNO(ERROR_MEMORY, "Failed to allocate WebSocket transport");
766 return NULL;
767 }
768
769 // Allocate WebSocket-specific data
770 websocket_transport_data_t *ws_data =
771 SAFE_CALLOC(1, sizeof(websocket_transport_data_t), websocket_transport_data_t *);
772 if (!ws_data) {
773 SAFE_FREE(transport);
774 SET_ERRNO(ERROR_MEMORY, "Failed to allocate WebSocket transport data");
775 return NULL;
776 }
777
778 // Create receive queue
779 ws_data->recv_queue = ringbuffer_create(sizeof(websocket_recv_msg_t), WEBSOCKET_RECV_QUEUE_SIZE);
780 if (!ws_data->recv_queue) {
781 SAFE_FREE(ws_data);
782 SAFE_FREE(transport);
783 SET_ERRNO(ERROR_MEMORY, "Failed to create receive queue");
784 return NULL;
785 }
786
787 // Initialize synchronization primitives
788 if (mutex_init(&ws_data->recv_mutex) != 0) {
789 ringbuffer_destroy(ws_data->recv_queue);
790 SAFE_FREE(ws_data);
791 SAFE_FREE(transport);
792 SET_ERRNO(ERROR_INTERNAL, "Failed to initialize recv mutex");
793 return NULL;
794 }
795
796 if (cond_init(&ws_data->recv_cond) != 0) {
797 mutex_destroy(&ws_data->recv_mutex);
798 ringbuffer_destroy(ws_data->recv_queue);
799 SAFE_FREE(ws_data);
800 SAFE_FREE(transport);
801 SET_ERRNO(ERROR_INTERNAL, "Failed to initialize recv condition variable");
802 return NULL;
803 }
804
805 if (mutex_init(&ws_data->state_mutex) != 0) {
806 cond_destroy(&ws_data->recv_cond);
807 mutex_destroy(&ws_data->recv_mutex);
808 ringbuffer_destroy(ws_data->recv_queue);
809 SAFE_FREE(ws_data);
810 SAFE_FREE(transport);
811 SET_ERRNO(ERROR_INTERNAL, "Failed to initialize state mutex");
812 return NULL;
813 }
814
815 // Allocate initial send buffer
816 ws_data->send_buffer_capacity = LWS_PRE + 8192; // Initial 8KB buffer
817 ws_data->send_buffer = SAFE_MALLOC(ws_data->send_buffer_capacity, uint8_t *);
818 if (!ws_data->send_buffer) {
819 mutex_destroy(&ws_data->state_mutex);
820 cond_destroy(&ws_data->recv_cond);
821 mutex_destroy(&ws_data->recv_mutex);
822 ringbuffer_destroy(ws_data->recv_queue);
823 SAFE_FREE(ws_data);
824 SAFE_FREE(transport);
825 SET_ERRNO(ERROR_MEMORY, "Failed to allocate send buffer");
826 return NULL;
827 }
828
829 // Create libwebsockets context
830 // Protocol array must persist for lifetime of context - use static
831 static struct lws_protocols client_protocols[] = {
832 {"acip", websocket_callback, 0, 4096, 0, NULL, 0}, {NULL, NULL, 0, 0, 0, NULL, 0} // Terminator
833 };
834
835 struct lws_context_creation_info info;
836 memset(&info, 0, sizeof(info));
837 info.port = CONTEXT_PORT_NO_LISTEN; // Client mode - no listening
838 info.protocols = client_protocols;
839 info.gid = (gid_t)-1; // Cast to avoid undefined behavior with unsigned type
840 info.uid = (uid_t)-1; // Cast to avoid undefined behavior with unsigned type
841 info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
842
843 ws_data->context = lws_create_context(&info);
844 if (!ws_data->context) {
845 SAFE_FREE(ws_data->send_buffer);
846 mutex_destroy(&ws_data->state_mutex);
847 cond_destroy(&ws_data->recv_cond);
848 mutex_destroy(&ws_data->recv_mutex);
849 ringbuffer_destroy(ws_data->recv_queue);
850 SAFE_FREE(ws_data);
851 SAFE_FREE(transport);
852 SET_ERRNO(ERROR_NETWORK, "Failed to create libwebsockets context");
853 return NULL;
854 }
855
856 // Connect to WebSocket server
857 log_debug("Initiating WebSocket connection to %s:%d%s", host, port, path);
858 struct lws_client_connect_info connect_info;
859 memset(&connect_info, 0, sizeof(connect_info));
860 connect_info.context = ws_data->context;
861 connect_info.address = host;
862 connect_info.port = port;
863 connect_info.path = path;
864 connect_info.host = host;
865 connect_info.origin = host;
866 connect_info.protocol = "acip";
867 connect_info.ssl_connection = use_ssl ? LCCSCF_USE_SSL : 0;
868 connect_info.userdata = ws_data;
869
870 log_debug("Calling lws_client_connect_via_info...");
871 ws_data->wsi = lws_client_connect_via_info(&connect_info);
872 log_debug("lws_client_connect_via_info returned: %p", (void *)ws_data->wsi);
873 if (!ws_data->wsi) {
874 lws_context_destroy(ws_data->context);
875 SAFE_FREE(ws_data->send_buffer);
876 mutex_destroy(&ws_data->state_mutex);
877 cond_destroy(&ws_data->recv_cond);
878 mutex_destroy(&ws_data->recv_mutex);
879 ringbuffer_destroy(ws_data->recv_queue);
880 SAFE_FREE(ws_data);
881 SAFE_FREE(transport);
882 SET_ERRNO(ERROR_NETWORK, "Failed to connect to WebSocket server");
883 return NULL;
884 }
885
886 ws_data->is_connected = false; // Will be set to true in LWS_CALLBACK_CLIENT_ESTABLISHED
887 ws_data->owns_context = true; // Client transport owns the context
888
889 // Initialize transport
890 transport->methods = &websocket_methods;
891 transport->crypto_ctx = crypto_ctx;
892 transport->impl_data = ws_data;
893
894 // Wait for connection to establish (synchronous connection)
895 // Service the libwebsockets event loop until connected or timeout
896 log_debug("Waiting for WebSocket connection to establish...");
897 int timeout_ms = 5000; // 5 second timeout
898 int elapsed_ms = 0;
899 while (!ws_data->is_connected && elapsed_ms < timeout_ms) {
900 // Service libwebsockets (processes network events, triggers callbacks)
901 int result = lws_service(ws_data->context, 50); // 50ms timeout per iteration
902 if (result < 0) {
903 log_error("lws_service error during connection: %d", result);
904 lws_context_destroy(ws_data->context);
905 SAFE_FREE(ws_data->send_buffer);
906 mutex_destroy(&ws_data->state_mutex);
907 cond_destroy(&ws_data->recv_cond);
908 mutex_destroy(&ws_data->recv_mutex);
909 ringbuffer_destroy(ws_data->recv_queue);
910 SAFE_FREE(ws_data);
911 SAFE_FREE(transport);
912 SET_ERRNO(ERROR_NETWORK, "WebSocket connection failed");
913 return NULL;
914 }
915 elapsed_ms += 50;
916 }
917
918 if (!ws_data->is_connected) {
919 log_error("WebSocket connection timeout after %d ms", elapsed_ms);
920 lws_context_destroy(ws_data->context);
921 SAFE_FREE(ws_data->send_buffer);
922 mutex_destroy(&ws_data->state_mutex);
923 cond_destroy(&ws_data->recv_cond);
924 mutex_destroy(&ws_data->recv_mutex);
925 ringbuffer_destroy(ws_data->recv_queue);
926 SAFE_FREE(ws_data);
927 SAFE_FREE(transport);
928 SET_ERRNO(ERROR_NETWORK, "WebSocket connection timeout");
929 return NULL;
930 }
931
932 log_info("WebSocket connection established (crypto: %s)", crypto_ctx ? "enabled" : "disabled");
933
934 // Start service thread to process incoming messages
935 ws_data->service_running = true;
936 if (asciichat_thread_create(&ws_data->service_thread, websocket_service_thread, ws_data) != 0) {
937 log_error("Failed to create WebSocket service thread");
938 ws_data->service_running = false;
939 lws_context_destroy(ws_data->context);
940 SAFE_FREE(ws_data->send_buffer);
941 mutex_destroy(&ws_data->state_mutex);
942 cond_destroy(&ws_data->recv_cond);
943 mutex_destroy(&ws_data->recv_mutex);
944 ringbuffer_destroy(ws_data->recv_queue);
945 SAFE_FREE(ws_data);
946 SAFE_FREE(transport);
947 SET_ERRNO(ERROR_INTERNAL, "Failed to create service thread");
948 return NULL;
949 }
950
951 log_debug("WebSocket service thread started for client transport");
952
953 return transport;
954}
ringbuffer_t * ringbuffer_create(size_t element_size, size_t capacity)
Definition ringbuffer.c:28
void ringbuffer_destroy(ringbuffer_t *rb)
Definition ringbuffer.c:54
int mutex_init(mutex_t *mutex)
Definition threading.c:16
int asciichat_thread_create(asciichat_thread_t *thread, void *(*start_routine)(void *), void *arg)
Definition threading.c:42
int mutex_destroy(mutex_t *mutex)
Definition threading.c:21
#define WEBSOCKET_RECV_QUEUE_SIZE
Maximum receive queue size (messages buffered before recv())

References asciichat_thread_create(), mutex_destroy(), mutex_init(), ringbuffer_create(), ringbuffer_destroy(), and WEBSOCKET_RECV_QUEUE_SIZE.

Referenced by server_connection_establish(), and websocket_client_connect().

◆ acip_websocket_server_transport_create()

acip_transport_t * acip_websocket_server_transport_create ( struct lws *  wsi,
crypto_context_t *  crypto_ctx 
)

Create WebSocket server transport from existing connection.

Wraps an already-established libwebsockets connection (from server accept). Used by websocket_server module to create transports for incoming clients.

Parameters
wsiEstablished libwebsockets connection (not owned by transport)
crypto_ctxOptional crypto context
Returns
Transport instance or NULL on error

Definition at line 966 of file websocket/transport.c.

966 {
967 if (!wsi) {
968 SET_ERRNO(ERROR_INVALID_PARAM, "Invalid wsi parameter");
969 return NULL;
970 }
971
972 // Allocate transport structure
973 acip_transport_t *transport = SAFE_CALLOC(1, sizeof(acip_transport_t), acip_transport_t *);
974 if (!transport) {
975 SET_ERRNO(ERROR_MEMORY, "Failed to allocate WebSocket transport");
976 return NULL;
977 }
978
979 // Allocate transport-specific data
980 websocket_transport_data_t *ws_data =
981 SAFE_CALLOC(1, sizeof(websocket_transport_data_t), websocket_transport_data_t *);
982 if (!ws_data) {
983 SAFE_FREE(transport);
984 SET_ERRNO(ERROR_MEMORY, "Failed to allocate WebSocket transport data");
985 return NULL;
986 }
987
988 // Initialize receive queue
989 ws_data->recv_queue = ringbuffer_create(sizeof(websocket_recv_msg_t), WEBSOCKET_RECV_QUEUE_SIZE);
990 if (!ws_data->recv_queue) {
991 SAFE_FREE(ws_data);
992 SAFE_FREE(transport);
993 SET_ERRNO(ERROR_MEMORY, "Failed to create receive queue");
994 return NULL;
995 }
996
997 // Initialize send queue (for server-side transports)
998 ws_data->send_queue = ringbuffer_create(sizeof(websocket_recv_msg_t), WEBSOCKET_SEND_QUEUE_SIZE);
999 if (!ws_data->send_queue) {
1000 ringbuffer_destroy(ws_data->recv_queue);
1001 SAFE_FREE(ws_data);
1002 SAFE_FREE(transport);
1003 SET_ERRNO(ERROR_MEMORY, "Failed to create send queue");
1004 return NULL;
1005 }
1006
1007 // Initialize synchronization primitives
1008 if (mutex_init(&ws_data->recv_mutex) != 0) {
1009 ringbuffer_destroy(ws_data->recv_queue);
1010 ringbuffer_destroy(ws_data->send_queue);
1011 SAFE_FREE(ws_data);
1012 SAFE_FREE(transport);
1013 SET_ERRNO(ERROR_NETWORK, "Failed to initialize recv mutex");
1014 return NULL;
1015 }
1016
1017 if (cond_init(&ws_data->recv_cond) != 0) {
1018 mutex_destroy(&ws_data->recv_mutex);
1019 ringbuffer_destroy(ws_data->recv_queue);
1020 ringbuffer_destroy(ws_data->send_queue);
1021 SAFE_FREE(ws_data);
1022 SAFE_FREE(transport);
1023 SET_ERRNO(ERROR_NETWORK, "Failed to initialize recv condition variable");
1024 return NULL;
1025 }
1026
1027 if (mutex_init(&ws_data->send_mutex) != 0) {
1028 cond_destroy(&ws_data->recv_cond);
1029 mutex_destroy(&ws_data->recv_mutex);
1030 ringbuffer_destroy(ws_data->recv_queue);
1031 ringbuffer_destroy(ws_data->send_queue);
1032 SAFE_FREE(ws_data);
1033 SAFE_FREE(transport);
1034 SET_ERRNO(ERROR_NETWORK, "Failed to initialize send mutex");
1035 return NULL;
1036 }
1037
1038 if (mutex_init(&ws_data->state_mutex) != 0) {
1039 mutex_destroy(&ws_data->send_mutex);
1040 cond_destroy(&ws_data->recv_cond);
1041 mutex_destroy(&ws_data->recv_mutex);
1042 ringbuffer_destroy(ws_data->recv_queue);
1043 ringbuffer_destroy(ws_data->send_queue);
1044 SAFE_FREE(ws_data);
1045 SAFE_FREE(transport);
1046 SET_ERRNO(ERROR_NETWORK, "Failed to initialize state mutex");
1047 return NULL;
1048 }
1049
1050 // Allocate send buffer with LWS_PRE padding
1051 size_t initial_capacity = 4096 + LWS_PRE;
1052 ws_data->send_buffer = SAFE_MALLOC(initial_capacity, uint8_t *);
1053 if (!ws_data->send_buffer) {
1054 mutex_destroy(&ws_data->state_mutex);
1055 mutex_destroy(&ws_data->send_mutex);
1056 cond_destroy(&ws_data->recv_cond);
1057 mutex_destroy(&ws_data->recv_mutex);
1058 ringbuffer_destroy(ws_data->recv_queue);
1059 ringbuffer_destroy(ws_data->send_queue);
1060 SAFE_FREE(ws_data);
1061 SAFE_FREE(transport);
1062 SET_ERRNO(ERROR_MEMORY, "Failed to allocate send buffer");
1063 return NULL;
1064 }
1065 ws_data->send_buffer_capacity = initial_capacity;
1066
1067 // Store connection info (server-side: no context ownership, connection already established)
1068 ws_data->wsi = wsi;
1069 ws_data->context = lws_get_context(wsi); // Get context from wsi (not owned)
1070 ws_data->owns_context = false; // Server owns context, not transport
1071 ws_data->is_connected = true; // Already connected (server-side)
1072 log_debug("Server transport created: is_connected=true, wsi=%p", (void *)wsi);
1073
1074 // Initialize transport
1075 transport->methods = &websocket_methods;
1076 transport->crypto_ctx = crypto_ctx;
1077 transport->impl_data = ws_data;
1078
1079 log_info("Created WebSocket server transport (crypto: %s)", crypto_ctx ? "enabled" : "disabled");
1080
1081 return transport;
1082}
#define WEBSOCKET_SEND_QUEUE_SIZE
Maximum send queue size (messages buffered for server-side sending)

References mutex_destroy(), mutex_init(), ringbuffer_create(), ringbuffer_destroy(), WEBSOCKET_RECV_QUEUE_SIZE, and WEBSOCKET_SEND_QUEUE_SIZE.