ascii-chat 0.8.38
Real-time terminal-based video chat with ASCII art conversion
Loading...
Searching...
No Matches
consensus.c
Go to the documentation of this file.
1
7#include <ascii-chat/session/consensus.h>
8#include <ascii-chat/network/consensus/state.h>
9#include <ascii-chat/network/consensus/coordinator.h>
10#include <ascii-chat/network/consensus/topology.h>
11#include <ascii-chat/network/consensus/election.h>
12#include <ascii-chat/network/consensus/metrics.h>
13#include <ascii-chat/common.h>
14#include <ascii-chat/log/logging.h>
15#include <ascii-chat/util/endian.h>
16#include <string.h>
17
21typedef struct session_consensus {
22 // Consensus modules
25
26 // Callbacks for mode integration
27 session_consensus_send_packet_fn send_packet;
28 session_consensus_on_election_fn on_election;
29 session_consensus_get_metrics_fn get_metrics;
30 session_consensus_election_fn election;
31 void *context;
32
33 // Local identity
34 uint8_t my_id[16];
37
44static asciichat_error_t session_consensus_election_bridge(void *context, consensus_state_t *state) {
45 session_consensus_t *consensus = (session_consensus_t *)context;
46 if (!consensus) {
47 return SET_ERRNO(ERROR_INVALID_PARAM, "consensus context is NULL");
48 }
49
50 // Get collected metrics from state
51 int num_metrics = consensus_state_get_metrics_count(state);
52 if (num_metrics < 0) {
53 return SET_ERRNO(ERROR_INVALID_STATE, "No metrics collected");
54 }
55
56 // Allocate array for metrics
57 participant_metrics_t *metrics = SAFE_MALLOC(num_metrics * sizeof(participant_metrics_t), participant_metrics_t *);
58 if (!metrics) {
59 return SET_ERRNO(ERROR_MEMORY, "Failed to allocate metrics array");
60 }
61
62 // Copy metrics from state
63 for (int i = 0; i < num_metrics; i++) {
64 asciichat_error_t err = consensus_state_get_metric_at(state, i, &metrics[i]);
65 if (err != ASCIICHAT_OK) {
66 SAFE_FREE(metrics);
67 return err;
68 }
69 }
70
71 // Run election (custom or default)
72 int best_index = -1;
73 int backup_index = -1;
74
75 asciichat_error_t result;
76 if (consensus->election) {
77 // Custom election
78 result = consensus->election(consensus->context, metrics, num_metrics, &best_index, &backup_index);
79 } else {
80 // Default election
81 result = consensus_election_choose_hosts(metrics, num_metrics, &best_index, &backup_index);
82 }
83
84 if (result != ASCIICHAT_OK) {
85 SAFE_FREE(metrics);
86 return result;
87 }
88
89 if (best_index < 0 || best_index >= num_metrics) {
90 SAFE_FREE(metrics);
91 return SET_ERRNO(ERROR_INVALID_STATE, "Invalid best host index: %d", best_index);
92 }
93
94 if (backup_index < 0 || backup_index >= num_metrics) {
95 SAFE_FREE(metrics);
96 return SET_ERRNO(ERROR_INVALID_STATE, "Invalid backup host index: %d", backup_index);
97 }
98
99 // Store in state for later retrieval
100 // TODO: Add method to store elected host/backup in state
101
102 log_info("Session consensus election: best=%d, backup=%d", best_index, backup_index);
103
104 SAFE_FREE(metrics);
105 return ASCIICHAT_OK;
106}
107
108asciichat_error_t session_consensus_create(const uint8_t my_id[16], bool is_leader,
109 const uint8_t participant_ids[64][16], int num_participants,
110 const session_consensus_callbacks_t *callbacks,
111 session_consensus_t **out_consensus) {
112 if (!my_id || !callbacks || !out_consensus) {
113 return SET_ERRNO(ERROR_INVALID_PARAM, "Invalid parameters: my_id=%p, callbacks=%p, out_consensus=%p", my_id,
114 callbacks, out_consensus);
115 }
116
117 if (!callbacks->send_packet || !callbacks->on_election || !callbacks->get_metrics) {
118 return SET_ERRNO(ERROR_INVALID_PARAM, "Required callbacks missing: send_packet=%p, on_election=%p, get_metrics=%p",
119 callbacks->send_packet, callbacks->on_election, callbacks->get_metrics);
120 }
121
122 if (num_participants < 1 || num_participants > 64) {
123 return SET_ERRNO(ERROR_INVALID_PARAM, "Invalid participant count: %d", num_participants);
124 }
125
126 // Allocate consensus handle
127 session_consensus_t *consensus = SAFE_CALLOC(1, sizeof(session_consensus_t), session_consensus_t *);
128 if (!consensus) {
129 return SET_ERRNO(ERROR_MEMORY, "Failed to allocate session consensus");
130 }
131
132 // Copy identity
133 memcpy(consensus->my_id, my_id, 16);
134 consensus->is_leader = is_leader;
135
136 // Copy callbacks
137 consensus->send_packet = callbacks->send_packet;
138 consensus->on_election = callbacks->on_election;
139 consensus->get_metrics = callbacks->get_metrics;
140 consensus->election = callbacks->election; // May be NULL
141 consensus->context = callbacks->context;
142
143 // Create topology
144 asciichat_error_t err = consensus_topology_create(participant_ids, num_participants, my_id, &consensus->topology);
145 if (err != ASCIICHAT_OK) {
146 SAFE_FREE(consensus);
147 return err;
148 }
149
150 // Create coordinator
151 err = consensus_coordinator_create(my_id, consensus->topology, session_consensus_election_bridge, consensus,
152 &consensus->coordinator);
153 if (err != ASCIICHAT_OK) {
155 SAFE_FREE(consensus);
156 return err;
157 }
158
159 log_debug("Session consensus created: my_id=%.*s, is_leader=%d, participants=%d", 16, my_id, is_leader,
160 num_participants);
161
162 *out_consensus = consensus;
163 return ASCIICHAT_OK;
164}
165
167 if (!consensus) {
168 return;
169 }
170
171 if (consensus->coordinator) {
173 }
174
175 if (consensus->topology) {
177 }
178
179 SAFE_FREE(consensus);
180}
181
182asciichat_error_t session_consensus_process(session_consensus_t *consensus, uint32_t timeout_ms) {
183 if (!consensus) {
184 return SET_ERRNO(ERROR_INVALID_PARAM, "consensus is NULL");
185 }
186
187 // Call coordinator process
188 return consensus_coordinator_process(consensus->coordinator, timeout_ms);
189}
190
191asciichat_error_t session_consensus_set_topology(session_consensus_t *consensus, const uint8_t participant_ids[64][16],
192 int num_participants) {
193 if (!consensus || !participant_ids) {
194 return SET_ERRNO(ERROR_INVALID_PARAM, "Invalid parameters");
195 }
196
197 if (num_participants < 1 || num_participants > 64) {
198 return SET_ERRNO(ERROR_INVALID_PARAM, "Invalid participant count: %d", num_participants);
199 }
200
201 // Destroy old topology
202 if (consensus->topology) {
204 }
205
206 // Create new topology
207 asciichat_error_t err =
208 consensus_topology_create(participant_ids, num_participants, consensus->my_id, &consensus->topology);
209 if (err != ASCIICHAT_OK) {
210 consensus->topology = NULL;
211 return err;
212 }
213
214 // Update coordinator with new topology
215 return consensus_coordinator_on_ring_members(consensus->coordinator, consensus->topology);
216}
217
218asciichat_error_t session_consensus_on_collection_start(session_consensus_t *consensus, uint32_t round_id,
219 uint64_t deadline_ns) {
220 if (!consensus) {
221 return SET_ERRNO(ERROR_INVALID_PARAM, "consensus is NULL");
222 }
223
224 return consensus_coordinator_on_collection_start(consensus->coordinator, round_id, deadline_ns);
225}
226
227asciichat_error_t session_consensus_on_stats_update(session_consensus_t *consensus, const uint8_t sender_id[16],
228 const participant_metrics_t *metrics, uint8_t num_metrics) {
229 if (!consensus || !sender_id || !metrics) {
230 return SET_ERRNO(ERROR_INVALID_PARAM, "Invalid parameters");
231 }
232
233 return consensus_coordinator_on_stats_update(consensus->coordinator, sender_id, metrics, num_metrics);
234}
235
236asciichat_error_t session_consensus_on_election_result(session_consensus_t *consensus, const uint8_t host_id[16],
237 const char host_address[64], uint16_t host_port,
238 const uint8_t backup_id[16], const char backup_address[64],
239 uint16_t backup_port) {
240 if (!consensus || !host_id || !backup_id) {
241 return SET_ERRNO(ERROR_INVALID_PARAM, "Invalid parameters");
242 }
243
244 // First acknowledge the election in coordinator
245 asciichat_error_t err = consensus_coordinator_on_election_result(consensus->coordinator, host_id, backup_id);
246 if (err != ASCIICHAT_OK) {
247 return err;
248 }
249
250 // Then call the mode's election callback
251 if (consensus->on_election) {
252 return consensus->on_election(consensus->context, host_id, host_address, host_port, backup_id, backup_address,
253 backup_port);
254 }
255
256 return ASCIICHAT_OK;
257}
258
259asciichat_error_t session_consensus_get_elected_host(session_consensus_t *consensus, uint8_t out_host_id[16],
260 char out_host_address[64], uint16_t *out_host_port,
261 uint8_t out_backup_id[16], char out_backup_address[64],
262 uint16_t *out_backup_port) {
263 if (!consensus || !out_host_id || !out_host_address || !out_host_port || !out_backup_id || !out_backup_address ||
264 !out_backup_port) {
265 return SET_ERRNO(ERROR_INVALID_PARAM, "Invalid parameters");
266 }
267
268 // Get from coordinator
269 asciichat_error_t err = consensus_coordinator_get_current_host(consensus->coordinator, out_host_id, out_backup_id);
270 if (err != ASCIICHAT_OK) {
271 return err;
272 }
273
274 // NOTE: The current API doesn't return addresses/ports. This is a limitation
275 // of the coordinator that should be addressed in a future enhancement.
276 // For now, return zeros and indicate this is a placeholder.
277 memset(out_host_address, 0, 64);
278 *out_host_port = 0;
279 memset(out_backup_address, 0, 64);
280 *out_backup_port = 0;
281
282 return ASCIICHAT_OK;
283}
284
286 if (!consensus) {
287 return false;
288 }
289
290 // Check if we have a valid elected host
291 uint8_t host_id[16];
292 uint8_t backup_id[16];
293 asciichat_error_t err = consensus_coordinator_get_current_host(consensus->coordinator, host_id, backup_id);
294
295 return err == ASCIICHAT_OK;
296}
297
299 if (!consensus) {
300 return -1;
301 }
302
303 return (int)consensus_coordinator_get_state(consensus->coordinator);
304}
305
307 if (!consensus) {
308 return 0;
309 }
310
312}
313
315 if (!consensus) {
316 return -1;
317 }
318
320}
asciichat_error_t session_consensus_process(session_consensus_t *consensus, uint32_t timeout_ms)
Definition consensus.c:182
asciichat_error_t session_consensus_set_topology(session_consensus_t *consensus, const uint8_t participant_ids[64][16], int num_participants)
Definition consensus.c:191
asciichat_error_t session_consensus_on_election_result(session_consensus_t *consensus, const uint8_t host_id[16], const char host_address[64], uint16_t host_port, const uint8_t backup_id[16], const char backup_address[64], uint16_t backup_port)
Definition consensus.c:236
void session_consensus_destroy(session_consensus_t *consensus)
Definition consensus.c:166
int session_consensus_get_metrics_count(session_consensus_t *consensus)
Definition consensus.c:314
asciichat_error_t session_consensus_on_collection_start(session_consensus_t *consensus, uint32_t round_id, uint64_t deadline_ns)
Definition consensus.c:218
bool session_consensus_is_ready(session_consensus_t *consensus)
Definition consensus.c:285
asciichat_error_t session_consensus_create(const uint8_t my_id[16], bool is_leader, const uint8_t participant_ids[64][16], int num_participants, const session_consensus_callbacks_t *callbacks, session_consensus_t **out_consensus)
Definition consensus.c:108
asciichat_error_t session_consensus_on_stats_update(session_consensus_t *consensus, const uint8_t sender_id[16], const participant_metrics_t *metrics, uint8_t num_metrics)
Definition consensus.c:227
int session_consensus_get_state(session_consensus_t *consensus)
Definition consensus.c:298
uint64_t session_consensus_time_until_next_round(session_consensus_t *consensus)
Definition consensus.c:306
asciichat_error_t session_consensus_get_elected_host(session_consensus_t *consensus, uint8_t out_host_id[16], char out_host_address[64], uint16_t *out_host_port, uint8_t out_backup_id[16], char out_backup_address[64], uint16_t *out_backup_port)
Definition consensus.c:259
struct session_consensus session_consensus_t
Session consensus handle - wraps all consensus modules.
asciichat_error_t consensus_coordinator_on_stats_update(consensus_coordinator_t *coordinator, const uint8_t sender_id[16], const participant_metrics_t *metrics, uint8_t num_metrics)
Handle STATS_UPDATE packet.
void consensus_coordinator_destroy(consensus_coordinator_t *coordinator)
Clean up coordinator resources.
Definition coordinator.c:83
int consensus_coordinator_get_metrics_count(const consensus_coordinator_t *coordinator)
Get count of metrics in current round.
asciichat_error_t consensus_coordinator_on_ring_members(consensus_coordinator_t *coordinator, const consensus_topology_t *new_topology)
Update ring topology when participants change.
asciichat_error_t consensus_coordinator_process(consensus_coordinator_t *coordinator, uint32_t timeout_ms)
Main orchestration loop.
uint64_t consensus_coordinator_time_until_next_round(const consensus_coordinator_t *coordinator)
Get time until next round.
consensus_state_machine_t consensus_coordinator_get_state(const consensus_coordinator_t *coordinator)
Get current coordinator state.
asciichat_error_t consensus_coordinator_get_current_host(const consensus_coordinator_t *coordinator, uint8_t out_host_id[16], uint8_t out_backup_id[16])
Get currently elected host.
asciichat_error_t consensus_coordinator_on_collection_start(consensus_coordinator_t *coordinator, uint32_t round_id, uint64_t deadline_ns)
Handle STATS_COLLECTION_START packet.
asciichat_error_t consensus_coordinator_on_election_result(consensus_coordinator_t *coordinator, const uint8_t host_id[16], const uint8_t backup_id[16])
Handle ELECTION_RESULT packet.
asciichat_error_t consensus_coordinator_create(const uint8_t my_id[16], const consensus_topology_t *topology, consensus_election_func_t election_func, void *election_context, consensus_coordinator_t **out_coordinator)
Initialize coordinator with given parameters.
Definition coordinator.c:42
asciichat_error_t consensus_election_choose_hosts(const participant_metrics_t *metrics, int num_metrics, int *out_best_index, int *out_backup_index)
Definition election.c:67
asciichat_error_t consensus_state_get_metric_at(const consensus_state_t *state, int index, participant_metrics_t *out_metrics)
Definition state.c:257
int consensus_state_get_metrics_count(const consensus_state_t *state)
Definition state.c:250
Internal coordinator structure.
Definition coordinator.c:21
State machine instance.
Definition state.c:24
Ring topology for consensus participants.
Definition topology.c:17
Session consensus handle - wraps all consensus modules.
Definition consensus.c:21
uint8_t my_id[16]
Definition consensus.c:34
consensus_topology_t * topology
Definition consensus.c:23
session_consensus_get_metrics_fn get_metrics
Definition consensus.c:29
session_consensus_on_election_fn on_election
Definition consensus.c:28
consensus_coordinator_t * coordinator
Definition consensus.c:24
session_consensus_send_packet_fn send_packet
Definition consensus.c:27
session_consensus_election_fn election
Definition consensus.c:30
void consensus_topology_destroy(consensus_topology_t *topology)
Definition topology.c:64
asciichat_error_t consensus_topology_create(const uint8_t participant_ids[64][16], int num_participants, const uint8_t my_id[16], consensus_topology_t **out_topology)
Definition topology.c:38