azalea_service/
handler.rs

1use std::{
2    cell::RefCell,
3    rc::Rc,
4    sync::{Arc, Mutex},
5};
6
7use azalea_log as log;
8use tokio::sync::broadcast;
9
10use super::{Service, Status};
11
12pub struct ListenerHandle(
13    Arc<broadcast::Sender<()>>,
14    Option<tokio::task::JoinHandle<()>>,
15);
16pub struct LocalListenerHandle(Arc<broadcast::Sender<()>>, Option<glib::JoinHandle<()>>);
17
18impl ListenerHandle {
19    pub async fn join(mut self) {
20        if let Some(handle) = self.1.take() {
21            drop(handle.await);
22        }
23    }
24}
25
26impl LocalListenerHandle {
27    pub async fn join(mut self) {
28        if let Some(handle) = self.1.take() {
29            drop(handle.await);
30        }
31    }
32}
33
34impl Drop for ListenerHandle {
35    fn drop(&mut self) {
36        let cancellation_sender = &self.0;
37
38        if Arc::strong_count(&cancellation_sender) == 2 {
39            drop(cancellation_sender.send(()));
40        }
41
42        if let Some(handle) = &self.1 {
43            handle.abort();
44        }
45    }
46}
47
48impl Drop for LocalListenerHandle {
49    fn drop(&mut self) {
50        let cancellation_sender = &self.0;
51
52        if Arc::strong_count(&cancellation_sender) == 2 {
53            drop(cancellation_sender.send(()));
54        }
55
56        if let Some(handle) = &self.1 {
57            handle.abort();
58        }
59    }
60}
61
62/// Service handler responsible for managing/handling a service
63#[derive(Clone)]
64pub struct ServiceManager<S>
65where
66    S: Service,
67{
68    input: flume::Sender<S::Input>,
69    output: broadcast::Sender<S::Output>,
70    cancellation: Arc<broadcast::Sender<()>>,
71    init: S::Init,
72    status: Arc<Mutex<Option<flume::Receiver<S::Input>>>>,
73}
74
75impl<S> ServiceManager<S>
76where
77    S: Service + 'static,
78{
79    pub fn new(init: S::Init, input_capacity: usize, output_capacity: usize) -> Self {
80        let (input_sender, input_receiver) = flume::bounded(input_capacity);
81        let (output_sender, _) = broadcast::channel(output_capacity);
82        let (cancellation_sender, _) = broadcast::channel(1);
83
84        Self {
85            input: input_sender,
86            output: output_sender,
87            init,
88            cancellation: Arc::new(cancellation_sender),
89            status: Arc::new(Mutex::new(Some(input_receiver))),
90        }
91    }
92
93    fn _start(&mut self, local: bool) {
94        let Some(input) = self.status.lock().unwrap().take() else {
95            return;
96        };
97
98        let input_sender = self.input.clone();
99        let output_sender = self.output.clone();
100        let init = self.init.clone();
101        let mut cancellation_receiver = self.cancellation.subscribe();
102        let status = self.status.clone();
103
104        let task = async move {
105            let mut service = S::new(init, input_sender, output_sender.clone()).await;
106            let thread_id = std::thread::current().id();
107            log::info!(S, "Service started at thread: {:?}", thread_id);
108
109            loop {
110                tokio::select! {
111                    event = service.event_generator(), if !S::DISABLE_EVENTS => {
112                        match service.event_handler(event, &output_sender).await {
113                            Ok(_) => continue,
114                            Err(e) => log::debug!(S, "Service iteration failed {}", e),
115                        }
116                    },
117                    Ok(msg) = input.recv_async() => service.message(msg, &output_sender).await,
118                    _ = cancellation_receiver.recv() => break,
119                    else => continue,
120                };
121            }
122
123            if let Ok(mut status) = status.lock() {
124                *status = Some(input);
125            };
126
127            log::info!(S, "Service stopped");
128        };
129
130        if local {
131            relm4::spawn(task);
132        } else {
133            relm4::spawn_blocking(move || {
134                tokio::runtime::Handle::current().block_on(task);
135            });
136        }
137    }
138
139    pub fn start(&mut self) {
140        if S::LOCAL {
141            self._start(true);
142        } else {
143            self._start(false);
144        }
145    }
146
147    pub fn stop(&mut self) {
148        if self.status.lock().unwrap().is_none() {
149            drop(self.cancellation.send(()));
150        }
151    }
152
153    pub fn status(&self) -> Status {
154        if self.status.lock().unwrap().is_none() {
155            Status::Started
156        } else {
157            Status::Stopped
158        }
159    }
160
161    pub fn send(&mut self, message: S::Input) {
162        drop(self.input.send(message));
163    }
164
165    pub fn listen<F: (Fn(S::Output) -> bool) + Send + 'static>(
166        &mut self,
167        transform: F,
168    ) -> ListenerHandle {
169        self.start();
170
171        let mut output = self.output.subscribe();
172
173        ListenerHandle(
174            self.cancellation.clone(),
175            Some(relm4::spawn(async move {
176                use tokio::sync::broadcast::error::RecvError;
177                loop {
178                    if match output.recv().await {
179                        Err(RecvError::Closed) => false,
180                        Err(RecvError::Lagged(_)) => true,
181                        Ok(event) => transform(event),
182                    } {
183                        continue;
184                    } else {
185                        break;
186                    }
187                }
188            })),
189        )
190    }
191
192    pub fn forward<X: Send + 'static, F: (Fn(S::Output) -> X) + Send + 'static>(
193        &mut self,
194        sender: relm4::Sender<X>,
195        transform: F,
196    ) -> ListenerHandle {
197        self.listen(move |event| sender.send(transform(event)).is_ok())
198    }
199
200    pub fn filtered_forward<X: Send + 'static, F: (Fn(S::Output) -> Option<X>) + Send + 'static>(
201        &mut self,
202        sender: relm4::Sender<X>,
203        transform: F,
204    ) -> ListenerHandle {
205        self.listen(move |event| match transform(event) {
206            Some(data) => sender.send(data).is_ok(),
207            None => true,
208        })
209    }
210
211    pub fn listen_local<F: (Fn(S::Output) -> bool) + 'static>(
212        &mut self,
213        transform: F,
214    ) -> LocalListenerHandle {
215        self.start();
216
217        let mut output = self.output.subscribe();
218
219        LocalListenerHandle(
220            self.cancellation.clone(),
221            Some(relm4::spawn_local(async move {
222                use tokio::sync::broadcast::error::RecvError;
223                loop {
224                    if match output.recv().await {
225                        Err(RecvError::Closed) => false,
226                        Err(RecvError::Lagged(_)) => true,
227                        Ok(event) => transform(event),
228                    } {
229                        continue;
230                    } else {
231                        break;
232                    }
233                }
234            })),
235        )
236    }
237
238    pub fn forward_local<X: 'static, F: (Fn(S::Output) -> X) + 'static>(
239        &mut self,
240        sender: relm4::Sender<X>,
241        transform: F,
242    ) -> LocalListenerHandle {
243        self.listen_local(move |event| sender.send(transform(event)).is_ok())
244    }
245
246    pub fn filtered_forward_local<X: 'static, F: (Fn(S::Output) -> Option<X>) + 'static>(
247        &mut self,
248        sender: relm4::Sender<X>,
249        transform: F,
250    ) -> LocalListenerHandle {
251        self.listen_local(move |event| match transform(event) {
252            Some(data) => sender.send(data).is_ok(),
253            None => true,
254        })
255    }
256}
257
258/// Thread local static handler for a Service
259///
260/// This allows global (thread local) access to a service
261pub trait LocalStaticServiceManager
262where
263    Self: Service,
264{
265    fn static_handler() -> Rc<RefCell<crate::ServiceManager<Self>>>;
266
267    fn init(init: Self::Init) {
268        Self::static_handler().borrow_mut().init = init
269    }
270
271    fn start() {
272        Self::static_handler().borrow_mut().start()
273    }
274
275    fn stop() {
276        Self::static_handler().borrow_mut().stop()
277    }
278
279    fn status() -> crate::Status {
280        Self::static_handler().borrow().status()
281    }
282
283    fn send(message: Self::Input) {
284        Self::static_handler().borrow_mut().send(message)
285    }
286
287    fn listen<F: (Fn(Self::Output) -> bool) + Send + 'static>(
288        transform: F,
289    ) -> crate::ListenerHandle {
290        Self::static_handler().borrow_mut().listen(transform)
291    }
292
293    fn forward<X: Send + 'static, F: (Fn(Self::Output) -> X) + Send + 'static>(
294        sender: relm4::Sender<X>,
295        transform: F,
296    ) -> crate::ListenerHandle {
297        Self::static_handler()
298            .borrow_mut()
299            .forward(sender, transform)
300    }
301
302    fn filtered_forward<X: Send + 'static, F: (Fn(Self::Output) -> Option<X>) + Send + 'static>(
303        sender: relm4::Sender<X>,
304        transform: F,
305    ) -> crate::ListenerHandle {
306        Self::static_handler()
307            .borrow_mut()
308            .filtered_forward(sender, transform)
309    }
310
311    fn listen_local<F: (Fn(Self::Output) -> bool) + 'static>(
312        transform: F,
313    ) -> crate::LocalListenerHandle {
314        Self::static_handler().borrow_mut().listen_local(transform)
315    }
316
317    fn forward_local<X: 'static, F: (Fn(Self::Output) -> X) + 'static>(
318        sender: relm4::Sender<X>,
319        transform: F,
320    ) -> crate::LocalListenerHandle {
321        Self::static_handler()
322            .borrow_mut()
323            .forward_local(sender, transform)
324    }
325
326    fn filtered_forward_local<X: 'static, F: (Fn(Self::Output) -> Option<X>) + 'static>(
327        sender: relm4::Sender<X>,
328        transform: F,
329    ) -> crate::LocalListenerHandle {
330        Self::static_handler()
331            .borrow_mut()
332            .filtered_forward_local(sender, transform)
333    }
334}
335
336/// Static handler for a Service
337///
338/// This allows global (multi thread) access to a service
339pub trait StaticServiceManager
340where
341    Self: Service,
342{
343    fn static_handler() -> Arc<Mutex<crate::ServiceManager<Self>>>;
344
345    fn init(init: Self::Init) {
346        match Self::static_handler().lock() {
347            Ok(mut handler) => handler.init = init,
348            Err(e) => azalea_log::warning!(Self, "Failed to lock service handler: {}", e),
349        }
350    }
351
352    fn start() {
353        match Self::static_handler().lock() {
354            Ok(mut handler) => handler.start(),
355            Err(e) => azalea_log::warning!(Self, "Failed to lock service handler: {}", e),
356        }
357    }
358
359    fn stop() {
360        match Self::static_handler().lock() {
361            Ok(mut handler) => handler.stop(),
362            Err(e) => azalea_log::warning!(Self, "Failed to lock service handler: {}", e),
363        }
364    }
365
366    fn status() -> crate::Status {
367        match Self::static_handler().lock() {
368            Ok(handler) => handler.status(),
369            Err(e) => {
370                azalea_log::warning!(Self, "Failed to lock service handler: {}", e);
371                crate::Status::Stopped
372            }
373        }
374    }
375
376    fn send(message: Self::Input) {
377        match Self::static_handler().lock() {
378            Ok(mut handler) => handler.send(message),
379            Err(e) => azalea_log::warning!(Self, "Failed to lock service handler: {}", e),
380        }
381    }
382
383    fn listen<F: (Fn(Self::Output) -> bool) + Send + 'static>(
384        transform: F,
385    ) -> crate::ListenerHandle {
386        match Self::static_handler().lock() {
387            Ok(mut handler) => handler.listen(transform),
388            Err(e) => azalea_log::error!(Self, "Failed to lock service handler: {}", e),
389        }
390    }
391
392    fn forward<X: Send + 'static, F: (Fn(Self::Output) -> X) + Send + 'static>(
393        sender: relm4::Sender<X>,
394        transform: F,
395    ) -> crate::ListenerHandle {
396        match Self::static_handler().lock() {
397            Ok(mut handler) => handler.forward(sender, transform),
398            Err(e) => azalea_log::error!(Self, "Failed to lock service handler: {}", e),
399        }
400    }
401
402    fn filtered_forward<X: Send + 'static, F: (Fn(Self::Output) -> Option<X>) + Send + 'static>(
403        sender: relm4::Sender<X>,
404        transform: F,
405    ) -> crate::ListenerHandle {
406        match Self::static_handler().lock() {
407            Ok(mut handler) => handler.filtered_forward(sender, transform),
408            Err(e) => azalea_log::error!(Self, "Failed to lock service handler: {}", e),
409        }
410    }
411
412    fn listen_local<F: (Fn(Self::Output) -> bool) + 'static>(
413        transform: F,
414    ) -> crate::LocalListenerHandle {
415        match Self::static_handler().lock() {
416            Ok(mut handler) => handler.listen_local(transform),
417            Err(e) => azalea_log::error!(Self, "Failed to lock service handler: {}", e),
418        }
419    }
420
421    fn forward_local<X: 'static, F: (Fn(Self::Output) -> X) + 'static>(
422        sender: relm4::Sender<X>,
423        transform: F,
424    ) -> crate::LocalListenerHandle {
425        match Self::static_handler().lock() {
426            Ok(mut handler) => handler.forward_local(sender, transform),
427            Err(e) => azalea_log::error!(Self, "Failed to lock service handler: {}", e),
428        }
429    }
430
431    fn filtered_forward_local<X: 'static, F: (Fn(Self::Output) -> Option<X>) + 'static>(
432        sender: relm4::Sender<X>,
433        transform: F,
434    ) -> crate::LocalListenerHandle {
435        match Self::static_handler().lock() {
436            Ok(mut handler) => handler.filtered_forward_local(sender, transform),
437            Err(e) => azalea_log::error!(Self, "Failed to lock service handler: {}", e),
438        }
439    }
440}
441
442#[macro_export]
443macro_rules! impl_local_static_handler {
444    ($service:ty) => {
445        impl $crate::LocalStaticServiceManager for $service {
446            fn static_handler() -> std::rc::Rc<std::cell::RefCell<$crate::ServiceManager<Self>>> {
447                use std::{cell::RefCell, rc::Rc, sync::LazyLock};
448
449                thread_local! {
450                    static HANDLER: LazyLock<Rc<RefCell<$crate::ServiceManager<$service>>>> = LazyLock::new(|| {
451                        azalea_log::debug!($service, "Service initialized");
452                        Rc::new(RefCell::new(<$service as $crate::Service>::handler(
453                            Default::default(),
454                        )))
455                    });
456                }
457
458                HANDLER.with(|handler| {
459                    (*handler).clone()
460                })
461            }
462        }
463    }
464}
465
466#[macro_export]
467macro_rules! impl_static_handler {
468    ($service:ty) => {
469        impl $crate::StaticServiceManager for Service {
470            fn static_handler() -> std::sync::Arc<std::sync::Mutex<$crate::ServiceManager<Self>>> {
471                use std::sync::{Arc, LazyLock, Mutex};
472
473                static HANDLER: LazyLock<Arc<Mutex<$crate::ServiceManager<$service>>>> =
474                    LazyLock::new(|| {
475                        azalea_log::debug!($service, "Service initialized");
476
477                        Arc::new(Mutex::new(<$service as $crate::Service>::handler(
478                            Default::default(),
479                        )))
480                    });
481
482                HANDLER.clone()
483            }
484        }
485    };
486}