1#![warn(missing_docs)]
3
4pub mod accumulator;
5#[cfg(feature = "dfir_macro")]
6#[cfg_attr(docsrs, doc(cfg(feature = "dfir_macro")))]
7pub mod demux_enum;
8pub mod multiset;
9pub mod priority_stack;
10pub mod slot_vec;
11pub mod sparse_vec;
12pub mod unsync;
13
14pub mod simulation;
15
16mod monotonic;
17pub use monotonic::*;
18
19mod udp;
20#[cfg(not(target_arch = "wasm32"))]
21pub use udp::*;
22
23mod tcp;
24#[cfg(not(target_arch = "wasm32"))]
25pub use tcp::*;
26
27#[cfg(unix)]
28mod socket;
29use std::net::SocketAddr;
30use std::num::NonZeroUsize;
31use std::task::{Context, Poll};
32
33use futures::Stream;
34use serde::de::DeserializeOwned;
35use serde::ser::Serialize;
36#[cfg(unix)]
37pub use socket::*;
38
39pub enum Persistence<T> {
41 Persist(T),
43 Delete(T),
45}
46
47pub enum PersistenceKeyed<K, V> {
49 Persist(K, V),
51 Delete(K),
53}
54
55pub fn unbounded_channel<T>() -> (
57 tokio::sync::mpsc::UnboundedSender<T>,
58 tokio_stream::wrappers::UnboundedReceiverStream<T>,
59) {
60 let (send, recv) = tokio::sync::mpsc::unbounded_channel();
61 let recv = tokio_stream::wrappers::UnboundedReceiverStream::new(recv);
62 (send, recv)
63}
64
65pub fn unsync_channel<T>(
67 capacity: Option<NonZeroUsize>,
68) -> (unsync::mpsc::Sender<T>, unsync::mpsc::Receiver<T>) {
69 unsync::mpsc::channel(capacity)
70}
71
72pub fn ready_iter<S>(stream: S) -> impl Iterator<Item = S::Item>
74where
75 S: Stream,
76{
77 let mut stream = Box::pin(stream);
78 std::iter::from_fn(move || {
79 match stream
80 .as_mut()
81 .poll_next(&mut Context::from_waker(futures::task::noop_waker_ref()))
82 {
83 Poll::Ready(opt) => opt,
84 Poll::Pending => None,
85 }
86 })
87}
88
89pub fn collect_ready<C, S>(stream: S) -> C
94where
95 C: FromIterator<S::Item>,
96 S: Stream,
97{
98 assert!(
99 tokio::runtime::Handle::try_current().is_err(),
100 "Calling `collect_ready` from an async runtime may cause incorrect results, use `collect_ready_async` instead."
101 );
102 ready_iter(stream).collect()
103}
104
105pub async fn collect_ready_async<C, S>(stream: S) -> C
110where
111 C: Default + Extend<S::Item>,
112 S: Stream,
113{
114 use std::sync::atomic::Ordering;
115
116 tokio::task::yield_now().await;
118
119 let got_any_items = std::sync::atomic::AtomicBool::new(true);
120 let mut unfused_iter =
121 ready_iter(stream).inspect(|_| got_any_items.store(true, Ordering::Relaxed));
122 let mut out = C::default();
123 while got_any_items.swap(false, Ordering::Relaxed) {
124 out.extend(unfused_iter.by_ref());
125 tokio::task::yield_now().await;
128 }
129 out
130}
131
132pub fn serialize_to_bytes<T>(msg: T) -> bytes::Bytes
134where
135 T: Serialize,
136{
137 bytes::Bytes::from(bincode::serialize(&msg).unwrap())
138}
139
140pub fn deserialize_from_bytes<T>(msg: impl AsRef<[u8]>) -> bincode::Result<T>
142where
143 T: DeserializeOwned,
144{
145 bincode::deserialize(msg.as_ref())
146}
147
148pub fn ipv4_resolve(addr: &str) -> Result<SocketAddr, std::io::Error> {
150 use std::net::ToSocketAddrs;
151 let mut addrs = addr.to_socket_addrs()?;
152 let result = addrs.find(|addr| addr.is_ipv4());
153 match result {
154 Some(addr) => Ok(addr),
155 None => Err(std::io::Error::other("Unable to resolve IPv4 address")),
156 }
157}
158
159#[cfg(not(target_arch = "wasm32"))]
162pub async fn bind_udp_bytes(addr: SocketAddr) -> (UdpSink, UdpStream, SocketAddr) {
163 let socket = tokio::net::UdpSocket::bind(addr).await.unwrap();
164 udp_bytes(socket)
165}
166
167#[cfg(not(target_arch = "wasm32"))]
170pub async fn bind_udp_lines(addr: SocketAddr) -> (UdpLinesSink, UdpLinesStream, SocketAddr) {
171 let socket = tokio::net::UdpSocket::bind(addr).await.unwrap();
172 udp_lines(socket)
173}
174
175#[cfg(not(target_arch = "wasm32"))]
182pub async fn bind_tcp_bytes(
183 addr: SocketAddr,
184) -> (
185 unsync::mpsc::Sender<(bytes::Bytes, SocketAddr)>,
186 unsync::mpsc::Receiver<Result<(bytes::BytesMut, SocketAddr), std::io::Error>>,
187 SocketAddr,
188) {
189 bind_tcp(addr, tokio_util::codec::LengthDelimitedCodec::new())
190 .await
191 .unwrap()
192}
193
194#[cfg(not(target_arch = "wasm32"))]
196pub async fn bind_tcp_lines(
197 addr: SocketAddr,
198) -> (
199 unsync::mpsc::Sender<(String, SocketAddr)>,
200 unsync::mpsc::Receiver<Result<(String, SocketAddr), tokio_util::codec::LinesCodecError>>,
201 SocketAddr,
202) {
203 bind_tcp(addr, tokio_util::codec::LinesCodec::new())
204 .await
205 .unwrap()
206}
207
208#[cfg(not(target_arch = "wasm32"))]
213pub fn connect_tcp_bytes() -> (
214 TcpFramedSink<bytes::Bytes>,
215 TcpFramedStream<tokio_util::codec::LengthDelimitedCodec>,
216) {
217 connect_tcp(tokio_util::codec::LengthDelimitedCodec::new())
218}
219
220#[cfg(not(target_arch = "wasm32"))]
222pub fn connect_tcp_lines() -> (
223 TcpFramedSink<String>,
224 TcpFramedStream<tokio_util::codec::LinesCodec>,
225) {
226 connect_tcp(tokio_util::codec::LinesCodec::new())
227}
228
229pub fn sort_unstable_by_key_hrtb<T, F, K>(slice: &mut [T], f: F)
234where
235 F: for<'a> Fn(&'a T) -> &'a K,
236 K: Ord,
237{
238 slice.sort_unstable_by(|a, b| f(a).cmp(f(b)))
239}
240
241pub fn iter_batches_stream<I>(
248 iter: I,
249 n: usize,
250) -> futures::stream::PollFn<impl FnMut(&mut Context<'_>) -> Poll<Option<I::Item>>>
251where
252 I: IntoIterator + Unpin,
253{
254 let mut count = 0;
255 let mut iter = iter.into_iter();
256 futures::stream::poll_fn(move |ctx| {
257 count += 1;
258 if n < count {
259 count = 0;
260 ctx.waker().wake_by_ref();
261 Poll::Pending
262 } else {
263 Poll::Ready(iter.next())
264 }
265 })
266}
267
268#[cfg(test)]
269mod test {
270 use super::*;
271
272 #[test]
273 pub fn test_collect_ready() {
274 let (send, mut recv) = unbounded_channel::<usize>();
275 for x in 0..1000 {
276 send.send(x).unwrap();
277 }
278 assert_eq!(1000, collect_ready::<Vec<_>, _>(&mut recv).len());
279 }
280
281 #[crate::test]
282 pub async fn test_collect_ready_async() {
283 let (send, mut recv) = unbounded_channel::<usize>();
285 for x in 0..1000 {
286 send.send(x).unwrap();
287 }
288 assert_eq!(
289 1000,
290 collect_ready_async::<Vec<_>, _>(&mut recv).await.len()
291 );
292 }
293}