7#include <ascii-chat/session/consensus.h>
8#include <ascii-chat/network/consensus/state.h>
9#include <ascii-chat/network/consensus/coordinator.h>
10#include <ascii-chat/network/consensus/topology.h>
11#include <ascii-chat/network/consensus/election.h>
12#include <ascii-chat/network/consensus/metrics.h>
13#include <ascii-chat/common.h>
14#include <ascii-chat/log/logging.h>
15#include <ascii-chat/util/endian.h>
44static asciichat_error_t session_consensus_election_bridge(
void *context,
consensus_state_t *state) {
47 return SET_ERRNO(ERROR_INVALID_PARAM,
"consensus context is NULL");
52 if (num_metrics < 0) {
53 return SET_ERRNO(ERROR_INVALID_STATE,
"No metrics collected");
57 participant_metrics_t *metrics = SAFE_MALLOC(num_metrics *
sizeof(participant_metrics_t), participant_metrics_t *);
59 return SET_ERRNO(ERROR_MEMORY,
"Failed to allocate metrics array");
63 for (
int i = 0; i < num_metrics; i++) {
65 if (err != ASCIICHAT_OK) {
73 int backup_index = -1;
75 asciichat_error_t result;
78 result = consensus->
election(consensus->
context, metrics, num_metrics, &best_index, &backup_index);
84 if (result != ASCIICHAT_OK) {
89 if (best_index < 0 || best_index >= num_metrics) {
91 return SET_ERRNO(ERROR_INVALID_STATE,
"Invalid best host index: %d", best_index);
94 if (backup_index < 0 || backup_index >= num_metrics) {
96 return SET_ERRNO(ERROR_INVALID_STATE,
"Invalid backup host index: %d", backup_index);
102 log_info(
"Session consensus election: best=%d, backup=%d", best_index, backup_index);
109 const uint8_t participant_ids[64][16],
int num_participants,
110 const session_consensus_callbacks_t *callbacks,
112 if (!my_id || !callbacks || !out_consensus) {
113 return SET_ERRNO(ERROR_INVALID_PARAM,
"Invalid parameters: my_id=%p, callbacks=%p, out_consensus=%p", my_id,
114 callbacks, out_consensus);
117 if (!callbacks->send_packet || !callbacks->on_election || !callbacks->get_metrics) {
118 return SET_ERRNO(ERROR_INVALID_PARAM,
"Required callbacks missing: send_packet=%p, on_election=%p, get_metrics=%p",
119 callbacks->send_packet, callbacks->on_election, callbacks->get_metrics);
122 if (num_participants < 1 || num_participants > 64) {
123 return SET_ERRNO(ERROR_INVALID_PARAM,
"Invalid participant count: %d", num_participants);
129 return SET_ERRNO(ERROR_MEMORY,
"Failed to allocate session consensus");
133 memcpy(consensus->
my_id, my_id, 16);
140 consensus->
election = callbacks->election;
141 consensus->
context = callbacks->context;
145 if (err != ASCIICHAT_OK) {
146 SAFE_FREE(consensus);
153 if (err != ASCIICHAT_OK) {
155 SAFE_FREE(consensus);
159 log_debug(
"Session consensus created: my_id=%.*s, is_leader=%d, participants=%d", 16, my_id, is_leader,
162 *out_consensus = consensus;
179 SAFE_FREE(consensus);
184 return SET_ERRNO(ERROR_INVALID_PARAM,
"consensus is NULL");
192 int num_participants) {
193 if (!consensus || !participant_ids) {
194 return SET_ERRNO(ERROR_INVALID_PARAM,
"Invalid parameters");
197 if (num_participants < 1 || num_participants > 64) {
198 return SET_ERRNO(ERROR_INVALID_PARAM,
"Invalid participant count: %d", num_participants);
207 asciichat_error_t err =
209 if (err != ASCIICHAT_OK) {
219 uint64_t deadline_ns) {
221 return SET_ERRNO(ERROR_INVALID_PARAM,
"consensus is NULL");
228 const participant_metrics_t *metrics, uint8_t num_metrics) {
229 if (!consensus || !sender_id || !metrics) {
230 return SET_ERRNO(ERROR_INVALID_PARAM,
"Invalid parameters");
237 const char host_address[64], uint16_t host_port,
238 const uint8_t backup_id[16],
const char backup_address[64],
239 uint16_t backup_port) {
240 if (!consensus || !host_id || !backup_id) {
241 return SET_ERRNO(ERROR_INVALID_PARAM,
"Invalid parameters");
246 if (err != ASCIICHAT_OK) {
252 return consensus->
on_election(consensus->
context, host_id, host_address, host_port, backup_id, backup_address,
260 char out_host_address[64], uint16_t *out_host_port,
261 uint8_t out_backup_id[16],
char out_backup_address[64],
262 uint16_t *out_backup_port) {
263 if (!consensus || !out_host_id || !out_host_address || !out_host_port || !out_backup_id || !out_backup_address ||
265 return SET_ERRNO(ERROR_INVALID_PARAM,
"Invalid parameters");
270 if (err != ASCIICHAT_OK) {
277 memset(out_host_address, 0, 64);
279 memset(out_backup_address, 0, 64);
280 *out_backup_port = 0;
292 uint8_t backup_id[16];
295 return err == ASCIICHAT_OK;
asciichat_error_t session_consensus_process(session_consensus_t *consensus, uint32_t timeout_ms)
asciichat_error_t session_consensus_set_topology(session_consensus_t *consensus, const uint8_t participant_ids[64][16], int num_participants)
asciichat_error_t session_consensus_on_election_result(session_consensus_t *consensus, const uint8_t host_id[16], const char host_address[64], uint16_t host_port, const uint8_t backup_id[16], const char backup_address[64], uint16_t backup_port)
void session_consensus_destroy(session_consensus_t *consensus)
int session_consensus_get_metrics_count(session_consensus_t *consensus)
asciichat_error_t session_consensus_on_collection_start(session_consensus_t *consensus, uint32_t round_id, uint64_t deadline_ns)
bool session_consensus_is_ready(session_consensus_t *consensus)
asciichat_error_t session_consensus_create(const uint8_t my_id[16], bool is_leader, const uint8_t participant_ids[64][16], int num_participants, const session_consensus_callbacks_t *callbacks, session_consensus_t **out_consensus)
asciichat_error_t session_consensus_on_stats_update(session_consensus_t *consensus, const uint8_t sender_id[16], const participant_metrics_t *metrics, uint8_t num_metrics)
int session_consensus_get_state(session_consensus_t *consensus)
uint64_t session_consensus_time_until_next_round(session_consensus_t *consensus)
asciichat_error_t session_consensus_get_elected_host(session_consensus_t *consensus, uint8_t out_host_id[16], char out_host_address[64], uint16_t *out_host_port, uint8_t out_backup_id[16], char out_backup_address[64], uint16_t *out_backup_port)
struct session_consensus session_consensus_t
Session consensus handle - wraps all consensus modules.
asciichat_error_t consensus_coordinator_on_stats_update(consensus_coordinator_t *coordinator, const uint8_t sender_id[16], const participant_metrics_t *metrics, uint8_t num_metrics)
Handle STATS_UPDATE packet.
void consensus_coordinator_destroy(consensus_coordinator_t *coordinator)
Clean up coordinator resources.
int consensus_coordinator_get_metrics_count(const consensus_coordinator_t *coordinator)
Get count of metrics in current round.
asciichat_error_t consensus_coordinator_on_ring_members(consensus_coordinator_t *coordinator, const consensus_topology_t *new_topology)
Update ring topology when participants change.
asciichat_error_t consensus_coordinator_process(consensus_coordinator_t *coordinator, uint32_t timeout_ms)
Main orchestration loop.
uint64_t consensus_coordinator_time_until_next_round(const consensus_coordinator_t *coordinator)
Get time until next round.
consensus_state_machine_t consensus_coordinator_get_state(const consensus_coordinator_t *coordinator)
Get current coordinator state.
asciichat_error_t consensus_coordinator_get_current_host(const consensus_coordinator_t *coordinator, uint8_t out_host_id[16], uint8_t out_backup_id[16])
Get currently elected host.
asciichat_error_t consensus_coordinator_on_collection_start(consensus_coordinator_t *coordinator, uint32_t round_id, uint64_t deadline_ns)
Handle STATS_COLLECTION_START packet.
asciichat_error_t consensus_coordinator_on_election_result(consensus_coordinator_t *coordinator, const uint8_t host_id[16], const uint8_t backup_id[16])
Handle ELECTION_RESULT packet.
asciichat_error_t consensus_coordinator_create(const uint8_t my_id[16], const consensus_topology_t *topology, consensus_election_func_t election_func, void *election_context, consensus_coordinator_t **out_coordinator)
Initialize coordinator with given parameters.
asciichat_error_t consensus_election_choose_hosts(const participant_metrics_t *metrics, int num_metrics, int *out_best_index, int *out_backup_index)
asciichat_error_t consensus_state_get_metric_at(const consensus_state_t *state, int index, participant_metrics_t *out_metrics)
int consensus_state_get_metrics_count(const consensus_state_t *state)
Internal coordinator structure.
Ring topology for consensus participants.
Session consensus handle - wraps all consensus modules.
consensus_topology_t * topology
session_consensus_get_metrics_fn get_metrics
session_consensus_on_election_fn on_election
consensus_coordinator_t * coordinator
session_consensus_send_packet_fn send_packet
session_consensus_election_fn election
void consensus_topology_destroy(consensus_topology_t *topology)
asciichat_error_t consensus_topology_create(const uint8_t participant_ids[64][16], int num_participants, const uint8_t my_id[16], consensus_topology_t **out_topology)