ascii-chat 0.8.38
Real-time terminal-based video chat with ASCII art conversion
Loading...
Searching...
No Matches
coordinator.c File Reference

Ring consensus coordinator implementation. More...

Go to the source code of this file.

Data Structures

struct  consensus_coordinator
 Internal coordinator structure. More...
 

Macros

#define CONSENSUS_ROUND_INTERVAL_NS   (5ULL * 60 * NS_PER_SEC_INT) /* 5 minutes */
 
#define CONSENSUS_COLLECTION_DEADLINE_NS   (30ULL * NS_PER_SEC_INT) /* 30 seconds */
 

Typedefs

typedef struct consensus_coordinator consensus_coordinator_t
 Internal coordinator structure.
 

Functions

asciichat_error_t consensus_coordinator_create (const uint8_t my_id[16], const consensus_topology_t *topology, consensus_election_func_t election_func, void *election_context, consensus_coordinator_t **out_coordinator)
 Initialize coordinator with given parameters.
 
void consensus_coordinator_destroy (consensus_coordinator_t *coordinator)
 Clean up coordinator resources.
 
asciichat_error_t consensus_coordinator_process (consensus_coordinator_t *coordinator, uint32_t timeout_ms)
 Main orchestration loop.
 
asciichat_error_t consensus_coordinator_on_ring_members (consensus_coordinator_t *coordinator, const consensus_topology_t *new_topology)
 Update ring topology when participants change.
 
asciichat_error_t consensus_coordinator_on_collection_start (consensus_coordinator_t *coordinator, uint32_t round_id, uint64_t deadline_ns)
 Handle STATS_COLLECTION_START packet.
 
asciichat_error_t consensus_coordinator_on_stats_update (consensus_coordinator_t *coordinator, const uint8_t sender_id[16], const participant_metrics_t *metrics, uint8_t num_metrics)
 Handle STATS_UPDATE packet.
 
asciichat_error_t consensus_coordinator_on_election_result (consensus_coordinator_t *coordinator, const uint8_t host_id[16], const uint8_t backup_id[16])
 Handle ELECTION_RESULT packet.
 
asciichat_error_t consensus_coordinator_get_current_host (const consensus_coordinator_t *coordinator, uint8_t out_host_id[16], uint8_t out_backup_id[16])
 Get currently elected host.
 
consensus_state_machine_t consensus_coordinator_get_state (const consensus_coordinator_t *coordinator)
 Get current coordinator state.
 
uint64_t consensus_coordinator_time_until_next_round (const consensus_coordinator_t *coordinator)
 Get time until next round.
 
int consensus_coordinator_get_metrics_count (const consensus_coordinator_t *coordinator)
 Get count of metrics in current round.
 

Detailed Description

Ring consensus coordinator implementation.

Definition in file coordinator.c.

Macro Definition Documentation

◆ CONSENSUS_COLLECTION_DEADLINE_NS

#define CONSENSUS_COLLECTION_DEADLINE_NS   (30ULL * NS_PER_SEC_INT) /* 30 seconds */

Definition at line 16 of file coordinator.c.

◆ CONSENSUS_ROUND_INTERVAL_NS

#define CONSENSUS_ROUND_INTERVAL_NS   (5ULL * 60 * NS_PER_SEC_INT) /* 5 minutes */

Definition at line 15 of file coordinator.c.

Typedef Documentation

◆ consensus_coordinator_t

Internal coordinator structure.

Function Documentation

◆ consensus_coordinator_create()

asciichat_error_t consensus_coordinator_create ( const uint8_t  my_id[16],
const consensus_topology_t topology,
consensus_election_func_t  election_func,
void *  election_context,
consensus_coordinator_t **  out_coordinator 
)

Initialize coordinator with given parameters.

Definition at line 42 of file coordinator.c.

44 {
45 if (!my_id || !topology || !election_func || !out_coordinator) {
46 return SET_ERRNO(
47 ERROR_INVALID_PARAM, "Invalid parameter: my_id=%p, topology=%p, election_func=%p, out_coordinator=%p",
48 (const void *)my_id, (const void *)topology, (const void *)election_func, (const void *)out_coordinator);
49 }
50
51 consensus_coordinator_t *coordinator = SAFE_MALLOC(sizeof(consensus_coordinator_t), consensus_coordinator_t *);
52 if (!coordinator) {
53 return SET_ERRNO(ERROR_MEMORY, "Failed to allocate coordinator");
54 }
55
56 memset(coordinator, 0, sizeof(*coordinator));
57 memcpy(coordinator->my_id, my_id, 16);
58 coordinator->topology = topology;
59 coordinator->election_func = election_func;
60 coordinator->election_context = election_context;
61
62 /* Initialize state machine */
63 asciichat_error_t err = consensus_state_create(my_id, topology, &coordinator->state);
64 if (err != ASCIICHAT_OK) {
65 SAFE_FREE(coordinator);
66 return err;
67 }
68
69 /* Initialize round scheduling */
70 coordinator->last_round_start_ns = time_get_ns();
71 coordinator->next_round_id = 1;
72 coordinator->has_stored_result = false;
73
74 log_debug("Coordinator created for node %u, first round in 5 minutes", my_id[0]);
75
76 *out_coordinator = coordinator;
77 return ASCIICHAT_OK;
78}
asciichat_error_t consensus_state_create(const uint8_t my_id[16], const consensus_topology_t *topology, consensus_state_t **out_state)
Definition state.c:50
Internal coordinator structure.
Definition coordinator.c:21
uint64_t last_round_start_ns
Definition coordinator.c:29
const consensus_topology_t * topology
Definition coordinator.c:23
consensus_election_func_t election_func
Definition coordinator.c:25
consensus_state_t * state
Definition coordinator.c:24
uint64_t time_get_ns(void)
Definition util/time.c:48

References consensus_state_create(), consensus_coordinator::election_context, consensus_coordinator::election_func, consensus_coordinator::has_stored_result, consensus_coordinator::last_round_start_ns, consensus_coordinator::my_id, consensus_coordinator::next_round_id, consensus_coordinator::state, time_get_ns(), and consensus_coordinator::topology.

Referenced by session_consensus_create().

◆ consensus_coordinator_destroy()

void consensus_coordinator_destroy ( consensus_coordinator_t coordinator)

Clean up coordinator resources.

Definition at line 83 of file coordinator.c.

83 {
84 if (!coordinator) {
85 return;
86 }
87 consensus_state_destroy(coordinator->state);
88 SAFE_FREE(coordinator);
89}
void consensus_state_destroy(consensus_state_t *state)
Definition state.c:74

References consensus_state_destroy(), and consensus_coordinator::state.

Referenced by session_consensus_destroy().

◆ consensus_coordinator_get_current_host()

asciichat_error_t consensus_coordinator_get_current_host ( const consensus_coordinator_t coordinator,
uint8_t  out_host_id[16],
uint8_t  out_backup_id[16] 
)

Get currently elected host.

Definition at line 370 of file coordinator.c.

371 {
372 if (!coordinator || !out_host_id || !out_backup_id) {
373 return SET_ERRNO(ERROR_INVALID_PARAM, "Invalid parameter");
374 }
375
376 /* Try to get from current state first */
377 consensus_state_machine_t current_state = consensus_state_get_current_state(coordinator->state);
378 if (current_state == CONSENSUS_STATE_ELECTION_COMPLETE) {
379 asciichat_error_t err = consensus_state_get_elected_host(coordinator->state, out_host_id);
380 if (err == ASCIICHAT_OK) {
381 err = consensus_state_get_elected_backup(coordinator->state, out_backup_id);
382 }
383 if (err == ASCIICHAT_OK) {
384 return ASCIICHAT_OK;
385 }
386 }
387
388 /* Fall back to stored result */
389 if (coordinator->has_stored_result) {
390 memcpy(out_host_id, coordinator->stored_host_id, 16);
391 memcpy(out_backup_id, coordinator->stored_backup_id, 16);
392 return ASCIICHAT_OK;
393 }
394
395 return SET_ERRNO(ERROR_INVALID_STATE, "No election result available");
396}
asciichat_error_t consensus_state_get_elected_backup(const consensus_state_t *state, uint8_t out_backup_id[16])
Definition state.c:230
asciichat_error_t consensus_state_get_elected_host(const consensus_state_t *state, uint8_t out_host_id[16])
Definition state.c:217
consensus_state_machine_t consensus_state_get_current_state(const consensus_state_t *state)
Definition state.c:210
uint8_t stored_host_id[16]
Definition coordinator.c:34
uint8_t stored_backup_id[16]
Definition coordinator.c:35

References consensus_state_get_current_state(), consensus_state_get_elected_backup(), consensus_state_get_elected_host(), consensus_coordinator::has_stored_result, consensus_coordinator::state, consensus_coordinator::stored_backup_id, and consensus_coordinator::stored_host_id.

Referenced by session_consensus_get_elected_host(), and session_consensus_is_ready().

◆ consensus_coordinator_get_metrics_count()

int consensus_coordinator_get_metrics_count ( const consensus_coordinator_t coordinator)

Get count of metrics in current round.

Definition at line 429 of file coordinator.c.

429 {
430 if (!coordinator || !coordinator->state) {
431 return -1;
432 }
433 return consensus_state_get_metrics_count(coordinator->state);
434}
int consensus_state_get_metrics_count(const consensus_state_t *state)
Definition state.c:250

References consensus_state_get_metrics_count(), and consensus_coordinator::state.

Referenced by session_consensus_get_metrics_count().

◆ consensus_coordinator_get_state()

consensus_state_machine_t consensus_coordinator_get_state ( const consensus_coordinator_t coordinator)

Get current coordinator state.

Definition at line 401 of file coordinator.c.

401 {
402 if (!coordinator || !coordinator->state) {
403 return CONSENSUS_STATE_FAILED;
404 }
405 return consensus_state_get_current_state(coordinator->state);
406}

References consensus_state_get_current_state(), and consensus_coordinator::state.

Referenced by session_consensus_get_state().

◆ consensus_coordinator_on_collection_start()

asciichat_error_t consensus_coordinator_on_collection_start ( consensus_coordinator_t coordinator,
uint32_t  round_id,
uint64_t  deadline_ns 
)

Handle STATS_COLLECTION_START packet.

Definition at line 279 of file coordinator.c.

280 {
281 if (!coordinator) {
282 return SET_ERRNO(ERROR_INVALID_PARAM, "Coordinator is NULL");
283 }
284
285 consensus_state_machine_t current_state = consensus_state_get_current_state(coordinator->state);
286 if (current_state != CONSENSUS_STATE_IDLE) {
287 return SET_ERRNO(ERROR_INVALID_STATE, "Cannot start collection, state is %d", current_state);
288 }
289
290 /* Start collection */
291 asciichat_error_t err = consensus_state_start_collection(coordinator->state);
292 if (err != ASCIICHAT_OK) {
293 return err;
294 }
295
296 coordinator->next_round_id = round_id;
297 coordinator->collection_deadline_ns = deadline_ns;
298
299 /* Measure and add our metrics */
300 err = measure_and_add_metrics(coordinator);
301 if (err != ASCIICHAT_OK) {
302 log_warn("Failed to measure metrics: %d", err);
303 }
304
305 log_dev("Collection started: round_id=%u, deadline in %u seconds", round_id,
306 (unsigned int)((deadline_ns - time_get_ns()) / NS_PER_SEC_INT));
307
308 return ASCIICHAT_OK;
309}
asciichat_error_t consensus_state_start_collection(consensus_state_t *state)
Definition state.c:83
uint64_t collection_deadline_ns
Definition coordinator.c:31

References consensus_coordinator::collection_deadline_ns, consensus_state_get_current_state(), consensus_state_start_collection(), consensus_coordinator::next_round_id, consensus_coordinator::state, and time_get_ns().

Referenced by session_consensus_on_collection_start().

◆ consensus_coordinator_on_election_result()

asciichat_error_t consensus_coordinator_on_election_result ( consensus_coordinator_t coordinator,
const uint8_t  host_id[16],
const uint8_t  backup_id[16] 
)

Handle ELECTION_RESULT packet.

Definition at line 342 of file coordinator.c.

343 {
344 if (!coordinator || !host_id || !backup_id) {
345 return SET_ERRNO(ERROR_INVALID_PARAM, "Invalid parameter");
346 }
347
348 /* Store the elected host and backup */
349 memcpy(coordinator->stored_host_id, host_id, 16);
350 memcpy(coordinator->stored_backup_id, backup_id, 16);
351 coordinator->has_stored_result = true;
352
353 log_info("Election result received: host=%u, backup=%u", host_id[0], backup_id[0]);
354
355 /* Transition state back to IDLE */
356 consensus_state_machine_t current_state = consensus_state_get_current_state(coordinator->state);
357 if (current_state == CONSENSUS_STATE_ELECTION_COMPLETE) {
358 asciichat_error_t err = consensus_state_reset_to_idle(coordinator->state);
359 if (err != ASCIICHAT_OK) {
360 log_warn("Failed to reset state to IDLE: %d", err);
361 }
362 }
363
364 return ASCIICHAT_OK;
365}
asciichat_error_t consensus_state_reset_to_idle(consensus_state_t *state)
Definition state.c:197

References consensus_state_get_current_state(), consensus_state_reset_to_idle(), consensus_coordinator::has_stored_result, consensus_coordinator::state, consensus_coordinator::stored_backup_id, and consensus_coordinator::stored_host_id.

Referenced by session_consensus_on_election_result().

◆ consensus_coordinator_on_ring_members()

asciichat_error_t consensus_coordinator_on_ring_members ( consensus_coordinator_t coordinator,
const consensus_topology_t new_topology 
)

Update ring topology when participants change.

Definition at line 250 of file coordinator.c.

251 {
252 if (!coordinator || !new_topology) {
253 return SET_ERRNO(ERROR_INVALID_PARAM, "Invalid parameter");
254 }
255
256 /* Update topology reference */
257 coordinator->topology = new_topology;
258
259 /* Reset state if we're in the middle of a round */
260 consensus_state_machine_t current_state = consensus_state_get_current_state(coordinator->state);
261 if (current_state != CONSENSUS_STATE_IDLE) {
262 log_warn("Ring topology changed during round, resetting state");
263 consensus_state_destroy(coordinator->state);
264 coordinator->state = NULL;
265
266 asciichat_error_t err = consensus_state_create(coordinator->my_id, new_topology, &coordinator->state);
267 if (err != ASCIICHAT_OK) {
268 return err;
269 }
270 }
271
272 log_info("Ring topology updated");
273 return ASCIICHAT_OK;
274}

References consensus_state_create(), consensus_state_destroy(), consensus_state_get_current_state(), consensus_coordinator::my_id, consensus_coordinator::state, and consensus_coordinator::topology.

Referenced by session_consensus_set_topology().

◆ consensus_coordinator_on_stats_update()

asciichat_error_t consensus_coordinator_on_stats_update ( consensus_coordinator_t coordinator,
const uint8_t  sender_id[16],
const participant_metrics_t *  metrics,
uint8_t  num_metrics 
)

Handle STATS_UPDATE packet.

Definition at line 314 of file coordinator.c.

316 {
317 if (!coordinator || !sender_id || !metrics) {
318 return SET_ERRNO(ERROR_INVALID_PARAM, "Invalid parameter");
319 }
320
321 consensus_state_machine_t current_state = consensus_state_get_current_state(coordinator->state);
322 if (current_state != CONSENSUS_STATE_COLLECTING) {
323 return SET_ERRNO(ERROR_INVALID_STATE, "Cannot accept metrics, state is %d", current_state);
324 }
325
326 /* Add all metrics to state */
327 for (int i = 0; i < num_metrics; i++) {
328 asciichat_error_t err = consensus_state_add_metrics(coordinator->state, &metrics[i]);
329 if (err != ASCIICHAT_OK) {
330 log_warn("Failed to add metric %d: %d", i, err);
331 }
332 }
333
334 log_debug("Received %d metrics from sender %u", num_metrics, sender_id[0]);
335
336 return ASCIICHAT_OK;
337}
asciichat_error_t consensus_state_add_metrics(consensus_state_t *state, const participant_metrics_t *metrics)
Definition state.c:100

References consensus_state_add_metrics(), consensus_state_get_current_state(), and consensus_coordinator::state.

Referenced by session_consensus_on_stats_update().

◆ consensus_coordinator_process()

asciichat_error_t consensus_coordinator_process ( consensus_coordinator_t coordinator,
uint32_t  timeout_ms 
)

Main orchestration loop.

Definition at line 212 of file coordinator.c.

212 {
213 (void)timeout_ms; /* Currently unused, but kept for future use */
214
215 if (!coordinator) {
216 return SET_ERRNO(ERROR_INVALID_PARAM, "Coordinator is NULL");
217 }
218
219 uint64_t now_ns = time_get_ns();
220
221 /* Check if we should start a new round */
222 if (should_start_new_round(coordinator, now_ns)) {
223 /* Only leader starts rounds */
224 if (consensus_topology_am_leader(coordinator->topology)) {
225 asciichat_error_t err = start_collection_round(coordinator, now_ns);
226 if (err != ASCIICHAT_OK) {
227 log_warn("Failed to start collection round: %d", err);
228 return err;
229 }
230 }
231 }
232
233 consensus_state_machine_t current_state = consensus_state_get_current_state(coordinator->state);
234
235 /* Check for collection timeout */
236 if (current_state == CONSENSUS_STATE_COLLECTING && has_collection_timed_out(coordinator, now_ns)) {
237 log_warn("Collection round timed out, completing early");
238 asciichat_error_t err = complete_collection(coordinator);
239 if (err != ASCIICHAT_OK) {
240 log_error("Failed to complete collection on timeout: %d", err);
241 }
242 }
243
244 return ASCIICHAT_OK;
245}
bool consensus_topology_am_leader(const consensus_topology_t *topology)
Definition topology.c:76

References consensus_state_get_current_state(), consensus_topology_am_leader(), consensus_coordinator::state, time_get_ns(), and consensus_coordinator::topology.

Referenced by session_consensus_process().

◆ consensus_coordinator_time_until_next_round()

uint64_t consensus_coordinator_time_until_next_round ( const consensus_coordinator_t coordinator)

Get time until next round.

Definition at line 411 of file coordinator.c.

411 {
412 if (!coordinator) {
413 return 0;
414 }
415
416 uint64_t now_ns = time_get_ns();
417 uint64_t next_round_ns = coordinator->last_round_start_ns + CONSENSUS_ROUND_INTERVAL_NS;
418
419 if (now_ns >= next_round_ns) {
420 return 0;
421 }
422
423 return next_round_ns - now_ns;
424}
#define CONSENSUS_ROUND_INTERVAL_NS
Definition coordinator.c:15

References CONSENSUS_ROUND_INTERVAL_NS, consensus_coordinator::last_round_start_ns, and time_get_ns().

Referenced by session_consensus_time_until_next_round().