azalea_shell/service/dbus/
discovery.rs

1use std::collections::HashSet;
2
3use futures_lite::stream::StreamExt;
4use tokio::sync::broadcast;
5use zbus::fdo::{DBusProxy, NameOwnerChangedStream};
6use zbus_names::OwnedBusName;
7
8/// DBus Discovery Service
9///
10/// Listens for newly created or destroyed DBus objects
11#[derive(azalea_derive::StaticServiceManager)]
12pub struct Service {
13    stream: NameOwnerChangedStream,
14    objects: HashSet<OwnedBusName>,
15}
16
17#[derive(Debug)]
18pub enum Input {
19    QueryObjects(tokio::sync::oneshot::Sender<Vec<OwnedBusName>>),
20    QueryServiceExists(String, tokio::sync::oneshot::Sender<bool>),
21}
22
23#[derive(Clone, Debug)]
24pub enum Output {
25    ObjectCreated(OwnedBusName),
26    ObjectDeleted(OwnedBusName),
27}
28
29impl azalea_service::Service for Service {
30    type Init = Option<zbus::Connection>;
31    type Input = Input;
32    type Event = Output;
33    type Output = Output;
34
35    fn handler(init: Self::Init) -> azalea_service::ServiceManager<Self> {
36        azalea_service::ServiceManager::new(init, 4, 8)
37    }
38
39    async fn new(
40        connection: Self::Init,
41        _input_sender: flume::Sender<Self::Input>,
42        _output_sender: broadcast::Sender<Self::Output>,
43    ) -> Self {
44        let connection = connection.unwrap_or(zbus::Connection::session().await.unwrap());
45        let proxy = DBusProxy::new(&connection).await.unwrap();
46        let mut objects: HashSet<OwnedBusName> = Default::default();
47
48        for name in proxy.list_names().await.unwrap_or_default() {
49            objects.insert(name);
50        }
51
52        let stream = proxy.receive_name_owner_changed().await.unwrap();
53
54        Self { stream, objects }
55    }
56
57    async fn message(
58        &mut self,
59        input: Self::Input,
60        _output_sender: &broadcast::Sender<Self::Output>,
61    ) {
62        match input {
63            Input::QueryObjects(sender) => {
64                let names = self.objects.clone().into_iter().collect();
65                drop(sender.send(names));
66            }
67            Input::QueryServiceExists(service, sender) => {
68                let mut found = false;
69
70                for name in &self.objects {
71                    if name.contains(&service) {
72                        found = true;
73                        break;
74                    }
75                }
76
77                let _ = sender.send(found);
78            }
79        }
80    }
81
82    async fn event_generator(&mut self) -> Self::Event {
83        loop {
84            if let Some(msg) = self.stream.next().await {
85                let Ok(args) = msg.args() else {
86                    continue;
87                };
88                if args.new_owner().is_some() {
89                    return Output::ObjectCreated(args.name.into());
90                } else {
91                    return Output::ObjectDeleted(args.name.into());
92                }
93            }
94        }
95    }
96
97    async fn event_handler(
98        &mut self,
99        event: Self::Event,
100        output_sender: &broadcast::Sender<self::Output>,
101    ) -> azalea_service::Result<()> {
102        output_sender.send(event)?;
103        Ok(())
104    }
105}