1use std::{
2 cell::RefCell,
3 rc::Rc,
4 sync::{Arc, Mutex},
5};
6
7use azalea_log as log;
8use tokio::sync::broadcast;
9
10use super::{Service, Status};
11
12pub struct ListenerHandle(
13 Arc<broadcast::Sender<()>>,
14 Option<tokio::task::JoinHandle<()>>,
15);
16pub struct LocalListenerHandle(Arc<broadcast::Sender<()>>, Option<glib::JoinHandle<()>>);
17
18impl ListenerHandle {
19 pub async fn join(mut self) {
20 if let Some(handle) = self.1.take() {
21 drop(handle.await);
22 }
23 }
24}
25
26impl LocalListenerHandle {
27 pub async fn join(mut self) {
28 if let Some(handle) = self.1.take() {
29 drop(handle.await);
30 }
31 }
32}
33
34impl Drop for ListenerHandle {
35 fn drop(&mut self) {
36 let cancellation_sender = &self.0;
37
38 if Arc::strong_count(&cancellation_sender) == 2 {
39 drop(cancellation_sender.send(()));
40 }
41
42 if let Some(handle) = &self.1 {
43 handle.abort();
44 }
45 }
46}
47
48impl Drop for LocalListenerHandle {
49 fn drop(&mut self) {
50 let cancellation_sender = &self.0;
51
52 if Arc::strong_count(&cancellation_sender) == 2 {
53 drop(cancellation_sender.send(()));
54 }
55
56 if let Some(handle) = &self.1 {
57 handle.abort();
58 }
59 }
60}
61
62#[derive(Clone)]
64pub struct ServiceManager<S>
65where
66 S: Service,
67{
68 input: flume::Sender<S::Input>,
69 output: broadcast::Sender<S::Output>,
70 cancellation: Arc<broadcast::Sender<()>>,
71 init: S::Init,
72 status: Arc<Mutex<Option<flume::Receiver<S::Input>>>>,
73}
74
75impl<S> ServiceManager<S>
76where
77 S: Service + 'static,
78{
79 pub fn new(init: S::Init, input_capacity: usize, output_capacity: usize) -> Self {
80 let (input_sender, input_receiver) = flume::bounded(input_capacity);
81 let (output_sender, _) = broadcast::channel(output_capacity);
82 let (cancellation_sender, _) = broadcast::channel(1);
83
84 Self {
85 input: input_sender,
86 output: output_sender,
87 init,
88 cancellation: Arc::new(cancellation_sender),
89 status: Arc::new(Mutex::new(Some(input_receiver))),
90 }
91 }
92
93 fn _start(&mut self, local: bool) {
94 let Some(input) = self.status.lock().unwrap().take() else {
95 return;
96 };
97
98 let input_sender = self.input.clone();
99 let output_sender = self.output.clone();
100 let init = self.init.clone();
101 let mut cancellation_receiver = self.cancellation.subscribe();
102 let status = self.status.clone();
103
104 let task = async move {
105 let mut service = S::new(init, input_sender, output_sender.clone()).await;
106 let thread_id = std::thread::current().id();
107 log::info!(S, "Service started at thread: {:?}", thread_id);
108
109 loop {
110 tokio::select! {
111 event = service.event_generator(), if !S::DISABLE_EVENTS => {
112 match service.event_handler(event, &output_sender).await {
113 Ok(_) => continue,
114 Err(e) => log::debug!(S, "Service iteration failed {}", e),
115 }
116 },
117 Ok(msg) = input.recv_async() => service.message(msg, &output_sender).await,
118 _ = cancellation_receiver.recv() => break,
119 else => continue,
120 };
121 }
122
123 if let Ok(mut status) = status.lock() {
124 *status = Some(input);
125 };
126
127 log::info!(S, "Service stopped");
128 };
129
130 if local {
131 relm4::spawn(task);
132 } else {
133 relm4::spawn_blocking(move || {
134 tokio::runtime::Handle::current().block_on(task);
135 });
136 }
137 }
138
139 pub fn start(&mut self) {
140 if S::LOCAL {
141 self._start(true);
142 } else {
143 self._start(false);
144 }
145 }
146
147 pub fn stop(&mut self) {
148 if self.status.lock().unwrap().is_none() {
149 drop(self.cancellation.send(()));
150 }
151 }
152
153 pub fn status(&self) -> Status {
154 if self.status.lock().unwrap().is_none() {
155 Status::Started
156 } else {
157 Status::Stopped
158 }
159 }
160
161 pub fn send(&mut self, message: S::Input) {
162 drop(self.input.send(message));
163 }
164
165 pub fn listen<F: (Fn(S::Output) -> bool) + Send + 'static>(
166 &mut self,
167 transform: F,
168 ) -> ListenerHandle {
169 self.start();
170
171 let mut output = self.output.subscribe();
172
173 ListenerHandle(
174 self.cancellation.clone(),
175 Some(relm4::spawn(async move {
176 use tokio::sync::broadcast::error::RecvError;
177 loop {
178 if match output.recv().await {
179 Err(RecvError::Closed) => false,
180 Err(RecvError::Lagged(_)) => true,
181 Ok(event) => transform(event),
182 } {
183 continue;
184 } else {
185 break;
186 }
187 }
188 })),
189 )
190 }
191
192 pub fn forward<X: Send + 'static, F: (Fn(S::Output) -> X) + Send + 'static>(
193 &mut self,
194 sender: relm4::Sender<X>,
195 transform: F,
196 ) -> ListenerHandle {
197 self.listen(move |event| sender.send(transform(event)).is_ok())
198 }
199
200 pub fn filtered_forward<X: Send + 'static, F: (Fn(S::Output) -> Option<X>) + Send + 'static>(
201 &mut self,
202 sender: relm4::Sender<X>,
203 transform: F,
204 ) -> ListenerHandle {
205 self.listen(move |event| match transform(event) {
206 Some(data) => sender.send(data).is_ok(),
207 None => true,
208 })
209 }
210
211 pub fn listen_local<F: (Fn(S::Output) -> bool) + 'static>(
212 &mut self,
213 transform: F,
214 ) -> LocalListenerHandle {
215 self.start();
216
217 let mut output = self.output.subscribe();
218
219 LocalListenerHandle(
220 self.cancellation.clone(),
221 Some(relm4::spawn_local(async move {
222 use tokio::sync::broadcast::error::RecvError;
223 loop {
224 if match output.recv().await {
225 Err(RecvError::Closed) => false,
226 Err(RecvError::Lagged(_)) => true,
227 Ok(event) => transform(event),
228 } {
229 continue;
230 } else {
231 break;
232 }
233 }
234 })),
235 )
236 }
237
238 pub fn forward_local<X: 'static, F: (Fn(S::Output) -> X) + 'static>(
239 &mut self,
240 sender: relm4::Sender<X>,
241 transform: F,
242 ) -> LocalListenerHandle {
243 self.listen_local(move |event| sender.send(transform(event)).is_ok())
244 }
245
246 pub fn filtered_forward_local<X: 'static, F: (Fn(S::Output) -> Option<X>) + 'static>(
247 &mut self,
248 sender: relm4::Sender<X>,
249 transform: F,
250 ) -> LocalListenerHandle {
251 self.listen_local(move |event| match transform(event) {
252 Some(data) => sender.send(data).is_ok(),
253 None => true,
254 })
255 }
256}
257
258pub trait LocalStaticServiceManager
262where
263 Self: Service,
264{
265 fn static_handler() -> Rc<RefCell<crate::ServiceManager<Self>>>;
266
267 fn init(init: Self::Init) {
268 Self::static_handler().borrow_mut().init = init
269 }
270
271 fn start() {
272 Self::static_handler().borrow_mut().start()
273 }
274
275 fn stop() {
276 Self::static_handler().borrow_mut().stop()
277 }
278
279 fn status() -> crate::Status {
280 Self::static_handler().borrow().status()
281 }
282
283 fn send(message: Self::Input) {
284 Self::static_handler().borrow_mut().send(message)
285 }
286
287 fn listen<F: (Fn(Self::Output) -> bool) + Send + 'static>(
288 transform: F,
289 ) -> crate::ListenerHandle {
290 Self::static_handler().borrow_mut().listen(transform)
291 }
292
293 fn forward<X: Send + 'static, F: (Fn(Self::Output) -> X) + Send + 'static>(
294 sender: relm4::Sender<X>,
295 transform: F,
296 ) -> crate::ListenerHandle {
297 Self::static_handler()
298 .borrow_mut()
299 .forward(sender, transform)
300 }
301
302 fn filtered_forward<X: Send + 'static, F: (Fn(Self::Output) -> Option<X>) + Send + 'static>(
303 sender: relm4::Sender<X>,
304 transform: F,
305 ) -> crate::ListenerHandle {
306 Self::static_handler()
307 .borrow_mut()
308 .filtered_forward(sender, transform)
309 }
310
311 fn listen_local<F: (Fn(Self::Output) -> bool) + 'static>(
312 transform: F,
313 ) -> crate::LocalListenerHandle {
314 Self::static_handler().borrow_mut().listen_local(transform)
315 }
316
317 fn forward_local<X: 'static, F: (Fn(Self::Output) -> X) + 'static>(
318 sender: relm4::Sender<X>,
319 transform: F,
320 ) -> crate::LocalListenerHandle {
321 Self::static_handler()
322 .borrow_mut()
323 .forward_local(sender, transform)
324 }
325
326 fn filtered_forward_local<X: 'static, F: (Fn(Self::Output) -> Option<X>) + 'static>(
327 sender: relm4::Sender<X>,
328 transform: F,
329 ) -> crate::LocalListenerHandle {
330 Self::static_handler()
331 .borrow_mut()
332 .filtered_forward_local(sender, transform)
333 }
334}
335
336pub trait StaticServiceManager
340where
341 Self: Service,
342{
343 fn static_handler() -> Arc<Mutex<crate::ServiceManager<Self>>>;
344
345 fn init(init: Self::Init) {
346 match Self::static_handler().lock() {
347 Ok(mut handler) => handler.init = init,
348 Err(e) => azalea_log::warning!(Self, "Failed to lock service handler: {}", e),
349 }
350 }
351
352 fn start() {
353 match Self::static_handler().lock() {
354 Ok(mut handler) => handler.start(),
355 Err(e) => azalea_log::warning!(Self, "Failed to lock service handler: {}", e),
356 }
357 }
358
359 fn stop() {
360 match Self::static_handler().lock() {
361 Ok(mut handler) => handler.stop(),
362 Err(e) => azalea_log::warning!(Self, "Failed to lock service handler: {}", e),
363 }
364 }
365
366 fn status() -> crate::Status {
367 match Self::static_handler().lock() {
368 Ok(handler) => handler.status(),
369 Err(e) => {
370 azalea_log::warning!(Self, "Failed to lock service handler: {}", e);
371 crate::Status::Stopped
372 }
373 }
374 }
375
376 fn send(message: Self::Input) {
377 match Self::static_handler().lock() {
378 Ok(mut handler) => handler.send(message),
379 Err(e) => azalea_log::warning!(Self, "Failed to lock service handler: {}", e),
380 }
381 }
382
383 fn listen<F: (Fn(Self::Output) -> bool) + Send + 'static>(
384 transform: F,
385 ) -> crate::ListenerHandle {
386 match Self::static_handler().lock() {
387 Ok(mut handler) => handler.listen(transform),
388 Err(e) => azalea_log::error!(Self, "Failed to lock service handler: {}", e),
389 }
390 }
391
392 fn forward<X: Send + 'static, F: (Fn(Self::Output) -> X) + Send + 'static>(
393 sender: relm4::Sender<X>,
394 transform: F,
395 ) -> crate::ListenerHandle {
396 match Self::static_handler().lock() {
397 Ok(mut handler) => handler.forward(sender, transform),
398 Err(e) => azalea_log::error!(Self, "Failed to lock service handler: {}", e),
399 }
400 }
401
402 fn filtered_forward<X: Send + 'static, F: (Fn(Self::Output) -> Option<X>) + Send + 'static>(
403 sender: relm4::Sender<X>,
404 transform: F,
405 ) -> crate::ListenerHandle {
406 match Self::static_handler().lock() {
407 Ok(mut handler) => handler.filtered_forward(sender, transform),
408 Err(e) => azalea_log::error!(Self, "Failed to lock service handler: {}", e),
409 }
410 }
411
412 fn listen_local<F: (Fn(Self::Output) -> bool) + 'static>(
413 transform: F,
414 ) -> crate::LocalListenerHandle {
415 match Self::static_handler().lock() {
416 Ok(mut handler) => handler.listen_local(transform),
417 Err(e) => azalea_log::error!(Self, "Failed to lock service handler: {}", e),
418 }
419 }
420
421 fn forward_local<X: 'static, F: (Fn(Self::Output) -> X) + 'static>(
422 sender: relm4::Sender<X>,
423 transform: F,
424 ) -> crate::LocalListenerHandle {
425 match Self::static_handler().lock() {
426 Ok(mut handler) => handler.forward_local(sender, transform),
427 Err(e) => azalea_log::error!(Self, "Failed to lock service handler: {}", e),
428 }
429 }
430
431 fn filtered_forward_local<X: 'static, F: (Fn(Self::Output) -> Option<X>) + 'static>(
432 sender: relm4::Sender<X>,
433 transform: F,
434 ) -> crate::LocalListenerHandle {
435 match Self::static_handler().lock() {
436 Ok(mut handler) => handler.filtered_forward_local(sender, transform),
437 Err(e) => azalea_log::error!(Self, "Failed to lock service handler: {}", e),
438 }
439 }
440}
441
442#[macro_export]
443macro_rules! impl_local_static_handler {
444 ($service:ty) => {
445 impl $crate::LocalStaticServiceManager for $service {
446 fn static_handler() -> std::rc::Rc<std::cell::RefCell<$crate::ServiceManager<Self>>> {
447 use std::{cell::RefCell, rc::Rc, sync::LazyLock};
448
449 thread_local! {
450 static HANDLER: LazyLock<Rc<RefCell<$crate::ServiceManager<$service>>>> = LazyLock::new(|| {
451 azalea_log::debug!($service, "Service initialized");
452 Rc::new(RefCell::new(<$service as $crate::Service>::handler(
453 Default::default(),
454 )))
455 });
456 }
457
458 HANDLER.with(|handler| {
459 (*handler).clone()
460 })
461 }
462 }
463 }
464}
465
466#[macro_export]
467macro_rules! impl_static_handler {
468 ($service:ty) => {
469 impl $crate::StaticServiceManager for Service {
470 fn static_handler() -> std::sync::Arc<std::sync::Mutex<$crate::ServiceManager<Self>>> {
471 use std::sync::{Arc, LazyLock, Mutex};
472
473 static HANDLER: LazyLock<Arc<Mutex<$crate::ServiceManager<$service>>>> =
474 LazyLock::new(|| {
475 azalea_log::debug!($service, "Service initialized");
476
477 Arc::new(Mutex::new(<$service as $crate::Service>::handler(
478 Default::default(),
479 )))
480 });
481
482 HANDLER.clone()
483 }
484 }
485 };
486}