6#include <ascii-chat/network/consensus/coordinator.h>
7#include <ascii-chat/network/consensus/election.h>
8#include <ascii-chat/network/consensus/metrics.h>
9#include <ascii-chat/util/time.h>
10#include <ascii-chat/log/logging.h>
11#include <ascii-chat/common.h>
15#define CONSENSUS_ROUND_INTERVAL_NS (5ULL * 60 * NS_PER_SEC_INT)
16#define CONSENSUS_COLLECTION_DEADLINE_NS (30ULL * NS_PER_SEC_INT)
43 consensus_election_func_t election_func,
void *election_context,
45 if (!my_id || !topology || !election_func || !out_coordinator) {
47 ERROR_INVALID_PARAM,
"Invalid parameter: my_id=%p, topology=%p, election_func=%p, out_coordinator=%p",
48 (
const void *)my_id, (
const void *)topology, (
const void *)election_func, (
const void *)out_coordinator);
53 return SET_ERRNO(ERROR_MEMORY,
"Failed to allocate coordinator");
56 memset(coordinator, 0,
sizeof(*coordinator));
57 memcpy(coordinator->
my_id, my_id, 16);
64 if (err != ASCIICHAT_OK) {
65 SAFE_FREE(coordinator);
74 log_debug(
"Coordinator created for node %u, first round in 5 minutes", my_id[0]);
76 *out_coordinator = coordinator;
88 SAFE_FREE(coordinator);
95 if (!coordinator || !coordinator->
state) {
100 if (current_state != CONSENSUS_STATE_IDLE) {
112 participant_metrics_t metrics;
114 if (err != ASCIICHAT_OK) {
119 if (err != ASCIICHAT_OK) {
123 log_debug(
"Added own metrics to collection");
133 if (err != ASCIICHAT_OK) {
141 log_info(
"Starting collection round %u, deadline in 30 seconds", coordinator->
next_round_id - 1);
144 err = measure_and_add_metrics(coordinator);
145 if (err != ASCIICHAT_OK) {
157 if (current_state != CONSENSUS_STATE_COLLECTING) {
169 if (err != ASCIICHAT_OK) {
175 if (current_state == CONSENSUS_STATE_ELECTION_START) {
180 if (err != ASCIICHAT_OK) {
187 if (err != ASCIICHAT_OK) {
188 log_warn(
"Election callback returned error: %d", err);
193 uint8_t host_id[16], backup_id[16];
195 if (err == ASCIICHAT_OK) {
198 if (err == ASCIICHAT_OK) {
202 log_info(
"Election complete: host=%u, backup=%u", host_id[0], backup_id[0]);
216 return SET_ERRNO(ERROR_INVALID_PARAM,
"Coordinator is NULL");
222 if (should_start_new_round(coordinator, now_ns)) {
225 asciichat_error_t err = start_collection_round(coordinator, now_ns);
226 if (err != ASCIICHAT_OK) {
227 log_warn(
"Failed to start collection round: %d", err);
236 if (current_state == CONSENSUS_STATE_COLLECTING && has_collection_timed_out(coordinator, now_ns)) {
237 log_warn(
"Collection round timed out, completing early");
238 asciichat_error_t err = complete_collection(coordinator);
239 if (err != ASCIICHAT_OK) {
240 log_error(
"Failed to complete collection on timeout: %d", err);
252 if (!coordinator || !new_topology) {
253 return SET_ERRNO(ERROR_INVALID_PARAM,
"Invalid parameter");
257 coordinator->
topology = new_topology;
261 if (current_state != CONSENSUS_STATE_IDLE) {
262 log_warn(
"Ring topology changed during round, resetting state");
264 coordinator->
state = NULL;
267 if (err != ASCIICHAT_OK) {
272 log_info(
"Ring topology updated");
280 uint64_t deadline_ns) {
282 return SET_ERRNO(ERROR_INVALID_PARAM,
"Coordinator is NULL");
286 if (current_state != CONSENSUS_STATE_IDLE) {
287 return SET_ERRNO(ERROR_INVALID_STATE,
"Cannot start collection, state is %d", current_state);
292 if (err != ASCIICHAT_OK) {
300 err = measure_and_add_metrics(coordinator);
301 if (err != ASCIICHAT_OK) {
302 log_warn(
"Failed to measure metrics: %d", err);
305 log_dev(
"Collection started: round_id=%u, deadline in %u seconds", round_id,
306 (
unsigned int)((deadline_ns -
time_get_ns()) / NS_PER_SEC_INT));
315 const uint8_t sender_id[16],
316 const participant_metrics_t *metrics, uint8_t num_metrics) {
317 if (!coordinator || !sender_id || !metrics) {
318 return SET_ERRNO(ERROR_INVALID_PARAM,
"Invalid parameter");
322 if (current_state != CONSENSUS_STATE_COLLECTING) {
323 return SET_ERRNO(ERROR_INVALID_STATE,
"Cannot accept metrics, state is %d", current_state);
327 for (
int i = 0; i < num_metrics; i++) {
329 if (err != ASCIICHAT_OK) {
330 log_warn(
"Failed to add metric %d: %d", i, err);
334 log_debug(
"Received %d metrics from sender %u", num_metrics, sender_id[0]);
343 const uint8_t host_id[16],
const uint8_t backup_id[16]) {
344 if (!coordinator || !host_id || !backup_id) {
345 return SET_ERRNO(ERROR_INVALID_PARAM,
"Invalid parameter");
353 log_info(
"Election result received: host=%u, backup=%u", host_id[0], backup_id[0]);
357 if (current_state == CONSENSUS_STATE_ELECTION_COMPLETE) {
359 if (err != ASCIICHAT_OK) {
360 log_warn(
"Failed to reset state to IDLE: %d", err);
371 uint8_t out_host_id[16], uint8_t out_backup_id[16]) {
372 if (!coordinator || !out_host_id || !out_backup_id) {
373 return SET_ERRNO(ERROR_INVALID_PARAM,
"Invalid parameter");
378 if (current_state == CONSENSUS_STATE_ELECTION_COMPLETE) {
380 if (err == ASCIICHAT_OK) {
383 if (err == ASCIICHAT_OK) {
395 return SET_ERRNO(ERROR_INVALID_STATE,
"No election result available");
402 if (!coordinator || !coordinator->
state) {
403 return CONSENSUS_STATE_FAILED;
419 if (now_ns >= next_round_ns) {
423 return next_round_ns - now_ns;
430 if (!coordinator || !coordinator->
state) {
asciichat_error_t consensus_coordinator_on_stats_update(consensus_coordinator_t *coordinator, const uint8_t sender_id[16], const participant_metrics_t *metrics, uint8_t num_metrics)
Handle STATS_UPDATE packet.
void consensus_coordinator_destroy(consensus_coordinator_t *coordinator)
Clean up coordinator resources.
#define CONSENSUS_COLLECTION_DEADLINE_NS
int consensus_coordinator_get_metrics_count(const consensus_coordinator_t *coordinator)
Get count of metrics in current round.
asciichat_error_t consensus_coordinator_on_ring_members(consensus_coordinator_t *coordinator, const consensus_topology_t *new_topology)
Update ring topology when participants change.
struct consensus_coordinator consensus_coordinator_t
Internal coordinator structure.
asciichat_error_t consensus_coordinator_process(consensus_coordinator_t *coordinator, uint32_t timeout_ms)
Main orchestration loop.
uint64_t consensus_coordinator_time_until_next_round(const consensus_coordinator_t *coordinator)
Get time until next round.
consensus_state_machine_t consensus_coordinator_get_state(const consensus_coordinator_t *coordinator)
Get current coordinator state.
asciichat_error_t consensus_coordinator_get_current_host(const consensus_coordinator_t *coordinator, uint8_t out_host_id[16], uint8_t out_backup_id[16])
Get currently elected host.
asciichat_error_t consensus_coordinator_on_collection_start(consensus_coordinator_t *coordinator, uint32_t round_id, uint64_t deadline_ns)
Handle STATS_COLLECTION_START packet.
asciichat_error_t consensus_coordinator_on_election_result(consensus_coordinator_t *coordinator, const uint8_t host_id[16], const uint8_t backup_id[16])
Handle ELECTION_RESULT packet.
asciichat_error_t consensus_coordinator_create(const uint8_t my_id[16], const consensus_topology_t *topology, consensus_election_func_t election_func, void *election_context, consensus_coordinator_t **out_coordinator)
Initialize coordinator with given parameters.
#define CONSENSUS_ROUND_INTERVAL_NS
asciichat_error_t consensus_metrics_measure(const uint8_t my_id[16], participant_metrics_t *out_metrics)
asciichat_error_t consensus_state_start_collection(consensus_state_t *state)
asciichat_error_t consensus_state_compute_election(consensus_state_t *state)
asciichat_error_t consensus_state_get_elected_backup(const consensus_state_t *state, uint8_t out_backup_id[16])
void consensus_state_destroy(consensus_state_t *state)
asciichat_error_t consensus_state_get_elected_host(const consensus_state_t *state, uint8_t out_host_id[16])
asciichat_error_t consensus_state_reset_to_idle(consensus_state_t *state)
asciichat_error_t consensus_state_collection_complete(consensus_state_t *state)
int consensus_state_get_metrics_count(const consensus_state_t *state)
asciichat_error_t consensus_state_create(const uint8_t my_id[16], const consensus_topology_t *topology, consensus_state_t **out_state)
consensus_state_machine_t consensus_state_get_current_state(const consensus_state_t *state)
asciichat_error_t consensus_state_add_metrics(consensus_state_t *state, const participant_metrics_t *metrics)
Internal coordinator structure.
uint64_t collection_deadline_ns
uint64_t last_round_start_ns
uint8_t stored_host_id[16]
uint8_t stored_backup_id[16]
const consensus_topology_t * topology
consensus_election_func_t election_func
consensus_state_t * state
Ring topology for consensus participants.
bool consensus_topology_am_leader(const consensus_topology_t *topology)
uint64_t time_get_ns(void)
uint64_t time_elapsed_ns(uint64_t start_ns, uint64_t end_ns)