orbit/component/
update_scheduler.rs
1use std::collections::{HashSet, VecDeque};
7use std::sync::{Arc, Mutex};
8
9use crate::component::ComponentId;
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
13pub enum UpdatePriority {
14 Low = 0,
16 Normal = 1,
18 High = 2,
20 Critical = 3,
22}
23
24#[derive(Debug)]
26struct ScheduledUpdate {
27 component_id: ComponentId,
29 priority: UpdatePriority,
31}
32
33#[derive(Debug, Clone)]
39pub struct UpdateScheduler {
40 updates: Arc<Mutex<VecDeque<ScheduledUpdate>>>,
42 pending: Arc<Mutex<HashSet<ComponentId>>>,
44 updating: Arc<Mutex<bool>>,
46}
47
48impl Default for UpdateScheduler {
49 fn default() -> Self {
50 Self::new()
51 }
52}
53
54impl UpdateScheduler {
55 pub fn new() -> Self {
57 Self {
58 updates: Arc::new(Mutex::new(VecDeque::new())),
59 pending: Arc::new(Mutex::new(HashSet::new())),
60 updating: Arc::new(Mutex::new(false)),
61 }
62 }
63
64 pub fn schedule_update(
66 &self,
67 component_id: ComponentId,
68 priority: UpdatePriority,
69 ) -> Result<(), String> {
70 let mut pending = match self.pending.lock() {
72 Ok(pending) => pending,
73 Err(_) => return Err("Failed to lock pending set".to_string()),
74 };
75
76 if pending.contains(&component_id) {
78 return Ok(());
79 }
80
81 pending.insert(component_id);
83 drop(pending); let mut updates = match self.updates.lock() {
87 Ok(updates) => updates,
88 Err(_) => return Err("Failed to lock update queue".to_string()),
89 };
90
91 let update = ScheduledUpdate {
92 component_id,
93 priority,
94 };
95
96 match priority {
98 UpdatePriority::Critical => {
99 updates.push_front(update);
101 }
102 UpdatePriority::High => {
103 let insert_pos = updates
105 .iter()
106 .position(|u| u.priority < UpdatePriority::High)
107 .unwrap_or(updates.len());
108 updates.insert(insert_pos, update);
109 }
110 _ => {
111 updates.push_back(update);
113 }
114 }
115
116 let updating = self
118 .updating
119 .lock()
120 .map_err(|_| "Failed to lock updating flag".to_string())?;
121 if !*updating {
122 drop(updating); }
130
131 Ok(())
132 }
133
134 pub fn process_updates<F>(&self, mut update_component: F) -> Result<usize, String>
138 where
139 F: FnMut(ComponentId) -> Result<(), String>,
140 {
141 {
143 let mut updating = self
144 .updating
145 .lock()
146 .map_err(|_| "Failed to lock updating flag".to_string())?;
147 *updating = true;
148 }
149
150 let mut count = 0;
151
152 loop {
154 let update = {
156 let mut updates = self
157 .updates
158 .lock()
159 .map_err(|_| "Failed to lock update queue".to_string())?;
160 updates.pop_front()
161 };
162
163 let update = match update {
165 Some(update) => update,
166 None => break,
167 };
168
169 {
171 let mut pending = self
172 .pending
173 .lock()
174 .map_err(|_| "Failed to lock pending set".to_string())?;
175 pending.remove(&update.component_id);
176 }
177
178 if let Err(e) = update_component(update.component_id) {
180 eprintln!(
182 "Error updating component {}: {}",
183 update.component_id.id(),
184 e
185 );
186 }
187
188 count += 1;
189 }
190
191 {
193 let mut updating = self
194 .updating
195 .lock()
196 .map_err(|_| "Failed to lock updating flag".to_string())?;
197 *updating = false;
198 }
199
200 Ok(count)
201 }
202
203 pub fn has_pending_updates(&self) -> Result<bool, String> {
205 let pending = self
206 .pending
207 .lock()
208 .map_err(|_| "Failed to lock pending set".to_string())?;
209 Ok(!pending.is_empty())
210 }
211
212 pub fn pending_update_count(&self) -> Result<usize, String> {
214 let pending = self
215 .pending
216 .lock()
217 .map_err(|_| "Failed to lock pending set".to_string())?;
218 Ok(pending.len())
219 }
220
221 pub fn clear_updates(&self) -> Result<(), String> {
223 {
224 let mut updates = self
225 .updates
226 .lock()
227 .map_err(|_| "Failed to lock update queue".to_string())?;
228 updates.clear();
229 }
230
231 {
232 let mut pending = self
233 .pending
234 .lock()
235 .map_err(|_| "Failed to lock pending set".to_string())?;
236 pending.clear();
237 }
238
239 Ok(())
240 }
241}
242
243#[cfg(test)]
244mod tests {
245 use super::*;
246
247 #[test]
248 fn test_update_scheduler_basic() {
249 let scheduler = UpdateScheduler::new();
250
251 let c1 = ComponentId::new();
253 let c2 = ComponentId::new();
254 let c3 = ComponentId::new();
255
256 scheduler
257 .schedule_update(c1, UpdatePriority::Normal)
258 .unwrap();
259 scheduler.schedule_update(c2, UpdatePriority::High).unwrap();
260 scheduler.schedule_update(c3, UpdatePriority::Low).unwrap();
261
262 assert_eq!(scheduler.pending_update_count().unwrap(), 3);
264
265 let mut processed = Vec::new();
267 scheduler
268 .process_updates(|id| {
269 processed.push(id);
270 Ok(())
271 })
272 .unwrap();
273
274 assert_eq!(processed.len(), 3);
275 assert_eq!(processed[0], c2);
277 assert_eq!(processed[1], c1);
279 assert_eq!(processed[2], c3);
281
282 assert_eq!(scheduler.pending_update_count().unwrap(), 0);
284 }
285
286 #[test]
287 fn test_duplicate_updates() {
288 let scheduler = UpdateScheduler::new();
289
290 let c1 = ComponentId::new();
291
292 scheduler
294 .schedule_update(c1, UpdatePriority::Normal)
295 .unwrap();
296 scheduler
297 .schedule_update(c1, UpdatePriority::Normal)
298 .unwrap();
299 scheduler
300 .schedule_update(c1, UpdatePriority::Normal)
301 .unwrap();
302
303 assert_eq!(scheduler.pending_update_count().unwrap(), 1);
305
306 let mut processed = Vec::new();
307 scheduler
308 .process_updates(|id| {
309 processed.push(id);
310 Ok(())
311 })
312 .unwrap();
313
314 assert_eq!(processed.len(), 1);
316 assert_eq!(processed[0], c1);
317 }
318
319 #[test]
320 fn test_priority_ordering() {
321 let scheduler = UpdateScheduler::new();
322
323 let c_normal = ComponentId::new();
325 let c_low = ComponentId::new();
326 let c_high = ComponentId::new();
327 let c_critical = ComponentId::new();
328
329 scheduler
331 .schedule_update(c_normal, UpdatePriority::Normal)
332 .unwrap();
333 scheduler
334 .schedule_update(c_low, UpdatePriority::Low)
335 .unwrap();
336 scheduler
337 .schedule_update(c_high, UpdatePriority::High)
338 .unwrap();
339 scheduler
340 .schedule_update(c_critical, UpdatePriority::Critical)
341 .unwrap();
342
343 let mut processed = Vec::new();
345 scheduler
346 .process_updates(|id| {
347 processed.push(id);
348 Ok(())
349 })
350 .unwrap();
351
352 assert_eq!(processed.len(), 4);
353 assert_eq!(processed[0], c_critical);
355 assert_eq!(processed[1], c_high);
357 assert_eq!(processed[2], c_normal);
359 assert_eq!(processed[3], c_low);
361 }
362
363 #[test]
364 fn test_clear_updates() {
365 let scheduler = UpdateScheduler::new();
366
367 let c1 = ComponentId::new();
368 let c2 = ComponentId::new();
369
370 scheduler
371 .schedule_update(c1, UpdatePriority::Normal)
372 .unwrap();
373 scheduler
374 .schedule_update(c2, UpdatePriority::Normal)
375 .unwrap();
376
377 assert_eq!(scheduler.pending_update_count().unwrap(), 2);
378
379 scheduler.clear_updates().unwrap();
381
382 assert_eq!(scheduler.pending_update_count().unwrap(), 0);
383
384 let mut processed = Vec::new();
385 scheduler
386 .process_updates(|id| {
387 processed.push(id);
388 Ok(())
389 })
390 .unwrap();
391
392 assert_eq!(processed.len(), 0);
394 }
395}