ascii-chat 0.8.38
Real-time terminal-based video chat with ASCII art conversion
Loading...
Searching...
No Matches
webrtc/transport.c
Go to the documentation of this file.
1
31#include <ascii-chat/network/acip/transport.h>
32#include <ascii-chat/network/webrtc/webrtc.h>
33#include <ascii-chat/log/logging.h>
34#include <ascii-chat/ringbuffer.h>
35#include <ascii-chat/buffer_pool.h>
36#include <ascii-chat/platform/mutex.h>
37#include <ascii-chat/platform/cond.h>
38#include <string.h>
39
47#define WEBRTC_RECV_QUEUE_SIZE 512
48
52typedef struct {
53 uint8_t *data;
54 size_t len;
56
60typedef struct {
61 webrtc_peer_connection_t *peer_conn;
62 webrtc_data_channel_t *data_channel;
63 ringbuffer_t *recv_queue;
64 mutex_t queue_mutex;
65 cond_t queue_cond;
67 mutex_t state_mutex;
69
70// =============================================================================
71// DataChannel Callbacks
72// =============================================================================
73
77static void webrtc_on_message(webrtc_data_channel_t *channel, const uint8_t *data, size_t len, void *user_data) {
78 (void)channel; // Unused
80
81 if (!wrtc || !data || len == 0) {
82 return;
83 }
84
85 // Allocate message buffer using buffer pool (will be freed by acip_client_receive_and_dispatch)
87 msg.data = buffer_pool_alloc(NULL, len);
88 if (!msg.data) {
89 return;
90 }
91
92 // Copy data
93 memcpy(msg.data, data, len);
94 msg.len = len;
95
96 // Push to receive queue
97 mutex_lock(&wrtc->queue_mutex);
98
99 bool success = ringbuffer_write(wrtc->recv_queue, &msg);
100 if (!success) {
101 // Queue full - drop oldest message to make room
102 webrtc_recv_msg_t dropped_msg;
103 if (ringbuffer_read(wrtc->recv_queue, &dropped_msg)) {
104 buffer_pool_free(NULL, dropped_msg.data, dropped_msg.len);
105 }
106
107 // Try again
108 success = ringbuffer_write(wrtc->recv_queue, &msg);
109 if (!success) {
110 buffer_pool_free(NULL, msg.data, len);
111 mutex_unlock(&wrtc->queue_mutex);
112 return;
113 }
114 }
115
116 // Signal waiting recv() call
117 cond_signal(&wrtc->queue_cond);
118 mutex_unlock(&wrtc->queue_mutex);
119}
120
124static void webrtc_on_open(webrtc_data_channel_t *channel, void *user_data) {
125 (void)channel;
127
128 if (!wrtc) {
129 return;
130 }
131
132 mutex_lock(&wrtc->state_mutex);
133 wrtc->is_connected = true;
134 mutex_unlock(&wrtc->state_mutex);
135
136 log_info("WebRTC DataChannel opened, transport ready");
137}
138
142static void webrtc_on_error(webrtc_data_channel_t *channel, const char *error_msg, void *user_data) {
143 (void)channel;
145
146 log_error("WebRTC DataChannel error: %s", error_msg ? error_msg : "unknown error");
147
148 if (!wrtc) {
149 return;
150 }
151
152 mutex_lock(&wrtc->state_mutex);
153 wrtc->is_connected = false;
154 mutex_unlock(&wrtc->state_mutex);
155
156 // Wake any blocking recv() calls
157 cond_broadcast(&wrtc->queue_cond);
158}
159
163static void webrtc_on_close(webrtc_data_channel_t *channel, void *user_data) {
164 (void)channel;
166
167 log_info("WebRTC DataChannel closed");
168
169 if (!wrtc) {
170 return;
171 }
172
173 mutex_lock(&wrtc->state_mutex);
174 wrtc->is_connected = false;
175 mutex_unlock(&wrtc->state_mutex);
176
177 // Wake any blocking recv() calls
178 cond_broadcast(&wrtc->queue_cond);
179}
180
181// =============================================================================
182// WebRTC Transport Methods
183// =============================================================================
184
185static asciichat_error_t webrtc_send(acip_transport_t *transport, const void *data, size_t len) {
186 webrtc_transport_data_t *wrtc = (webrtc_transport_data_t *)transport->impl_data;
187
188 mutex_lock(&wrtc->state_mutex);
189 bool connected = wrtc->is_connected;
190 mutex_unlock(&wrtc->state_mutex);
191
192 if (!connected) {
193 return SET_ERRNO(ERROR_NETWORK, "WebRTC transport not connected");
194 }
195
196 // Send via DataChannel
197 asciichat_error_t result = webrtc_datachannel_send(wrtc->data_channel, data, len);
198
199 if (result != ASCIICHAT_OK) {
200 return SET_ERRNO(ERROR_NETWORK, "Failed to send on WebRTC DataChannel");
201 }
202
203 return ASCIICHAT_OK;
204}
205
206static asciichat_error_t webrtc_recv(acip_transport_t *transport, void **buffer, size_t *out_len,
207 void **out_allocated_buffer) {
208 webrtc_transport_data_t *wrtc = (webrtc_transport_data_t *)transport->impl_data;
209
210 mutex_lock(&wrtc->queue_mutex);
211
212 // Block until message arrives or connection closes
213 while (ringbuffer_is_empty(wrtc->recv_queue)) {
214 mutex_lock(&wrtc->state_mutex);
215 bool connected = wrtc->is_connected;
216 mutex_unlock(&wrtc->state_mutex);
217
218 if (!connected) {
219 mutex_unlock(&wrtc->queue_mutex);
220 return SET_ERRNO(ERROR_NETWORK, "Connection closed while waiting for data");
221 }
222
223 // Wait for message arrival or connection close
224 cond_wait(&wrtc->queue_cond, &wrtc->queue_mutex);
225 }
226
227 // Read message from queue
229 bool success = ringbuffer_read(wrtc->recv_queue, &msg);
230 mutex_unlock(&wrtc->queue_mutex);
231
232 if (!success) {
233 return SET_ERRNO(ERROR_NETWORK, "Failed to read from receive queue");
234 }
235
236 // Return message to caller (caller owns the buffer)
237 *buffer = msg.data;
238 *out_len = msg.len;
239 *out_allocated_buffer = msg.data;
240
241 return ASCIICHAT_OK;
242}
243
244static asciichat_error_t webrtc_close(acip_transport_t *transport) {
245 webrtc_transport_data_t *wrtc = (webrtc_transport_data_t *)transport->impl_data;
246
247 mutex_lock(&wrtc->state_mutex);
248
249 if (!wrtc->is_connected) {
250 mutex_unlock(&wrtc->state_mutex);
251 return ASCIICHAT_OK; // Already closed
252 }
253
254 wrtc->is_connected = false;
255 mutex_unlock(&wrtc->state_mutex);
256
257 // Close DataChannel
258 if (wrtc->data_channel) {
259 webrtc_datachannel_close(wrtc->data_channel);
260 }
261
262 // Close peer connection
263 if (wrtc->peer_conn) {
265 }
266
267 // Wake any blocking recv() calls
268 cond_broadcast(&wrtc->queue_cond);
269
270 log_debug("WebRTC transport closed");
271 return ASCIICHAT_OK;
272}
273
274static acip_transport_type_t webrtc_get_type(acip_transport_t *transport) {
275 (void)transport;
276 return ACIP_TRANSPORT_WEBRTC;
277}
278
279static socket_t webrtc_get_socket(acip_transport_t *transport) {
280 (void)transport;
281 return INVALID_SOCKET_VALUE; // WebRTC has no underlying socket
282}
283
284static bool webrtc_is_connected(acip_transport_t *transport) {
285 webrtc_transport_data_t *wrtc = (webrtc_transport_data_t *)transport->impl_data;
286
287 mutex_lock(&wrtc->state_mutex);
288 bool connected = wrtc->is_connected;
289 mutex_unlock(&wrtc->state_mutex);
290
291 return connected;
292}
293
294// =============================================================================
295// WebRTC Transport Destroy Implementation
296// =============================================================================
297
307static void webrtc_destroy_impl(acip_transport_t *transport) {
308 if (!transport || !transport->impl_data) {
309 return;
310 }
311
312 webrtc_transport_data_t *wrtc = (webrtc_transport_data_t *)transport->impl_data;
313
314 // Destroy peer connection and data channel
315 if (wrtc->data_channel) {
317 wrtc->data_channel = NULL;
318 }
319
320 if (wrtc->peer_conn) {
322 wrtc->peer_conn = NULL;
323 }
324
325 // Clear receive queue and free buffered messages
326 if (wrtc->recv_queue) {
327 mutex_lock(&wrtc->queue_mutex);
328
330 while (ringbuffer_read(wrtc->recv_queue, &msg)) {
331 if (msg.data) {
332 SAFE_FREE(msg.data);
333 }
334 }
335
336 mutex_unlock(&wrtc->queue_mutex);
338 wrtc->recv_queue = NULL;
339 }
340
341 // Destroy synchronization primitives
343 cond_destroy(&wrtc->queue_cond);
345
346 log_debug("Destroyed WebRTC transport resources");
347}
348
349// =============================================================================
350// WebRTC Transport Method Table
351// =============================================================================
352
353static const acip_transport_methods_t webrtc_methods = {
354 .send = webrtc_send,
355 .recv = webrtc_recv,
356 .close = webrtc_close,
357 .get_type = webrtc_get_type,
358 .get_socket = webrtc_get_socket,
359 .is_connected = webrtc_is_connected,
360 .destroy_impl = webrtc_destroy_impl,
361};
362
363// =============================================================================
364// WebRTC Transport Creation
365// =============================================================================
366
367acip_transport_t *acip_webrtc_transport_create(webrtc_peer_connection_t *peer_conn, webrtc_data_channel_t *data_channel,
368 crypto_context_t *crypto_ctx) {
369 if (!peer_conn || !data_channel) {
370 SET_ERRNO(ERROR_INVALID_PARAM, "peer_conn and data_channel are required");
371 return NULL;
372 }
373
374 // Allocate transport structure
375 acip_transport_t *transport = SAFE_MALLOC(sizeof(acip_transport_t), acip_transport_t *);
376 if (!transport) {
377 SET_ERRNO(ERROR_MEMORY, "Failed to allocate WebRTC transport");
378 return NULL;
379 }
380
381 // Allocate WebRTC-specific data
383 if (!wrtc_data) {
384 SAFE_FREE(transport);
385 SET_ERRNO(ERROR_MEMORY, "Failed to allocate WebRTC transport data");
386 return NULL;
387 }
388
389 // Create receive queue
391 if (!wrtc_data->recv_queue) {
392 SAFE_FREE(wrtc_data);
393 SAFE_FREE(transport);
394 SET_ERRNO(ERROR_MEMORY, "Failed to create receive queue");
395 return NULL;
396 }
397
398 // Initialize synchronization primitives
399 if (mutex_init(&wrtc_data->queue_mutex) != 0) {
400 ringbuffer_destroy(wrtc_data->recv_queue);
401 SAFE_FREE(wrtc_data);
402 SAFE_FREE(transport);
403 SET_ERRNO(ERROR_INTERNAL, "Failed to initialize queue mutex");
404 return NULL;
405 }
406
407 if (cond_init(&wrtc_data->queue_cond) != 0) {
408 mutex_destroy(&wrtc_data->queue_mutex);
409 ringbuffer_destroy(wrtc_data->recv_queue);
410 SAFE_FREE(wrtc_data);
411 SAFE_FREE(transport);
412 SET_ERRNO(ERROR_INTERNAL, "Failed to initialize queue condition variable");
413 return NULL;
414 }
415
416 if (mutex_init(&wrtc_data->state_mutex) != 0) {
417 cond_destroy(&wrtc_data->queue_cond);
418 mutex_destroy(&wrtc_data->queue_mutex);
419 ringbuffer_destroy(wrtc_data->recv_queue);
420 SAFE_FREE(wrtc_data);
421 SAFE_FREE(transport);
422 SET_ERRNO(ERROR_INTERNAL, "Failed to initialize state mutex");
423 return NULL;
424 }
425
426 // Initialize WebRTC data
427 wrtc_data->peer_conn = peer_conn;
428 wrtc_data->data_channel = data_channel;
429 wrtc_data->is_connected = false; // Will be set to true in on_open callback
430
431 // Register DataChannel callbacks
432 webrtc_datachannel_callbacks_t callbacks = {
433 .on_open = webrtc_on_open,
434 .on_close = webrtc_on_close,
435 .on_error = webrtc_on_error,
436 .on_message = webrtc_on_message,
437 .user_data = wrtc_data,
438 };
439
440 asciichat_error_t result = webrtc_datachannel_set_callbacks(data_channel, &callbacks);
441 if (result != ASCIICHAT_OK) {
442 mutex_destroy(&wrtc_data->state_mutex);
443 cond_destroy(&wrtc_data->queue_cond);
444 mutex_destroy(&wrtc_data->queue_mutex);
445 ringbuffer_destroy(wrtc_data->recv_queue);
446 SAFE_FREE(wrtc_data);
447 SAFE_FREE(transport);
448 SET_ERRNO(ERROR_INTERNAL, "Failed to set DataChannel callbacks");
449 return NULL;
450 }
451
452 // IMPORTANT: The transport is always created from peer_manager's on_datachannel_open callback,
453 // which means the DataChannel is ALREADY OPEN when we get here. However, by setting our own
454 // callbacks above (webrtc_datachannel_set_callbacks), we replaced the callbacks that would
455 // have set dc->is_open=true. So we need to manually mark both the transport AND the DataChannel
456 // as open/connected now.
457 //
458 // We cannot rely on webrtc_on_open being called later because:
459 // 1. The DataChannel is already open
460 // 2. libdatachannel won't fire the open event again
461 // 3. Setting callbacks after open doesn't trigger a retroactive open event
462
463 // Mark DataChannel as open (needed for webrtc_datachannel_send() check)
464 webrtc_datachannel_set_open_state(data_channel, true);
465
466 // Mark transport as connected
467 mutex_lock(&wrtc_data->state_mutex);
468 wrtc_data->is_connected = true;
469 mutex_unlock(&wrtc_data->state_mutex);
470 log_debug("Transport and DataChannel marked as connected/open (already open from peer_manager callback)");
471
472 // Initialize transport
473 transport->methods = &webrtc_methods;
474 transport->crypto_ctx = crypto_ctx;
475 transport->impl_data = wrtc_data;
476
477 log_info("Created WebRTC transport (crypto: %s)", crypto_ctx ? "enabled" : "disabled");
478
479 return transport;
480}
void buffer_pool_free(buffer_pool_t *pool, void *data, size_t size)
void * buffer_pool_alloc(buffer_pool_t *pool, size_t size)
Definition buffer_pool.c:99
int socket_t
asciichat_error_t webrtc_datachannel_set_callbacks(webrtc_data_channel_t *dc, const webrtc_datachannel_callbacks_t *callbacks)
void webrtc_datachannel_destroy(webrtc_data_channel_t *dc)
void webrtc_peer_connection_close(webrtc_peer_connection_t *pc)
void webrtc_peer_connection_destroy(webrtc_peer_connection_t *pc)
asciichat_error_t webrtc_datachannel_send(webrtc_data_channel_t *dc, const uint8_t *data, size_t size)
void webrtc_datachannel_set_open_state(webrtc_data_channel_t *dc, bool is_open)
ringbuffer_t * ringbuffer_create(size_t element_size, size_t capacity)
Definition ringbuffer.c:28
void ringbuffer_destroy(ringbuffer_t *rb)
Definition ringbuffer.c:54
bool ringbuffer_is_empty(const ringbuffer_t *rb)
Definition ringbuffer.c:126
bool ringbuffer_read(ringbuffer_t *rb, void *data)
Definition ringbuffer.c:83
bool ringbuffer_write(ringbuffer_t *rb, const void *data)
Definition ringbuffer.c:61
Receive queue element (variable-length message)
size_t len
Message length in bytes.
uint8_t * data
Message data (allocated, caller must free)
WebRTC transport implementation data.
webrtc_peer_connection_t * peer_conn
Peer connection (owned)
bool is_connected
Connection state.
mutex_t queue_mutex
Protect queue operations.
webrtc_data_channel_t * data_channel
Data channel (owned)
mutex_t state_mutex
Protect state changes.
cond_t queue_cond
Signal when messages arrive.
ringbuffer_t * recv_queue
Receive message queue.
int mutex_init(mutex_t *mutex)
Definition threading.c:16
int mutex_destroy(mutex_t *mutex)
Definition threading.c:21
#define WEBRTC_RECV_QUEUE_SIZE
Maximum receive queue size (messages buffered before recv())
acip_transport_t * acip_webrtc_transport_create(webrtc_peer_connection_t *peer_conn, webrtc_data_channel_t *data_channel, crypto_context_t *crypto_ctx)