server/
blocking_archive.rs

1// SPDX-FileCopyrightText: 2025 Foundation Devices, Inc. <hello@foundation.xyz>
2// SPDX-License-Identifier: GPL-3.0-or-later
3
4//! archive message for server IPC
5
6use std::any::type_name;
7
8use rkyv::{
9    bytecheck::CheckBytes,
10    rancor::{self, Source as _},
11};
12use whence::WhenceExt;
13use xous_ipc::{SizeOfSerializer, XousDeserializer, XousSerializer, XousValidator};
14
15use crate::{utils, AsyncMessageInit, Error, Server, ServerContext, WrongMessageTypeError};
16
17// ==================== core ====================
18
19/// heap allocated message that expects a response
20pub trait BlockingArchive
21where
22    Self: ArchiveCodec,
23    Self: crate::MessageId,
24    <Self::Response as rkyv::Archive>::Archived:
25        rkyv::Deserialize<Self::Response, XousDeserializer> + for<'a> CheckBytes<XousValidator<'a>>,
26{
27    /// response type for this message
28    type Response: ArchiveCodec;
29}
30
31/// serialization requirements for archive messages
32pub trait ArchiveCodec
33where
34    Self: Sized
35        + rkyv::Archive
36        + for<'a, 'b> rkyv::Serialize<XousSerializer<'a, 'b>>
37        + for<'a> rkyv::Serialize<SizeOfSerializer<'a>>,
38    Self::Archived: rkyv::Portable,
39{
40}
41
42impl<T> ArchiveCodec for T
43where
44    T: Sized
45        + rkyv::Archive
46        + for<'a, 'b> rkyv::Serialize<XousSerializer<'a, 'b>>
47        + for<'a> rkyv::Serialize<SizeOfSerializer<'a>>,
48    T::Archived: rkyv::Portable,
49{
50}
51
52// ==================== handler traits ====================
53
54/// handle archive messages synchronously
55pub trait BlockingArchiveHandler<M>
56where
57    M: BlockingArchive,
58    Self: Server,
59{
60    /// process message and return response immediately
61    fn handle(&mut self, msg: M, sender: xous::PID, context: &mut ServerContext<Self>) -> M::Response;
62}
63
64/// handle archive messages asynchronously (can defer response)
65pub trait BlockingArchiveAsyncHandler<M>
66where
67    M: BlockingArchive,
68    Self: Server,
69{
70    /// process message, response can be sent later
71    fn handle(&mut self, request: ArchiveRequest<M>, context: &mut ServerContext<Self>);
72    /// default response if handler drops without responding
73    fn default_response() -> M::Response;
74}
75
76// auto-convert sync handlers to async
77impl<T, M> BlockingArchiveAsyncHandler<M> for T
78where
79    M: BlockingArchive,
80    T: BlockingArchiveHandler<M>,
81{
82    fn handle(&mut self, request: ArchiveRequest<M>, context: &mut ServerContext<Self>) {
83        let ArchiveRequest { message, response: request } = request;
84        let response = <Self as BlockingArchiveHandler<M>>::handle(self, message, request.pid(), context);
85        if let Err(e) = request.respond(response) {
86            log::warn!("failed to respond archive {e:?}");
87        }
88    }
89
90    fn default_response() -> <M as BlockingArchive>::Response {
91        unreachable!("default value not required in sync handler")
92    }
93}
94
95/// handle async responses from other servers
96pub trait ArchiveResponseHandler<R>
97where
98    Self: Server,
99    R: ArchiveCodec,
100{
101    /// process received async response
102    fn handle_response(&mut self, response: R, sender: xous::PID, context: &mut ServerContext<Self>);
103}
104
105// ==================== types ====================
106
107/// archive request with deferred response capability
108#[derive(Debug)]
109pub struct ArchiveRequest<M: BlockingArchive> {
110    pub message: M,
111    pub response: ArchiveResponse<M::Response>,
112}
113
114/// deferred response that sends default on drop if not used
115#[derive(Debug)]
116pub struct ArchiveResponse<R: ArchiveCodec> {
117    responder: Option<Responder>,
118    pid: xous::PID,
119    default: fn() -> R,
120}
121
122impl<R: ArchiveCodec> ArchiveResponse<R> {
123    /// get sender's process ID
124    pub fn pid(&self) -> xous::PID { self.pid }
125
126    /// send response
127    pub fn respond(mut self, response: R) -> whence::Result<(), Error> {
128        let responder = self.responder.take().unwrap();
129        responder.respond(&response)
130    }
131
132    /// override default response function
133    /// will be sent on drop if [`Self::respond`] is not called
134    pub fn set_response(&mut self, f: fn() -> R) { self.default = f; }
135}
136
137// auto-send default response on drop
138impl<R: ArchiveCodec> Drop for ArchiveResponse<R> {
139    fn drop(&mut self) {
140        if let Some(responder) = self.responder.take() {
141            let default = (self.default)();
142            responder.respond(&default).ok();
143        }
144    }
145}
146
147// ==================== API ====================
148
149/// send archive message and block for response
150pub fn send_blocking_archive<M>(cid: xous::CID, msg: M) -> M::Response
151where
152    M: BlockingArchive,
153{
154    try_send_blocking_archive(cid, msg).unwrap()
155}
156
157/// send archive message, returns error instead of panic
158pub fn try_send_blocking_archive<M>(cid: xous::CID, msg: M) -> whence::Result<M::Response, Error>
159where
160    M: BlockingArchive,
161{
162    let mut buf = xous_ipc::Buffer::into_buf(&msg).whence()?;
163    buf.blocking_move(cid, M::ID as u32).whence()?;
164    buf.to_original().whence()
165}
166
167/// Message handler, used by ServerMessages::messages()
168pub fn handle_blocking_archive_message<M, S>(
169    handler: &mut S,
170    raw: xous::MessageEnvelope,
171    context: &mut ServerContext<S>,
172) where
173    M: BlockingArchive,
174    S: BlockingArchiveAsyncHandler<M>,
175    <M as rkyv::Archive>::Archived:
176        rkyv::Deserialize<M, XousDeserializer> + for<'a> CheckBytes<XousValidator<'a>>,
177{
178    let pid = raw.sender.pid().unwrap();
179    if let Err(e) = try_handle_blocking_archive_message(handler, raw, context) {
180        log::warn!("BlockingArchive handle error (PID {pid}) for {}: {e}", type_name::<M>());
181    }
182}
183
184fn try_handle_blocking_archive_message<M, S>(
185    handler: &mut S,
186    mut raw: xous::MessageEnvelope,
187    context: &mut ServerContext<S>,
188) -> whence::Result<(), Error>
189where
190    M: BlockingArchive,
191    S: BlockingArchiveAsyncHandler<M>,
192    <M as rkyv::Archive>::Archived:
193        rkyv::Deserialize<M, XousDeserializer> + for<'a> CheckBytes<XousValidator<'a>>,
194{
195    let pid = raw.sender.pid().unwrap();
196
197    match &mut raw.body {
198        xous::Message::BlockingMove(mem) => {
199            // sync case - extract message directly (no AsyncMessageInit wrapper)
200            let message: M = {
201                let buf = unsafe { xous_ipc::Buffer::from_memory_message_mut(mem) };
202                buf.to_original::<M>().whence()?
203            };
204            let request =
205                ArchiveResponse { responder: Some(Responder::Sync(raw)), pid, default: S::default_response };
206            let request = ArchiveRequest { message, response: request };
207            handler.handle(request, context);
208            Ok(())
209        }
210        xous::Message::Move(mem) => {
211            // async case - extract async wrapper
212            let buf = unsafe { xous_ipc::Buffer::from_memory_message(mem) };
213            let init: AsyncMessageInit<M> = buf.to_original().whence()?;
214            let request = ArchiveResponse {
215                responder: Some(Responder::Async { cid: init.cid, msg_id: init.msg_id }),
216                pid,
217                default: S::default_response,
218            };
219            let request = ArchiveRequest { message: init.msg, response: request };
220            handler.handle(request, context);
221            Ok(())
222        }
223        _ => Err(rancor::Error::new(WrongMessageTypeError)).whence(),
224    }
225}
226/// decode async response from raw envelope
227pub fn decode_archive_async_response<M>(raw: xous::MessageEnvelope) -> M
228where
229    M: ArchiveCodec,
230    <M as rkyv::Archive>::Archived:
231        rkyv::Deserialize<M, XousDeserializer> + for<'a> CheckBytes<XousValidator<'a>>,
232{
233    try_decode_archive_async_response(raw).unwrap()
234}
235
236pub fn try_decode_archive_async_response<M>(mut raw: xous::MessageEnvelope) -> whence::Result<M, Error>
237where
238    M: ArchiveCodec,
239    <M as rkyv::Archive>::Archived:
240        rkyv::Deserialize<M, XousDeserializer> + for<'a> CheckBytes<XousValidator<'a>>,
241{
242    let buf = utils::extract_move_message(&mut raw).whence()?;
243    Ok(buf.to_original::<M>().whence()?)
244}
245
246// ==================== internal ====================
247
248#[derive(Debug)]
249enum Responder {
250    /// response goes back in same buffer (blocking call)
251    Sync(xous::MessageEnvelope),
252    /// response sent in new buffer (async call)
253    Async { cid: xous::CID, msg_id: xous::MessageId },
254}
255
256impl Responder {
257    fn respond<R>(self, response: &R) -> whence::Result<(), Error>
258    where
259        R: ArchiveCodec,
260    {
261        match self {
262            Responder::Sync(mut envelope) => {
263                let existing = envelope.body.memory_message_mut().expect("blocking move carries memory").buf;
264                let (reply, used) = xous_ipc::Buffer::serialize_reply(existing, response).whence()?;
265                let old = envelope.set_response(
266                    reply,
267                    xous::MemoryAddress::new(used),
268                    xous::MemorySize::new(reply.len()),
269                );
270                // Free whatever of the moved-in buffer the reply doesn't cover: the tail
271                // when we reused the front, or all of it when we allocated a new buffer.
272                if reply.as_ptr() == old.as_ptr() {
273                    let tail_len = old.len() - reply.len();
274                    if tail_len > 0 {
275                        let tail =
276                            unsafe { xous::MemoryRange::new(old.as_ptr() as usize + reply.len(), tail_len) }
277                                .whence()?;
278                        xous::unmap_memory(tail).whence()?;
279                    }
280                } else {
281                    xous::unmap_memory(old).whence()?;
282                }
283                Ok(())
284            }
285            Responder::Async { cid, msg_id } => {
286                let _disconnect = defer::defer(|| {
287                    xous::disconnect(cid).ok();
288                });
289                xous_ipc::Buffer::into_buf(response).whence()?.send_nowait(cid, msg_id as u32).whence()?;
290                Ok(())
291            }
292        }
293    }
294}