1use std::{any::type_name, marker::PhantomData};
5
6use rkyv::bytecheck::CheckBytes;
7use whence::WhenceExt;
8use xous_ipc::{XousDeserializer, XousValidator};
9
10use crate::{utils, ArchiveCodec, Error, EventSubscriptionMessage, Owned, Server, ServerContext};
11
12pub struct ArchiveEventSubscriber<M>
14where
15 M: ArchiveCodec,
16{
17 pid: xous::PID,
18 cid: xous::CID,
19 msg_id: xous::MessageId,
20 cancel_msg_id: xous::MessageId,
21 _phantom: PhantomData<M>,
22}
23
24impl<M> core::fmt::Debug for ArchiveEventSubscriber<M>
25where
26 M: ArchiveCodec,
27{
28 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29 f.debug_struct("ArchiveEventSubscriber").field("pid", &self.pid).finish()
30 }
31}
32
33impl<M> ArchiveEventSubscriber<M>
34where
35 M: ArchiveCodec,
36{
37 pub fn send(&self, msg: &M) -> Result<xous::Result, xous::Error> {
41 xous_ipc::Buffer::into_buf(msg)
42 .map_err(|_| xous::Error::InternalError)?
43 .send(self.cid, self.msg_id as u32)
44 }
45
46 pub fn send_nowait(&self, msg: &M) -> Result<xous::Result, xous::Error> {
48 xous_ipc::Buffer::into_buf(msg)
49 .map_err(|_| xous::Error::InternalError)?
50 .send_nowait(self.cid, self.msg_id as u32)
51 }
52
53 pub fn pid(&self) -> xous::PID { self.pid }
54
55 pub fn cid(&self) -> xous::CID { self.cid }
56}
57
58impl<M> Drop for ArchiveEventSubscriber<M>
59where
60 M: ArchiveCodec,
61{
62 fn drop(&mut self) {
63 if let Err(e) =
64 xous::send_message(self.cid, super::cancellation_message(self.msg_id, self.cancel_msg_id))
65 {
66 log::debug!("Error sending cancellation message {self:?}: {e:?}")
67 }
68 if let Err(e) = xous::disconnect(self.cid) {
69 log::error!("Error disconnecting {self:?}: {e:?}")
70 }
71 }
72}
73
74pub trait ArchiveEvent: ArchiveCodec
76where
77 <Self as rkyv::Archive>::Archived: for<'a> CheckBytes<XousValidator<'a>>,
78{
79}
80
81impl<M> ArchiveEvent for M
82where
83 M: ArchiveCodec,
84 <M as rkyv::Archive>::Archived: for<'a> CheckBytes<XousValidator<'a>>,
85{
86}
87
88pub trait ArchiveSubscription
89where
90 Self: crate::MessageId + ArchiveCodec,
91 <Self::Event as rkyv::Archive>::Archived:
92 rkyv::Deserialize<Self::Event, XousDeserializer> + for<'a> CheckBytes<XousValidator<'a>>,
93 <Self::Error as rkyv::Archive>::Archived:
94 rkyv::Deserialize<Self::Error, XousDeserializer> + for<'a> CheckBytes<XousValidator<'a>>,
95{
96 type Event: ArchiveEvent;
97 type Error: super::SubscriptionError;
98}
99
100pub trait ArchiveEventSubscriptionHandler<M>
101where
102 Self: Server,
103 M: ArchiveSubscription,
104{
105 fn handle(
106 &mut self,
107 msg: M,
108 subscriber: ArchiveEventSubscriber<M::Event>,
109 context: &mut ServerContext<Self>,
110 ) -> Result<(), M::Error>;
111}
112
113pub trait ArchiveEventHandler<M>
115where
116 Self: Server,
117 M: ArchiveEvent,
118 <M as rkyv::Archive>::Archived: for<'a> CheckBytes<XousValidator<'a>>,
119{
120 fn handle(&mut self, msg: Owned<M>, sender: xous::PID, context: &mut ServerContext<Self>);
121}
122
123pub fn handle_archive_subscription<M, S>(
125 handler: &mut S,
126 raw: xous::MessageEnvelope,
127 context: &mut ServerContext<S>,
128) where
129 M: ArchiveSubscription + 'static,
130 S: ArchiveEventSubscriptionHandler<M>,
131 <M as rkyv::Archive>::Archived:
132 rkyv::Deserialize<M, XousDeserializer> + for<'a> CheckBytes<XousValidator<'a>>,
133{
134 let pid = raw.sender.pid().unwrap();
135 if let Err(e) = try_handle_archive_subscription(pid, handler, raw, context) {
136 log::warn!("archive sub handle error (PID {pid}) for {}: {e}", type_name::<M>());
137 }
138}
139
140fn try_handle_archive_subscription<M, S>(
142 pid: xous::PID,
143 handler: &mut S,
144 mut raw: xous::MessageEnvelope,
145 context: &mut ServerContext<S>,
146) -> whence::Result<(), Error>
147where
148 M: ArchiveSubscription + 'static,
149 S: ArchiveEventSubscriptionHandler<M>,
150 <M as rkyv::Archive>::Archived:
151 rkyv::Deserialize<M, XousDeserializer> + for<'a> CheckBytes<XousValidator<'a>>,
152{
153 let mut buf = utils::extract_borrow_mut_message(&mut raw).whence()?;
154 let msg: EventSubscriptionMessage<M> = buf.to_original().whence()?;
155 let res = handler.handle(
156 msg.msg,
157 ArchiveEventSubscriber::<M::Event> {
158 pid,
159 msg_id: msg.msg_id,
160 cancel_msg_id: msg.cancel_msg_id,
161 cid: msg.cid,
162 _phantom: PhantomData,
163 },
164 context,
165 );
166 buf.replace(&res).whence()?;
167 Ok(())
168}
169
170pub fn decode_archive_event<M>(raw: xous::MessageEnvelope) -> M
171where
172 M: ArchiveEvent,
173 <M as rkyv::Archive>::Archived:
174 rkyv::Deserialize<M, XousDeserializer> + for<'a> CheckBytes<XousValidator<'a>>,
175{
176 try_decode_archive_event(raw).unwrap()
177}
178
179pub fn try_decode_archive_event<M>(mut raw: xous::MessageEnvelope) -> whence::Result<M, Error>
180where
181 M: ArchiveEvent,
182 <M as rkyv::Archive>::Archived:
183 rkyv::Deserialize<M, XousDeserializer> + for<'a> CheckBytes<XousValidator<'a>>,
184{
185 let buffer = utils::extract_move_message(&mut raw).whence()?;
186 buffer.to_original::<M>().whence()
187}
188
189pub(crate) fn archive_event_handler<M, S>(
190 handler: &mut S,
191 raw: xous::MessageEnvelope,
192 context: &mut ServerContext<S>,
193) where
194 M: ArchiveEvent + 'static,
195 S: ArchiveEventHandler<M>,
196 <M as rkyv::Archive>::Archived: for<'a> CheckBytes<XousValidator<'a>>,
197{
198 if let Err(e) = try_archive_event_handler(handler, raw, context) {
199 log::warn!("failed to handle archive event {e:?}")
200 }
201}
202
203fn try_archive_event_handler<M, S>(
204 handler: &mut S,
205 raw: xous::MessageEnvelope,
206 context: &mut ServerContext<S>,
207) -> whence::Result<(), Error>
208where
209 M: ArchiveEvent + 'static,
210 S: ArchiveEventHandler<M>,
211 <M as rkyv::Archive>::Archived: for<'a> CheckBytes<XousValidator<'a>>,
212{
213 let sender = raw.sender.pid().unwrap();
214 let msg = Owned::new_move(raw).whence()?;
215 handler.handle(msg, sender, context);
216 Ok(())
217}
218
219pub fn subscribe_archive<M>(cid: xous::CID, msg: M, sid: xous::SID) -> Result<(usize, usize), M::Error>
232where
233 M: ArchiveSubscription + 'static,
234{
235 try_subscribe_archive(cid, msg, sid).unwrap()
236}
237
238pub fn try_subscribe_archive<M>(
239 cid: xous::CID,
240 msg: M,
241 sid: xous::SID,
242) -> whence::Result<Result<(usize, usize), M::Error>, Error>
243where
244 M: ArchiveSubscription + 'static,
245{
246 let msg_id = crate::next_dynamic_message_id();
247 let cancel_msg_id = crate::next_dynamic_message_id();
248 let pid = xous::get_remote_pid(cid).whence()?;
249 let cid_remote = xous::connect_for_process(pid, sid).whence()?;
250 xous::allow_messages_on_connection(pid, cid_remote, msg_id..(cancel_msg_id + 1)).whence()?;
251 let msg = EventSubscriptionMessage { cid: cid_remote, msg_id, cancel_msg_id, msg };
252 let result = msg.send_archive(cid)?;
253 Ok(result.map(|_| (msg_id, cancel_msg_id)))
254}
255
256#[derive(Debug)]
257pub struct ArchiveSubList<T: ArchiveCodec> {
258 inner: Vec<ArchiveEventSubscriber<T>>,
259}
260
261impl<T: ArchiveCodec> Default for ArchiveSubList<T> {
262 fn default() -> Self { Self { inner: Default::default() } }
263}
264
265impl<T: ArchiveCodec> ArchiveSubList<T> {
266 pub fn push(&mut self, sub: ArchiveEventSubscriber<T>) { self.inner.push(sub); }
267
268 pub fn send(&mut self, msg: &T) { self.inner.retain(|sub| sub.send(msg).is_ok()) }
269
270 pub fn send_nowait(&mut self, msg: &T) {
271 self.inner.retain(|sub| match sub.send_nowait(msg) {
272 Ok(_) => true,
273 Err(xous::Error::ServerQueueFull) => {
274 log::warn!("archive event send_nowait error for pid {} {}", sub.pid(), type_name::<T>());
275 true
276 }
277 Err(_) => false,
278 })
279 }
280
281 pub fn remove_cid(&mut self, cid: xous::CID) { self.inner.retain(|s| s.cid() != cid) }
282}