server/event/
archive.rs

1// SPDX-FileCopyrightText: 2023 Foundation Devices, Inc. <hello@foundation.xyz>
2// SPDX-License-Identifier: GPL-3.0-or-later
3
4use std::{any::type_name, marker::PhantomData};
5
6use rkyv::bytecheck::CheckBytes;
7use whence::WhenceExt;
8use xous_ipc::{XousDeserializer, XousValidator};
9
10use crate::{utils, ArchiveCodec, Error, EventSubscriptionMessage, Owned, Server, ServerContext};
11
12/// Handle for a single event subscriber
13pub struct ArchiveEventSubscriber<M>
14where
15    M: ArchiveCodec,
16{
17    pid: xous::PID,
18    cid: xous::CID,
19    msg_id: xous::MessageId,
20    cancel_msg_id: xous::MessageId,
21    _phantom: PhantomData<M>,
22}
23
24impl<M> core::fmt::Debug for ArchiveEventSubscriber<M>
25where
26    M: ArchiveCodec,
27{
28    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29        f.debug_struct("ArchiveEventSubscriber").field("pid", &self.pid).finish()
30    }
31}
32
33impl<M> ArchiveEventSubscriber<M>
34where
35    M: ArchiveCodec,
36{
37    /// Send the event to the subscriber.
38    ///
39    /// Warning: Cannot be used in an IRQ handler context.
40    pub fn send(&self, msg: &M) -> Result<xous::Result, xous::Error> {
41        xous_ipc::Buffer::into_buf(msg)
42            .map_err(|_| xous::Error::InternalError)?
43            .send(self.cid, self.msg_id as u32)
44    }
45
46    /// Send the event to the subscriber.
47    pub fn send_nowait(&self, msg: &M) -> Result<xous::Result, xous::Error> {
48        xous_ipc::Buffer::into_buf(msg)
49            .map_err(|_| xous::Error::InternalError)?
50            .send_nowait(self.cid, self.msg_id as u32)
51    }
52
53    pub fn pid(&self) -> xous::PID { self.pid }
54
55    pub fn cid(&self) -> xous::CID { self.cid }
56}
57
58impl<M> Drop for ArchiveEventSubscriber<M>
59where
60    M: ArchiveCodec,
61{
62    fn drop(&mut self) {
63        if let Err(e) =
64            xous::send_message(self.cid, super::cancellation_message(self.msg_id, self.cancel_msg_id))
65        {
66            log::debug!("Error sending cancellation message {self:?}: {e:?}")
67        }
68        if let Err(e) = xous::disconnect(self.cid) {
69            log::error!("Error disconnecting {self:?}: {e:?}")
70        }
71    }
72}
73
74/// A message which can be serialized and deserialized using rkyv.
75pub trait ArchiveEvent: ArchiveCodec
76where
77    <Self as rkyv::Archive>::Archived: for<'a> CheckBytes<XousValidator<'a>>,
78{
79}
80
81impl<M> ArchiveEvent for M
82where
83    M: ArchiveCodec,
84    <M as rkyv::Archive>::Archived: for<'a> CheckBytes<XousValidator<'a>>,
85{
86}
87
88pub trait ArchiveSubscription
89where
90    Self: crate::MessageId + ArchiveCodec,
91    <Self::Event as rkyv::Archive>::Archived:
92        rkyv::Deserialize<Self::Event, XousDeserializer> + for<'a> CheckBytes<XousValidator<'a>>,
93    <Self::Error as rkyv::Archive>::Archived:
94        rkyv::Deserialize<Self::Error, XousDeserializer> + for<'a> CheckBytes<XousValidator<'a>>,
95{
96    type Event: ArchiveEvent;
97    type Error: super::SubscriptionError;
98}
99
100pub trait ArchiveEventSubscriptionHandler<M>
101where
102    Self: Server,
103    M: ArchiveSubscription,
104{
105    fn handle(
106        &mut self,
107        msg: M,
108        subscriber: ArchiveEventSubscriber<M::Event>,
109        context: &mut ServerContext<Self>,
110    ) -> Result<(), M::Error>;
111}
112
113/// Handler for an incoming [`ArchiveEvent`]
114pub trait ArchiveEventHandler<M>
115where
116    Self: Server,
117    M: ArchiveEvent,
118    <M as rkyv::Archive>::Archived: for<'a> CheckBytes<XousValidator<'a>>,
119{
120    fn handle(&mut self, msg: Owned<M>, sender: xous::PID, context: &mut ServerContext<Self>);
121}
122
123/// Message handler, used by ServerMessages::messages()
124pub fn handle_archive_subscription<M, S>(
125    handler: &mut S,
126    raw: xous::MessageEnvelope,
127    context: &mut ServerContext<S>,
128) where
129    M: ArchiveSubscription + 'static,
130    S: ArchiveEventSubscriptionHandler<M>,
131    <M as rkyv::Archive>::Archived:
132        rkyv::Deserialize<M, XousDeserializer> + for<'a> CheckBytes<XousValidator<'a>>,
133{
134    let pid = raw.sender.pid().unwrap();
135    if let Err(e) = try_handle_archive_subscription(pid, handler, raw, context) {
136        log::warn!("archive sub handle error (PID {pid}) for {}: {e}", type_name::<M>());
137    }
138}
139
140/// Message handler, used by ServerMessages::messages()
141fn try_handle_archive_subscription<M, S>(
142    pid: xous::PID,
143    handler: &mut S,
144    mut raw: xous::MessageEnvelope,
145    context: &mut ServerContext<S>,
146) -> whence::Result<(), Error>
147where
148    M: ArchiveSubscription + 'static,
149    S: ArchiveEventSubscriptionHandler<M>,
150    <M as rkyv::Archive>::Archived:
151        rkyv::Deserialize<M, XousDeserializer> + for<'a> CheckBytes<XousValidator<'a>>,
152{
153    let mut buf = utils::extract_borrow_mut_message(&mut raw).whence()?;
154    let msg: EventSubscriptionMessage<M> = buf.to_original().whence()?;
155    let res = handler.handle(
156        msg.msg,
157        ArchiveEventSubscriber::<M::Event> {
158            pid,
159            msg_id: msg.msg_id,
160            cancel_msg_id: msg.cancel_msg_id,
161            cid: msg.cid,
162            _phantom: PhantomData,
163        },
164        context,
165    );
166    buf.replace(&res).whence()?;
167    Ok(())
168}
169
170pub fn decode_archive_event<M>(raw: xous::MessageEnvelope) -> M
171where
172    M: ArchiveEvent,
173    <M as rkyv::Archive>::Archived:
174        rkyv::Deserialize<M, XousDeserializer> + for<'a> CheckBytes<XousValidator<'a>>,
175{
176    try_decode_archive_event(raw).unwrap()
177}
178
179pub fn try_decode_archive_event<M>(mut raw: xous::MessageEnvelope) -> whence::Result<M, Error>
180where
181    M: ArchiveEvent,
182    <M as rkyv::Archive>::Archived:
183        rkyv::Deserialize<M, XousDeserializer> + for<'a> CheckBytes<XousValidator<'a>>,
184{
185    let buffer = utils::extract_move_message(&mut raw).whence()?;
186    buffer.to_original::<M>().whence()
187}
188
189pub(crate) fn archive_event_handler<M, S>(
190    handler: &mut S,
191    raw: xous::MessageEnvelope,
192    context: &mut ServerContext<S>,
193) where
194    M: ArchiveEvent + 'static,
195    S: ArchiveEventHandler<M>,
196    <M as rkyv::Archive>::Archived: for<'a> CheckBytes<XousValidator<'a>>,
197{
198    if let Err(e) = try_archive_event_handler(handler, raw, context) {
199        log::warn!("failed to handle archive event {e:?}")
200    }
201}
202
203fn try_archive_event_handler<M, S>(
204    handler: &mut S,
205    raw: xous::MessageEnvelope,
206    context: &mut ServerContext<S>,
207) -> whence::Result<(), Error>
208where
209    M: ArchiveEvent + 'static,
210    S: ArchiveEventHandler<M>,
211    <M as rkyv::Archive>::Archived: for<'a> CheckBytes<XousValidator<'a>>,
212{
213    let sender = raw.sender.pid().unwrap();
214    let msg = Owned::new_move(raw).whence()?;
215    handler.handle(msg, sender, context);
216    Ok(())
217}
218
219/// Subscribe to an [`ArchiveEvent`] event.
220///
221/// # Arguments
222///
223/// * `cid` - The connection ID to the event sending server.
224/// * `sid` - The server ID of the event receiving server.
225///
226/// # Returns
227///
228/// A tuple containing two unique message IDs (to this process) for the incoming events:
229/// - The first ID is for the event message.
230/// - The second ID is for the cancellation message.
231pub fn subscribe_archive<M>(cid: xous::CID, msg: M, sid: xous::SID) -> Result<(usize, usize), M::Error>
232where
233    M: ArchiveSubscription + 'static,
234{
235    try_subscribe_archive(cid, msg, sid).unwrap()
236}
237
238pub fn try_subscribe_archive<M>(
239    cid: xous::CID,
240    msg: M,
241    sid: xous::SID,
242) -> whence::Result<Result<(usize, usize), M::Error>, Error>
243where
244    M: ArchiveSubscription + 'static,
245{
246    let msg_id = crate::next_dynamic_message_id();
247    let cancel_msg_id = crate::next_dynamic_message_id();
248    let pid = xous::get_remote_pid(cid).whence()?;
249    let cid_remote = xous::connect_for_process(pid, sid).whence()?;
250    xous::allow_messages_on_connection(pid, cid_remote, msg_id..(cancel_msg_id + 1)).whence()?;
251    let msg = EventSubscriptionMessage { cid: cid_remote, msg_id, cancel_msg_id, msg };
252    let result = msg.send_archive(cid)?;
253    Ok(result.map(|_| (msg_id, cancel_msg_id)))
254}
255
256#[derive(Debug)]
257pub struct ArchiveSubList<T: ArchiveCodec> {
258    inner: Vec<ArchiveEventSubscriber<T>>,
259}
260
261impl<T: ArchiveCodec> Default for ArchiveSubList<T> {
262    fn default() -> Self { Self { inner: Default::default() } }
263}
264
265impl<T: ArchiveCodec> ArchiveSubList<T> {
266    pub fn push(&mut self, sub: ArchiveEventSubscriber<T>) { self.inner.push(sub); }
267
268    pub fn send(&mut self, msg: &T) { self.inner.retain(|sub| sub.send(msg).is_ok()) }
269
270    pub fn send_nowait(&mut self, msg: &T) {
271        self.inner.retain(|sub| match sub.send_nowait(msg) {
272            Ok(_) => true,
273            Err(xous::Error::ServerQueueFull) => {
274                log::warn!("archive event send_nowait error for pid {} {}", sub.pid(), type_name::<T>());
275                true
276            }
277            Err(_) => false,
278        })
279    }
280
281    pub fn remove_cid(&mut self, cid: xous::CID) { self.inner.retain(|s| s.cid() != cid) }
282}