ascii-chat 0.8.38
Real-time terminal-based video chat with ASCII art conversion
Loading...
Searching...
No Matches
coordinator.c
Go to the documentation of this file.
1
6#include <ascii-chat/network/consensus/coordinator.h>
7#include <ascii-chat/network/consensus/election.h>
8#include <ascii-chat/network/consensus/metrics.h>
9#include <ascii-chat/util/time.h>
10#include <ascii-chat/log/logging.h>
11#include <ascii-chat/common.h>
12#include <string.h>
13
14/* Round scheduling constants */
15#define CONSENSUS_ROUND_INTERVAL_NS (5ULL * 60 * NS_PER_SEC_INT) /* 5 minutes */
16#define CONSENSUS_COLLECTION_DEADLINE_NS (30ULL * NS_PER_SEC_INT) /* 30 seconds */
17
21typedef struct consensus_coordinator {
22 uint8_t my_id[16];
25 consensus_election_func_t election_func;
27
28 /* Round scheduling */
29 uint64_t last_round_start_ns; /* When the last round was initiated */
30 uint32_t next_round_id; /* Next round ID to use */
31 uint64_t collection_deadline_ns; /* Deadline for current collection */
32
33 /* Elected hosts - fallback storage */
34 uint8_t stored_host_id[16];
35 uint8_t stored_backup_id[16];
38
42asciichat_error_t consensus_coordinator_create(const uint8_t my_id[16], const consensus_topology_t *topology,
43 consensus_election_func_t election_func, void *election_context,
44 consensus_coordinator_t **out_coordinator) {
45 if (!my_id || !topology || !election_func || !out_coordinator) {
46 return SET_ERRNO(
47 ERROR_INVALID_PARAM, "Invalid parameter: my_id=%p, topology=%p, election_func=%p, out_coordinator=%p",
48 (const void *)my_id, (const void *)topology, (const void *)election_func, (const void *)out_coordinator);
49 }
50
51 consensus_coordinator_t *coordinator = SAFE_MALLOC(sizeof(consensus_coordinator_t), consensus_coordinator_t *);
52 if (!coordinator) {
53 return SET_ERRNO(ERROR_MEMORY, "Failed to allocate coordinator");
54 }
55
56 memset(coordinator, 0, sizeof(*coordinator));
57 memcpy(coordinator->my_id, my_id, 16);
58 coordinator->topology = topology;
59 coordinator->election_func = election_func;
60 coordinator->election_context = election_context;
61
62 /* Initialize state machine */
63 asciichat_error_t err = consensus_state_create(my_id, topology, &coordinator->state);
64 if (err != ASCIICHAT_OK) {
65 SAFE_FREE(coordinator);
66 return err;
67 }
68
69 /* Initialize round scheduling */
70 coordinator->last_round_start_ns = time_get_ns();
71 coordinator->next_round_id = 1;
72 coordinator->has_stored_result = false;
73
74 log_debug("Coordinator created for node %u, first round in 5 minutes", my_id[0]);
75
76 *out_coordinator = coordinator;
77 return ASCIICHAT_OK;
78}
79
84 if (!coordinator) {
85 return;
86 }
87 consensus_state_destroy(coordinator->state);
88 SAFE_FREE(coordinator);
89}
90
94static bool should_start_new_round(consensus_coordinator_t *coordinator, uint64_t now_ns) {
95 if (!coordinator || !coordinator->state) {
96 return false;
97 }
98
99 consensus_state_machine_t current_state = consensus_state_get_current_state(coordinator->state);
100 if (current_state != CONSENSUS_STATE_IDLE) {
101 return false;
102 }
103
104 uint64_t time_since_last_round = time_elapsed_ns(coordinator->last_round_start_ns, now_ns);
105 return time_since_last_round >= CONSENSUS_ROUND_INTERVAL_NS;
106}
107
111static asciichat_error_t measure_and_add_metrics(consensus_coordinator_t *coordinator) {
112 participant_metrics_t metrics;
113 asciichat_error_t err = consensus_metrics_measure(coordinator->my_id, &metrics);
114 if (err != ASCIICHAT_OK) {
115 return err;
116 }
117
118 err = consensus_state_add_metrics(coordinator->state, &metrics);
119 if (err != ASCIICHAT_OK) {
120 return err;
121 }
122
123 log_debug("Added own metrics to collection");
124 return ASCIICHAT_OK;
125}
126
130static asciichat_error_t start_collection_round(consensus_coordinator_t *coordinator, uint64_t now_ns) {
131 /* Transition state machine */
132 asciichat_error_t err = consensus_state_start_collection(coordinator->state);
133 if (err != ASCIICHAT_OK) {
134 return err;
135 }
136
137 coordinator->last_round_start_ns = now_ns;
139 coordinator->next_round_id++;
140
141 log_info("Starting collection round %u, deadline in 30 seconds", coordinator->next_round_id - 1);
142
143 /* Measure our own metrics immediately */
144 err = measure_and_add_metrics(coordinator);
145 if (err != ASCIICHAT_OK) {
146 return err;
147 }
148
149 return ASCIICHAT_OK;
150}
151
155static bool has_collection_timed_out(consensus_coordinator_t *coordinator, uint64_t now_ns) {
156 consensus_state_machine_t current_state = consensus_state_get_current_state(coordinator->state);
157 if (current_state != CONSENSUS_STATE_COLLECTING) {
158 return false;
159 }
160
161 return now_ns >= coordinator->collection_deadline_ns;
162}
163
167static asciichat_error_t complete_collection(consensus_coordinator_t *coordinator) {
168 asciichat_error_t err = consensus_state_collection_complete(coordinator->state);
169 if (err != ASCIICHAT_OK) {
170 return err;
171 }
172
173 consensus_state_machine_t current_state = consensus_state_get_current_state(coordinator->state);
174
175 if (current_state == CONSENSUS_STATE_ELECTION_START) {
176 /* Leader: Run election */
177 log_info("Leader: Computing election from %d metrics", consensus_state_get_metrics_count(coordinator->state));
178
179 err = consensus_state_compute_election(coordinator->state);
180 if (err != ASCIICHAT_OK) {
181 return err;
182 }
183
184 /* Optionally run election callback */
185 if (coordinator->election_func) {
186 err = coordinator->election_func(coordinator->election_context, coordinator->state);
187 if (err != ASCIICHAT_OK) {
188 log_warn("Election callback returned error: %d", err);
189 }
190 }
191
192 /* Store elected hosts */
193 uint8_t host_id[16], backup_id[16];
194 err = consensus_state_get_elected_host(coordinator->state, host_id);
195 if (err == ASCIICHAT_OK) {
196 err = consensus_state_get_elected_backup(coordinator->state, backup_id);
197 }
198 if (err == ASCIICHAT_OK) {
199 memcpy(coordinator->stored_host_id, host_id, 16);
200 memcpy(coordinator->stored_backup_id, backup_id, 16);
201 coordinator->has_stored_result = true;
202 log_info("Election complete: host=%u, backup=%u", host_id[0], backup_id[0]);
203 }
204 }
205
206 return ASCIICHAT_OK;
207}
208
212asciichat_error_t consensus_coordinator_process(consensus_coordinator_t *coordinator, uint32_t timeout_ms) {
213 (void)timeout_ms; /* Currently unused, but kept for future use */
214
215 if (!coordinator) {
216 return SET_ERRNO(ERROR_INVALID_PARAM, "Coordinator is NULL");
217 }
218
219 uint64_t now_ns = time_get_ns();
220
221 /* Check if we should start a new round */
222 if (should_start_new_round(coordinator, now_ns)) {
223 /* Only leader starts rounds */
224 if (consensus_topology_am_leader(coordinator->topology)) {
225 asciichat_error_t err = start_collection_round(coordinator, now_ns);
226 if (err != ASCIICHAT_OK) {
227 log_warn("Failed to start collection round: %d", err);
228 return err;
229 }
230 }
231 }
232
233 consensus_state_machine_t current_state = consensus_state_get_current_state(coordinator->state);
234
235 /* Check for collection timeout */
236 if (current_state == CONSENSUS_STATE_COLLECTING && has_collection_timed_out(coordinator, now_ns)) {
237 log_warn("Collection round timed out, completing early");
238 asciichat_error_t err = complete_collection(coordinator);
239 if (err != ASCIICHAT_OK) {
240 log_error("Failed to complete collection on timeout: %d", err);
241 }
242 }
243
244 return ASCIICHAT_OK;
245}
246
251 const consensus_topology_t *new_topology) {
252 if (!coordinator || !new_topology) {
253 return SET_ERRNO(ERROR_INVALID_PARAM, "Invalid parameter");
254 }
255
256 /* Update topology reference */
257 coordinator->topology = new_topology;
258
259 /* Reset state if we're in the middle of a round */
260 consensus_state_machine_t current_state = consensus_state_get_current_state(coordinator->state);
261 if (current_state != CONSENSUS_STATE_IDLE) {
262 log_warn("Ring topology changed during round, resetting state");
263 consensus_state_destroy(coordinator->state);
264 coordinator->state = NULL;
265
266 asciichat_error_t err = consensus_state_create(coordinator->my_id, new_topology, &coordinator->state);
267 if (err != ASCIICHAT_OK) {
268 return err;
269 }
270 }
271
272 log_info("Ring topology updated");
273 return ASCIICHAT_OK;
274}
275
279asciichat_error_t consensus_coordinator_on_collection_start(consensus_coordinator_t *coordinator, uint32_t round_id,
280 uint64_t deadline_ns) {
281 if (!coordinator) {
282 return SET_ERRNO(ERROR_INVALID_PARAM, "Coordinator is NULL");
283 }
284
285 consensus_state_machine_t current_state = consensus_state_get_current_state(coordinator->state);
286 if (current_state != CONSENSUS_STATE_IDLE) {
287 return SET_ERRNO(ERROR_INVALID_STATE, "Cannot start collection, state is %d", current_state);
288 }
289
290 /* Start collection */
291 asciichat_error_t err = consensus_state_start_collection(coordinator->state);
292 if (err != ASCIICHAT_OK) {
293 return err;
294 }
295
296 coordinator->next_round_id = round_id;
297 coordinator->collection_deadline_ns = deadline_ns;
298
299 /* Measure and add our metrics */
300 err = measure_and_add_metrics(coordinator);
301 if (err != ASCIICHAT_OK) {
302 log_warn("Failed to measure metrics: %d", err);
303 }
304
305 log_dev("Collection started: round_id=%u, deadline in %u seconds", round_id,
306 (unsigned int)((deadline_ns - time_get_ns()) / NS_PER_SEC_INT));
307
308 return ASCIICHAT_OK;
309}
310
315 const uint8_t sender_id[16],
316 const participant_metrics_t *metrics, uint8_t num_metrics) {
317 if (!coordinator || !sender_id || !metrics) {
318 return SET_ERRNO(ERROR_INVALID_PARAM, "Invalid parameter");
319 }
320
321 consensus_state_machine_t current_state = consensus_state_get_current_state(coordinator->state);
322 if (current_state != CONSENSUS_STATE_COLLECTING) {
323 return SET_ERRNO(ERROR_INVALID_STATE, "Cannot accept metrics, state is %d", current_state);
324 }
325
326 /* Add all metrics to state */
327 for (int i = 0; i < num_metrics; i++) {
328 asciichat_error_t err = consensus_state_add_metrics(coordinator->state, &metrics[i]);
329 if (err != ASCIICHAT_OK) {
330 log_warn("Failed to add metric %d: %d", i, err);
331 }
332 }
333
334 log_debug("Received %d metrics from sender %u", num_metrics, sender_id[0]);
335
336 return ASCIICHAT_OK;
337}
338
343 const uint8_t host_id[16], const uint8_t backup_id[16]) {
344 if (!coordinator || !host_id || !backup_id) {
345 return SET_ERRNO(ERROR_INVALID_PARAM, "Invalid parameter");
346 }
347
348 /* Store the elected host and backup */
349 memcpy(coordinator->stored_host_id, host_id, 16);
350 memcpy(coordinator->stored_backup_id, backup_id, 16);
351 coordinator->has_stored_result = true;
352
353 log_info("Election result received: host=%u, backup=%u", host_id[0], backup_id[0]);
354
355 /* Transition state back to IDLE */
356 consensus_state_machine_t current_state = consensus_state_get_current_state(coordinator->state);
357 if (current_state == CONSENSUS_STATE_ELECTION_COMPLETE) {
358 asciichat_error_t err = consensus_state_reset_to_idle(coordinator->state);
359 if (err != ASCIICHAT_OK) {
360 log_warn("Failed to reset state to IDLE: %d", err);
361 }
362 }
363
364 return ASCIICHAT_OK;
365}
366
371 uint8_t out_host_id[16], uint8_t out_backup_id[16]) {
372 if (!coordinator || !out_host_id || !out_backup_id) {
373 return SET_ERRNO(ERROR_INVALID_PARAM, "Invalid parameter");
374 }
375
376 /* Try to get from current state first */
377 consensus_state_machine_t current_state = consensus_state_get_current_state(coordinator->state);
378 if (current_state == CONSENSUS_STATE_ELECTION_COMPLETE) {
379 asciichat_error_t err = consensus_state_get_elected_host(coordinator->state, out_host_id);
380 if (err == ASCIICHAT_OK) {
381 err = consensus_state_get_elected_backup(coordinator->state, out_backup_id);
382 }
383 if (err == ASCIICHAT_OK) {
384 return ASCIICHAT_OK;
385 }
386 }
387
388 /* Fall back to stored result */
389 if (coordinator->has_stored_result) {
390 memcpy(out_host_id, coordinator->stored_host_id, 16);
391 memcpy(out_backup_id, coordinator->stored_backup_id, 16);
392 return ASCIICHAT_OK;
393 }
394
395 return SET_ERRNO(ERROR_INVALID_STATE, "No election result available");
396}
397
401consensus_state_machine_t consensus_coordinator_get_state(const consensus_coordinator_t *coordinator) {
402 if (!coordinator || !coordinator->state) {
403 return CONSENSUS_STATE_FAILED;
404 }
405 return consensus_state_get_current_state(coordinator->state);
406}
407
412 if (!coordinator) {
413 return 0;
414 }
415
416 uint64_t now_ns = time_get_ns();
417 uint64_t next_round_ns = coordinator->last_round_start_ns + CONSENSUS_ROUND_INTERVAL_NS;
418
419 if (now_ns >= next_round_ns) {
420 return 0;
421 }
422
423 return next_round_ns - now_ns;
424}
425
430 if (!coordinator || !coordinator->state) {
431 return -1;
432 }
433 return consensus_state_get_metrics_count(coordinator->state);
434}
asciichat_error_t consensus_coordinator_on_stats_update(consensus_coordinator_t *coordinator, const uint8_t sender_id[16], const participant_metrics_t *metrics, uint8_t num_metrics)
Handle STATS_UPDATE packet.
void consensus_coordinator_destroy(consensus_coordinator_t *coordinator)
Clean up coordinator resources.
Definition coordinator.c:83
#define CONSENSUS_COLLECTION_DEADLINE_NS
Definition coordinator.c:16
int consensus_coordinator_get_metrics_count(const consensus_coordinator_t *coordinator)
Get count of metrics in current round.
asciichat_error_t consensus_coordinator_on_ring_members(consensus_coordinator_t *coordinator, const consensus_topology_t *new_topology)
Update ring topology when participants change.
struct consensus_coordinator consensus_coordinator_t
Internal coordinator structure.
asciichat_error_t consensus_coordinator_process(consensus_coordinator_t *coordinator, uint32_t timeout_ms)
Main orchestration loop.
uint64_t consensus_coordinator_time_until_next_round(const consensus_coordinator_t *coordinator)
Get time until next round.
consensus_state_machine_t consensus_coordinator_get_state(const consensus_coordinator_t *coordinator)
Get current coordinator state.
asciichat_error_t consensus_coordinator_get_current_host(const consensus_coordinator_t *coordinator, uint8_t out_host_id[16], uint8_t out_backup_id[16])
Get currently elected host.
asciichat_error_t consensus_coordinator_on_collection_start(consensus_coordinator_t *coordinator, uint32_t round_id, uint64_t deadline_ns)
Handle STATS_COLLECTION_START packet.
asciichat_error_t consensus_coordinator_on_election_result(consensus_coordinator_t *coordinator, const uint8_t host_id[16], const uint8_t backup_id[16])
Handle ELECTION_RESULT packet.
asciichat_error_t consensus_coordinator_create(const uint8_t my_id[16], const consensus_topology_t *topology, consensus_election_func_t election_func, void *election_context, consensus_coordinator_t **out_coordinator)
Initialize coordinator with given parameters.
Definition coordinator.c:42
#define CONSENSUS_ROUND_INTERVAL_NS
Definition coordinator.c:15
asciichat_error_t consensus_metrics_measure(const uint8_t my_id[16], participant_metrics_t *out_metrics)
Definition metrics.c:86
asciichat_error_t consensus_state_start_collection(consensus_state_t *state)
Definition state.c:83
asciichat_error_t consensus_state_compute_election(consensus_state_t *state)
Definition state.c:144
asciichat_error_t consensus_state_get_elected_backup(const consensus_state_t *state, uint8_t out_backup_id[16])
Definition state.c:230
void consensus_state_destroy(consensus_state_t *state)
Definition state.c:74
asciichat_error_t consensus_state_get_elected_host(const consensus_state_t *state, uint8_t out_host_id[16])
Definition state.c:217
asciichat_error_t consensus_state_reset_to_idle(consensus_state_t *state)
Definition state.c:197
asciichat_error_t consensus_state_collection_complete(consensus_state_t *state)
Definition state.c:125
int consensus_state_get_metrics_count(const consensus_state_t *state)
Definition state.c:250
asciichat_error_t consensus_state_create(const uint8_t my_id[16], const consensus_topology_t *topology, consensus_state_t **out_state)
Definition state.c:50
consensus_state_machine_t consensus_state_get_current_state(const consensus_state_t *state)
Definition state.c:210
asciichat_error_t consensus_state_add_metrics(consensus_state_t *state, const participant_metrics_t *metrics)
Definition state.c:100
Internal coordinator structure.
Definition coordinator.c:21
uint64_t collection_deadline_ns
Definition coordinator.c:31
uint64_t last_round_start_ns
Definition coordinator.c:29
uint8_t stored_host_id[16]
Definition coordinator.c:34
uint8_t stored_backup_id[16]
Definition coordinator.c:35
const consensus_topology_t * topology
Definition coordinator.c:23
consensus_election_func_t election_func
Definition coordinator.c:25
consensus_state_t * state
Definition coordinator.c:24
State machine instance.
Definition state.c:24
Ring topology for consensus participants.
Definition topology.c:17
bool consensus_topology_am_leader(const consensus_topology_t *topology)
Definition topology.c:76
uint64_t time_get_ns(void)
Definition util/time.c:48
uint64_t time_elapsed_ns(uint64_t start_ns, uint64_t end_ns)
Definition util/time.c:90