quantum_link/
worker.rs

1// SPDX-FileCopyrightText: 2025 Foundation Devices, Inc. <hello@foundation.xyz>
2// SPDX-License-Identifier: GPL-3.0-or-later
3
4use std::{future::Future, marker::PhantomData};
5
6use rkyv::bytecheck::CheckBytes;
7use server::{
8    xous_ipc::{XousDeserializer, XousValidator},
9    CheckedPermissions, MessageAllowed,
10};
11use worker::{StreamWatch, WorkerHandle};
12
13use crate::{messages::SubscribeConnectionStatus, ConnectionStatus};
14
15/// reactive handle to latest QuantumLink status
16#[derive(Clone)]
17pub struct QlStatus<P> {
18    watch: StreamWatch<ConnectionStatus>,
19    worker: WorkerHandle,
20    _phantom: PhantomData<fn() -> P>,
21}
22
23impl<P> QlStatus<P>
24where
25    P: CheckedPermissions + 'static,
26{
27    pub fn new(worker: WorkerHandle) -> Self
28    where
29        P: MessageAllowed<SubscribeConnectionStatus> + 'static,
30    {
31        let sub = worker.subscribe_scalar::<P, _>(SubscribeConnectionStatus);
32        let initial = ConnectionStatus { bt_connected: false, ql_paired: false, live: false };
33        let watch = worker.watch_stream(sub, initial);
34        Self { watch, worker, _phantom: Default::default() }
35    }
36
37    /// wait until bluetooth is connected, device is paired, and connection is confirmed as live
38    pub async fn ready(&self) {
39        self.watch.wait_until(|status| status.bt_connected && status.ql_paired && status.live).await
40    }
41
42    /// wait until bluetooth is connected
43    pub async fn bt_ready(&self) { self.watch.wait_until(|status| status.bt_connected).await }
44
45    /// check if fully connected (BT + paired)
46    pub fn is_connected(&self) -> bool {
47        let status = self.watch.borrow();
48        status.bt_connected && status.ql_paired
49    }
50
51    // send a ql archive, after waiting for a connection
52    pub fn send_ql_archive<M>(&self, msg: M) -> impl Future<Output = M::Response>
53    where
54        P: server::MessageAllowed<M>,
55        M: server::BlockingArchive + Send + 'static,
56        M::Response: Send,
57    {
58        let this = self.clone();
59        async move {
60            this.ready().await;
61            this.worker.async_archive::<P, _>(msg).await
62        }
63    }
64
65    // retry publishing the message indefinitely
66    pub fn send_ql_archive_retry<M, T, E>(
67        &self,
68        msg: M,
69        mut error: impl FnMut(E) + Send + 'static,
70    ) -> impl Future<Output = T>
71    where
72        P: server::CheckedPermissions + server::MessageAllowed<M>,
73        M: server::BlockingArchive<Response = Result<T, E>> + Send + Clone + 'static,
74        M::Response: Send,
75        T: server::ArchiveCodec + Send + 'static,
76        <T as rkyv::Archive>::Archived:
77            rkyv::Deserialize<T, XousDeserializer> + for<'a> CheckBytes<XousValidator<'a>>,
78        E: server::ArchiveCodec + Send + 'static,
79        <E as rkyv::Archive>::Archived:
80            rkyv::Deserialize<E, XousDeserializer> + for<'a> CheckBytes<XousValidator<'a>>,
81    {
82        let this = self.clone();
83        async move {
84            loop {
85                this.ready().await;
86                match this.worker.async_archive::<P, _>(msg.clone()).await {
87                    Ok(value) => {
88                        return value;
89                    }
90                    Err(e) => {
91                        error(e);
92                    }
93                }
94            }
95        }
96    }
97
98    pub fn into_inner(self) -> StreamWatch<ConnectionStatus> { self.watch }
99}
100
101impl<P> std::ops::Deref for QlStatus<P> {
102    type Target = StreamWatch<ConnectionStatus>;
103
104    fn deref(&self) -> &Self::Target { &self.watch }
105}
106
107impl<P> std::ops::DerefMut for QlStatus<P> {
108    fn deref_mut(&mut self) -> &mut Self::Target { &mut self.watch }
109}