ascii-chat 0.6.0
Real-time terminal-based video chat with ASCII art conversion
Loading...
Searching...
No Matches
webrtc/transport.c
Go to the documentation of this file.
1
33#include "log/logging.h"
34#include "ringbuffer.h"
35#include "platform/mutex.h"
36#include "platform/cond.h"
37#include <string.h>
38
45#define WEBRTC_RECV_QUEUE_SIZE 64
46
50typedef struct {
52 size_t len;
54
67
68// =============================================================================
69// DataChannel Callbacks
70// =============================================================================
71
75static void webrtc_on_message(webrtc_data_channel_t *channel, const uint8_t *data, size_t len, void *user_data) {
76 (void)channel; // Unused
78
79 if (!wrtc || !data || len == 0) {
80 log_error("Invalid DataChannel message callback parameters");
81 return;
82 }
83
84 // Allocate message structure
86 msg.data = SAFE_MALLOC(len, uint8_t *);
87 if (!msg.data) {
88 log_error("Failed to allocate receive message buffer (%zu bytes)", len);
89 return;
90 }
91
92 memcpy(msg.data, data, len);
93 msg.len = len;
94
95 // Push to receive queue (thread-safe)
96 mutex_lock(&wrtc->queue_mutex);
97
98 bool success = ringbuffer_write(wrtc->recv_queue, &msg);
99 if (!success) {
100 // Queue full - drop oldest message to make room
101 webrtc_recv_msg_t dropped_msg;
102 if (ringbuffer_read(wrtc->recv_queue, &dropped_msg)) {
103 log_warn("Receive queue full, dropping oldest message (%zu bytes)", dropped_msg.len);
104 SAFE_FREE(dropped_msg.data);
105 }
106
107 // Try again
108 success = ringbuffer_write(wrtc->recv_queue, &msg);
109 if (!success) {
110 log_error("Failed to write to receive queue after drop");
111 SAFE_FREE(msg.data);
113 return;
114 }
115 }
116
117 // Signal waiting recv() call
118 cond_signal(&wrtc->queue_cond);
120
121 log_debug_every(1000000, "WebRTC message received (%zu bytes), queue size: %zu", len,
123}
124
128static void webrtc_on_open(webrtc_data_channel_t *channel, void *user_data) {
129 (void)channel;
131
132 if (!wrtc) {
133 return;
134 }
135
136 mutex_lock(&wrtc->state_mutex);
137 wrtc->is_connected = true;
139
140 log_info("WebRTC DataChannel opened, transport ready");
141}
142
146static void webrtc_on_error(webrtc_data_channel_t *channel, const char *error_msg, void *user_data) {
147 (void)channel;
149
150 log_error("WebRTC DataChannel error: %s", error_msg ? error_msg : "unknown error");
151
152 if (!wrtc) {
153 return;
154 }
155
156 mutex_lock(&wrtc->state_mutex);
157 wrtc->is_connected = false;
159
160 // Wake any blocking recv() calls
162}
163
167static void webrtc_on_close(webrtc_data_channel_t *channel, void *user_data) {
168 (void)channel;
170
171 log_info("WebRTC DataChannel closed");
172
173 if (!wrtc) {
174 return;
175 }
176
177 mutex_lock(&wrtc->state_mutex);
178 wrtc->is_connected = false;
180
181 // Wake any blocking recv() calls
183}
184
185// =============================================================================
186// WebRTC Transport Methods
187// =============================================================================
188
189static asciichat_error_t webrtc_send(acip_transport_t *transport, const void *data, size_t len) {
191
192 mutex_lock(&wrtc->state_mutex);
193 bool connected = wrtc->is_connected;
195
196 if (!connected) {
197 return SET_ERRNO(ERROR_NETWORK, "WebRTC transport not connected");
198 }
199
200 // Send via DataChannel
201 asciichat_error_t result = webrtc_datachannel_send(wrtc->data_channel, data, len);
202 if (result != ASCIICHAT_OK) {
203 return SET_ERRNO(ERROR_NETWORK, "Failed to send on WebRTC DataChannel");
204 }
205
206 log_debug_every(1000000, "WebRTC sent %zu bytes", len);
207 return ASCIICHAT_OK;
208}
209
210static asciichat_error_t webrtc_recv(acip_transport_t *transport, void **buffer, size_t *out_len,
211 void **out_allocated_buffer) {
213
214 mutex_lock(&wrtc->queue_mutex);
215
216 // Block until message arrives or connection closes
217 while (ringbuffer_is_empty(wrtc->recv_queue)) {
218 mutex_lock(&wrtc->state_mutex);
219 bool connected = wrtc->is_connected;
221
222 if (!connected) {
224 return SET_ERRNO(ERROR_NETWORK, "Connection closed while waiting for data");
225 }
226
227 // Wait for message arrival or connection close
228 cond_wait(&wrtc->queue_cond, &wrtc->queue_mutex);
229 }
230
231 // Read message from queue
233 bool success = ringbuffer_read(wrtc->recv_queue, &msg);
235
236 if (!success) {
237 return SET_ERRNO(ERROR_NETWORK, "Failed to read from receive queue");
238 }
239
240 // Return message to caller (caller owns the buffer)
241 *buffer = msg.data;
242 *out_len = msg.len;
243 *out_allocated_buffer = msg.data;
244
245 log_debug_every(1000000, "WebRTC received %zu bytes, queue remaining: %zu", msg.len,
247
248 return ASCIICHAT_OK;
249}
250
251static asciichat_error_t webrtc_close(acip_transport_t *transport) {
253
254 mutex_lock(&wrtc->state_mutex);
255
256 if (!wrtc->is_connected) {
258 return ASCIICHAT_OK; // Already closed
259 }
260
261 wrtc->is_connected = false;
263
264 // Close DataChannel
265 if (wrtc->data_channel) {
266 webrtc_datachannel_close(wrtc->data_channel);
267 }
268
269 // Close peer connection
270 if (wrtc->peer_conn) {
272 }
273
274 // Wake any blocking recv() calls
276
277 log_debug("WebRTC transport closed");
278 return ASCIICHAT_OK;
279}
280
281static acip_transport_type_t webrtc_get_type(acip_transport_t *transport) {
282 (void)transport;
284}
285
286static socket_t webrtc_get_socket(acip_transport_t *transport) {
287 (void)transport;
288 return INVALID_SOCKET_VALUE; // WebRTC has no underlying socket
289}
290
291static bool webrtc_is_connected(acip_transport_t *transport) {
293
294 mutex_lock(&wrtc->state_mutex);
295 bool connected = wrtc->is_connected;
297
298 return connected;
299}
300
301// =============================================================================
302// WebRTC Transport Destroy Implementation
303// =============================================================================
304
314static void webrtc_destroy_impl(acip_transport_t *transport) {
315 if (!transport || !transport->impl_data) {
316 return;
317 }
318
320
321 // Destroy peer connection and data channel
322 if (wrtc->data_channel) {
324 wrtc->data_channel = NULL;
325 }
326
327 if (wrtc->peer_conn) {
329 wrtc->peer_conn = NULL;
330 }
331
332 // Clear receive queue and free buffered messages
333 if (wrtc->recv_queue) {
334 mutex_lock(&wrtc->queue_mutex);
335
337 while (ringbuffer_read(wrtc->recv_queue, &msg)) {
338 if (msg.data) {
339 SAFE_FREE(msg.data);
340 }
341 }
342
345 wrtc->recv_queue = NULL;
346 }
347
348 // Destroy synchronization primitives
350 cond_destroy(&wrtc->queue_cond);
352
353 log_debug("Destroyed WebRTC transport resources");
354}
355
356// =============================================================================
357// WebRTC Transport Method Table
358// =============================================================================
359
360static const acip_transport_methods_t webrtc_methods = {
361 .send = webrtc_send,
362 .recv = webrtc_recv,
363 .close = webrtc_close,
364 .get_type = webrtc_get_type,
365 .get_socket = webrtc_get_socket,
366 .is_connected = webrtc_is_connected,
367 .destroy_impl = webrtc_destroy_impl,
368};
369
370// =============================================================================
371// WebRTC Transport Creation
372// =============================================================================
373
375 crypto_context_t *crypto_ctx) {
376 if (!peer_conn || !data_channel) {
377 SET_ERRNO(ERROR_INVALID_PARAM, "peer_conn and data_channel are required");
378 return NULL;
379 }
380
381 // Allocate transport structure
383 if (!transport) {
384 SET_ERRNO(ERROR_MEMORY, "Failed to allocate WebRTC transport");
385 return NULL;
386 }
387
388 // Allocate WebRTC-specific data
390 if (!wrtc_data) {
391 SAFE_FREE(transport);
392 SET_ERRNO(ERROR_MEMORY, "Failed to allocate WebRTC transport data");
393 return NULL;
394 }
395
396 // Create receive queue
398 if (!wrtc_data->recv_queue) {
399 SAFE_FREE(wrtc_data);
400 SAFE_FREE(transport);
401 SET_ERRNO(ERROR_MEMORY, "Failed to create receive queue");
402 return NULL;
403 }
404
405 // Initialize synchronization primitives
406 if (mutex_init(&wrtc_data->queue_mutex) != 0) {
407 ringbuffer_destroy(wrtc_data->recv_queue);
408 SAFE_FREE(wrtc_data);
409 SAFE_FREE(transport);
410 SET_ERRNO(ERROR_INTERNAL, "Failed to initialize queue mutex");
411 return NULL;
412 }
413
414 if (cond_init(&wrtc_data->queue_cond) != 0) {
415 mutex_destroy(&wrtc_data->queue_mutex);
416 ringbuffer_destroy(wrtc_data->recv_queue);
417 SAFE_FREE(wrtc_data);
418 SAFE_FREE(transport);
419 SET_ERRNO(ERROR_INTERNAL, "Failed to initialize queue condition variable");
420 return NULL;
421 }
422
423 if (mutex_init(&wrtc_data->state_mutex) != 0) {
424 cond_destroy(&wrtc_data->queue_cond);
425 mutex_destroy(&wrtc_data->queue_mutex);
426 ringbuffer_destroy(wrtc_data->recv_queue);
427 SAFE_FREE(wrtc_data);
428 SAFE_FREE(transport);
429 SET_ERRNO(ERROR_INTERNAL, "Failed to initialize state mutex");
430 return NULL;
431 }
432
433 // Initialize WebRTC data
434 wrtc_data->peer_conn = peer_conn;
435 wrtc_data->data_channel = data_channel;
436 wrtc_data->is_connected = false; // Will be set to true in on_open callback
437
438 // Register DataChannel callbacks
440 .on_open = webrtc_on_open,
441 .on_close = webrtc_on_close,
442 .on_error = webrtc_on_error,
443 .on_message = webrtc_on_message,
444 .user_data = wrtc_data,
445 };
446
447 asciichat_error_t result = webrtc_datachannel_set_callbacks(data_channel, &callbacks);
448 if (result != ASCIICHAT_OK) {
449 mutex_destroy(&wrtc_data->state_mutex);
450 cond_destroy(&wrtc_data->queue_cond);
451 mutex_destroy(&wrtc_data->queue_mutex);
452 ringbuffer_destroy(wrtc_data->recv_queue);
453 SAFE_FREE(wrtc_data);
454 SAFE_FREE(transport);
455 SET_ERRNO(ERROR_INTERNAL, "Failed to set DataChannel callbacks");
456 return NULL;
457 }
458
459 // Initialize transport
460 transport->methods = &webrtc_methods;
461 transport->crypto_ctx = crypto_ctx;
462 transport->impl_data = wrtc_data;
463
464 log_info("Created WebRTC transport (crypto: %s)", crypto_ctx ? "enabled" : "disabled");
465
466 return transport;
467}
Cross-platform condition variable interface for ascii-chat.
#define SAFE_FREE(ptr)
Definition common.h:320
#define SAFE_MALLOC(size, cast)
Definition common.h:208
unsigned char uint8_t
Definition common.h:56
#define SET_ERRNO(code, context_msg,...)
Set error code with custom context message and log it.
asciichat_error_t
Error and exit codes - unified status values (0-255)
Definition error_codes.h:46
@ ERROR_NETWORK
Definition error_codes.h:69
@ ERROR_MEMORY
Definition error_codes.h:53
@ ASCIICHAT_OK
Definition error_codes.h:48
@ ERROR_INVALID_PARAM
@ ERROR_INTERNAL
Definition error_codes.h:84
#define log_warn(...)
Log a WARN message.
#define log_error(...)
Log an ERROR message.
#define log_debug_every(interval_us, fmt,...)
Rate-limited DEBUG logging.
#define log_info(...)
Log an INFO message.
#define log_debug(...)
Log a DEBUG message.
int mutex_init(mutex_t *mutex)
Initialize a mutex.
int cond_init(cond_t *cond)
Initialize a condition variable.
int socket_t
Socket handle type (POSIX: int)
Definition socket.h:50
#define mutex_lock(mutex)
Lock a mutex (with debug tracking in debug builds)
Definition mutex.h:140
int cond_broadcast(cond_t *cond)
Broadcast to a condition variable (wake all waiting threads)
pthread_mutex_t mutex_t
Mutex type (POSIX: pthread_mutex_t)
Definition mutex.h:38
#define INVALID_SOCKET_VALUE
Invalid socket value (POSIX: -1)
Definition socket.h:52
int cond_signal(cond_t *cond)
Signal a condition variable (wake one waiting thread)
pthread_cond_t cond_t
Condition variable type (POSIX: pthread_cond_t)
Definition cond.h:38
#define mutex_unlock(mutex)
Unlock a mutex (with debug tracking in debug builds)
Definition mutex.h:175
int cond_wait(cond_t *cond, mutex_t *mutex)
Wait on a condition variable (blocking)
int cond_destroy(cond_t *cond)
Destroy a condition variable.
int mutex_destroy(mutex_t *mutex)
Destroy a mutex.
ringbuffer_t * ringbuffer_create(size_t element_size, size_t capacity)
Create a new ring buffer.
Definition ringbuffer.c:28
size_t ringbuffer_size(const ringbuffer_t *rb)
Get current number of elements in the buffer.
Definition ringbuffer.c:122
void ringbuffer_destroy(ringbuffer_t *rb)
Destroy a ring buffer and free its memory.
Definition ringbuffer.c:54
bool ringbuffer_is_empty(const ringbuffer_t *rb)
Check if buffer is empty.
Definition ringbuffer.c:126
bool ringbuffer_read(ringbuffer_t *rb, void *data)
Try to read an element from the ring buffer (non-blocking)
Definition ringbuffer.c:83
bool ringbuffer_write(ringbuffer_t *rb, const void *data)
Try to write an element to the ring buffer (non-blocking)
Definition ringbuffer.c:61
asciichat_error_t webrtc_datachannel_set_callbacks(webrtc_data_channel_t *dc, const webrtc_datachannel_callbacks_t *callbacks)
Set DataChannel callbacks.
void webrtc_datachannel_destroy(webrtc_data_channel_t *dc)
Destroy a DataChannel and free resources.
void webrtc_peer_connection_close(webrtc_peer_connection_t *pc)
Close a peer connection.
void webrtc_peer_connection_destroy(webrtc_peer_connection_t *pc)
Destroy a peer connection and free resources.
asciichat_error_t webrtc_datachannel_send(webrtc_data_channel_t *dc, const uint8_t *data, size_t size)
Send data over DataChannel.
📝 Logging API with multiple log levels and terminal output control
Cross-platform mutex interface for ascii-chat.
Lock-Free Ring Buffer and Frame Buffer Management.
Transport method table (virtual function table)
Definition transport.h:81
asciichat_error_t(* send)(acip_transport_t *transport, const void *data, size_t len)
Send data through this transport.
Definition transport.h:95
Transport instance structure.
Definition transport.h:169
void * impl_data
Transport-specific state.
Definition transport.h:172
const acip_transport_methods_t * methods
Method table (virtual functions)
Definition transport.h:170
crypto_context_t * crypto_ctx
Optional encryption context.
Definition transport.h:171
Cryptographic context structure.
Lock-free ring buffer structure.
Definition ringbuffer.h:95
DataChannel callback structure.
void(* on_open)(webrtc_data_channel_t *dc, void *user_data)
Channel opened.
Receive queue element (variable-length message)
size_t len
Message length in bytes.
uint8_t * data
Message data (allocated, caller must free)
WebRTC transport implementation data.
webrtc_peer_connection_t * peer_conn
Peer connection (owned)
bool is_connected
Connection state.
mutex_t queue_mutex
Protect queue operations.
webrtc_data_channel_t * data_channel
Data channel (owned)
mutex_t state_mutex
Protect state changes.
cond_t queue_cond
Signal when messages arrive.
ringbuffer_t * recv_queue
Receive message queue.
Transport abstraction layer for ACIP protocol.
acip_transport_type_t
Transport type enumeration.
Definition transport.h:59
@ ACIP_TRANSPORT_WEBRTC
WebRTC DataChannel (P2P)
Definition transport.h:62
#define WEBRTC_RECV_QUEUE_SIZE
Maximum receive queue size (messages buffered before recv())
acip_transport_t * acip_webrtc_transport_create(webrtc_peer_connection_t *peer_conn, webrtc_data_channel_t *data_channel, crypto_context_t *crypto_ctx)