azalea_shell/service/dbus/network_manager/
mod.rs

1pub mod proxy;
2
3use std::collections::HashMap;
4
5use futures_lite::StreamExt;
6use proxy::{NMConnectivityState, NMState, NetworkManagerProxy, NetworkManagerSettingsProxy};
7use tokio::sync::broadcast;
8
9use zbus::{
10    proxy::{PropertyChanged, PropertyStream},
11    zvariant::OwnedObjectPath,
12};
13
14use crate::service::dbus::network_manager::proxy::NetworkManagerConnectionActiveProxy;
15
16#[derive(azalea_derive::StaticServiceManager)]
17pub struct Service {
18    proxy: NetworkManagerProxy<'static>,
19    settings_proxy: NetworkManagerSettingsProxy<'static>,
20    streams: Streams,
21}
22
23pub struct Streams {
24    enable: PropertyStream<'static, bool>,
25    state: PropertyStream<'static, NMState>,
26    connectivity: PropertyStream<'static, NMConnectivityState>,
27}
28
29#[derive(Default, Clone)]
30pub struct Init {
31    pub dbus_connection: Option<zbus::Connection>,
32}
33
34#[derive(Clone, Debug)]
35pub enum Input {
36    GetDevices,
37    ListConnections,
38    Update,
39    Enable(bool),
40    DeactivateConnection {
41        active_connection: OwnedObjectPath,
42    },
43    ActivateConnection {
44        connection: Option<OwnedObjectPath>,
45        device: Option<OwnedObjectPath>,
46        specific_object: Option<OwnedObjectPath>,
47    },
48}
49
50pub enum Event {
51    NetworkingEnabledChanged(PropertyChanged<'static, bool>),
52    StateChanged(PropertyChanged<'static, NMState>),
53    ConnectivityChanged(PropertyChanged<'static, NMConnectivityState>),
54}
55
56#[derive(Clone, Debug)]
57pub enum Output {
58    /// If the connection is active, it'll send the Connection.Active object too
59    Connections(Vec<(OwnedObjectPath, Option<OwnedObjectPath>)>),
60    Devices(Vec<OwnedObjectPath>),
61    NetworkingEnabledChanged(bool),
62    StateChanged(NMState),
63    ConnectivityChanged(NMConnectivityState),
64}
65
66impl azalea_service::Service for Service {
67    type Init = Init;
68    type Input = Input;
69    type Event = Event;
70    type Output = Output;
71
72    fn handler(init: Self::Init) -> azalea_service::ServiceManager<Self> {
73        azalea_service::ServiceManager::new(init, 4, 8)
74    }
75
76    async fn new(
77        init: Self::Init,
78        _input: flume::Sender<Self::Input>,
79        _: broadcast::Sender<Self::Output>,
80    ) -> Self {
81        let connection = init
82            .dbus_connection
83            .unwrap_or(zbus::Connection::system().await.unwrap());
84        let proxy = NetworkManagerProxy::new(&connection).await.unwrap();
85        let settings_proxy = NetworkManagerSettingsProxy::new(&connection).await.unwrap();
86
87        azalea_log::debug!(
88            Self,
89            "Version: {}",
90            proxy.version().await.unwrap_or_default()
91        );
92
93        Self {
94            streams: Streams {
95                enable: proxy.receive_networking_enabled_changed().await,
96                state: proxy.receive_state_changed().await,
97                connectivity: proxy.receive_connectivity_changed().await,
98            },
99            settings_proxy,
100            proxy,
101        }
102    }
103
104    async fn message(
105        &mut self,
106        input: Self::Input,
107        output_sender: &broadcast::Sender<Self::Output>,
108    ) {
109        match input {
110            Input::DeactivateConnection { active_connection } => {
111                azalea_log::debug!("Disconnecting: {:?}", active_connection);
112                drop(self.proxy.deactivate_connection(active_connection).await);
113                self.update_active_connections(output_sender).await;
114            }
115            Input::ListConnections => {
116                self.update_active_connections(output_sender).await;
117            }
118            Input::GetDevices => {
119                drop(output_sender.send(Output::Devices(
120                    self.proxy.get_devices().await.unwrap_or_default(),
121                )));
122            }
123            Input::Update => {
124                drop(output_sender.send(Output::StateChanged(self.proxy.state().await.unwrap())));
125                drop(output_sender.send(Output::ConnectivityChanged(
126                    self.proxy.connectivity().await.unwrap(),
127                )));
128                drop(output_sender.send(Output::Devices(
129                    self.proxy.get_devices().await.unwrap_or_default(),
130                )));
131                self.update_active_connections(output_sender).await;
132            }
133            Input::Enable(on) => {
134                if let Err(e) = self.proxy.enable(on).await {
135                    azalea_log::warning!("Failed to (dis)enable network: {}", e)
136                }
137            }
138            Input::ActivateConnection {
139                connection,
140                device,
141                specific_object,
142            } => {
143                let root_object_path = OwnedObjectPath::try_from("/").unwrap();
144
145                drop(
146                    self.proxy
147                        .activate_connection(
148                            connection.unwrap_or(root_object_path.clone()),
149                            device.unwrap_or(root_object_path.clone()),
150                            specific_object.unwrap_or(root_object_path),
151                        )
152                        .await,
153                );
154
155                self.update_active_connections(output_sender).await;
156            }
157        }
158    }
159
160    async fn event_generator(&mut self) -> Self::Event {
161        loop {
162            tokio::select! {
163                Some(prop) = self.streams.enable.next() =>
164                    return Event::NetworkingEnabledChanged(prop),
165                Some(prop) = self.streams.state.next() =>
166                    return Event::StateChanged(prop),
167                Some(prop) = self.streams.connectivity.next() =>
168                    return Event::ConnectivityChanged(prop),
169                else => continue,
170            }
171        }
172    }
173
174    async fn event_handler(
175        &mut self,
176        event: Self::Event,
177        output_sender: &tokio::sync::broadcast::Sender<Self::Output>,
178    ) -> azalea_service::Result<()> {
179        let output = match event {
180            Event::StateChanged(prop) => Output::StateChanged(prop.get().await?),
181            Event::ConnectivityChanged(prop) => Output::ConnectivityChanged(prop.get().await?),
182            Event::NetworkingEnabledChanged(prop) => {
183                Output::NetworkingEnabledChanged(prop.get().await?)
184            }
185        };
186        output_sender.send(output)?;
187        Ok(())
188    }
189}
190
191impl Service {
192    async fn get_active_connections(&self) -> HashMap<OwnedObjectPath, OwnedObjectPath> {
193        futures_lite::stream::iter(self.proxy.active_connections().await.unwrap_or_default())
194            .then(|v| async move {
195                let conn = zbus::Connection::system().await.unwrap();
196                let ca = NetworkManagerConnectionActiveProxy::new(&conn, v.clone())
197                    .await
198                    .unwrap();
199                (ca.connection().await.unwrap(), v)
200            })
201            .collect::<HashMap<OwnedObjectPath, OwnedObjectPath>>()
202            .await
203    }
204
205    async fn update_active_connections(&self, output_sender: &broadcast::Sender<Output>) {
206        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
207        let active_connections = self.get_active_connections().await;
208
209        drop(
210            output_sender.send(Output::Connections(
211                self.settings_proxy
212                    .list_connections()
213                    .await
214                    .unwrap_or_default()
215                    .into_iter()
216                    .map(|v| (v.clone(), active_connections.get(&v).cloned()))
217                    .collect(),
218            )),
219        );
220    }
221}