Skip to main content

hydro_lang/live_collections/stream/
mod.rs

1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::KeyedStream;
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::builder::CycleId;
19use crate::compile::ir::{
20    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, StreamOrder, StreamRetry, TeeNode,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26#[cfg(stageleft_runtime)]
27use crate::location::dynamic::{DynLocation, LocationId};
28use crate::location::tick::{Atomic, DeferTick, NoAtomic};
29use crate::location::{Location, NoTick, Tick, check_matching_location};
30use crate::nondet::{NonDet, nondet};
31use crate::prelude::manual_proof;
32use crate::properties::{AggFuncAlgebra, ValidCommutativityFor, ValidIdempotenceFor};
33
34pub mod networking;
35
36/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
37#[sealed::sealed]
38pub trait Ordering:
39    MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
40{
41    /// The [`StreamOrder`] corresponding to this type.
42    const ORDERING_KIND: StreamOrder;
43}
44
45/// Marks the stream as being totally ordered, which means that there are
46/// no sources of non-determinism (other than intentional ones) that will
47/// affect the order of elements.
48pub enum TotalOrder {}
49
50#[sealed::sealed]
51impl Ordering for TotalOrder {
52    const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
53}
54
55/// Marks the stream as having no order, which means that the order of
56/// elements may be affected by non-determinism.
57///
58/// This restricts certain operators, such as `fold` and `reduce`, to only
59/// be used with commutative aggregation functions.
60pub enum NoOrder {}
61
62#[sealed::sealed]
63impl Ordering for NoOrder {
64    const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
65}
66
67/// Marker trait for an [`Ordering`] that is available when `Self` is a weaker guarantee than
68/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
69/// have `Self` guarantees instead.
70#[sealed::sealed]
71pub trait WeakerOrderingThan<Other: ?Sized>: Ordering {}
72#[sealed::sealed]
73impl<O: Ordering, O2: Ordering> WeakerOrderingThan<O2> for O where O: MinOrder<O2, Min = O> {}
74
75/// Helper trait for determining the weakest of two orderings.
76#[sealed::sealed]
77pub trait MinOrder<Other: ?Sized> {
78    /// The weaker of the two orderings.
79    type Min: Ordering;
80}
81
82#[sealed::sealed]
83impl<O: Ordering> MinOrder<O> for TotalOrder {
84    type Min = O;
85}
86
87#[sealed::sealed]
88impl<O: Ordering> MinOrder<O> for NoOrder {
89    type Min = NoOrder;
90}
91
92/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
93#[sealed::sealed]
94pub trait Retries:
95    MinRetries<Self, Min = Self>
96    + MinRetries<ExactlyOnce, Min = Self>
97    + MinRetries<AtLeastOnce, Min = AtLeastOnce>
98{
99    /// The [`StreamRetry`] corresponding to this type.
100    const RETRIES_KIND: StreamRetry;
101}
102
103/// Marks the stream as having deterministic message cardinality, with no
104/// possibility of duplicates.
105pub enum ExactlyOnce {}
106
107#[sealed::sealed]
108impl Retries for ExactlyOnce {
109    const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
110}
111
112/// Marks the stream as having non-deterministic message cardinality, which
113/// means that duplicates may occur, but messages will not be dropped.
114pub enum AtLeastOnce {}
115
116#[sealed::sealed]
117impl Retries for AtLeastOnce {
118    const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
119}
120
121/// Marker trait for a [`Retries`] that is available when `Self` is a weaker guarantee than
122/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
123/// have `Self` guarantees instead.
124#[sealed::sealed]
125pub trait WeakerRetryThan<Other: ?Sized>: Retries {}
126#[sealed::sealed]
127impl<R: Retries, R2: Retries> WeakerRetryThan<R2> for R where R: MinRetries<R2, Min = R> {}
128
129/// Helper trait for determining the weakest of two retry guarantees.
130#[sealed::sealed]
131pub trait MinRetries<Other: ?Sized> {
132    /// The weaker of the two retry guarantees.
133    type Min: Retries + WeakerRetryThan<Self> + WeakerRetryThan<Other>;
134}
135
136#[sealed::sealed]
137impl<R: Retries> MinRetries<R> for ExactlyOnce {
138    type Min = R;
139}
140
141#[sealed::sealed]
142impl<R: Retries> MinRetries<R> for AtLeastOnce {
143    type Min = AtLeastOnce;
144}
145
146#[sealed::sealed]
147#[diagnostic::on_unimplemented(
148    message = "The input stream must be totally-ordered (`TotalOrder`), but has order `{Self}`. Strengthen the order upstream or consider a different API.",
149    label = "required here",
150    note = "To intentionally process the stream by observing a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary."
151)]
152/// Marker trait that is implemented for the [`TotalOrder`] ordering guarantee.
153pub trait IsOrdered: Ordering {}
154
155#[sealed::sealed]
156#[diagnostic::do_not_recommend]
157impl IsOrdered for TotalOrder {}
158
159#[sealed::sealed]
160#[diagnostic::on_unimplemented(
161    message = "The input stream must be exactly-once (`ExactlyOnce`), but has retries `{Self}`. Strengthen the retries guarantee upstream or consider a different API.",
162    label = "required here",
163    note = "To intentionally process the stream by observing non-deterministic (randomly duplicated) retries, use `.assume_retries`. This introduces non-determinism so avoid unless necessary."
164)]
165/// Marker trait that is implemented for the [`ExactlyOnce`] retries guarantee.
166pub trait IsExactlyOnce: Retries {}
167
168#[sealed::sealed]
169#[diagnostic::do_not_recommend]
170impl IsExactlyOnce for ExactlyOnce {}
171
172/// Streaming sequence of elements with type `Type`.
173///
174/// This live collection represents a growing sequence of elements, with new elements being
175/// asynchronously appended to the end of the sequence. This can be used to model the arrival
176/// of network input, such as API requests, or streaming ingestion.
177///
178/// By default, all streams have deterministic ordering and each element is materialized exactly
179/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
180/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
181/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
182///
183/// Type Parameters:
184/// - `Type`: the type of elements in the stream
185/// - `Loc`: the location where the stream is being materialized
186/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
187/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
188///   (default is [`TotalOrder`])
189/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
190///   [`AtLeastOnce`] (default is [`ExactlyOnce`])
191pub struct Stream<
192    Type,
193    Loc,
194    Bound: Boundedness = Unbounded,
195    Order: Ordering = TotalOrder,
196    Retry: Retries = ExactlyOnce,
197> {
198    pub(crate) location: Loc,
199    pub(crate) ir_node: RefCell<HydroNode>,
200
201    _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
202}
203
204impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
205    for Stream<T, L, Unbounded, O, R>
206where
207    L: Location<'a>,
208{
209    fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
210        let new_meta = stream
211            .location
212            .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
213
214        Stream {
215            location: stream.location,
216            ir_node: RefCell::new(HydroNode::Cast {
217                inner: Box::new(stream.ir_node.into_inner()),
218                metadata: new_meta,
219            }),
220            _phantom: PhantomData,
221        }
222    }
223}
224
225impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
226    for Stream<T, L, B, NoOrder, R>
227where
228    L: Location<'a>,
229{
230    fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
231        stream.weaken_ordering()
232    }
233}
234
235impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
236    for Stream<T, L, B, O, AtLeastOnce>
237where
238    L: Location<'a>,
239{
240    fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
241        stream.weaken_retries()
242    }
243}
244
245impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
246where
247    L: Location<'a>,
248{
249    fn defer_tick(self) -> Self {
250        Stream::defer_tick(self)
251    }
252}
253
254impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
255    for Stream<T, Tick<L>, Bounded, O, R>
256where
257    L: Location<'a>,
258{
259    type Location = Tick<L>;
260
261    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
262        Stream::new(
263            location.clone(),
264            HydroNode::CycleSource {
265                cycle_id,
266                metadata: location.new_node_metadata(Self::collection_kind()),
267            },
268        )
269    }
270}
271
272impl<'a, T, L, O: Ordering, R: Retries> CycleCollectionWithInitial<'a, TickCycle>
273    for Stream<T, Tick<L>, Bounded, O, R>
274where
275    L: Location<'a>,
276{
277    type Location = Tick<L>;
278
279    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
280        let from_previous_tick: Stream<T, Tick<L>, Bounded, O, R> = Stream::new(
281            location.clone(),
282            HydroNode::DeferTick {
283                input: Box::new(HydroNode::CycleSource {
284                    cycle_id,
285                    metadata: location.new_node_metadata(Self::collection_kind()),
286                }),
287                metadata: location.new_node_metadata(Self::collection_kind()),
288            },
289        );
290
291        from_previous_tick.chain(initial.filter_if_some(location.optional_first_tick(q!(()))))
292    }
293}
294
295impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
296    for Stream<T, Tick<L>, Bounded, O, R>
297where
298    L: Location<'a>,
299{
300    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
301        assert_eq!(
302            Location::id(&self.location),
303            expected_location,
304            "locations do not match"
305        );
306        self.location
307            .flow_state()
308            .borrow_mut()
309            .push_root(HydroRoot::CycleSink {
310                cycle_id,
311                input: Box::new(self.ir_node.into_inner()),
312                op_metadata: HydroIrOpMetadata::new(),
313            });
314    }
315}
316
317impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
318    for Stream<T, L, B, O, R>
319where
320    L: Location<'a> + NoTick,
321{
322    type Location = L;
323
324    fn create_source(cycle_id: CycleId, location: L) -> Self {
325        Stream::new(
326            location.clone(),
327            HydroNode::CycleSource {
328                cycle_id,
329                metadata: location.new_node_metadata(Self::collection_kind()),
330            },
331        )
332    }
333}
334
335impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
336    for Stream<T, L, B, O, R>
337where
338    L: Location<'a> + NoTick,
339{
340    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
341        assert_eq!(
342            Location::id(&self.location),
343            expected_location,
344            "locations do not match"
345        );
346        self.location
347            .flow_state()
348            .borrow_mut()
349            .push_root(HydroRoot::CycleSink {
350                cycle_id,
351                input: Box::new(self.ir_node.into_inner()),
352                op_metadata: HydroIrOpMetadata::new(),
353            });
354    }
355}
356
357impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
358where
359    T: Clone,
360    L: Location<'a>,
361{
362    fn clone(&self) -> Self {
363        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
364            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
365            *self.ir_node.borrow_mut() = HydroNode::Tee {
366                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
367                metadata: self.location.new_node_metadata(Self::collection_kind()),
368            };
369        }
370
371        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
372            Stream {
373                location: self.location.clone(),
374                ir_node: HydroNode::Tee {
375                    inner: TeeNode(inner.0.clone()),
376                    metadata: metadata.clone(),
377                }
378                .into(),
379                _phantom: PhantomData,
380            }
381        } else {
382            unreachable!()
383        }
384    }
385}
386
387impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
388where
389    L: Location<'a>,
390{
391    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
392        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
393        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
394
395        Stream {
396            location,
397            ir_node: RefCell::new(ir_node),
398            _phantom: PhantomData,
399        }
400    }
401
402    /// Returns the [`Location`] where this stream is being materialized.
403    pub fn location(&self) -> &L {
404        &self.location
405    }
406
407    pub(crate) fn collection_kind() -> CollectionKind {
408        CollectionKind::Stream {
409            bound: B::BOUND_KIND,
410            order: O::ORDERING_KIND,
411            retry: R::RETRIES_KIND,
412            element_type: quote_type::<T>().into(),
413        }
414    }
415
416    /// Produces a stream based on invoking `f` on each element.
417    /// If you do not want to modify the stream and instead only want to view
418    /// each item use [`Stream::inspect`] instead.
419    ///
420    /// # Example
421    /// ```rust
422    /// # #[cfg(feature = "deploy")] {
423    /// # use hydro_lang::prelude::*;
424    /// # use futures::StreamExt;
425    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
426    /// let words = process.source_iter(q!(vec!["hello", "world"]));
427    /// words.map(q!(|x| x.to_uppercase()))
428    /// # }, |mut stream| async move {
429    /// # for w in vec!["HELLO", "WORLD"] {
430    /// #     assert_eq!(stream.next().await.unwrap(), w);
431    /// # }
432    /// # }));
433    /// # }
434    /// ```
435    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
436    where
437        F: Fn(T) -> U + 'a,
438    {
439        let f = f.splice_fn1_ctx(&self.location).into();
440        Stream::new(
441            self.location.clone(),
442            HydroNode::Map {
443                f,
444                input: Box::new(self.ir_node.into_inner()),
445                metadata: self
446                    .location
447                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
448            },
449        )
450    }
451
452    /// For each item `i` in the input stream, transform `i` using `f` and then treat the
453    /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
454    /// for the output type `U` must produce items in a **deterministic** order.
455    ///
456    /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
457    /// not deterministic, use [`Stream::flat_map_unordered`] instead.
458    ///
459    /// # Example
460    /// ```rust
461    /// # #[cfg(feature = "deploy")] {
462    /// # use hydro_lang::prelude::*;
463    /// # use futures::StreamExt;
464    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
465    /// process
466    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
467    ///     .flat_map_ordered(q!(|x| x))
468    /// # }, |mut stream| async move {
469    /// // 1, 2, 3, 4
470    /// # for w in (1..5) {
471    /// #     assert_eq!(stream.next().await.unwrap(), w);
472    /// # }
473    /// # }));
474    /// # }
475    /// ```
476    pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
477    where
478        I: IntoIterator<Item = U>,
479        F: Fn(T) -> I + 'a,
480    {
481        let f = f.splice_fn1_ctx(&self.location).into();
482        Stream::new(
483            self.location.clone(),
484            HydroNode::FlatMap {
485                f,
486                input: Box::new(self.ir_node.into_inner()),
487                metadata: self
488                    .location
489                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
490            },
491        )
492    }
493
494    /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
495    /// for the output type `U` to produce items in any order.
496    ///
497    /// # Example
498    /// ```rust
499    /// # #[cfg(feature = "deploy")] {
500    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
501    /// # use futures::StreamExt;
502    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
503    /// process
504    ///     .source_iter(q!(vec![
505    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
506    ///         std::collections::HashSet::from_iter(vec![3, 4]),
507    ///     ]))
508    ///     .flat_map_unordered(q!(|x| x))
509    /// # }, |mut stream| async move {
510    /// // 1, 2, 3, 4, but in no particular order
511    /// # let mut results = Vec::new();
512    /// # for w in (1..5) {
513    /// #     results.push(stream.next().await.unwrap());
514    /// # }
515    /// # results.sort();
516    /// # assert_eq!(results, vec![1, 2, 3, 4]);
517    /// # }));
518    /// # }
519    /// ```
520    pub fn flat_map_unordered<U, I, F>(
521        self,
522        f: impl IntoQuotedMut<'a, F, L>,
523    ) -> Stream<U, L, B, NoOrder, R>
524    where
525        I: IntoIterator<Item = U>,
526        F: Fn(T) -> I + 'a,
527    {
528        let f = f.splice_fn1_ctx(&self.location).into();
529        Stream::new(
530            self.location.clone(),
531            HydroNode::FlatMap {
532                f,
533                input: Box::new(self.ir_node.into_inner()),
534                metadata: self
535                    .location
536                    .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
537            },
538        )
539    }
540
541    /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
542    /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
543    ///
544    /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
545    /// not deterministic, use [`Stream::flatten_unordered`] instead.
546    ///
547    /// ```rust
548    /// # #[cfg(feature = "deploy")] {
549    /// # use hydro_lang::prelude::*;
550    /// # use futures::StreamExt;
551    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
552    /// process
553    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
554    ///     .flatten_ordered()
555    /// # }, |mut stream| async move {
556    /// // 1, 2, 3, 4
557    /// # for w in (1..5) {
558    /// #     assert_eq!(stream.next().await.unwrap(), w);
559    /// # }
560    /// # }));
561    /// # }
562    /// ```
563    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
564    where
565        T: IntoIterator<Item = U>,
566    {
567        self.flat_map_ordered(q!(|d| d))
568    }
569
570    /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
571    /// for the element type `T` to produce items in any order.
572    ///
573    /// # Example
574    /// ```rust
575    /// # #[cfg(feature = "deploy")] {
576    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
577    /// # use futures::StreamExt;
578    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
579    /// process
580    ///     .source_iter(q!(vec![
581    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
582    ///         std::collections::HashSet::from_iter(vec![3, 4]),
583    ///     ]))
584    ///     .flatten_unordered()
585    /// # }, |mut stream| async move {
586    /// // 1, 2, 3, 4, but in no particular order
587    /// # let mut results = Vec::new();
588    /// # for w in (1..5) {
589    /// #     results.push(stream.next().await.unwrap());
590    /// # }
591    /// # results.sort();
592    /// # assert_eq!(results, vec![1, 2, 3, 4]);
593    /// # }));
594    /// # }
595    /// ```
596    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
597    where
598        T: IntoIterator<Item = U>,
599    {
600        self.flat_map_unordered(q!(|d| d))
601    }
602
603    /// Creates a stream containing only the elements of the input stream that satisfy a predicate
604    /// `f`, preserving the order of the elements.
605    ///
606    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
607    /// not modify or take ownership of the values. If you need to modify the values while filtering
608    /// use [`Stream::filter_map`] instead.
609    ///
610    /// # Example
611    /// ```rust
612    /// # #[cfg(feature = "deploy")] {
613    /// # use hydro_lang::prelude::*;
614    /// # use futures::StreamExt;
615    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
616    /// process
617    ///     .source_iter(q!(vec![1, 2, 3, 4]))
618    ///     .filter(q!(|&x| x > 2))
619    /// # }, |mut stream| async move {
620    /// // 3, 4
621    /// # for w in (3..5) {
622    /// #     assert_eq!(stream.next().await.unwrap(), w);
623    /// # }
624    /// # }));
625    /// # }
626    /// ```
627    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
628    where
629        F: Fn(&T) -> bool + 'a,
630    {
631        let f = f.splice_fn1_borrow_ctx(&self.location).into();
632        Stream::new(
633            self.location.clone(),
634            HydroNode::Filter {
635                f,
636                input: Box::new(self.ir_node.into_inner()),
637                metadata: self.location.new_node_metadata(Self::collection_kind()),
638            },
639        )
640    }
641
642    /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
643    ///
644    /// # Example
645    /// ```rust
646    /// # #[cfg(feature = "deploy")] {
647    /// # use hydro_lang::prelude::*;
648    /// # use futures::StreamExt;
649    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
650    /// process
651    ///     .source_iter(q!(vec!["1", "hello", "world", "2"]))
652    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
653    /// # }, |mut stream| async move {
654    /// // 1, 2
655    /// # for w in (1..3) {
656    /// #     assert_eq!(stream.next().await.unwrap(), w);
657    /// # }
658    /// # }));
659    /// # }
660    /// ```
661    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
662    where
663        F: Fn(T) -> Option<U> + 'a,
664    {
665        let f = f.splice_fn1_ctx(&self.location).into();
666        Stream::new(
667            self.location.clone(),
668            HydroNode::FilterMap {
669                f,
670                input: Box::new(self.ir_node.into_inner()),
671                metadata: self
672                    .location
673                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
674            },
675        )
676    }
677
678    /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
679    /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
680    /// If `other` is an empty [`Optional`], no values will be produced.
681    ///
682    /// # Example
683    /// ```rust
684    /// # #[cfg(feature = "deploy")] {
685    /// # use hydro_lang::prelude::*;
686    /// # use futures::StreamExt;
687    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
688    /// let tick = process.tick();
689    /// let batch = process
690    ///   .source_iter(q!(vec![1, 2, 3, 4]))
691    ///   .batch(&tick, nondet!(/** test */));
692    /// let count = batch.clone().count(); // `count()` returns a singleton
693    /// batch.cross_singleton(count).all_ticks()
694    /// # }, |mut stream| async move {
695    /// // (1, 4), (2, 4), (3, 4), (4, 4)
696    /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
697    /// #     assert_eq!(stream.next().await.unwrap(), w);
698    /// # }
699    /// # }));
700    /// # }
701    /// ```
702    pub fn cross_singleton<O2>(
703        self,
704        other: impl Into<Optional<O2, L, Bounded>>,
705    ) -> Stream<(T, O2), L, B, O, R>
706    where
707        O2: Clone,
708    {
709        let other: Optional<O2, L, Bounded> = other.into();
710        check_matching_location(&self.location, &other.location);
711
712        Stream::new(
713            self.location.clone(),
714            HydroNode::CrossSingleton {
715                left: Box::new(self.ir_node.into_inner()),
716                right: Box::new(other.ir_node.into_inner()),
717                metadata: self
718                    .location
719                    .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
720            },
721        )
722    }
723
724    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
725    ///
726    /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
727    /// leader of a cluster.
728    ///
729    /// # Example
730    /// ```rust
731    /// # #[cfg(feature = "deploy")] {
732    /// # use hydro_lang::prelude::*;
733    /// # use futures::StreamExt;
734    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
735    /// let tick = process.tick();
736    /// // ticks are lazy by default, forces the second tick to run
737    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
738    ///
739    /// let batch_first_tick = process
740    ///   .source_iter(q!(vec![1, 2, 3, 4]))
741    ///   .batch(&tick, nondet!(/** test */));
742    /// let batch_second_tick = process
743    ///   .source_iter(q!(vec![5, 6, 7, 8]))
744    ///   .batch(&tick, nondet!(/** test */))
745    ///   .defer_tick(); // appears on the second tick
746    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
747    /// batch_first_tick.chain(batch_second_tick)
748    ///   .filter_if_some(some_on_first_tick)
749    ///   .all_ticks()
750    /// # }, |mut stream| async move {
751    /// // [1, 2, 3, 4]
752    /// # for w in vec![1, 2, 3, 4] {
753    /// #     assert_eq!(stream.next().await.unwrap(), w);
754    /// # }
755    /// # }));
756    /// # }
757    /// ```
758    pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
759        self.cross_singleton(signal.map(q!(|_u| ())))
760            .map(q!(|(d, _signal)| d))
761    }
762
763    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
764    ///
765    /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
766    /// some local state.
767    ///
768    /// # Example
769    /// ```rust
770    /// # #[cfg(feature = "deploy")] {
771    /// # use hydro_lang::prelude::*;
772    /// # use futures::StreamExt;
773    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
774    /// let tick = process.tick();
775    /// // ticks are lazy by default, forces the second tick to run
776    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
777    ///
778    /// let batch_first_tick = process
779    ///   .source_iter(q!(vec![1, 2, 3, 4]))
780    ///   .batch(&tick, nondet!(/** test */));
781    /// let batch_second_tick = process
782    ///   .source_iter(q!(vec![5, 6, 7, 8]))
783    ///   .batch(&tick, nondet!(/** test */))
784    ///   .defer_tick(); // appears on the second tick
785    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
786    /// batch_first_tick.chain(batch_second_tick)
787    ///   .filter_if_none(some_on_first_tick)
788    ///   .all_ticks()
789    /// # }, |mut stream| async move {
790    /// // [5, 6, 7, 8]
791    /// # for w in vec![5, 6, 7, 8] {
792    /// #     assert_eq!(stream.next().await.unwrap(), w);
793    /// # }
794    /// # }));
795    /// # }
796    /// ```
797    pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
798        self.filter_if_some(
799            other
800                .map(q!(|_| ()))
801                .into_singleton()
802                .filter(q!(|o| o.is_none())),
803        )
804    }
805
806    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
807    /// tupled pairs in a non-deterministic order.
808    ///
809    /// # Example
810    /// ```rust
811    /// # #[cfg(feature = "deploy")] {
812    /// # use hydro_lang::prelude::*;
813    /// # use std::collections::HashSet;
814    /// # use futures::StreamExt;
815    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
816    /// let tick = process.tick();
817    /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
818    /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
819    /// stream1.cross_product(stream2)
820    /// # }, |mut stream| async move {
821    /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
822    /// # stream.map(|i| assert!(expected.contains(&i)));
823    /// # }));
824    /// # }
825    /// ```
826    pub fn cross_product<T2, O2: Ordering>(
827        self,
828        other: Stream<T2, L, B, O2, R>,
829    ) -> Stream<(T, T2), L, B, NoOrder, R>
830    where
831        T: Clone,
832        T2: Clone,
833    {
834        check_matching_location(&self.location, &other.location);
835
836        Stream::new(
837            self.location.clone(),
838            HydroNode::CrossProduct {
839                left: Box::new(self.ir_node.into_inner()),
840                right: Box::new(other.ir_node.into_inner()),
841                metadata: self
842                    .location
843                    .new_node_metadata(Stream::<(T, T2), L, B, NoOrder, R>::collection_kind()),
844            },
845        )
846    }
847
848    /// Takes one stream as input and filters out any duplicate occurrences. The output
849    /// contains all unique values from the input.
850    ///
851    /// # Example
852    /// ```rust
853    /// # #[cfg(feature = "deploy")] {
854    /// # use hydro_lang::prelude::*;
855    /// # use futures::StreamExt;
856    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
857    /// let tick = process.tick();
858    /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
859    /// # }, |mut stream| async move {
860    /// # for w in vec![1, 2, 3, 4] {
861    /// #     assert_eq!(stream.next().await.unwrap(), w);
862    /// # }
863    /// # }));
864    /// # }
865    /// ```
866    pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
867    where
868        T: Eq + Hash,
869    {
870        Stream::new(
871            self.location.clone(),
872            HydroNode::Unique {
873                input: Box::new(self.ir_node.into_inner()),
874                metadata: self
875                    .location
876                    .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
877            },
878        )
879    }
880
881    /// Outputs everything in this stream that is *not* contained in the `other` stream.
882    ///
883    /// The `other` stream must be [`Bounded`], since this function will wait until
884    /// all its elements are available before producing any output.
885    /// # Example
886    /// ```rust
887    /// # #[cfg(feature = "deploy")] {
888    /// # use hydro_lang::prelude::*;
889    /// # use futures::StreamExt;
890    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
891    /// let tick = process.tick();
892    /// let stream = process
893    ///   .source_iter(q!(vec![ 1, 2, 3, 4 ]))
894    ///   .batch(&tick, nondet!(/** test */));
895    /// let batch = process
896    ///   .source_iter(q!(vec![1, 2]))
897    ///   .batch(&tick, nondet!(/** test */));
898    /// stream.filter_not_in(batch).all_ticks()
899    /// # }, |mut stream| async move {
900    /// # for w in vec![3, 4] {
901    /// #     assert_eq!(stream.next().await.unwrap(), w);
902    /// # }
903    /// # }));
904    /// # }
905    /// ```
906    pub fn filter_not_in<O2: Ordering, B2>(self, other: Stream<T, L, B2, O2, R>) -> Self
907    where
908        T: Eq + Hash,
909        B2: IsBounded,
910    {
911        check_matching_location(&self.location, &other.location);
912
913        Stream::new(
914            self.location.clone(),
915            HydroNode::Difference {
916                pos: Box::new(self.ir_node.into_inner()),
917                neg: Box::new(other.ir_node.into_inner()),
918                metadata: self
919                    .location
920                    .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
921            },
922        )
923    }
924
925    /// An operator which allows you to "inspect" each element of a stream without
926    /// modifying it. The closure `f` is called on a reference to each item. This is
927    /// mainly useful for debugging, and should not be used to generate side-effects.
928    ///
929    /// # Example
930    /// ```rust
931    /// # #[cfg(feature = "deploy")] {
932    /// # use hydro_lang::prelude::*;
933    /// # use futures::StreamExt;
934    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
935    /// let nums = process.source_iter(q!(vec![1, 2]));
936    /// // prints "1 * 10 = 10" and "2 * 10 = 20"
937    /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
938    /// # }, |mut stream| async move {
939    /// # for w in vec![1, 2] {
940    /// #     assert_eq!(stream.next().await.unwrap(), w);
941    /// # }
942    /// # }));
943    /// # }
944    /// ```
945    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
946    where
947        F: Fn(&T) + 'a,
948    {
949        let f = f.splice_fn1_borrow_ctx(&self.location).into();
950
951        Stream::new(
952            self.location.clone(),
953            HydroNode::Inspect {
954                f,
955                input: Box::new(self.ir_node.into_inner()),
956                metadata: self.location.new_node_metadata(Self::collection_kind()),
957            },
958        )
959    }
960
961    /// Executes the provided closure for every element in this stream.
962    ///
963    /// Because the closure may have side effects, the stream must have deterministic order
964    /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
965    /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
966    /// [`Stream::assume_retries`] with an explanation for why this is the case.
967    pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>)
968    where
969        O: IsOrdered,
970        R: IsExactlyOnce,
971    {
972        let f = f.splice_fn1_ctx(&self.location).into();
973        self.location
974            .flow_state()
975            .borrow_mut()
976            .push_root(HydroRoot::ForEach {
977                input: Box::new(self.ir_node.into_inner()),
978                f,
979                op_metadata: HydroIrOpMetadata::new(),
980            });
981    }
982
983    /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
984    /// TCP socket to some other server. You should _not_ use this API for interacting with
985    /// external clients, instead see [`Location::bidi_external_many_bytes`] and
986    /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
987    /// interaction with asynchronous sinks.
988    pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
989    where
990        O: IsOrdered,
991        R: IsExactlyOnce,
992        S: 'a + futures::Sink<T> + Unpin,
993    {
994        self.location
995            .flow_state()
996            .borrow_mut()
997            .push_root(HydroRoot::DestSink {
998                sink: sink.splice_typed_ctx(&self.location).into(),
999                input: Box::new(self.ir_node.into_inner()),
1000                op_metadata: HydroIrOpMetadata::new(),
1001            });
1002    }
1003
1004    /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1005    ///
1006    /// # Example
1007    /// ```rust
1008    /// # #[cfg(feature = "deploy")] {
1009    /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1010    /// # use futures::StreamExt;
1011    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, TotalOrder, ExactlyOnce>(|process| {
1012    /// let tick = process.tick();
1013    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1014    /// numbers.enumerate()
1015    /// # }, |mut stream| async move {
1016    /// // (0, 1), (1, 2), (2, 3), (3, 4)
1017    /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1018    /// #     assert_eq!(stream.next().await.unwrap(), w);
1019    /// # }
1020    /// # }));
1021    /// # }
1022    /// ```
1023    pub fn enumerate(self) -> Stream<(usize, T), L, B, O, R>
1024    where
1025        O: IsOrdered,
1026        R: IsExactlyOnce,
1027    {
1028        Stream::new(
1029            self.location.clone(),
1030            HydroNode::Enumerate {
1031                input: Box::new(self.ir_node.into_inner()),
1032                metadata: self.location.new_node_metadata(Stream::<
1033                    (usize, T),
1034                    L,
1035                    B,
1036                    TotalOrder,
1037                    ExactlyOnce,
1038                >::collection_kind()),
1039            },
1040        )
1041    }
1042
1043    /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1044    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1045    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1046    ///
1047    /// Depending on the input stream guarantees, the closure may need to be commutative
1048    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1049    ///
1050    /// # Example
1051    /// ```rust
1052    /// # #[cfg(feature = "deploy")] {
1053    /// # use hydro_lang::prelude::*;
1054    /// # use futures::StreamExt;
1055    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1056    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1057    /// words
1058    ///     .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1059    ///     .into_stream()
1060    /// # }, |mut stream| async move {
1061    /// // "HELLOWORLD"
1062    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1063    /// # }));
1064    /// # }
1065    /// ```
1066    pub fn fold<A, I, F, C, Idemp>(
1067        self,
1068        init: impl IntoQuotedMut<'a, I, L>,
1069        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1070    ) -> Singleton<A, L, B>
1071    where
1072        I: Fn() -> A + 'a,
1073        F: Fn(&mut A, T),
1074        C: ValidCommutativityFor<O>,
1075        Idemp: ValidIdempotenceFor<R>,
1076    {
1077        let init = init.splice_fn0_ctx(&self.location).into();
1078        let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1079        proof.register_proof(&comb);
1080
1081        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1082        let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1083
1084        let core = HydroNode::Fold {
1085            init,
1086            acc: comb.into(),
1087            input: Box::new(ordered_etc.ir_node.into_inner()),
1088            metadata: ordered_etc
1089                .location
1090                .new_node_metadata(Singleton::<A, L, B>::collection_kind()),
1091        };
1092
1093        Singleton::new(ordered_etc.location, core)
1094    }
1095
1096    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1097    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1098    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1099    /// reference, so that it can be modified in place.
1100    ///
1101    /// Depending on the input stream guarantees, the closure may need to be commutative
1102    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1103    ///
1104    /// # Example
1105    /// ```rust
1106    /// # #[cfg(feature = "deploy")] {
1107    /// # use hydro_lang::prelude::*;
1108    /// # use futures::StreamExt;
1109    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1110    /// let bools = process.source_iter(q!(vec![false, true, false]));
1111    /// bools.reduce(q!(|acc, x| *acc |= x)).into_stream()
1112    /// # }, |mut stream| async move {
1113    /// // true
1114    /// # assert_eq!(stream.next().await.unwrap(), true);
1115    /// # }));
1116    /// # }
1117    /// ```
1118    pub fn reduce<F, C, Idemp>(
1119        self,
1120        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1121    ) -> Optional<T, L, B>
1122    where
1123        F: Fn(&mut T, T) + 'a,
1124        C: ValidCommutativityFor<O>,
1125        Idemp: ValidIdempotenceFor<R>,
1126    {
1127        let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1128        proof.register_proof(&f);
1129
1130        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1131        let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1132
1133        let core = HydroNode::Reduce {
1134            f: f.into(),
1135            input: Box::new(ordered_etc.ir_node.into_inner()),
1136            metadata: ordered_etc
1137                .location
1138                .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1139        };
1140
1141        Optional::new(ordered_etc.location, core)
1142    }
1143
1144    /// Computes the maximum element in the stream as an [`Optional`], which
1145    /// will be empty until the first element in the input arrives.
1146    ///
1147    /// # Example
1148    /// ```rust
1149    /// # #[cfg(feature = "deploy")] {
1150    /// # use hydro_lang::prelude::*;
1151    /// # use futures::StreamExt;
1152    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1153    /// let tick = process.tick();
1154    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1155    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1156    /// batch.max().all_ticks()
1157    /// # }, |mut stream| async move {
1158    /// // 4
1159    /// # assert_eq!(stream.next().await.unwrap(), 4);
1160    /// # }));
1161    /// # }
1162    /// ```
1163    pub fn max(self) -> Optional<T, L, B>
1164    where
1165        T: Ord,
1166    {
1167        self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** max is idempotent */))
1168            .assume_ordering_trusted_bounded::<TotalOrder>(
1169                nondet!(/** max is commutative, but order affects intermediates */),
1170            )
1171            .reduce(q!(|curr, new| {
1172                if new > *curr {
1173                    *curr = new;
1174                }
1175            }))
1176    }
1177
1178    /// Computes the minimum element in the stream as an [`Optional`], which
1179    /// will be empty until the first element in the input arrives.
1180    ///
1181    /// # Example
1182    /// ```rust
1183    /// # #[cfg(feature = "deploy")] {
1184    /// # use hydro_lang::prelude::*;
1185    /// # use futures::StreamExt;
1186    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1187    /// let tick = process.tick();
1188    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1189    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1190    /// batch.min().all_ticks()
1191    /// # }, |mut stream| async move {
1192    /// // 1
1193    /// # assert_eq!(stream.next().await.unwrap(), 1);
1194    /// # }));
1195    /// # }
1196    /// ```
1197    pub fn min(self) -> Optional<T, L, B>
1198    where
1199        T: Ord,
1200    {
1201        self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** min is idempotent */))
1202            .assume_ordering_trusted_bounded::<TotalOrder>(
1203                nondet!(/** max is commutative, but order affects intermediates */),
1204            )
1205            .reduce(q!(|curr, new| {
1206                if new < *curr {
1207                    *curr = new;
1208                }
1209            }))
1210    }
1211
1212    /// Computes the first element in the stream as an [`Optional`], which
1213    /// will be empty until the first element in the input arrives.
1214    ///
1215    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1216    /// re-ordering of elements may cause the first element to change.
1217    ///
1218    /// # Example
1219    /// ```rust
1220    /// # #[cfg(feature = "deploy")] {
1221    /// # use hydro_lang::prelude::*;
1222    /// # use futures::StreamExt;
1223    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1224    /// let tick = process.tick();
1225    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1226    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1227    /// batch.first().all_ticks()
1228    /// # }, |mut stream| async move {
1229    /// // 1
1230    /// # assert_eq!(stream.next().await.unwrap(), 1);
1231    /// # }));
1232    /// # }
1233    /// ```
1234    pub fn first(self) -> Optional<T, L, B>
1235    where
1236        O: IsOrdered,
1237    {
1238        self.make_totally_ordered()
1239            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** first is idempotent */))
1240            .reduce(q!(|_, _| {}))
1241    }
1242
1243    /// Computes the last element in the stream as an [`Optional`], which
1244    /// will be empty until an element in the input arrives.
1245    ///
1246    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1247    /// re-ordering of elements may cause the last element to change.
1248    ///
1249    /// # Example
1250    /// ```rust
1251    /// # #[cfg(feature = "deploy")] {
1252    /// # use hydro_lang::prelude::*;
1253    /// # use futures::StreamExt;
1254    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1255    /// let tick = process.tick();
1256    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1257    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1258    /// batch.last().all_ticks()
1259    /// # }, |mut stream| async move {
1260    /// // 4
1261    /// # assert_eq!(stream.next().await.unwrap(), 4);
1262    /// # }));
1263    /// # }
1264    /// ```
1265    pub fn last(self) -> Optional<T, L, B>
1266    where
1267        O: IsOrdered,
1268    {
1269        self.make_totally_ordered()
1270            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** last is idempotent */))
1271            .reduce(q!(|curr, new| *curr = new))
1272    }
1273
1274    /// Collects all the elements of this stream into a single [`Vec`] element.
1275    ///
1276    /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1277    /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1278    /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1279    /// the vector at an arbitrary point in time.
1280    ///
1281    /// # Example
1282    /// ```rust
1283    /// # #[cfg(feature = "deploy")] {
1284    /// # use hydro_lang::prelude::*;
1285    /// # use futures::StreamExt;
1286    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1287    /// let tick = process.tick();
1288    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1289    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1290    /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1291    /// # }, |mut stream| async move {
1292    /// // [ vec![1, 2, 3, 4] ]
1293    /// # for w in vec![vec![1, 2, 3, 4]] {
1294    /// #     assert_eq!(stream.next().await.unwrap(), w);
1295    /// # }
1296    /// # }));
1297    /// # }
1298    /// ```
1299    pub fn collect_vec(self) -> Singleton<Vec<T>, L, B>
1300    where
1301        O: IsOrdered,
1302        R: IsExactlyOnce,
1303    {
1304        self.make_totally_ordered().make_exactly_once().fold(
1305            q!(|| vec![]),
1306            q!(|acc, v| {
1307                acc.push(v);
1308            }),
1309        )
1310    }
1311
1312    /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1313    /// and emitting each intermediate result.
1314    ///
1315    /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1316    /// containing all intermediate accumulated values. The scan operation can also terminate early
1317    /// by returning `None`.
1318    ///
1319    /// The function takes a mutable reference to the accumulator and the current element, and returns
1320    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1321    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1322    ///
1323    /// # Examples
1324    ///
1325    /// Basic usage - running sum:
1326    /// ```rust
1327    /// # #[cfg(feature = "deploy")] {
1328    /// # use hydro_lang::prelude::*;
1329    /// # use futures::StreamExt;
1330    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1331    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1332    ///     q!(|| 0),
1333    ///     q!(|acc, x| {
1334    ///         *acc += x;
1335    ///         Some(*acc)
1336    ///     }),
1337    /// )
1338    /// # }, |mut stream| async move {
1339    /// // Output: 1, 3, 6, 10
1340    /// # for w in vec![1, 3, 6, 10] {
1341    /// #     assert_eq!(stream.next().await.unwrap(), w);
1342    /// # }
1343    /// # }));
1344    /// # }
1345    /// ```
1346    ///
1347    /// Early termination example:
1348    /// ```rust
1349    /// # #[cfg(feature = "deploy")] {
1350    /// # use hydro_lang::prelude::*;
1351    /// # use futures::StreamExt;
1352    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1353    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1354    ///     q!(|| 1),
1355    ///     q!(|state, x| {
1356    ///         *state = *state * x;
1357    ///         if *state > 6 {
1358    ///             None // Terminate the stream
1359    ///         } else {
1360    ///             Some(-*state)
1361    ///         }
1362    ///     }),
1363    /// )
1364    /// # }, |mut stream| async move {
1365    /// // Output: -1, -2, -6
1366    /// # for w in vec![-1, -2, -6] {
1367    /// #     assert_eq!(stream.next().await.unwrap(), w);
1368    /// # }
1369    /// # }));
1370    /// # }
1371    /// ```
1372    pub fn scan<A, U, I, F>(
1373        self,
1374        init: impl IntoQuotedMut<'a, I, L>,
1375        f: impl IntoQuotedMut<'a, F, L>,
1376    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1377    where
1378        O: IsOrdered,
1379        R: IsExactlyOnce,
1380        I: Fn() -> A + 'a,
1381        F: Fn(&mut A, T) -> Option<U> + 'a,
1382    {
1383        let init = init.splice_fn0_ctx(&self.location).into();
1384        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1385
1386        Stream::new(
1387            self.location.clone(),
1388            HydroNode::Scan {
1389                init,
1390                acc: f,
1391                input: Box::new(self.ir_node.into_inner()),
1392                metadata: self.location.new_node_metadata(
1393                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1394                ),
1395            },
1396        )
1397    }
1398
1399    /// Given a time interval, returns a stream corresponding to samples taken from the
1400    /// stream roughly at that interval. The output will have elements in the same order
1401    /// as the input, but with arbitrary elements skipped between samples. There is also
1402    /// no guarantee on the exact timing of the samples.
1403    ///
1404    /// # Non-Determinism
1405    /// The output stream is non-deterministic in which elements are sampled, since this
1406    /// is controlled by a clock.
1407    pub fn sample_every(
1408        self,
1409        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1410        nondet: NonDet,
1411    ) -> Stream<T, L, Unbounded, O, AtLeastOnce>
1412    where
1413        L: NoTick + NoAtomic,
1414    {
1415        let samples = self.location.source_interval(interval, nondet);
1416
1417        let tick = self.location.tick();
1418        self.batch(&tick, nondet)
1419            .filter_if_some(samples.batch(&tick, nondet).first())
1420            .all_ticks()
1421            .weaken_retries()
1422    }
1423
1424    /// Given a timeout duration, returns an [`Optional`]  which will have a value if the
1425    /// stream has not emitted a value since that duration.
1426    ///
1427    /// # Non-Determinism
1428    /// Timeout relies on non-deterministic sampling of the stream, so depending on when
1429    /// samples take place, timeouts may be non-deterministically generated or missed,
1430    /// and the notification of the timeout may be delayed as well. There is also no
1431    /// guarantee on how long the [`Optional`] will have a value after the timeout is
1432    /// detected based on when the next sample is taken.
1433    pub fn timeout(
1434        self,
1435        duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
1436        nondet: NonDet,
1437    ) -> Optional<(), L, Unbounded>
1438    where
1439        L: NoTick + NoAtomic,
1440    {
1441        let tick = self.location.tick();
1442
1443        let latest_received = self.assume_retries::<ExactlyOnce>(nondet).fold(
1444            q!(|| None),
1445            q!(
1446                |latest, _| {
1447                    *latest = Some(Instant::now());
1448                },
1449                commutative = manual_proof!(/** TODO */)
1450            ),
1451        );
1452
1453        latest_received
1454            .snapshot(&tick, nondet)
1455            .filter_map(q!(move |latest_received| {
1456                if let Some(latest_received) = latest_received {
1457                    if Instant::now().duration_since(latest_received) > duration {
1458                        Some(())
1459                    } else {
1460                        None
1461                    }
1462                } else {
1463                    Some(())
1464                }
1465            }))
1466            .latest()
1467    }
1468
1469    /// Shifts this stream into an atomic context, which guarantees that any downstream logic
1470    /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
1471    ///
1472    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1473    /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
1474    /// argument that declares where the stream will be atomically processed. Batching a stream into
1475    /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
1476    /// [`Tick`] will introduce asynchrony.
1477    pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, O, R> {
1478        let out_location = Atomic { tick: tick.clone() };
1479        Stream::new(
1480            out_location.clone(),
1481            HydroNode::BeginAtomic {
1482                inner: Box::new(self.ir_node.into_inner()),
1483                metadata: out_location
1484                    .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
1485            },
1486        )
1487    }
1488
1489    /// Given a tick, returns a stream corresponding to a batch of elements segmented by
1490    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1491    /// the order of the input. The output stream will execute in the [`Tick`] that was
1492    /// used to create the atomic section.
1493    ///
1494    /// # Non-Determinism
1495    /// The batch boundaries are non-deterministic and may change across executions.
1496    pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
1497        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1498        Stream::new(
1499            tick.clone(),
1500            HydroNode::Batch {
1501                inner: Box::new(self.ir_node.into_inner()),
1502                metadata: tick
1503                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
1504            },
1505        )
1506    }
1507
1508    /// An operator which allows you to "name" a `HydroNode`.
1509    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1510    pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
1511        {
1512            let mut node = self.ir_node.borrow_mut();
1513            let metadata = node.metadata_mut();
1514            metadata.tag = Some(name.to_owned());
1515        }
1516        self
1517    }
1518
1519    /// Explicitly "casts" the stream to a type with a different ordering
1520    /// guarantee. Useful in unsafe code where the ordering cannot be proven
1521    /// by the type-system.
1522    ///
1523    /// # Non-Determinism
1524    /// This function is used as an escape hatch, and any mistakes in the
1525    /// provided ordering guarantee will propagate into the guarantees
1526    /// for the rest of the program.
1527    pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
1528        if O::ORDERING_KIND == O2::ORDERING_KIND {
1529            Stream::new(self.location, self.ir_node.into_inner())
1530        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
1531            // We can always weaken the ordering guarantee
1532            Stream::new(
1533                self.location.clone(),
1534                HydroNode::Cast {
1535                    inner: Box::new(self.ir_node.into_inner()),
1536                    metadata: self
1537                        .location
1538                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1539                },
1540            )
1541        } else {
1542            Stream::new(
1543                self.location.clone(),
1544                HydroNode::ObserveNonDet {
1545                    inner: Box::new(self.ir_node.into_inner()),
1546                    trusted: false,
1547                    metadata: self
1548                        .location
1549                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1550                },
1551            )
1552        }
1553    }
1554
1555    // like `assume_ordering_trusted`, but only if the input stream is bounded and therefore
1556    // intermediate states will not be revealed
1557    fn assume_ordering_trusted_bounded<O2: Ordering>(
1558        self,
1559        nondet: NonDet,
1560    ) -> Stream<T, L, B, O2, R> {
1561        if B::BOUNDED {
1562            self.assume_ordering_trusted(nondet)
1563        } else {
1564            self.assume_ordering(nondet)
1565        }
1566    }
1567
1568    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1569    // is not observable
1570    pub(crate) fn assume_ordering_trusted<O2: Ordering>(
1571        self,
1572        _nondet: NonDet,
1573    ) -> Stream<T, L, B, O2, R> {
1574        if O::ORDERING_KIND == O2::ORDERING_KIND {
1575            Stream::new(self.location, self.ir_node.into_inner())
1576        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
1577            // We can always weaken the ordering guarantee
1578            Stream::new(
1579                self.location.clone(),
1580                HydroNode::Cast {
1581                    inner: Box::new(self.ir_node.into_inner()),
1582                    metadata: self
1583                        .location
1584                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1585                },
1586            )
1587        } else {
1588            Stream::new(
1589                self.location.clone(),
1590                HydroNode::ObserveNonDet {
1591                    inner: Box::new(self.ir_node.into_inner()),
1592                    trusted: true,
1593                    metadata: self
1594                        .location
1595                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1596                },
1597            )
1598        }
1599    }
1600
1601    #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
1602    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
1603    /// which is always safe because that is the weakest possible guarantee.
1604    pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
1605        self.weaken_ordering::<NoOrder>()
1606    }
1607
1608    /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
1609    /// enforcing that `O2` is weaker than the input ordering guarantee.
1610    pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> Stream<T, L, B, O2, R> {
1611        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
1612        self.assume_ordering::<O2>(nondet)
1613    }
1614
1615    /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
1616    /// implies that `O == TotalOrder`.
1617    pub fn make_totally_ordered(self) -> Stream<T, L, B, TotalOrder, R>
1618    where
1619        O: IsOrdered,
1620    {
1621        self.assume_ordering(nondet!(/** no-op */))
1622    }
1623
1624    /// Explicitly "casts" the stream to a type with a different retries
1625    /// guarantee. Useful in unsafe code where the lack of retries cannot
1626    /// be proven by the type-system.
1627    ///
1628    /// # Non-Determinism
1629    /// This function is used as an escape hatch, and any mistakes in the
1630    /// provided retries guarantee will propagate into the guarantees
1631    /// for the rest of the program.
1632    pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1633        if R::RETRIES_KIND == R2::RETRIES_KIND {
1634            Stream::new(self.location, self.ir_node.into_inner())
1635        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1636            // We can always weaken the retries guarantee
1637            Stream::new(
1638                self.location.clone(),
1639                HydroNode::Cast {
1640                    inner: Box::new(self.ir_node.into_inner()),
1641                    metadata: self
1642                        .location
1643                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1644                },
1645            )
1646        } else {
1647            Stream::new(
1648                self.location.clone(),
1649                HydroNode::ObserveNonDet {
1650                    inner: Box::new(self.ir_node.into_inner()),
1651                    trusted: false,
1652                    metadata: self
1653                        .location
1654                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1655                },
1656            )
1657        }
1658    }
1659
1660    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1661    // is not observable
1662    fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1663        if R::RETRIES_KIND == R2::RETRIES_KIND {
1664            Stream::new(self.location, self.ir_node.into_inner())
1665        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1666            // We can always weaken the retries guarantee
1667            Stream::new(
1668                self.location.clone(),
1669                HydroNode::Cast {
1670                    inner: Box::new(self.ir_node.into_inner()),
1671                    metadata: self
1672                        .location
1673                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1674                },
1675            )
1676        } else {
1677            Stream::new(
1678                self.location.clone(),
1679                HydroNode::ObserveNonDet {
1680                    inner: Box::new(self.ir_node.into_inner()),
1681                    trusted: true,
1682                    metadata: self
1683                        .location
1684                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1685                },
1686            )
1687        }
1688    }
1689
1690    #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
1691    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
1692    /// which is always safe because that is the weakest possible guarantee.
1693    pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
1694        self.weaken_retries::<AtLeastOnce>()
1695    }
1696
1697    /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
1698    /// enforcing that `R2` is weaker than the input retries guarantee.
1699    pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> Stream<T, L, B, O, R2> {
1700        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
1701        self.assume_retries::<R2>(nondet)
1702    }
1703
1704    /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
1705    /// implies that `R == ExactlyOnce`.
1706    pub fn make_exactly_once(self) -> Stream<T, L, B, O, ExactlyOnce>
1707    where
1708        R: IsExactlyOnce,
1709    {
1710        self.assume_retries(nondet!(/** no-op */))
1711    }
1712
1713    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
1714    /// implies that `B == Bounded`.
1715    pub fn make_bounded(self) -> Stream<T, L, Bounded, O, R>
1716    where
1717        B: IsBounded,
1718    {
1719        Stream::new(self.location, self.ir_node.into_inner())
1720    }
1721}
1722
1723impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
1724where
1725    L: Location<'a>,
1726{
1727    /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
1728    ///
1729    /// # Example
1730    /// ```rust
1731    /// # #[cfg(feature = "deploy")] {
1732    /// # use hydro_lang::prelude::*;
1733    /// # use futures::StreamExt;
1734    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1735    /// process.source_iter(q!(&[1, 2, 3])).cloned()
1736    /// # }, |mut stream| async move {
1737    /// // 1, 2, 3
1738    /// # for w in vec![1, 2, 3] {
1739    /// #     assert_eq!(stream.next().await.unwrap(), w);
1740    /// # }
1741    /// # }));
1742    /// # }
1743    /// ```
1744    pub fn cloned(self) -> Stream<T, L, B, O, R>
1745    where
1746        T: Clone,
1747    {
1748        self.map(q!(|d| d.clone()))
1749    }
1750}
1751
1752impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
1753where
1754    L: Location<'a>,
1755{
1756    /// Computes the number of elements in the stream as a [`Singleton`].
1757    ///
1758    /// # Example
1759    /// ```rust
1760    /// # #[cfg(feature = "deploy")] {
1761    /// # use hydro_lang::prelude::*;
1762    /// # use futures::StreamExt;
1763    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1764    /// let tick = process.tick();
1765    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1766    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1767    /// batch.count().all_ticks()
1768    /// # }, |mut stream| async move {
1769    /// // 4
1770    /// # assert_eq!(stream.next().await.unwrap(), 4);
1771    /// # }));
1772    /// # }
1773    /// ```
1774    pub fn count(self) -> Singleton<usize, L, B> {
1775        self.assume_ordering_trusted::<TotalOrder>(nondet!(
1776            /// Order does not affect eventual count, and also does not affect intermediate states.
1777        ))
1778        .fold(q!(|| 0usize), q!(|count, _| *count += 1))
1779    }
1780}
1781
1782impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
1783    /// Produces a new stream that interleaves the elements of the two input streams.
1784    /// The result has [`NoOrder`] because the order of interleaving is not guaranteed.
1785    ///
1786    /// Currently, both input streams must be [`Unbounded`]. When the streams are
1787    /// [`Bounded`], you can use [`Stream::chain`] instead.
1788    ///
1789    /// # Example
1790    /// ```rust
1791    /// # #[cfg(feature = "deploy")] {
1792    /// # use hydro_lang::prelude::*;
1793    /// # use futures::StreamExt;
1794    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1795    /// let numbers: Stream<i32, _, Unbounded> = // 1, 2, 3, 4
1796    /// # process.source_iter(q!(vec![1, 2, 3, 4])).into();
1797    /// numbers.clone().map(q!(|x| x + 1)).interleave(numbers)
1798    /// # }, |mut stream| async move {
1799    /// // 2, 3, 4, 5, and 1, 2, 3, 4 interleaved in unknown order
1800    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1801    /// #     assert_eq!(stream.next().await.unwrap(), w);
1802    /// # }
1803    /// # }));
1804    /// # }
1805    /// ```
1806    pub fn interleave<O2: Ordering, R2: Retries>(
1807        self,
1808        other: Stream<T, L, Unbounded, O2, R2>,
1809    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
1810    where
1811        R: MinRetries<R2>,
1812    {
1813        Stream::new(
1814            self.location.clone(),
1815            HydroNode::Chain {
1816                first: Box::new(self.ir_node.into_inner()),
1817                second: Box::new(other.ir_node.into_inner()),
1818                metadata: self.location.new_node_metadata(Stream::<
1819                    T,
1820                    L,
1821                    Unbounded,
1822                    NoOrder,
1823                    <R as MinRetries<R2>>::Min,
1824                >::collection_kind()),
1825            },
1826        )
1827    }
1828}
1829
1830impl<'a, T, L: Location<'a> + NoTick, R: Retries> Stream<T, L, Unbounded, TotalOrder, R> {
1831    /// Produces a new stream that combines the elements of the two input streams,
1832    /// preserving the relative order of elements within each input.
1833    ///
1834    /// Currently, both input streams must be [`Unbounded`]. When the streams are
1835    /// [`Bounded`], you can use [`Stream::chain`] instead.
1836    ///
1837    /// # Non-Determinism
1838    /// The order in which elements *across* the two streams will be interleaved is
1839    /// non-deterministic, so the order of elements will vary across runs. If the output order
1840    /// is irrelevant, use [`Stream::interleave`] instead, which is deterministic but emits an
1841    /// unordered stream.
1842    ///
1843    /// # Example
1844    /// ```rust
1845    /// # #[cfg(feature = "deploy")] {
1846    /// # use hydro_lang::prelude::*;
1847    /// # use futures::StreamExt;
1848    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1849    /// let numbers: Stream<i32, _, Unbounded> = // 1, 3
1850    /// # process.source_iter(q!(vec![1, 3])).into();
1851    /// numbers.clone().merge_ordered(numbers.map(q!(|x| x + 1)), nondet!(/** example */))
1852    /// # }, |mut stream| async move {
1853    /// // 1, 3 and 2, 4 in some order, preserving the original local order
1854    /// # for w in vec![1, 3, 2, 4] {
1855    /// #     assert_eq!(stream.next().await.unwrap(), w);
1856    /// # }
1857    /// # }));
1858    /// # }
1859    /// ```
1860    pub fn merge_ordered<R2: Retries>(
1861        self,
1862        other: Stream<T, L, Unbounded, TotalOrder, R2>,
1863        _nondet: NonDet,
1864    ) -> Stream<T, L, Unbounded, TotalOrder, <R as MinRetries<R2>>::Min>
1865    where
1866        R: MinRetries<R2>,
1867    {
1868        Stream::new(
1869            self.location.clone(),
1870            HydroNode::Chain {
1871                first: Box::new(self.ir_node.into_inner()),
1872                second: Box::new(other.ir_node.into_inner()),
1873                metadata: self.location.new_node_metadata(Stream::<
1874                    T,
1875                    L,
1876                    Unbounded,
1877                    TotalOrder,
1878                    <R as MinRetries<R2>>::Min,
1879                >::collection_kind()),
1880            },
1881        )
1882    }
1883}
1884
1885impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
1886where
1887    L: Location<'a>,
1888{
1889    /// Produces a new stream that emits the input elements in sorted order.
1890    ///
1891    /// The input stream can have any ordering guarantee, but the output stream
1892    /// will have a [`TotalOrder`] guarantee. This operator will block until all
1893    /// elements in the input stream are available, so it requires the input stream
1894    /// to be [`Bounded`].
1895    ///
1896    /// # Example
1897    /// ```rust
1898    /// # #[cfg(feature = "deploy")] {
1899    /// # use hydro_lang::prelude::*;
1900    /// # use futures::StreamExt;
1901    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1902    /// let tick = process.tick();
1903    /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
1904    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1905    /// batch.sort().all_ticks()
1906    /// # }, |mut stream| async move {
1907    /// // 1, 2, 3, 4
1908    /// # for w in (1..5) {
1909    /// #     assert_eq!(stream.next().await.unwrap(), w);
1910    /// # }
1911    /// # }));
1912    /// # }
1913    /// ```
1914    pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
1915    where
1916        B: IsBounded,
1917        T: Ord,
1918    {
1919        let this = self.make_bounded();
1920        Stream::new(
1921            this.location.clone(),
1922            HydroNode::Sort {
1923                input: Box::new(this.ir_node.into_inner()),
1924                metadata: this
1925                    .location
1926                    .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
1927            },
1928        )
1929    }
1930
1931    /// Produces a new stream that first emits the elements of the `self` stream,
1932    /// and then emits the elements of the `other` stream. The output stream has
1933    /// a [`TotalOrder`] guarantee if and only if both input streams have a
1934    /// [`TotalOrder`] guarantee.
1935    ///
1936    /// Currently, both input streams must be [`Bounded`]. This operator will block
1937    /// on the first stream until all its elements are available. In a future version,
1938    /// we will relax the requirement on the `other` stream.
1939    ///
1940    /// # Example
1941    /// ```rust
1942    /// # #[cfg(feature = "deploy")] {
1943    /// # use hydro_lang::prelude::*;
1944    /// # use futures::StreamExt;
1945    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1946    /// let tick = process.tick();
1947    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1948    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1949    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1950    /// # }, |mut stream| async move {
1951    /// // 2, 3, 4, 5, 1, 2, 3, 4
1952    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1953    /// #     assert_eq!(stream.next().await.unwrap(), w);
1954    /// # }
1955    /// # }));
1956    /// # }
1957    /// ```
1958    pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
1959        self,
1960        other: Stream<T, L, B2, O2, R2>,
1961    ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
1962    where
1963        B: IsBounded,
1964        O: MinOrder<O2>,
1965        R: MinRetries<R2>,
1966    {
1967        check_matching_location(&self.location, &other.location);
1968
1969        Stream::new(
1970            self.location.clone(),
1971            HydroNode::Chain {
1972                first: Box::new(self.ir_node.into_inner()),
1973                second: Box::new(other.ir_node.into_inner()),
1974                metadata: self.location.new_node_metadata(Stream::<
1975                    T,
1976                    L,
1977                    B2,
1978                    <O as MinOrder<O2>>::Min,
1979                    <R as MinRetries<R2>>::Min,
1980                >::collection_kind()),
1981            },
1982        )
1983    }
1984
1985    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
1986    /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
1987    /// because this is compiled into a nested loop.
1988    pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
1989        self,
1990        other: Stream<T2, L, Bounded, O2, R>,
1991    ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
1992    where
1993        B: IsBounded,
1994        T: Clone,
1995        T2: Clone,
1996    {
1997        let this = self.make_bounded();
1998        check_matching_location(&this.location, &other.location);
1999
2000        Stream::new(
2001            this.location.clone(),
2002            HydroNode::CrossProduct {
2003                left: Box::new(this.ir_node.into_inner()),
2004                right: Box::new(other.ir_node.into_inner()),
2005                metadata: this.location.new_node_metadata(Stream::<
2006                    (T, T2),
2007                    L,
2008                    Bounded,
2009                    <O2 as MinOrder<O>>::Min,
2010                    R,
2011                >::collection_kind()),
2012            },
2013        )
2014    }
2015
2016    /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
2017    /// `self` used as the values for *each* key.
2018    ///
2019    /// This is helpful when "broadcasting" a set of values so that all the keys have the same
2020    /// values. For example, it can be used to send the same set of elements to several cluster
2021    /// members, if the membership information is available as a [`KeyedSingleton`].
2022    ///
2023    /// # Example
2024    /// ```rust
2025    /// # #[cfg(feature = "deploy")] {
2026    /// # use hydro_lang::prelude::*;
2027    /// # use futures::StreamExt;
2028    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2029    /// # let tick = process.tick();
2030    /// let keyed_singleton = // { 1: (), 2: () }
2031    /// # process
2032    /// #     .source_iter(q!(vec![(1, ()), (2, ())]))
2033    /// #     .into_keyed()
2034    /// #     .batch(&tick, nondet!(/** test */))
2035    /// #     .first();
2036    /// let stream = // [ "a", "b" ]
2037    /// # process
2038    /// #     .source_iter(q!(vec!["a".to_owned(), "b".to_owned()]))
2039    /// #     .batch(&tick, nondet!(/** test */));
2040    /// stream.repeat_with_keys(keyed_singleton)
2041    /// # .entries().all_ticks()
2042    /// # }, |mut stream| async move {
2043    /// // { 1: ["a", "b" ], 2: ["a", "b"] }
2044    /// # let mut results = Vec::new();
2045    /// # for _ in 0..4 {
2046    /// #     results.push(stream.next().await.unwrap());
2047    /// # }
2048    /// # results.sort();
2049    /// # assert_eq!(results, vec![(1, "a".to_owned()), (1, "b".to_owned()), (2, "a".to_owned()), (2, "b".to_owned())]);
2050    /// # }));
2051    /// # }
2052    /// ```
2053    pub fn repeat_with_keys<K, V2>(
2054        self,
2055        keys: KeyedSingleton<K, V2, L, Bounded>,
2056    ) -> KeyedStream<K, T, L, Bounded, O, R>
2057    where
2058        B: IsBounded,
2059        K: Clone,
2060        T: Clone,
2061    {
2062        keys.keys()
2063            .weaken_retries()
2064            .assume_ordering_trusted::<TotalOrder>(
2065                nondet!(/** keyed stream does not depend on ordering of keys */),
2066            )
2067            .cross_product_nested_loop(self.make_bounded())
2068            .into_keyed()
2069    }
2070}
2071
2072impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
2073where
2074    L: Location<'a>,
2075{
2076    #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
2077    /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
2078    /// by equi-joining the two streams on the key attribute `K`.
2079    ///
2080    /// # Example
2081    /// ```rust
2082    /// # #[cfg(feature = "deploy")] {
2083    /// # use hydro_lang::prelude::*;
2084    /// # use std::collections::HashSet;
2085    /// # use futures::StreamExt;
2086    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2087    /// let tick = process.tick();
2088    /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
2089    /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
2090    /// stream1.join(stream2)
2091    /// # }, |mut stream| async move {
2092    /// // (1, ('a', 'x')), (2, ('b', 'y'))
2093    /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
2094    /// # stream.map(|i| assert!(expected.contains(&i)));
2095    /// # }));
2096    /// # }
2097    pub fn join<V2, O2: Ordering, R2: Retries>(
2098        self,
2099        n: Stream<(K, V2), L, B, O2, R2>,
2100    ) -> Stream<(K, (V1, V2)), L, B, NoOrder, <R as MinRetries<R2>>::Min>
2101    where
2102        K: Eq + Hash,
2103        R: MinRetries<R2>,
2104    {
2105        check_matching_location(&self.location, &n.location);
2106
2107        Stream::new(
2108            self.location.clone(),
2109            HydroNode::Join {
2110                left: Box::new(self.ir_node.into_inner()),
2111                right: Box::new(n.ir_node.into_inner()),
2112                metadata: self.location.new_node_metadata(Stream::<
2113                    (K, (V1, V2)),
2114                    L,
2115                    B,
2116                    NoOrder,
2117                    <R as MinRetries<R2>>::Min,
2118                >::collection_kind()),
2119            },
2120        )
2121    }
2122
2123    /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
2124    /// computes the anti-join of the items in the input -- i.e. returns
2125    /// unique items in the first input that do not have a matching key
2126    /// in the second input.
2127    ///
2128    /// # Example
2129    /// ```rust
2130    /// # #[cfg(feature = "deploy")] {
2131    /// # use hydro_lang::prelude::*;
2132    /// # use futures::StreamExt;
2133    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2134    /// let tick = process.tick();
2135    /// let stream = process
2136    ///   .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2137    ///   .batch(&tick, nondet!(/** test */));
2138    /// let batch = process
2139    ///   .source_iter(q!(vec![1, 2]))
2140    ///   .batch(&tick, nondet!(/** test */));
2141    /// stream.anti_join(batch).all_ticks()
2142    /// # }, |mut stream| async move {
2143    /// # for w in vec![(3, 'c'), (4, 'd')] {
2144    /// #     assert_eq!(stream.next().await.unwrap(), w);
2145    /// # }
2146    /// # }));
2147    /// # }
2148    pub fn anti_join<O2: Ordering, R2: Retries>(
2149        self,
2150        n: Stream<K, L, Bounded, O2, R2>,
2151    ) -> Stream<(K, V1), L, B, O, R>
2152    where
2153        K: Eq + Hash,
2154    {
2155        check_matching_location(&self.location, &n.location);
2156
2157        Stream::new(
2158            self.location.clone(),
2159            HydroNode::AntiJoin {
2160                pos: Box::new(self.ir_node.into_inner()),
2161                neg: Box::new(n.ir_node.into_inner()),
2162                metadata: self
2163                    .location
2164                    .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2165            },
2166        )
2167    }
2168}
2169
2170impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2171    Stream<(K, V), L, B, O, R>
2172{
2173    /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2174    /// is used as the key and the second element is added to the entries associated with that key.
2175    ///
2176    /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2177    /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2178    /// performing grouped aggregations, but also for more precise ordering guarantees such as
2179    /// total ordering _within_ each group but no ordering _across_ groups.
2180    ///
2181    /// # Example
2182    /// ```rust
2183    /// # #[cfg(feature = "deploy")] {
2184    /// # use hydro_lang::prelude::*;
2185    /// # use futures::StreamExt;
2186    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2187    /// process
2188    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2189    ///     .into_keyed()
2190    /// #   .entries()
2191    /// # }, |mut stream| async move {
2192    /// // { 1: [2, 3], 2: [4] }
2193    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2194    /// #     assert_eq!(stream.next().await.unwrap(), w);
2195    /// # }
2196    /// # }));
2197    /// # }
2198    /// ```
2199    pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2200        KeyedStream::new(
2201            self.location.clone(),
2202            HydroNode::Cast {
2203                inner: Box::new(self.ir_node.into_inner()),
2204                metadata: self
2205                    .location
2206                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2207            },
2208        )
2209    }
2210}
2211
2212impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2213where
2214    K: Eq + Hash,
2215    L: Location<'a>,
2216{
2217    /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2218    /// # Example
2219    /// ```rust
2220    /// # #[cfg(feature = "deploy")] {
2221    /// # use hydro_lang::prelude::*;
2222    /// # use futures::StreamExt;
2223    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2224    /// let tick = process.tick();
2225    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2226    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2227    /// batch.keys().all_ticks()
2228    /// # }, |mut stream| async move {
2229    /// // 1, 2
2230    /// # assert_eq!(stream.next().await.unwrap(), 1);
2231    /// # assert_eq!(stream.next().await.unwrap(), 2);
2232    /// # }));
2233    /// # }
2234    /// ```
2235    pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2236        self.into_keyed()
2237            .fold(
2238                q!(|| ()),
2239                q!(
2240                    |_, _| {},
2241                    commutative = manual_proof!(/** values are ignored */),
2242                    idempotent = manual_proof!(/** values are ignored */)
2243                ),
2244            )
2245            .keys()
2246    }
2247}
2248
2249impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2250where
2251    L: Location<'a> + NoTick,
2252{
2253    /// Returns a stream corresponding to the latest batch of elements being atomically
2254    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2255    /// the order of the input.
2256    ///
2257    /// # Non-Determinism
2258    /// The batch boundaries are non-deterministic and may change across executions.
2259    pub fn batch_atomic(self, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2260        Stream::new(
2261            self.location.clone().tick,
2262            HydroNode::Batch {
2263                inner: Box::new(self.ir_node.into_inner()),
2264                metadata: self
2265                    .location
2266                    .tick
2267                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2268            },
2269        )
2270    }
2271
2272    /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2273    /// See [`Stream::atomic`] for more details.
2274    pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2275        Stream::new(
2276            self.location.tick.l.clone(),
2277            HydroNode::EndAtomic {
2278                inner: Box::new(self.ir_node.into_inner()),
2279                metadata: self
2280                    .location
2281                    .tick
2282                    .l
2283                    .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2284            },
2285        )
2286    }
2287}
2288
2289impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2290where
2291    L: Location<'a> + NoTick + NoAtomic,
2292    F: Future<Output = T>,
2293{
2294    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2295    /// Future outputs are produced as available, regardless of input arrival order.
2296    ///
2297    /// # Example
2298    /// ```rust
2299    /// # #[cfg(feature = "deploy")] {
2300    /// # use std::collections::HashSet;
2301    /// # use futures::StreamExt;
2302    /// # use hydro_lang::prelude::*;
2303    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2304    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2305    ///     .map(q!(|x| async move {
2306    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2307    ///         x
2308    ///     }))
2309    ///     .resolve_futures()
2310    /// #   },
2311    /// #   |mut stream| async move {
2312    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2313    /// #       let mut output = HashSet::new();
2314    /// #       for _ in 1..10 {
2315    /// #           output.insert(stream.next().await.unwrap());
2316    /// #       }
2317    /// #       assert_eq!(
2318    /// #           output,
2319    /// #           HashSet::<i32>::from_iter(1..10)
2320    /// #       );
2321    /// #   },
2322    /// # ));
2323    /// # }
2324    pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
2325        Stream::new(
2326            self.location.clone(),
2327            HydroNode::ResolveFutures {
2328                input: Box::new(self.ir_node.into_inner()),
2329                metadata: self
2330                    .location
2331                    .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
2332            },
2333        )
2334    }
2335
2336    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2337    /// Future outputs are produced in the same order as the input stream.
2338    ///
2339    /// # Example
2340    /// ```rust
2341    /// # #[cfg(feature = "deploy")] {
2342    /// # use std::collections::HashSet;
2343    /// # use futures::StreamExt;
2344    /// # use hydro_lang::prelude::*;
2345    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2346    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2347    ///     .map(q!(|x| async move {
2348    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2349    ///         x
2350    ///     }))
2351    ///     .resolve_futures_ordered()
2352    /// #   },
2353    /// #   |mut stream| async move {
2354    /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2355    /// #       let mut output = Vec::new();
2356    /// #       for _ in 1..10 {
2357    /// #           output.push(stream.next().await.unwrap());
2358    /// #       }
2359    /// #       assert_eq!(
2360    /// #           output,
2361    /// #           vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2362    /// #       );
2363    /// #   },
2364    /// # ));
2365    /// # }
2366    pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
2367        Stream::new(
2368            self.location.clone(),
2369            HydroNode::ResolveFuturesOrdered {
2370                input: Box::new(self.ir_node.into_inner()),
2371                metadata: self
2372                    .location
2373                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2374            },
2375        )
2376    }
2377}
2378
2379impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2380where
2381    L: Location<'a>,
2382{
2383    /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2384    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2385    pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2386        Stream::new(
2387            self.location.outer().clone(),
2388            HydroNode::YieldConcat {
2389                inner: Box::new(self.ir_node.into_inner()),
2390                metadata: self
2391                    .location
2392                    .outer()
2393                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2394            },
2395        )
2396    }
2397
2398    /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2399    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2400    ///
2401    /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2402    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2403    /// stream's [`Tick`] context.
2404    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2405        let out_location = Atomic {
2406            tick: self.location.clone(),
2407        };
2408
2409        Stream::new(
2410            out_location.clone(),
2411            HydroNode::YieldConcat {
2412                inner: Box::new(self.ir_node.into_inner()),
2413                metadata: out_location
2414                    .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
2415            },
2416        )
2417    }
2418
2419    /// Transforms the stream using the given closure in "stateful" mode, where stateful operators
2420    /// such as `fold` retrain their memory across ticks rather than resetting across batches of
2421    /// input.
2422    ///
2423    /// This API is particularly useful for stateful computation on batches of data, such as
2424    /// maintaining an accumulated state that is up to date with the current batch.
2425    ///
2426    /// # Example
2427    /// ```rust
2428    /// # #[cfg(feature = "deploy")] {
2429    /// # use hydro_lang::prelude::*;
2430    /// # use futures::StreamExt;
2431    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2432    /// let tick = process.tick();
2433    /// # // ticks are lazy by default, forces the second tick to run
2434    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2435    /// # let batch_first_tick = process
2436    /// #   .source_iter(q!(vec![1, 2, 3, 4]))
2437    /// #  .batch(&tick, nondet!(/** test */));
2438    /// # let batch_second_tick = process
2439    /// #   .source_iter(q!(vec![5, 6, 7]))
2440    /// #   .batch(&tick, nondet!(/** test */))
2441    /// #   .defer_tick(); // appears on the second tick
2442    /// let input = // [1, 2, 3, 4 (first batch), 5, 6, 7 (second batch)]
2443    /// # batch_first_tick.chain(batch_second_tick).all_ticks();
2444    ///
2445    /// input.batch(&tick, nondet!(/** test */))
2446    ///     .across_ticks(|s| s.count()).all_ticks()
2447    /// # }, |mut stream| async move {
2448    /// // [4, 7]
2449    /// assert_eq!(stream.next().await.unwrap(), 4);
2450    /// assert_eq!(stream.next().await.unwrap(), 7);
2451    /// # }));
2452    /// # }
2453    /// ```
2454    pub fn across_ticks<Out: BatchAtomic>(
2455        self,
2456        thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
2457    ) -> Out::Batched {
2458        thunk(self.all_ticks_atomic()).batched_atomic()
2459    }
2460
2461    /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
2462    /// always has the elements of `self` at tick `T - 1`.
2463    ///
2464    /// At tick `0`, the output stream is empty, since there is no previous tick.
2465    ///
2466    /// This operator enables stateful iterative processing with ticks, by sending data from one
2467    /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
2468    ///
2469    /// # Example
2470    /// ```rust
2471    /// # #[cfg(feature = "deploy")] {
2472    /// # use hydro_lang::prelude::*;
2473    /// # use futures::StreamExt;
2474    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2475    /// let tick = process.tick();
2476    /// // ticks are lazy by default, forces the second tick to run
2477    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2478    ///
2479    /// let batch_first_tick = process
2480    ///   .source_iter(q!(vec![1, 2, 3, 4]))
2481    ///   .batch(&tick, nondet!(/** test */));
2482    /// let batch_second_tick = process
2483    ///   .source_iter(q!(vec![0, 3, 4, 5, 6]))
2484    ///   .batch(&tick, nondet!(/** test */))
2485    ///   .defer_tick(); // appears on the second tick
2486    /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
2487    ///
2488    /// changes_across_ticks.clone().filter_not_in(
2489    ///     changes_across_ticks.defer_tick() // the elements from the previous tick
2490    /// ).all_ticks()
2491    /// # }, |mut stream| async move {
2492    /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
2493    /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
2494    /// #     assert_eq!(stream.next().await.unwrap(), w);
2495    /// # }
2496    /// # }));
2497    /// # }
2498    /// ```
2499    pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2500        Stream::new(
2501            self.location.clone(),
2502            HydroNode::DeferTick {
2503                input: Box::new(self.ir_node.into_inner()),
2504                metadata: self
2505                    .location
2506                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2507            },
2508        )
2509    }
2510}
2511
2512#[cfg(test)]
2513mod tests {
2514    #[cfg(feature = "deploy")]
2515    use futures::{SinkExt, StreamExt};
2516    #[cfg(feature = "deploy")]
2517    use hydro_deploy::Deployment;
2518    #[cfg(feature = "deploy")]
2519    use serde::{Deserialize, Serialize};
2520    #[cfg(any(feature = "deploy", feature = "sim"))]
2521    use stageleft::q;
2522
2523    #[cfg(any(feature = "deploy", feature = "sim"))]
2524    use crate::compile::builder::FlowBuilder;
2525    #[cfg(feature = "deploy")]
2526    use crate::live_collections::sliced::sliced;
2527    #[cfg(feature = "deploy")]
2528    use crate::live_collections::stream::ExactlyOnce;
2529    #[cfg(feature = "sim")]
2530    use crate::live_collections::stream::NoOrder;
2531    #[cfg(any(feature = "deploy", feature = "sim"))]
2532    use crate::live_collections::stream::TotalOrder;
2533    #[cfg(any(feature = "deploy", feature = "sim"))]
2534    use crate::location::Location;
2535    #[cfg(any(feature = "deploy", feature = "sim"))]
2536    use crate::nondet::nondet;
2537
2538    mod backtrace_chained_ops;
2539
2540    #[cfg(feature = "deploy")]
2541    struct P1 {}
2542    #[cfg(feature = "deploy")]
2543    struct P2 {}
2544
2545    #[cfg(feature = "deploy")]
2546    #[derive(Serialize, Deserialize, Debug)]
2547    struct SendOverNetwork {
2548        n: u32,
2549    }
2550
2551    #[cfg(feature = "deploy")]
2552    #[tokio::test]
2553    async fn first_ten_distributed() {
2554        use crate::networking::TCP;
2555
2556        let mut deployment = Deployment::new();
2557
2558        let mut flow = FlowBuilder::new();
2559        let first_node = flow.process::<P1>();
2560        let second_node = flow.process::<P2>();
2561        let external = flow.external::<P2>();
2562
2563        let numbers = first_node.source_iter(q!(0..10));
2564        let out_port = numbers
2565            .map(q!(|n| SendOverNetwork { n }))
2566            .send(&second_node, TCP.fail_stop().bincode())
2567            .send_bincode_external(&external);
2568
2569        let nodes = flow
2570            .with_process(&first_node, deployment.Localhost())
2571            .with_process(&second_node, deployment.Localhost())
2572            .with_external(&external, deployment.Localhost())
2573            .deploy(&mut deployment);
2574
2575        deployment.deploy().await.unwrap();
2576
2577        let mut external_out = nodes.connect(out_port).await;
2578
2579        deployment.start().await.unwrap();
2580
2581        for i in 0..10 {
2582            assert_eq!(external_out.next().await.unwrap().n, i);
2583        }
2584    }
2585
2586    #[cfg(feature = "deploy")]
2587    #[tokio::test]
2588    async fn first_cardinality() {
2589        let mut deployment = Deployment::new();
2590
2591        let mut flow = FlowBuilder::new();
2592        let node = flow.process::<()>();
2593        let external = flow.external::<()>();
2594
2595        let node_tick = node.tick();
2596        let count = node_tick
2597            .singleton(q!([1, 2, 3]))
2598            .into_stream()
2599            .flatten_ordered()
2600            .first()
2601            .into_stream()
2602            .count()
2603            .all_ticks()
2604            .send_bincode_external(&external);
2605
2606        let nodes = flow
2607            .with_process(&node, deployment.Localhost())
2608            .with_external(&external, deployment.Localhost())
2609            .deploy(&mut deployment);
2610
2611        deployment.deploy().await.unwrap();
2612
2613        let mut external_out = nodes.connect(count).await;
2614
2615        deployment.start().await.unwrap();
2616
2617        assert_eq!(external_out.next().await.unwrap(), 1);
2618    }
2619
2620    #[cfg(feature = "deploy")]
2621    #[tokio::test]
2622    async fn unbounded_reduce_remembers_state() {
2623        let mut deployment = Deployment::new();
2624
2625        let mut flow = FlowBuilder::new();
2626        let node = flow.process::<()>();
2627        let external = flow.external::<()>();
2628
2629        let (input_port, input) = node.source_external_bincode(&external);
2630        let out = input
2631            .reduce(q!(|acc, v| *acc += v))
2632            .sample_eager(nondet!(/** test */))
2633            .send_bincode_external(&external);
2634
2635        let nodes = flow
2636            .with_process(&node, deployment.Localhost())
2637            .with_external(&external, deployment.Localhost())
2638            .deploy(&mut deployment);
2639
2640        deployment.deploy().await.unwrap();
2641
2642        let mut external_in = nodes.connect(input_port).await;
2643        let mut external_out = nodes.connect(out).await;
2644
2645        deployment.start().await.unwrap();
2646
2647        external_in.send(1).await.unwrap();
2648        assert_eq!(external_out.next().await.unwrap(), 1);
2649
2650        external_in.send(2).await.unwrap();
2651        assert_eq!(external_out.next().await.unwrap(), 3);
2652    }
2653
2654    #[cfg(feature = "deploy")]
2655    #[tokio::test]
2656    async fn top_level_bounded_cross_singleton() {
2657        let mut deployment = Deployment::new();
2658
2659        let mut flow = FlowBuilder::new();
2660        let node = flow.process::<()>();
2661        let external = flow.external::<()>();
2662
2663        let (input_port, input) =
2664            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2665
2666        let out = input
2667            .cross_singleton(
2668                node.source_iter(q!(vec![1, 2, 3]))
2669                    .fold(q!(|| 0), q!(|acc, v| *acc += v)),
2670            )
2671            .send_bincode_external(&external);
2672
2673        let nodes = flow
2674            .with_process(&node, deployment.Localhost())
2675            .with_external(&external, deployment.Localhost())
2676            .deploy(&mut deployment);
2677
2678        deployment.deploy().await.unwrap();
2679
2680        let mut external_in = nodes.connect(input_port).await;
2681        let mut external_out = nodes.connect(out).await;
2682
2683        deployment.start().await.unwrap();
2684
2685        external_in.send(1).await.unwrap();
2686        assert_eq!(external_out.next().await.unwrap(), (1, 6));
2687
2688        external_in.send(2).await.unwrap();
2689        assert_eq!(external_out.next().await.unwrap(), (2, 6));
2690    }
2691
2692    #[cfg(feature = "deploy")]
2693    #[tokio::test]
2694    async fn top_level_bounded_reduce_cardinality() {
2695        let mut deployment = Deployment::new();
2696
2697        let mut flow = FlowBuilder::new();
2698        let node = flow.process::<()>();
2699        let external = flow.external::<()>();
2700
2701        let (input_port, input) =
2702            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2703
2704        let out = sliced! {
2705            let input = use(input, nondet!(/** test */));
2706            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!(/** test */));
2707            input.cross_singleton(v.into_stream().count())
2708        }
2709        .send_bincode_external(&external);
2710
2711        let nodes = flow
2712            .with_process(&node, deployment.Localhost())
2713            .with_external(&external, deployment.Localhost())
2714            .deploy(&mut deployment);
2715
2716        deployment.deploy().await.unwrap();
2717
2718        let mut external_in = nodes.connect(input_port).await;
2719        let mut external_out = nodes.connect(out).await;
2720
2721        deployment.start().await.unwrap();
2722
2723        external_in.send(1).await.unwrap();
2724        assert_eq!(external_out.next().await.unwrap(), (1, 1));
2725
2726        external_in.send(2).await.unwrap();
2727        assert_eq!(external_out.next().await.unwrap(), (2, 1));
2728    }
2729
2730    #[cfg(feature = "deploy")]
2731    #[tokio::test]
2732    async fn top_level_bounded_into_singleton_cardinality() {
2733        let mut deployment = Deployment::new();
2734
2735        let mut flow = FlowBuilder::new();
2736        let node = flow.process::<()>();
2737        let external = flow.external::<()>();
2738
2739        let (input_port, input) =
2740            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2741
2742        let out = sliced! {
2743            let input = use(input, nondet!(/** test */));
2744            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!(/** test */));
2745            input.cross_singleton(v.into_stream().count())
2746        }
2747        .send_bincode_external(&external);
2748
2749        let nodes = flow
2750            .with_process(&node, deployment.Localhost())
2751            .with_external(&external, deployment.Localhost())
2752            .deploy(&mut deployment);
2753
2754        deployment.deploy().await.unwrap();
2755
2756        let mut external_in = nodes.connect(input_port).await;
2757        let mut external_out = nodes.connect(out).await;
2758
2759        deployment.start().await.unwrap();
2760
2761        external_in.send(1).await.unwrap();
2762        assert_eq!(external_out.next().await.unwrap(), (1, 1));
2763
2764        external_in.send(2).await.unwrap();
2765        assert_eq!(external_out.next().await.unwrap(), (2, 1));
2766    }
2767
2768    #[cfg(feature = "deploy")]
2769    #[tokio::test]
2770    async fn atomic_fold_replays_each_tick() {
2771        let mut deployment = Deployment::new();
2772
2773        let mut flow = FlowBuilder::new();
2774        let node = flow.process::<()>();
2775        let external = flow.external::<()>();
2776
2777        let (input_port, input) =
2778            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2779        let tick = node.tick();
2780
2781        let out = input
2782            .batch(&tick, nondet!(/** test */))
2783            .cross_singleton(
2784                node.source_iter(q!(vec![1, 2, 3]))
2785                    .atomic(&tick)
2786                    .fold(q!(|| 0), q!(|acc, v| *acc += v))
2787                    .snapshot_atomic(nondet!(/** test */)),
2788            )
2789            .all_ticks()
2790            .send_bincode_external(&external);
2791
2792        let nodes = flow
2793            .with_process(&node, deployment.Localhost())
2794            .with_external(&external, deployment.Localhost())
2795            .deploy(&mut deployment);
2796
2797        deployment.deploy().await.unwrap();
2798
2799        let mut external_in = nodes.connect(input_port).await;
2800        let mut external_out = nodes.connect(out).await;
2801
2802        deployment.start().await.unwrap();
2803
2804        external_in.send(1).await.unwrap();
2805        assert_eq!(external_out.next().await.unwrap(), (1, 6));
2806
2807        external_in.send(2).await.unwrap();
2808        assert_eq!(external_out.next().await.unwrap(), (2, 6));
2809    }
2810
2811    #[cfg(feature = "deploy")]
2812    #[tokio::test]
2813    async fn unbounded_scan_remembers_state() {
2814        let mut deployment = Deployment::new();
2815
2816        let mut flow = FlowBuilder::new();
2817        let node = flow.process::<()>();
2818        let external = flow.external::<()>();
2819
2820        let (input_port, input) = node.source_external_bincode(&external);
2821        let out = input
2822            .scan(
2823                q!(|| 0),
2824                q!(|acc, v| {
2825                    *acc += v;
2826                    Some(*acc)
2827                }),
2828            )
2829            .send_bincode_external(&external);
2830
2831        let nodes = flow
2832            .with_process(&node, deployment.Localhost())
2833            .with_external(&external, deployment.Localhost())
2834            .deploy(&mut deployment);
2835
2836        deployment.deploy().await.unwrap();
2837
2838        let mut external_in = nodes.connect(input_port).await;
2839        let mut external_out = nodes.connect(out).await;
2840
2841        deployment.start().await.unwrap();
2842
2843        external_in.send(1).await.unwrap();
2844        assert_eq!(external_out.next().await.unwrap(), 1);
2845
2846        external_in.send(2).await.unwrap();
2847        assert_eq!(external_out.next().await.unwrap(), 3);
2848    }
2849
2850    #[cfg(feature = "deploy")]
2851    #[tokio::test]
2852    async fn unbounded_enumerate_remembers_state() {
2853        let mut deployment = Deployment::new();
2854
2855        let mut flow = FlowBuilder::new();
2856        let node = flow.process::<()>();
2857        let external = flow.external::<()>();
2858
2859        let (input_port, input) = node.source_external_bincode(&external);
2860        let out = input.enumerate().send_bincode_external(&external);
2861
2862        let nodes = flow
2863            .with_process(&node, deployment.Localhost())
2864            .with_external(&external, deployment.Localhost())
2865            .deploy(&mut deployment);
2866
2867        deployment.deploy().await.unwrap();
2868
2869        let mut external_in = nodes.connect(input_port).await;
2870        let mut external_out = nodes.connect(out).await;
2871
2872        deployment.start().await.unwrap();
2873
2874        external_in.send(1).await.unwrap();
2875        assert_eq!(external_out.next().await.unwrap(), (0, 1));
2876
2877        external_in.send(2).await.unwrap();
2878        assert_eq!(external_out.next().await.unwrap(), (1, 2));
2879    }
2880
2881    #[cfg(feature = "deploy")]
2882    #[tokio::test]
2883    async fn unbounded_unique_remembers_state() {
2884        let mut deployment = Deployment::new();
2885
2886        let mut flow = FlowBuilder::new();
2887        let node = flow.process::<()>();
2888        let external = flow.external::<()>();
2889
2890        let (input_port, input) =
2891            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2892        let out = input.unique().send_bincode_external(&external);
2893
2894        let nodes = flow
2895            .with_process(&node, deployment.Localhost())
2896            .with_external(&external, deployment.Localhost())
2897            .deploy(&mut deployment);
2898
2899        deployment.deploy().await.unwrap();
2900
2901        let mut external_in = nodes.connect(input_port).await;
2902        let mut external_out = nodes.connect(out).await;
2903
2904        deployment.start().await.unwrap();
2905
2906        external_in.send(1).await.unwrap();
2907        assert_eq!(external_out.next().await.unwrap(), 1);
2908
2909        external_in.send(2).await.unwrap();
2910        assert_eq!(external_out.next().await.unwrap(), 2);
2911
2912        external_in.send(1).await.unwrap();
2913        external_in.send(3).await.unwrap();
2914        assert_eq!(external_out.next().await.unwrap(), 3);
2915    }
2916
2917    #[cfg(feature = "sim")]
2918    #[test]
2919    #[should_panic]
2920    fn sim_batch_nondet_size() {
2921        let mut flow = FlowBuilder::new();
2922        let node = flow.process::<()>();
2923
2924        let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
2925
2926        let tick = node.tick();
2927        let out_recv = input
2928            .batch(&tick, nondet!(/** test */))
2929            .count()
2930            .all_ticks()
2931            .sim_output();
2932
2933        flow.sim().exhaustive(async || {
2934            in_send.send(());
2935            in_send.send(());
2936            in_send.send(());
2937
2938            assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
2939        });
2940    }
2941
2942    #[cfg(feature = "sim")]
2943    #[test]
2944    fn sim_batch_preserves_order() {
2945        let mut flow = FlowBuilder::new();
2946        let node = flow.process::<()>();
2947
2948        let (in_send, input) = node.sim_input();
2949
2950        let tick = node.tick();
2951        let out_recv = input
2952            .batch(&tick, nondet!(/** test */))
2953            .all_ticks()
2954            .sim_output();
2955
2956        flow.sim().exhaustive(async || {
2957            in_send.send(1);
2958            in_send.send(2);
2959            in_send.send(3);
2960
2961            out_recv.assert_yields_only([1, 2, 3]).await;
2962        });
2963    }
2964
2965    #[cfg(feature = "sim")]
2966    #[test]
2967    #[should_panic]
2968    fn sim_batch_unordered_shuffles() {
2969        let mut flow = FlowBuilder::new();
2970        let node = flow.process::<()>();
2971
2972        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2973
2974        let tick = node.tick();
2975        let batch = input.batch(&tick, nondet!(/** test */));
2976        let out_recv = batch
2977            .clone()
2978            .min()
2979            .zip(batch.max())
2980            .all_ticks()
2981            .sim_output();
2982
2983        flow.sim().exhaustive(async || {
2984            in_send.send_many_unordered([1, 2, 3]);
2985
2986            if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
2987                panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
2988            }
2989        });
2990    }
2991
2992    #[cfg(feature = "sim")]
2993    #[test]
2994    fn sim_batch_unordered_shuffles_count() {
2995        let mut flow = FlowBuilder::new();
2996        let node = flow.process::<()>();
2997
2998        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2999
3000        let tick = node.tick();
3001        let batch = input.batch(&tick, nondet!(/** test */));
3002        let out_recv = batch.all_ticks().sim_output();
3003
3004        let instance_count = flow.sim().exhaustive(async || {
3005            in_send.send_many_unordered([1, 2, 3, 4]);
3006            out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3007        });
3008
3009        assert_eq!(
3010            instance_count,
3011            75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3012        )
3013    }
3014
3015    #[cfg(feature = "sim")]
3016    #[test]
3017    #[should_panic]
3018    fn sim_observe_order_batched() {
3019        let mut flow = FlowBuilder::new();
3020        let node = flow.process::<()>();
3021
3022        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3023
3024        let tick = node.tick();
3025        let batch = input.batch(&tick, nondet!(/** test */));
3026        let out_recv = batch
3027            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3028            .all_ticks()
3029            .sim_output();
3030
3031        flow.sim().exhaustive(async || {
3032            in_send.send_many_unordered([1, 2, 3, 4]);
3033            out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
3034        });
3035    }
3036
3037    #[cfg(feature = "sim")]
3038    #[test]
3039    fn sim_observe_order_batched_count() {
3040        let mut flow = FlowBuilder::new();
3041        let node = flow.process::<()>();
3042
3043        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3044
3045        let tick = node.tick();
3046        let batch = input.batch(&tick, nondet!(/** test */));
3047        let out_recv = batch
3048            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3049            .all_ticks()
3050            .sim_output();
3051
3052        let instance_count = flow.sim().exhaustive(async || {
3053            in_send.send_many_unordered([1, 2, 3, 4]);
3054            let _ = out_recv.collect::<Vec<_>>().await;
3055        });
3056
3057        assert_eq!(
3058            instance_count,
3059            192 // 4! * 2^{4 - 1}
3060        )
3061    }
3062
3063    #[cfg(feature = "sim")]
3064    #[test]
3065    fn sim_unordered_count_instance_count() {
3066        let mut flow = FlowBuilder::new();
3067        let node = flow.process::<()>();
3068
3069        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3070
3071        let tick = node.tick();
3072        let out_recv = input
3073            .count()
3074            .snapshot(&tick, nondet!(/** test */))
3075            .all_ticks()
3076            .sim_output();
3077
3078        let instance_count = flow.sim().exhaustive(async || {
3079            in_send.send_many_unordered([1, 2, 3, 4]);
3080            assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3081        });
3082
3083        assert_eq!(
3084            instance_count,
3085            16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3086        )
3087    }
3088
3089    #[cfg(feature = "sim")]
3090    #[test]
3091    fn sim_top_level_assume_ordering() {
3092        let mut flow = FlowBuilder::new();
3093        let node = flow.process::<()>();
3094
3095        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3096
3097        let out_recv = input
3098            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3099            .sim_output();
3100
3101        let instance_count = flow.sim().exhaustive(async || {
3102            in_send.send_many_unordered([1, 2, 3]);
3103            let mut out = out_recv.collect::<Vec<_>>().await;
3104            out.sort();
3105            assert_eq!(out, vec![1, 2, 3]);
3106        });
3107
3108        assert_eq!(instance_count, 6)
3109    }
3110
3111    #[cfg(feature = "sim")]
3112    #[test]
3113    fn sim_top_level_assume_ordering_cycle_back() {
3114        let mut flow = FlowBuilder::new();
3115        let node = flow.process::<()>();
3116
3117        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3118
3119        let (complete_cycle_back, cycle_back) =
3120            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3121        let ordered = input
3122            .interleave(cycle_back)
3123            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3124        complete_cycle_back.complete(
3125            ordered
3126                .clone()
3127                .map(q!(|v| v + 1))
3128                .filter(q!(|v| v % 2 == 1)),
3129        );
3130
3131        let out_recv = ordered.sim_output();
3132
3133        let mut saw = false;
3134        let instance_count = flow.sim().exhaustive(async || {
3135            in_send.send_many_unordered([0, 2]);
3136            let out = out_recv.collect::<Vec<_>>().await;
3137
3138            if out.starts_with(&[0, 1, 2]) {
3139                saw = true;
3140            }
3141        });
3142
3143        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3144        assert_eq!(instance_count, 6)
3145    }
3146
3147    #[cfg(feature = "sim")]
3148    #[test]
3149    fn sim_top_level_assume_ordering_cycle_back_tick() {
3150        let mut flow = FlowBuilder::new();
3151        let node = flow.process::<()>();
3152
3153        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3154
3155        let (complete_cycle_back, cycle_back) =
3156            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3157        let ordered = input
3158            .interleave(cycle_back)
3159            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3160        complete_cycle_back.complete(
3161            ordered
3162                .clone()
3163                .batch(&node.tick(), nondet!(/** test */))
3164                .all_ticks()
3165                .map(q!(|v| v + 1))
3166                .filter(q!(|v| v % 2 == 1)),
3167        );
3168
3169        let out_recv = ordered.sim_output();
3170
3171        let mut saw = false;
3172        let instance_count = flow.sim().exhaustive(async || {
3173            in_send.send_many_unordered([0, 2]);
3174            let out = out_recv.collect::<Vec<_>>().await;
3175
3176            if out.starts_with(&[0, 1, 2]) {
3177                saw = true;
3178            }
3179        });
3180
3181        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3182        assert_eq!(instance_count, 58)
3183    }
3184
3185    #[cfg(feature = "sim")]
3186    #[test]
3187    fn sim_top_level_assume_ordering_multiple() {
3188        let mut flow = FlowBuilder::new();
3189        let node = flow.process::<()>();
3190
3191        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3192        let (_, input2) = node.sim_input::<_, NoOrder, _>();
3193
3194        let (complete_cycle_back, cycle_back) =
3195            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3196        let input1_ordered = input
3197            .clone()
3198            .interleave(cycle_back)
3199            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3200        let foo = input1_ordered
3201            .clone()
3202            .map(q!(|v| v + 3))
3203            .weaken_ordering::<NoOrder>()
3204            .interleave(input2)
3205            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3206
3207        complete_cycle_back.complete(foo.filter(q!(|v| *v == 3)));
3208
3209        let out_recv = input1_ordered.sim_output();
3210
3211        let mut saw = false;
3212        let instance_count = flow.sim().exhaustive(async || {
3213            in_send.send_many_unordered([0, 1]);
3214            let out = out_recv.collect::<Vec<_>>().await;
3215
3216            if out.starts_with(&[0, 3, 1]) {
3217                saw = true;
3218            }
3219        });
3220
3221        assert!(saw, "did not see an instance with 0, 3, 1 in order");
3222        assert_eq!(instance_count, 24)
3223    }
3224
3225    #[cfg(feature = "sim")]
3226    #[test]
3227    fn sim_atomic_assume_ordering_cycle_back() {
3228        let mut flow = FlowBuilder::new();
3229        let node = flow.process::<()>();
3230
3231        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3232
3233        let (complete_cycle_back, cycle_back) =
3234            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3235        let ordered = input
3236            .interleave(cycle_back)
3237            .atomic(&node.tick())
3238            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3239            .end_atomic();
3240        complete_cycle_back.complete(
3241            ordered
3242                .clone()
3243                .map(q!(|v| v + 1))
3244                .filter(q!(|v| v % 2 == 1)),
3245        );
3246
3247        let out_recv = ordered.sim_output();
3248
3249        let instance_count = flow.sim().exhaustive(async || {
3250            in_send.send_many_unordered([0, 2]);
3251            let out = out_recv.collect::<Vec<_>>().await;
3252            assert_eq!(out.len(), 4);
3253        });
3254
3255        assert_eq!(instance_count, 22)
3256    }
3257}