azalea_shell/service/dbus/network_manager/
mod.rs1pub mod proxy;
2
3use std::collections::HashMap;
4
5use futures_lite::StreamExt;
6use proxy::{NMConnectivityState, NMState, NetworkManagerProxy, NetworkManagerSettingsProxy};
7use tokio::sync::broadcast;
8
9use zbus::{
10 proxy::{PropertyChanged, PropertyStream},
11 zvariant::OwnedObjectPath,
12};
13
14use crate::service::dbus::network_manager::proxy::NetworkManagerConnectionActiveProxy;
15
16#[derive(azalea_derive::StaticServiceManager)]
17pub struct Service {
18 proxy: NetworkManagerProxy<'static>,
19 settings_proxy: NetworkManagerSettingsProxy<'static>,
20 streams: Streams,
21}
22
23pub struct Streams {
24 enable: PropertyStream<'static, bool>,
25 state: PropertyStream<'static, NMState>,
26 connectivity: PropertyStream<'static, NMConnectivityState>,
27}
28
29#[derive(Default, Clone)]
30pub struct Init {
31 pub dbus_connection: Option<zbus::Connection>,
32}
33
34#[derive(Clone, Debug)]
35pub enum Input {
36 GetDevices,
37 ListConnections,
38 Update,
39 Enable(bool),
40 DeactivateConnection {
41 active_connection: OwnedObjectPath,
42 },
43 ActivateConnection {
44 connection: Option<OwnedObjectPath>,
45 device: Option<OwnedObjectPath>,
46 specific_object: Option<OwnedObjectPath>,
47 },
48}
49
50pub enum Event {
51 NetworkingEnabledChanged(PropertyChanged<'static, bool>),
52 StateChanged(PropertyChanged<'static, NMState>),
53 ConnectivityChanged(PropertyChanged<'static, NMConnectivityState>),
54}
55
56#[derive(Clone, Debug)]
57pub enum Output {
58 Connections(Vec<(OwnedObjectPath, Option<OwnedObjectPath>)>),
60 Devices(Vec<OwnedObjectPath>),
61 NetworkingEnabledChanged(bool),
62 StateChanged(NMState),
63 ConnectivityChanged(NMConnectivityState),
64}
65
66impl azalea_service::Service for Service {
67 type Init = Init;
68 type Input = Input;
69 type Event = Event;
70 type Output = Output;
71
72 fn handler(init: Self::Init) -> azalea_service::ServiceManager<Self> {
73 azalea_service::ServiceManager::new(init, 4, 8)
74 }
75
76 async fn new(
77 init: Self::Init,
78 _input: flume::Sender<Self::Input>,
79 _: broadcast::Sender<Self::Output>,
80 ) -> Self {
81 let connection = init
82 .dbus_connection
83 .unwrap_or(zbus::Connection::system().await.unwrap());
84 let proxy = NetworkManagerProxy::new(&connection).await.unwrap();
85 let settings_proxy = NetworkManagerSettingsProxy::new(&connection).await.unwrap();
86
87 azalea_log::debug!(
88 Self,
89 "Version: {}",
90 proxy.version().await.unwrap_or_default()
91 );
92
93 Self {
94 streams: Streams {
95 enable: proxy.receive_networking_enabled_changed().await,
96 state: proxy.receive_state_changed().await,
97 connectivity: proxy.receive_connectivity_changed().await,
98 },
99 settings_proxy,
100 proxy,
101 }
102 }
103
104 async fn message(
105 &mut self,
106 input: Self::Input,
107 output_sender: &broadcast::Sender<Self::Output>,
108 ) {
109 match input {
110 Input::DeactivateConnection { active_connection } => {
111 azalea_log::debug!("Disconnecting: {:?}", active_connection);
112 drop(self.proxy.deactivate_connection(active_connection).await);
113 self.update_active_connections(output_sender).await;
114 }
115 Input::ListConnections => {
116 self.update_active_connections(output_sender).await;
117 }
118 Input::GetDevices => {
119 drop(output_sender.send(Output::Devices(
120 self.proxy.get_devices().await.unwrap_or_default(),
121 )));
122 }
123 Input::Update => {
124 drop(output_sender.send(Output::StateChanged(self.proxy.state().await.unwrap())));
125 drop(output_sender.send(Output::ConnectivityChanged(
126 self.proxy.connectivity().await.unwrap(),
127 )));
128 drop(output_sender.send(Output::Devices(
129 self.proxy.get_devices().await.unwrap_or_default(),
130 )));
131 self.update_active_connections(output_sender).await;
132 }
133 Input::Enable(on) => {
134 if let Err(e) = self.proxy.enable(on).await {
135 azalea_log::warning!("Failed to (dis)enable network: {}", e)
136 }
137 }
138 Input::ActivateConnection {
139 connection,
140 device,
141 specific_object,
142 } => {
143 let root_object_path = OwnedObjectPath::try_from("/").unwrap();
144
145 drop(
146 self.proxy
147 .activate_connection(
148 connection.unwrap_or(root_object_path.clone()),
149 device.unwrap_or(root_object_path.clone()),
150 specific_object.unwrap_or(root_object_path),
151 )
152 .await,
153 );
154
155 self.update_active_connections(output_sender).await;
156 }
157 }
158 }
159
160 async fn event_generator(&mut self) -> Self::Event {
161 loop {
162 tokio::select! {
163 Some(prop) = self.streams.enable.next() =>
164 return Event::NetworkingEnabledChanged(prop),
165 Some(prop) = self.streams.state.next() =>
166 return Event::StateChanged(prop),
167 Some(prop) = self.streams.connectivity.next() =>
168 return Event::ConnectivityChanged(prop),
169 else => continue,
170 }
171 }
172 }
173
174 async fn event_handler(
175 &mut self,
176 event: Self::Event,
177 output_sender: &tokio::sync::broadcast::Sender<Self::Output>,
178 ) -> azalea_service::Result<()> {
179 let output = match event {
180 Event::StateChanged(prop) => Output::StateChanged(prop.get().await?),
181 Event::ConnectivityChanged(prop) => Output::ConnectivityChanged(prop.get().await?),
182 Event::NetworkingEnabledChanged(prop) => {
183 Output::NetworkingEnabledChanged(prop.get().await?)
184 }
185 };
186 output_sender.send(output)?;
187 Ok(())
188 }
189}
190
191impl Service {
192 async fn get_active_connections(&self) -> HashMap<OwnedObjectPath, OwnedObjectPath> {
193 futures_lite::stream::iter(self.proxy.active_connections().await.unwrap_or_default())
194 .then(|v| async move {
195 let conn = zbus::Connection::system().await.unwrap();
196 let ca = NetworkManagerConnectionActiveProxy::new(&conn, v.clone())
197 .await
198 .unwrap();
199 (ca.connection().await.unwrap(), v)
200 })
201 .collect::<HashMap<OwnedObjectPath, OwnedObjectPath>>()
202 .await
203 }
204
205 async fn update_active_connections(&self, output_sender: &broadcast::Sender<Output>) {
206 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
207 let active_connections = self.get_active_connections().await;
208
209 drop(
210 output_sender.send(Output::Connections(
211 self.settings_proxy
212 .list_connections()
213 .await
214 .unwrap_or_default()
215 .into_iter()
216 .map(|v| (v.clone(), active_connections.get(&v).cloned()))
217 .collect(),
218 )),
219 );
220 }
221}