1use std::{future::Future, marker::PhantomData};
5
6use rkyv::bytecheck::CheckBytes;
7use server::{
8 xous_ipc::{XousDeserializer, XousValidator},
9 CheckedPermissions, MessageAllowed,
10};
11use worker::{StreamWatch, WorkerHandle};
12
13use crate::{messages::SubscribeConnectionStatus, ConnectionStatus};
14
15#[derive(Clone)]
17pub struct QlStatus<P> {
18 watch: StreamWatch<ConnectionStatus>,
19 worker: WorkerHandle,
20 _phantom: PhantomData<fn() -> P>,
21}
22
23impl<P> QlStatus<P>
24where
25 P: CheckedPermissions + 'static,
26{
27 pub fn new(worker: WorkerHandle) -> Self
28 where
29 P: MessageAllowed<SubscribeConnectionStatus> + 'static,
30 {
31 let sub = worker.subscribe_scalar::<P, _>(SubscribeConnectionStatus);
32 let initial = ConnectionStatus { bt_connected: false, ql_paired: false, live: false };
33 let watch = worker.watch_stream(sub, initial);
34 Self { watch, worker, _phantom: Default::default() }
35 }
36
37 pub async fn ready(&self) {
39 self.watch.wait_until(|status| status.bt_connected && status.ql_paired && status.live).await
40 }
41
42 pub async fn bt_ready(&self) { self.watch.wait_until(|status| status.bt_connected).await }
44
45 pub fn is_connected(&self) -> bool {
47 let status = self.watch.borrow();
48 status.bt_connected && status.ql_paired
49 }
50
51 pub fn send_ql_archive<M>(&self, msg: M) -> impl Future<Output = M::Response>
53 where
54 P: server::MessageAllowed<M>,
55 M: server::BlockingArchive + Send + 'static,
56 M::Response: Send,
57 {
58 let this = self.clone();
59 async move {
60 this.ready().await;
61 this.worker.async_archive::<P, _>(msg).await
62 }
63 }
64
65 pub fn send_ql_archive_retry<M, T, E>(
67 &self,
68 msg: M,
69 mut error: impl FnMut(E) + Send + 'static,
70 ) -> impl Future<Output = T>
71 where
72 P: server::CheckedPermissions + server::MessageAllowed<M>,
73 M: server::BlockingArchive<Response = Result<T, E>> + Send + Clone + 'static,
74 M::Response: Send,
75 T: server::ArchiveCodec + Send + 'static,
76 <T as rkyv::Archive>::Archived:
77 rkyv::Deserialize<T, XousDeserializer> + for<'a> CheckBytes<XousValidator<'a>>,
78 E: server::ArchiveCodec + Send + 'static,
79 <E as rkyv::Archive>::Archived:
80 rkyv::Deserialize<E, XousDeserializer> + for<'a> CheckBytes<XousValidator<'a>>,
81 {
82 let this = self.clone();
83 async move {
84 loop {
85 this.ready().await;
86 match this.worker.async_archive::<P, _>(msg.clone()).await {
87 Ok(value) => {
88 return value;
89 }
90 Err(e) => {
91 error(e);
92 }
93 }
94 }
95 }
96 }
97
98 pub fn into_inner(self) -> StreamWatch<ConnectionStatus> { self.watch }
99}
100
101impl<P> std::ops::Deref for QlStatus<P> {
102 type Target = StreamWatch<ConnectionStatus>;
103
104 fn deref(&self) -> &Self::Target { &self.watch }
105}
106
107impl<P> std::ops::DerefMut for QlStatus<P> {
108 fn deref_mut(&mut self) -> &mut Self::Target { &mut self.watch }
109}