azalea_shell/service/dbus/mpris/
mod.rs

1use std::collections::HashMap;
2
3use azalea_service::{ListenerHandle, StaticServiceManager};
4use futures_lite::stream::StreamExt;
5use tokio::sync::{broadcast, oneshot};
6pub use zbus_names::OwnedBusName;
7
8pub mod proxy;
9use proxy::{Metadata, PlaybackRate, PlaybackStatus, PlayerProxy};
10
11#[derive(azalea_derive::StaticServiceManager)]
12pub struct Service {
13    connection: zbus::Connection,
14    players: HashMap<OwnedBusName, PlayerProxy<'static>>,
15    _listener_handle: ListenerHandle,
16}
17
18#[derive(Default, Clone)]
19pub struct Init {
20    pub dbus_connection: Option<zbus::Connection>,
21}
22
23#[derive(Clone, Debug)]
24pub enum Action {
25    PlayPause(OwnedBusName),
26    Previous(OwnedBusName),
27    Next(OwnedBusName),
28}
29
30#[derive(Clone, Debug)]
31pub enum Input {
32    ObjectCreated(OwnedBusName),
33    ObjectDeleted(OwnedBusName),
34    UpdatePositionAndRate(OwnedBusName),
35    UpdateMetadata(OwnedBusName),
36    Action(Action),
37}
38
39#[derive(Clone, Debug)]
40pub enum Event {
41    Volume(f64),
42    Position(i64),
43    Metadata(Metadata),
44    PlaybackStatus(PlaybackStatus),
45    PlaybackRate(PlaybackRate),
46}
47
48#[derive(Clone, Debug)]
49pub struct Output {
50    pub name: OwnedBusName,
51    pub event: Event,
52}
53
54impl azalea_service::Service for Service {
55    type Init = Init;
56    type Input = Input;
57    type Event = ();
58    type Output = Output;
59    const DISABLE_EVENTS: bool = true;
60
61    fn handler(init: Self::Init) -> azalea_service::ServiceManager<Self> {
62        azalea_service::ServiceManager::new(init, 8, 8)
63    }
64
65    async fn new(
66        init: Self::Init,
67        input_sender: flume::Sender<Self::Input>,
68        output_sender: broadcast::Sender<Self::Output>,
69    ) -> Self {
70        let connection = init
71            .dbus_connection
72            .unwrap_or(zbus::Connection::session().await.unwrap());
73
74        let listener_handle =
75            super::discovery::Service::filtered_forward(input_sender.into(), |output| {
76                use super::discovery::Output;
77
78                match output {
79                    Output::ObjectCreated(owned_bus_name) => {
80                        if owned_bus_name.contains("org.mpris.MediaPlayer2") {
81                            return Some(Input::ObjectCreated(owned_bus_name));
82                        }
83                    }
84                    Output::ObjectDeleted(owned_bus_name) => {
85                        if owned_bus_name.contains("org.mpris.MediaPlayer2") {
86                            return Some(Input::ObjectDeleted(owned_bus_name));
87                        }
88                    }
89                };
90
91                None
92            });
93
94        let mut service = Self {
95            _listener_handle: listener_handle,
96            connection,
97            players: Default::default(),
98        };
99
100        let (tx, rx) = oneshot::channel();
101        super::discovery::Service::send(super::discovery::Input::QueryObjects(tx));
102        match rx.await {
103            Ok(names) => {
104                for name in names {
105                    if !name.contains("org.mpris.MediaPlayer2") {
106                        continue;
107                    }
108                    service
109                        .message(Input::ObjectCreated(name), &output_sender)
110                        .await;
111                }
112            }
113            Err(e) => azalea_log::debug!("Failed to send: {e}"),
114        }
115
116        service
117    }
118
119    async fn message(
120        &mut self,
121        input: Self::Input,
122        output_sender: &broadcast::Sender<Self::Output>,
123    ) {
124        match input {
125            Input::ObjectCreated(bus_name) => {
126                if self.players.contains_key(&bus_name) {
127                    return;
128                }
129                azalea_log::debug!(Self, "Object created: {}", bus_name);
130                let proxy = PlayerProxy::new(&self.connection, bus_name.clone())
131                    .await
132                    .unwrap();
133                let sender = output_sender.clone();
134                self.players.insert(bus_name.clone(), proxy.clone());
135                // TODO: Save handler and abort it when player disconnects (use ListenerHandle?)
136                relm4::spawn(async move {
137                    listen_to_player(bus_name, proxy, &sender).await;
138                });
139            }
140            Input::ObjectDeleted(bus_name) => {
141                azalea_log::debug!(Self, "Object deleted: {}", bus_name);
142                self.players.remove(&bus_name);
143            }
144            Input::UpdatePositionAndRate(bus_name) => {
145                let Some(player) = self.players.get(&bus_name) else {
146                    return;
147                };
148                drop(output_sender.send(Output {
149                    name: bus_name.clone(),
150                    event: Event::Position(player.position().await.unwrap_or(0)),
151                }));
152                drop(output_sender.send(Output {
153                    name: bus_name.clone(),
154                    event: Event::PlaybackRate(player.rate().await.unwrap_or(1.)),
155                }));
156            }
157            Input::UpdateMetadata(bus_name) => {
158                let Some(player) = self.players.get(&bus_name) else {
159                    return;
160                };
161                let Ok(metadata) = player.metadata().await else {
162                    return;
163                };
164                azalea_log::debug!(
165                    Self,
166                    "Metadata changed for object {}. {:#?}",
167                    bus_name,
168                    metadata
169                );
170                drop(output_sender.send(Output {
171                    name: bus_name,
172                    event: Event::Metadata(metadata),
173                }));
174            }
175            Input::Action(action) => {
176                azalea_log::debug!(Self, "Triggered action: {:?}", action);
177                // TODO: return anyhow error
178                match action {
179                    Action::PlayPause(bus_name) => {
180                        drop(
181                            self.players
182                                .get(&bus_name)
183                                .map(|p| p.play_pause())
184                                .unwrap()
185                                .await,
186                        );
187                    }
188                    Action::Previous(bus_name) => {
189                        drop(
190                            self.players
191                                .get(&bus_name)
192                                .map(|p| p.previous())
193                                .unwrap()
194                                .await,
195                        );
196                    }
197                    Action::Next(bus_name) => {
198                        drop(self.players.get(&bus_name).map(|p| p.next()).unwrap().await);
199                    }
200                }
201            }
202        }
203    }
204}
205
206async fn listen_to_player<'a>(
207    name: OwnedBusName,
208    player: PlayerProxy<'a>,
209    output_sender: &broadcast::Sender<<Service as azalea_service::Service>::Output>,
210) -> <Service as azalea_service::Service>::Event {
211    let mut volume = player.receive_volume_changed().await;
212    let mut metadata = player.receive_metadata_changed().await;
213    let mut playback_status = player.receive_playback_status_changed().await;
214    let mut playback_rate = player.receive_rate_changed().await;
215    let mut seeked = player.receive_seeked().await.unwrap();
216
217    loop {
218        tokio::select! {
219            Some(prop) = volume.next() => {
220                let Ok(value) = prop.get().await else { continue; };
221                azalea_log::debug!(Service, "Volume changed for object {}: {}", name, value);
222                drop(output_sender.send(Output {
223                    name: name.clone(),
224                    event: Event::Volume(value),
225                }));
226            },
227            Some(prop) = metadata.next() => {
228                let Ok(value) = prop.get().await else { continue; };
229                azalea_log::debug!(Service, "Metadata changed for object {}: {:#?}", name, value);
230                drop(output_sender.send(Output {
231                    name: name.clone(),
232                    event: Event::Metadata(value),
233                }));
234            },
235            Some(prop) = playback_status.next() => {
236                let Ok(value) = prop.get().await else { continue; };
237                azalea_log::debug!(Service, "PlaybackStatus changed for object {}: {:#?}", name, value);
238                drop(output_sender.send(Output {
239                    name: name.clone(),
240                    event: Event::PlaybackStatus(value),
241                }));
242                let Ok(position) = player.position().await else { continue; };
243                drop(output_sender.send(Output {
244                    name: name.clone(),
245                    event: Event::Position(position),
246                }));
247            },
248            Some(prop) = playback_rate.next() => {
249                let Ok(value) = prop.get().await else { continue; };
250                azalea_log::debug!(Service, "PlaybackRate changed for object {}: {:#?}", name, value);
251                drop(output_sender.send(Output {
252                    name: name.clone(),
253                    event: Event::PlaybackRate(value),
254                }));
255            },
256            Some(prop) = seeked.next() => {
257                let Ok(value) = prop.args() else { continue; };
258                azalea_log::debug!(Service, "Seeked position for object {}: {:#?}", name, value.position);
259                drop(output_sender.send(Output {
260                    name: name.clone(),
261                    event: Event::Position(value.position),
262                }));
263            },
264            else => continue
265        }
266    }
267}