Skip to main content

hydro_lang/live_collections/
optional.rs

1//! Definitions for the [`Optional`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9use syn::parse_quote;
10
11use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
12use super::singleton::Singleton;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::builder::CycleId;
15use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, TeeNode};
16#[cfg(stageleft_runtime)]
17use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
18use crate::forward_handle::{ForwardRef, TickCycle};
19#[cfg(stageleft_runtime)]
20use crate::location::dynamic::{DynLocation, LocationId};
21use crate::location::tick::{Atomic, DeferTick, NoAtomic};
22use crate::location::{Location, NoTick, Tick, check_matching_location};
23use crate::nondet::{NonDet, nondet};
24
25/// A *nullable* Rust value that can asynchronously change over time.
26///
27/// Optionals are the live collection equivalent of [`Option`]. If the optional is [`Bounded`],
28/// the value is frozen and will not change. But if it is [`Unbounded`], the value will
29/// asynchronously change over time, including becoming present of uninhabited.
30///
31/// Optionals are used in many of the same places as [`Singleton`], but when the value may be
32/// nullable. For example, the first element of a [`Stream`] is exposed as an [`Optional`].
33///
34/// Type Parameters:
35/// - `Type`: the type of the value in this optional (when it is not null)
36/// - `Loc`: the [`Location`] where the optional is materialized
37/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
38pub struct Optional<Type, Loc, Bound: Boundedness> {
39    pub(crate) location: Loc,
40    pub(crate) ir_node: RefCell<HydroNode>,
41
42    _phantom: PhantomData<(Type, Loc, Bound)>,
43}
44
45impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
46where
47    T: Clone,
48    L: Location<'a> + NoTick,
49{
50    fn from(value: Optional<T, L, Bounded>) -> Self {
51        let tick = value.location().tick();
52        value.clone_into_tick(&tick).latest()
53    }
54}
55
56impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
57where
58    L: Location<'a>,
59{
60    fn defer_tick(self) -> Self {
61        Optional::defer_tick(self)
62    }
63}
64
65impl<'a, T, L> CycleCollection<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
66where
67    L: Location<'a>,
68{
69    type Location = Tick<L>;
70
71    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
72        Optional::new(
73            location.clone(),
74            HydroNode::CycleSource {
75                cycle_id,
76                metadata: location.new_node_metadata(Self::collection_kind()),
77            },
78        )
79    }
80}
81
82impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
83where
84    L: Location<'a>,
85{
86    type Location = Tick<L>;
87
88    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
89        let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
90            location.clone(),
91            HydroNode::DeferTick {
92                input: Box::new(HydroNode::CycleSource {
93                    cycle_id,
94                    metadata: location.new_node_metadata(Self::collection_kind()),
95                }),
96                metadata: location
97                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
98            },
99        );
100
101        from_previous_tick.or(initial.filter_if_some(location.optional_first_tick(q!(()))))
102    }
103}
104
105impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
106where
107    L: Location<'a>,
108{
109    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
110        assert_eq!(
111            Location::id(&self.location),
112            expected_location,
113            "locations do not match"
114        );
115        self.location
116            .flow_state()
117            .borrow_mut()
118            .push_root(HydroRoot::CycleSink {
119                cycle_id,
120                input: Box::new(self.ir_node.into_inner()),
121                op_metadata: HydroIrOpMetadata::new(),
122            });
123    }
124}
125
126impl<'a, T, L> CycleCollection<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
127where
128    L: Location<'a>,
129{
130    type Location = Tick<L>;
131
132    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
133        Optional::new(
134            location.clone(),
135            HydroNode::CycleSource {
136                cycle_id,
137                metadata: location.new_node_metadata(Self::collection_kind()),
138            },
139        )
140    }
141}
142
143impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
144where
145    L: Location<'a>,
146{
147    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
148        assert_eq!(
149            Location::id(&self.location),
150            expected_location,
151            "locations do not match"
152        );
153        self.location
154            .flow_state()
155            .borrow_mut()
156            .push_root(HydroRoot::CycleSink {
157                cycle_id,
158                input: Box::new(self.ir_node.into_inner()),
159                op_metadata: HydroIrOpMetadata::new(),
160            });
161    }
162}
163
164impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Optional<T, L, B>
165where
166    L: Location<'a> + NoTick,
167{
168    type Location = L;
169
170    fn create_source(cycle_id: CycleId, location: L) -> Self {
171        Optional::new(
172            location.clone(),
173            HydroNode::CycleSource {
174                cycle_id,
175                metadata: location.new_node_metadata(Self::collection_kind()),
176            },
177        )
178    }
179}
180
181impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Optional<T, L, B>
182where
183    L: Location<'a> + NoTick,
184{
185    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
186        assert_eq!(
187            Location::id(&self.location),
188            expected_location,
189            "locations do not match"
190        );
191        self.location
192            .flow_state()
193            .borrow_mut()
194            .push_root(HydroRoot::CycleSink {
195                cycle_id,
196                input: Box::new(self.ir_node.into_inner()),
197                op_metadata: HydroIrOpMetadata::new(),
198            });
199    }
200}
201
202impl<'a, T, L, B: Boundedness> From<Singleton<T, L, B>> for Optional<T, L, B>
203where
204    L: Location<'a>,
205{
206    fn from(singleton: Singleton<T, L, B>) -> Self {
207        Optional::new(
208            singleton.location.clone(),
209            HydroNode::Cast {
210                inner: Box::new(singleton.ir_node.into_inner()),
211                metadata: singleton
212                    .location
213                    .new_node_metadata(Self::collection_kind()),
214            },
215        )
216    }
217}
218
219#[cfg(stageleft_runtime)]
220pub(super) fn zip_inside_tick<'a, T, O, L: Location<'a>, B: Boundedness>(
221    me: Optional<T, L, B>,
222    other: Optional<O, L, B>,
223) -> Optional<(T, O), L, B> {
224    check_matching_location(&me.location, &other.location);
225
226    Optional::new(
227        me.location.clone(),
228        HydroNode::CrossSingleton {
229            left: Box::new(me.ir_node.into_inner()),
230            right: Box::new(other.ir_node.into_inner()),
231            metadata: me
232                .location
233                .new_node_metadata(Optional::<(T, O), L, B>::collection_kind()),
234        },
235    )
236}
237
238#[cfg(stageleft_runtime)]
239fn or_inside_tick<'a, T, L: Location<'a>, B: Boundedness>(
240    me: Optional<T, L, B>,
241    other: Optional<T, L, B>,
242) -> Optional<T, L, B> {
243    check_matching_location(&me.location, &other.location);
244
245    Optional::new(
246        me.location.clone(),
247        HydroNode::ChainFirst {
248            first: Box::new(me.ir_node.into_inner()),
249            second: Box::new(other.ir_node.into_inner()),
250            metadata: me
251                .location
252                .new_node_metadata(Optional::<T, L, B>::collection_kind()),
253        },
254    )
255}
256
257impl<'a, T, L, B: Boundedness> Clone for Optional<T, L, B>
258where
259    T: Clone,
260    L: Location<'a>,
261{
262    fn clone(&self) -> Self {
263        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
264            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
265            *self.ir_node.borrow_mut() = HydroNode::Tee {
266                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
267                metadata: self.location.new_node_metadata(Self::collection_kind()),
268            };
269        }
270
271        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
272            Optional {
273                location: self.location.clone(),
274                ir_node: HydroNode::Tee {
275                    inner: TeeNode(inner.0.clone()),
276                    metadata: metadata.clone(),
277                }
278                .into(),
279                _phantom: PhantomData,
280            }
281        } else {
282            unreachable!()
283        }
284    }
285}
286
287impl<'a, T, L, B: Boundedness> Optional<T, L, B>
288where
289    L: Location<'a>,
290{
291    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
292        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
293        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
294        Optional {
295            location,
296            ir_node: RefCell::new(ir_node),
297            _phantom: PhantomData,
298        }
299    }
300
301    pub(crate) fn collection_kind() -> CollectionKind {
302        CollectionKind::Optional {
303            bound: B::BOUND_KIND,
304            element_type: stageleft::quote_type::<T>().into(),
305        }
306    }
307
308    /// Returns the [`Location`] where this optional is being materialized.
309    pub fn location(&self) -> &L {
310        &self.location
311    }
312
313    /// Transforms the optional value by applying a function `f` to it,
314    /// continuously as the input is updated.
315    ///
316    /// Whenever the optional is empty, the output optional is also empty.
317    ///
318    /// # Example
319    /// ```rust
320    /// # #[cfg(feature = "deploy")] {
321    /// # use hydro_lang::prelude::*;
322    /// # use futures::StreamExt;
323    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
324    /// let tick = process.tick();
325    /// let optional = tick.optional_first_tick(q!(1));
326    /// optional.map(q!(|v| v + 1)).all_ticks()
327    /// # }, |mut stream| async move {
328    /// // 2
329    /// # assert_eq!(stream.next().await.unwrap(), 2);
330    /// # }));
331    /// # }
332    /// ```
333    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
334    where
335        F: Fn(T) -> U + 'a,
336    {
337        let f = f.splice_fn1_ctx(&self.location).into();
338        Optional::new(
339            self.location.clone(),
340            HydroNode::Map {
341                f,
342                input: Box::new(self.ir_node.into_inner()),
343                metadata: self
344                    .location
345                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
346            },
347        )
348    }
349
350    /// Transforms the optional value by applying a function `f` to it and then flattening
351    /// the result into a stream, preserving the order of elements.
352    ///
353    /// If the optional is empty, the output stream is also empty. If the optional contains
354    /// a value, `f` is applied to produce an iterator, and all items from that iterator
355    /// are emitted in the output stream in deterministic order.
356    ///
357    /// The implementation of [`Iterator`] for the output type `I` must produce items in a
358    /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
359    /// If the order is not deterministic, use [`Optional::flat_map_unordered`] instead.
360    ///
361    /// # Example
362    /// ```rust
363    /// # #[cfg(feature = "deploy")] {
364    /// # use hydro_lang::prelude::*;
365    /// # use futures::StreamExt;
366    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
367    /// let tick = process.tick();
368    /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
369    /// optional.flat_map_ordered(q!(|v| v)).all_ticks()
370    /// # }, |mut stream| async move {
371    /// // 1, 2, 3
372    /// # for w in vec![1, 2, 3] {
373    /// #     assert_eq!(stream.next().await.unwrap(), w);
374    /// # }
375    /// # }));
376    /// # }
377    /// ```
378    pub fn flat_map_ordered<U, I, F>(
379        self,
380        f: impl IntoQuotedMut<'a, F, L>,
381    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
382    where
383        I: IntoIterator<Item = U>,
384        F: Fn(T) -> I + 'a,
385    {
386        let f = f.splice_fn1_ctx(&self.location).into();
387        Stream::new(
388            self.location.clone(),
389            HydroNode::FlatMap {
390                f,
391                input: Box::new(self.ir_node.into_inner()),
392                metadata: self.location.new_node_metadata(
393                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
394                ),
395            },
396        )
397    }
398
399    /// Like [`Optional::flat_map_ordered`], but allows the implementation of [`Iterator`]
400    /// for the output type `I` to produce items in any order.
401    ///
402    /// If the optional is empty, the output stream is also empty. If the optional contains
403    /// a value, `f` is applied to produce an iterator, and all items from that iterator
404    /// are emitted in the output stream in non-deterministic order.
405    ///
406    /// # Example
407    /// ```rust
408    /// # #[cfg(feature = "deploy")] {
409    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
410    /// # use futures::StreamExt;
411    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
412    /// let tick = process.tick();
413    /// let optional = tick.optional_first_tick(q!(
414    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
415    /// ));
416    /// optional.flat_map_unordered(q!(|v| v)).all_ticks()
417    /// # }, |mut stream| async move {
418    /// // 1, 2, 3, but in no particular order
419    /// # let mut results = Vec::new();
420    /// # for _ in 0..3 {
421    /// #     results.push(stream.next().await.unwrap());
422    /// # }
423    /// # results.sort();
424    /// # assert_eq!(results, vec![1, 2, 3]);
425    /// # }));
426    /// # }
427    /// ```
428    pub fn flat_map_unordered<U, I, F>(
429        self,
430        f: impl IntoQuotedMut<'a, F, L>,
431    ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
432    where
433        I: IntoIterator<Item = U>,
434        F: Fn(T) -> I + 'a,
435    {
436        let f = f.splice_fn1_ctx(&self.location).into();
437        Stream::new(
438            self.location.clone(),
439            HydroNode::FlatMap {
440                f,
441                input: Box::new(self.ir_node.into_inner()),
442                metadata: self
443                    .location
444                    .new_node_metadata(Stream::<U, L, B, NoOrder, ExactlyOnce>::collection_kind()),
445            },
446        )
447    }
448
449    /// Flattens the optional value into a stream, preserving the order of elements.
450    ///
451    /// If the optional is empty, the output stream is also empty. If the optional contains
452    /// a value that implements [`IntoIterator`], all items from that iterator are emitted
453    /// in the output stream in deterministic order.
454    ///
455    /// The implementation of [`Iterator`] for the element type `T` must produce items in a
456    /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
457    /// If the order is not deterministic, use [`Optional::flatten_unordered`] instead.
458    ///
459    /// # Example
460    /// ```rust
461    /// # #[cfg(feature = "deploy")] {
462    /// # use hydro_lang::prelude::*;
463    /// # use futures::StreamExt;
464    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
465    /// let tick = process.tick();
466    /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
467    /// optional.flatten_ordered().all_ticks()
468    /// # }, |mut stream| async move {
469    /// // 1, 2, 3
470    /// # for w in vec![1, 2, 3] {
471    /// #     assert_eq!(stream.next().await.unwrap(), w);
472    /// # }
473    /// # }));
474    /// # }
475    /// ```
476    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
477    where
478        T: IntoIterator<Item = U>,
479    {
480        self.flat_map_ordered(q!(|v| v))
481    }
482
483    /// Like [`Optional::flatten_ordered`], but allows the implementation of [`Iterator`]
484    /// for the element type `T` to produce items in any order.
485    ///
486    /// If the optional is empty, the output stream is also empty. If the optional contains
487    /// a value that implements [`IntoIterator`], all items from that iterator are emitted
488    /// in the output stream in non-deterministic order.
489    ///
490    /// # Example
491    /// ```rust
492    /// # #[cfg(feature = "deploy")] {
493    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
494    /// # use futures::StreamExt;
495    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
496    /// let tick = process.tick();
497    /// let optional = tick.optional_first_tick(q!(
498    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
499    /// ));
500    /// optional.flatten_unordered().all_ticks()
501    /// # }, |mut stream| async move {
502    /// // 1, 2, 3, but in no particular order
503    /// # let mut results = Vec::new();
504    /// # for _ in 0..3 {
505    /// #     results.push(stream.next().await.unwrap());
506    /// # }
507    /// # results.sort();
508    /// # assert_eq!(results, vec![1, 2, 3]);
509    /// # }));
510    /// # }
511    /// ```
512    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
513    where
514        T: IntoIterator<Item = U>,
515    {
516        self.flat_map_unordered(q!(|v| v))
517    }
518
519    /// Creates an optional containing only the value if it satisfies a predicate `f`.
520    ///
521    /// If the optional is empty, the output optional is also empty. If the optional contains
522    /// a value and the predicate returns `true`, the output optional contains the same value.
523    /// If the predicate returns `false`, the output optional is empty.
524    ///
525    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
526    /// not modify or take ownership of the value. If you need to modify the value while filtering
527    /// use [`Optional::filter_map`] instead.
528    ///
529    /// # Example
530    /// ```rust
531    /// # #[cfg(feature = "deploy")] {
532    /// # use hydro_lang::prelude::*;
533    /// # use futures::StreamExt;
534    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
535    /// let tick = process.tick();
536    /// let optional = tick.optional_first_tick(q!(5));
537    /// optional.filter(q!(|&x| x > 3)).all_ticks()
538    /// # }, |mut stream| async move {
539    /// // 5
540    /// # assert_eq!(stream.next().await.unwrap(), 5);
541    /// # }));
542    /// # }
543    /// ```
544    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
545    where
546        F: Fn(&T) -> bool + 'a,
547    {
548        let f = f.splice_fn1_borrow_ctx(&self.location).into();
549        Optional::new(
550            self.location.clone(),
551            HydroNode::Filter {
552                f,
553                input: Box::new(self.ir_node.into_inner()),
554                metadata: self.location.new_node_metadata(Self::collection_kind()),
555            },
556        )
557    }
558
559    /// An operator that both filters and maps. It yields only the value if the supplied
560    /// closure `f` returns `Some(value)`.
561    ///
562    /// If the optional is empty, the output optional is also empty. If the optional contains
563    /// a value and the closure returns `Some(new_value)`, the output optional contains `new_value`.
564    /// If the closure returns `None`, the output optional is empty.
565    ///
566    /// # Example
567    /// ```rust
568    /// # #[cfg(feature = "deploy")] {
569    /// # use hydro_lang::prelude::*;
570    /// # use futures::StreamExt;
571    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
572    /// let tick = process.tick();
573    /// let optional = tick.optional_first_tick(q!("42"));
574    /// optional
575    ///     .filter_map(q!(|s| s.parse::<i32>().ok()))
576    ///     .all_ticks()
577    /// # }, |mut stream| async move {
578    /// // 42
579    /// # assert_eq!(stream.next().await.unwrap(), 42);
580    /// # }));
581    /// # }
582    /// ```
583    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
584    where
585        F: Fn(T) -> Option<U> + 'a,
586    {
587        let f = f.splice_fn1_ctx(&self.location).into();
588        Optional::new(
589            self.location.clone(),
590            HydroNode::FilterMap {
591                f,
592                input: Box::new(self.ir_node.into_inner()),
593                metadata: self
594                    .location
595                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
596            },
597        )
598    }
599
600    /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
601    ///
602    /// If the other value is a [`Optional`], the output will be non-null only if the argument is
603    /// non-null. This is useful for combining several pieces of state together.
604    ///
605    /// # Example
606    /// ```rust
607    /// # #[cfg(feature = "deploy")] {
608    /// # use hydro_lang::prelude::*;
609    /// # use futures::StreamExt;
610    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
611    /// let tick = process.tick();
612    /// let numbers = process
613    ///   .source_iter(q!(vec![123, 456, 789]))
614    ///   .batch(&tick, nondet!(/** test */));
615    /// let min = numbers.clone().min(); // Optional
616    /// let max = numbers.max(); // Optional
617    /// min.zip(max).all_ticks()
618    /// # }, |mut stream| async move {
619    /// // [(123, 789)]
620    /// # for w in vec![(123, 789)] {
621    /// #     assert_eq!(stream.next().await.unwrap(), w);
622    /// # }
623    /// # }));
624    /// # }
625    /// ```
626    pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
627    where
628        B: IsBounded,
629    {
630        let other: Optional<O, L, B> = other.into();
631        check_matching_location(&self.location, &other.location);
632
633        if L::is_top_level()
634            && let Some(tick) = self.location.try_tick()
635        {
636            let out = zip_inside_tick(
637                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
638                other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
639            )
640            .latest();
641
642            Optional::new(out.location, out.ir_node.into_inner())
643        } else {
644            zip_inside_tick(self, other)
645        }
646    }
647
648    /// Passes through `self` when it has a value, otherwise passes through `other`.
649    ///
650    /// Like [`Option::or`], this is helpful for defining a fallback for an [`Optional`], when the
651    /// fallback itself is an [`Optional`]. If the fallback is a [`Singleton`], you can use
652    /// [`Optional::unwrap_or`] to ensure that the output is always non-null.
653    ///
654    /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
655    /// of the inputs change (including to/from null states).
656    ///
657    /// # Example
658    /// ```rust
659    /// # #[cfg(feature = "deploy")] {
660    /// # use hydro_lang::prelude::*;
661    /// # use futures::StreamExt;
662    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
663    /// let tick = process.tick();
664    /// // ticks are lazy by default, forces the second tick to run
665    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
666    ///
667    /// let some_first_tick = tick.optional_first_tick(q!(123));
668    /// let some_second_tick = tick.optional_first_tick(q!(456)).defer_tick();
669    /// some_first_tick.or(some_second_tick).all_ticks()
670    /// # }, |mut stream| async move {
671    /// // [123 /* first tick */, 456 /* second tick */]
672    /// # for w in vec![123, 456] {
673    /// #     assert_eq!(stream.next().await.unwrap(), w);
674    /// # }
675    /// # }));
676    /// # }
677    /// ```
678    pub fn or(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
679        check_matching_location(&self.location, &other.location);
680
681        if L::is_top_level()
682            && !B::BOUNDED // only if unbounded we need to use a tick
683            && let Some(tick) = self.location.try_tick()
684        {
685            let out = or_inside_tick(
686                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
687                other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
688            )
689            .latest();
690
691            Optional::new(out.location, out.ir_node.into_inner())
692        } else {
693            Optional::new(
694                self.location.clone(),
695                HydroNode::ChainFirst {
696                    first: Box::new(self.ir_node.into_inner()),
697                    second: Box::new(other.ir_node.into_inner()),
698                    metadata: self.location.new_node_metadata(Self::collection_kind()),
699                },
700            )
701        }
702    }
703
704    /// Gets the contents of `self` when it has a value, otherwise passes through `other`.
705    ///
706    /// Like [`Option::unwrap_or`], this is helpful for defining a fallback for an [`Optional`].
707    /// If the fallback is not always defined (an [`Optional`]), you can use [`Optional::or`].
708    ///
709    /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
710    /// of the inputs change (including to/from null states).
711    ///
712    /// # Example
713    /// ```rust
714    /// # #[cfg(feature = "deploy")] {
715    /// # use hydro_lang::prelude::*;
716    /// # use futures::StreamExt;
717    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
718    /// let tick = process.tick();
719    /// // ticks are lazy by default, forces the later ticks to run
720    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
721    ///
722    /// let some_first_tick = tick.optional_first_tick(q!(123));
723    /// some_first_tick
724    ///     .unwrap_or(tick.singleton(q!(456)))
725    ///     .all_ticks()
726    /// # }, |mut stream| async move {
727    /// // [123 /* first tick */, 456 /* second tick */, 456 /* third tick */, 456, ...]
728    /// # for w in vec![123, 456, 456, 456] {
729    /// #     assert_eq!(stream.next().await.unwrap(), w);
730    /// # }
731    /// # }));
732    /// # }
733    /// ```
734    pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
735        let res_option = self.or(other.into());
736        Singleton::new(
737            res_option.location.clone(),
738            HydroNode::Cast {
739                inner: Box::new(res_option.ir_node.into_inner()),
740                metadata: res_option
741                    .location
742                    .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
743            },
744        )
745    }
746
747    /// Converts this optional into a [`Singleton`] with a Rust [`Option`] as its contents.
748    ///
749    /// Useful for writing custom Rust code that needs to interact with both the null and non-null
750    /// states of the [`Optional`]. When possible, you should use the native APIs on [`Optional`]
751    /// so that Hydro can skip any computation on null values.
752    ///
753    /// # Example
754    /// ```rust
755    /// # #[cfg(feature = "deploy")] {
756    /// # use hydro_lang::prelude::*;
757    /// # use futures::StreamExt;
758    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
759    /// let tick = process.tick();
760    /// // ticks are lazy by default, forces the later ticks to run
761    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
762    ///
763    /// let some_first_tick = tick.optional_first_tick(q!(123));
764    /// some_first_tick.into_singleton().all_ticks()
765    /// # }, |mut stream| async move {
766    /// // [Some(123) /* first tick */, None /* second tick */, None /* third tick */, None, ...]
767    /// # for w in vec![Some(123), None, None, None] {
768    /// #     assert_eq!(stream.next().await.unwrap(), w);
769    /// # }
770    /// # }));
771    /// # }
772    /// ```
773    pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
774    where
775        T: Clone,
776    {
777        let none: syn::Expr = parse_quote!(::std::option::Option::None);
778
779        let none_singleton = Singleton::new(
780            self.location.clone(),
781            HydroNode::SingletonSource {
782                value: none.into(),
783                first_tick_only: false,
784                metadata: self
785                    .location
786                    .new_node_metadata(Singleton::<Option<T>, L, B>::collection_kind()),
787            },
788        );
789
790        self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
791    }
792
793    /// An operator which allows you to "name" a `HydroNode`.
794    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
795    pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
796        {
797            let mut node = self.ir_node.borrow_mut();
798            let metadata = node.metadata_mut();
799            metadata.tag = Some(name.to_owned());
800        }
801        self
802    }
803
804    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
805    /// implies that `B == Bounded`.
806    pub fn make_bounded(self) -> Optional<T, L, Bounded>
807    where
808        B: IsBounded,
809    {
810        Optional::new(self.location, self.ir_node.into_inner())
811    }
812
813    /// Clones this bounded optional into a tick, returning a optional that has the
814    /// same value as the outer optional. Because the outer optional is bounded, this
815    /// is deterministic because there is only a single immutable version.
816    pub fn clone_into_tick(self, tick: &Tick<L>) -> Optional<T, Tick<L>, Bounded>
817    where
818        B: IsBounded,
819        T: Clone,
820    {
821        // TODO(shadaj): avoid printing simulator logs for this snapshot
822        self.snapshot(
823            tick,
824            nondet!(/** bounded top-level optional so deterministic */),
825        )
826    }
827
828    /// Converts this optional into a [`Stream`] containing a single element, the value, if it is
829    /// non-null. Otherwise, the stream is empty.
830    ///
831    /// # Example
832    /// ```rust
833    /// # #[cfg(feature = "deploy")] {
834    /// # use hydro_lang::prelude::*;
835    /// # use futures::StreamExt;
836    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
837    /// # let tick = process.tick();
838    /// # // ticks are lazy by default, forces the second tick to run
839    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
840    /// # let batch_first_tick = process
841    /// #   .source_iter(q!(vec![]))
842    /// #   .batch(&tick, nondet!(/** test */));
843    /// # let batch_second_tick = process
844    /// #   .source_iter(q!(vec![123, 456]))
845    /// #   .batch(&tick, nondet!(/** test */))
846    /// #   .defer_tick(); // appears on the second tick
847    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
848    /// input_batch // first tick: [], second tick: [123, 456]
849    ///     .clone()
850    ///     .max()
851    ///     .into_stream()
852    ///     .chain(input_batch)
853    ///     .all_ticks()
854    /// # }, |mut stream| async move {
855    /// // [456, 123, 456]
856    /// # for w in vec![456, 123, 456] {
857    /// #     assert_eq!(stream.next().await.unwrap(), w);
858    /// # }
859    /// # }));
860    /// # }
861    /// ```
862    pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
863    where
864        B: IsBounded,
865    {
866        Stream::new(
867            self.location.clone(),
868            HydroNode::Cast {
869                inner: Box::new(self.ir_node.into_inner()),
870                metadata: self.location.new_node_metadata(Stream::<
871                    T,
872                    Tick<L>,
873                    Bounded,
874                    TotalOrder,
875                    ExactlyOnce,
876                >::collection_kind()),
877            },
878        )
879    }
880
881    /// Filters this optional, passing through the optional value if it is non-null **and** the
882    /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
883    ///
884    /// Useful for conditionally processing, such as only emitting an optional's value outside
885    /// a tick if some other condition is satisfied.
886    ///
887    /// # Example
888    /// ```rust
889    /// # #[cfg(feature = "deploy")] {
890    /// # use hydro_lang::prelude::*;
891    /// # use futures::StreamExt;
892    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
893    /// let tick = process.tick();
894    /// // ticks are lazy by default, forces the second tick to run
895    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
896    ///
897    /// let batch_first_tick = process
898    ///   .source_iter(q!(vec![]))
899    ///   .batch(&tick, nondet!(/** test */));
900    /// let batch_second_tick = process
901    ///   .source_iter(q!(vec![456]))
902    ///   .batch(&tick, nondet!(/** test */))
903    ///   .defer_tick(); // appears on the second tick
904    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
905    /// batch_first_tick.chain(batch_second_tick).first()
906    ///   .filter_if_some(some_on_first_tick)
907    ///   .unwrap_or(tick.singleton(q!(789)))
908    ///   .all_ticks()
909    /// # }, |mut stream| async move {
910    /// // [789, 789]
911    /// # for w in vec![789, 789] {
912    /// #     assert_eq!(stream.next().await.unwrap(), w);
913    /// # }
914    /// # }));
915    /// # }
916    /// ```
917    pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B>
918    where
919        B: IsBounded,
920    {
921        self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d))
922    }
923
924    /// Filters this optional, passing through the optional value if it is non-null **and** the
925    /// argument (a [`Bounded`] [`Optional`]`) is _null_, otherwise the output is null.
926    ///
927    /// Useful for conditionally processing, such as only emitting an optional's value outside
928    /// a tick if some other condition is satisfied.
929    ///
930    /// # Example
931    /// ```rust
932    /// # #[cfg(feature = "deploy")] {
933    /// # use hydro_lang::prelude::*;
934    /// # use futures::StreamExt;
935    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
936    /// let tick = process.tick();
937    /// // ticks are lazy by default, forces the second tick to run
938    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
939    ///
940    /// let batch_first_tick = process
941    ///   .source_iter(q!(vec![]))
942    ///   .batch(&tick, nondet!(/** test */));
943    /// let batch_second_tick = process
944    ///   .source_iter(q!(vec![456]))
945    ///   .batch(&tick, nondet!(/** test */))
946    ///   .defer_tick(); // appears on the second tick
947    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
948    /// batch_first_tick.chain(batch_second_tick).first()
949    ///   .filter_if_none(some_on_first_tick)
950    ///   .unwrap_or(tick.singleton(q!(789)))
951    ///   .all_ticks()
952    /// # }, |mut stream| async move {
953    /// // [789, 789]
954    /// # for w in vec![789, 456] {
955    /// #     assert_eq!(stream.next().await.unwrap(), w);
956    /// # }
957    /// # }));
958    /// # }
959    /// ```
960    pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B>
961    where
962        B: IsBounded,
963    {
964        self.filter_if_some(
965            other
966                .map(q!(|_| ()))
967                .into_singleton()
968                .filter(q!(|o| o.is_none())),
969        )
970    }
971
972    /// If `self` is null, emits a null optional, but if it non-null, emits `value`.
973    ///
974    /// Useful for gating the release of a [`Singleton`] on a condition of the [`Optional`]
975    /// having a value, such as only releasing a piece of state if the node is the leader.
976    ///
977    /// # Example
978    /// ```rust
979    /// # #[cfg(feature = "deploy")] {
980    /// # use hydro_lang::prelude::*;
981    /// # use futures::StreamExt;
982    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
983    /// let tick = process.tick();
984    /// // ticks are lazy by default, forces the second tick to run
985    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
986    ///
987    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
988    /// some_on_first_tick
989    ///     .if_some_then(tick.singleton(q!(456)))
990    ///     .unwrap_or(tick.singleton(q!(123)))
991    /// # .all_ticks()
992    /// # }, |mut stream| async move {
993    /// // 456 (first tick) ~> 123 (second tick onwards)
994    /// # for w in vec![456, 123, 123] {
995    /// #     assert_eq!(stream.next().await.unwrap(), w);
996    /// # }
997    /// # }));
998    /// # }
999    /// ```
1000    pub fn if_some_then<U>(self, value: Singleton<U, L, B>) -> Optional<U, L, B>
1001    where
1002        B: IsBounded,
1003    {
1004        value.filter_if_some(self)
1005    }
1006}
1007
1008impl<'a, T, L, B: Boundedness> Optional<T, Atomic<L>, B>
1009where
1010    L: Location<'a> + NoTick,
1011{
1012    /// Returns an optional value corresponding to the latest snapshot of the optional
1013    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
1014    /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
1015    /// all snapshots of this optional into the atomic-associated tick will observe the
1016    /// same value each tick.
1017    ///
1018    /// # Non-Determinism
1019    /// Because this picks a snapshot of a optional whose value is continuously changing,
1020    /// the output optional has a non-deterministic value since the snapshot can be at an
1021    /// arbitrary point in time.
1022    pub fn snapshot_atomic(self, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
1023        Optional::new(
1024            self.location.clone().tick,
1025            HydroNode::Batch {
1026                inner: Box::new(self.ir_node.into_inner()),
1027                metadata: self
1028                    .location
1029                    .tick
1030                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1031            },
1032        )
1033    }
1034
1035    /// Returns this optional back into a top-level, asynchronous execution context where updates
1036    /// to the value will be asynchronously propagated.
1037    pub fn end_atomic(self) -> Optional<T, L, B> {
1038        Optional::new(
1039            self.location.tick.l.clone(),
1040            HydroNode::EndAtomic {
1041                inner: Box::new(self.ir_node.into_inner()),
1042                metadata: self
1043                    .location
1044                    .tick
1045                    .l
1046                    .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1047            },
1048        )
1049    }
1050}
1051
1052impl<'a, T, L, B: Boundedness> Optional<T, L, B>
1053where
1054    L: Location<'a>,
1055{
1056    /// Shifts this optional into an atomic context, which guarantees that any downstream logic
1057    /// will observe the same version of the value and will be executed synchronously before any
1058    /// outputs are yielded (in [`Optional::end_atomic`]).
1059    ///
1060    /// This is useful to enforce local consistency constraints, such as ensuring that several readers
1061    /// see a consistent version of local state (since otherwise each [`Optional::snapshot`] may pick
1062    /// a different version).
1063    ///
1064    /// Entering an atomic section requires a [`Tick`] argument that declares where the optional will
1065    /// be atomically processed. Snapshotting an optional into the _same_ [`Tick`] will preserve the
1066    /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
1067    pub fn atomic(self, tick: &Tick<L>) -> Optional<T, Atomic<L>, B> {
1068        let out_location = Atomic { tick: tick.clone() };
1069        Optional::new(
1070            out_location.clone(),
1071            HydroNode::BeginAtomic {
1072                inner: Box::new(self.ir_node.into_inner()),
1073                metadata: out_location
1074                    .new_node_metadata(Optional::<T, Atomic<L>, B>::collection_kind()),
1075            },
1076        )
1077    }
1078
1079    /// Given a tick, returns a optional value corresponding to a snapshot of the optional
1080    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
1081    /// relevant data that contributed to the snapshot at tick `t`.
1082    ///
1083    /// # Non-Determinism
1084    /// Because this picks a snapshot of a optional whose value is continuously changing,
1085    /// the output optional has a non-deterministic value since the snapshot can be at an
1086    /// arbitrary point in time.
1087    pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
1088        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1089        Optional::new(
1090            tick.clone(),
1091            HydroNode::Batch {
1092                inner: Box::new(self.ir_node.into_inner()),
1093                metadata: tick
1094                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1095            },
1096        )
1097    }
1098
1099    /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
1100    /// with order corresponding to increasing prefixes of data contributing to the optional.
1101    ///
1102    /// # Non-Determinism
1103    /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
1104    /// to non-deterministic batching and arrival of inputs, the output stream is
1105    /// non-deterministic.
1106    pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1107    where
1108        L: NoTick,
1109    {
1110        let tick = self.location.tick();
1111        self.snapshot(&tick, nondet).all_ticks().weaken_retries()
1112    }
1113
1114    /// Given a time interval, returns a stream corresponding to snapshots of the optional
1115    /// value taken at various points in time. Because the input optional may be
1116    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1117    /// represent the value of the optional given some prefix of the streams leading up to
1118    /// it.
1119    ///
1120    /// # Non-Determinism
1121    /// The output stream is non-deterministic in which elements are sampled, since this
1122    /// is controlled by a clock.
1123    pub fn sample_every(
1124        self,
1125        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1126        nondet: NonDet,
1127    ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1128    where
1129        L: NoTick + NoAtomic,
1130    {
1131        let samples = self.location.source_interval(interval, nondet);
1132        let tick = self.location.tick();
1133
1134        self.snapshot(&tick, nondet)
1135            .filter_if_some(samples.batch(&tick, nondet).first())
1136            .all_ticks()
1137            .weaken_retries()
1138    }
1139}
1140
1141impl<'a, T, L> Optional<T, Tick<L>, Bounded>
1142where
1143    L: Location<'a>,
1144{
1145    /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1146    /// which will stream the value computed in _each_ tick as a separate stream element (skipping
1147    /// null values).
1148    ///
1149    /// Unlike [`Optional::latest`], the value computed in each tick is emitted separately,
1150    /// producing one element in the output for each (non-null) tick. This is useful for batched
1151    /// computations, where the results from each tick must be combined together.
1152    ///
1153    /// # Example
1154    /// ```rust
1155    /// # #[cfg(feature = "deploy")] {
1156    /// # use hydro_lang::prelude::*;
1157    /// # use futures::StreamExt;
1158    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1159    /// # let tick = process.tick();
1160    /// # // ticks are lazy by default, forces the second tick to run
1161    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1162    /// # let batch_first_tick = process
1163    /// #   .source_iter(q!(vec![]))
1164    /// #   .batch(&tick, nondet!(/** test */));
1165    /// # let batch_second_tick = process
1166    /// #   .source_iter(q!(vec![1, 2, 3]))
1167    /// #   .batch(&tick, nondet!(/** test */))
1168    /// #   .defer_tick(); // appears on the second tick
1169    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1170    /// input_batch // first tick: [], second tick: [1, 2, 3]
1171    ///     .max()
1172    ///     .all_ticks()
1173    /// # }, |mut stream| async move {
1174    /// // [3]
1175    /// # for w in vec![3] {
1176    /// #     assert_eq!(stream.next().await.unwrap(), w);
1177    /// # }
1178    /// # }));
1179    /// # }
1180    /// ```
1181    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1182        self.into_stream().all_ticks()
1183    }
1184
1185    /// Synchronously yields the value of this optional outside the tick as an unbounded stream,
1186    /// which will stream the value computed in _each_ tick as a separate stream element.
1187    ///
1188    /// Unlike [`Optional::all_ticks`], this preserves synchronous execution, as the output stream
1189    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1190    /// optional's [`Tick`] context.
1191    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1192        self.into_stream().all_ticks_atomic()
1193    }
1194
1195    /// Asynchronously yields this optional outside the tick as an unbounded optional, which will
1196    /// be asynchronously updated with the latest value of the optional inside the tick, including
1197    /// whether the optional is null or not.
1198    ///
1199    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1200    /// tick that tracks the inner value. This is useful for getting the value as of the
1201    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1202    ///
1203    /// # Example
1204    /// ```rust
1205    /// # #[cfg(feature = "deploy")] {
1206    /// # use hydro_lang::prelude::*;
1207    /// # use futures::StreamExt;
1208    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1209    /// # let tick = process.tick();
1210    /// # // ticks are lazy by default, forces the second tick to run
1211    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1212    /// # let batch_first_tick = process
1213    /// #   .source_iter(q!(vec![]))
1214    /// #   .batch(&tick, nondet!(/** test */));
1215    /// # let batch_second_tick = process
1216    /// #   .source_iter(q!(vec![1, 2, 3]))
1217    /// #   .batch(&tick, nondet!(/** test */))
1218    /// #   .defer_tick(); // appears on the second tick
1219    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1220    /// input_batch // first tick: [], second tick: [1, 2, 3]
1221    ///     .max()
1222    ///     .latest()
1223    /// # .into_singleton()
1224    /// # .sample_eager(nondet!(/** test */))
1225    /// # }, |mut stream| async move {
1226    /// // asynchronously changes from None ~> 3
1227    /// # for w in vec![None, Some(3)] {
1228    /// #     assert_eq!(stream.next().await.unwrap(), w);
1229    /// # }
1230    /// # }));
1231    /// # }
1232    /// ```
1233    pub fn latest(self) -> Optional<T, L, Unbounded> {
1234        Optional::new(
1235            self.location.outer().clone(),
1236            HydroNode::YieldConcat {
1237                inner: Box::new(self.ir_node.into_inner()),
1238                metadata: self
1239                    .location
1240                    .outer()
1241                    .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
1242            },
1243        )
1244    }
1245
1246    /// Synchronously yields this optional outside the tick as an unbounded optional, which will
1247    /// be updated with the latest value of the optional inside the tick.
1248    ///
1249    /// Unlike [`Optional::latest`], this preserves synchronous execution, as the output optional
1250    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1251    /// optional's [`Tick`] context.
1252    pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
1253        let out_location = Atomic {
1254            tick: self.location.clone(),
1255        };
1256
1257        Optional::new(
1258            out_location.clone(),
1259            HydroNode::YieldConcat {
1260                inner: Box::new(self.ir_node.into_inner()),
1261                metadata: out_location
1262                    .new_node_metadata(Optional::<T, Atomic<L>, Unbounded>::collection_kind()),
1263            },
1264        )
1265    }
1266
1267    /// Shifts the state in `self` to the **next tick**, so that the returned optional at tick `T`
1268    /// always has the state of `self` at tick `T - 1`.
1269    ///
1270    /// At tick `0`, the output optional is null, since there is no previous tick.
1271    ///
1272    /// This operator enables stateful iterative processing with ticks, by sending data from one
1273    /// tick to the next. For example, you can use it to compare state across consecutive batches.
1274    ///
1275    /// # Example
1276    /// ```rust
1277    /// # #[cfg(feature = "deploy")] {
1278    /// # use hydro_lang::prelude::*;
1279    /// # use futures::StreamExt;
1280    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1281    /// let tick = process.tick();
1282    /// // ticks are lazy by default, forces the second tick to run
1283    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1284    ///
1285    /// let batch_first_tick = process
1286    ///   .source_iter(q!(vec![1, 2]))
1287    ///   .batch(&tick, nondet!(/** test */));
1288    /// let batch_second_tick = process
1289    ///   .source_iter(q!(vec![3, 4]))
1290    ///   .batch(&tick, nondet!(/** test */))
1291    ///   .defer_tick(); // appears on the second tick
1292    /// let current_tick_sum = batch_first_tick.chain(batch_second_tick)
1293    ///   .reduce(q!(|state, v| *state += v));
1294    ///
1295    /// current_tick_sum.clone().into_singleton().zip(
1296    ///   current_tick_sum.defer_tick().into_singleton() // state from previous tick
1297    /// ).all_ticks()
1298    /// # }, |mut stream| async move {
1299    /// // [(Some(3), None) /* first tick */, (Some(7), Some(3)) /* second tick */]
1300    /// # for w in vec![(Some(3), None), (Some(7), Some(3))] {
1301    /// #     assert_eq!(stream.next().await.unwrap(), w);
1302    /// # }
1303    /// # }));
1304    /// # }
1305    /// ```
1306    pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
1307        Optional::new(
1308            self.location.clone(),
1309            HydroNode::DeferTick {
1310                input: Box::new(self.ir_node.into_inner()),
1311                metadata: self.location.new_node_metadata(Self::collection_kind()),
1312            },
1313        )
1314    }
1315}
1316
1317#[cfg(test)]
1318mod tests {
1319    #[cfg(feature = "deploy")]
1320    use futures::StreamExt;
1321    #[cfg(feature = "deploy")]
1322    use hydro_deploy::Deployment;
1323    #[cfg(any(feature = "deploy", feature = "sim"))]
1324    use stageleft::q;
1325
1326    #[cfg(feature = "deploy")]
1327    use super::Optional;
1328    #[cfg(any(feature = "deploy", feature = "sim"))]
1329    use crate::compile::builder::FlowBuilder;
1330    #[cfg(any(feature = "deploy", feature = "sim"))]
1331    use crate::location::Location;
1332    #[cfg(feature = "deploy")]
1333    use crate::nondet::nondet;
1334
1335    #[cfg(feature = "deploy")]
1336    #[tokio::test]
1337    async fn optional_or_cardinality() {
1338        let mut deployment = Deployment::new();
1339
1340        let mut flow = FlowBuilder::new();
1341        let node = flow.process::<()>();
1342        let external = flow.external::<()>();
1343
1344        let node_tick = node.tick();
1345        let tick_singleton = node_tick.singleton(q!(123));
1346        let tick_optional_inhabited: Optional<_, _, _> = tick_singleton.into();
1347        let counts = tick_optional_inhabited
1348            .clone()
1349            .or(tick_optional_inhabited)
1350            .into_stream()
1351            .count()
1352            .all_ticks()
1353            .send_bincode_external(&external);
1354
1355        let nodes = flow
1356            .with_process(&node, deployment.Localhost())
1357            .with_external(&external, deployment.Localhost())
1358            .deploy(&mut deployment);
1359
1360        deployment.deploy().await.unwrap();
1361
1362        let mut external_out = nodes.connect(counts).await;
1363
1364        deployment.start().await.unwrap();
1365
1366        assert_eq!(external_out.next().await.unwrap(), 1);
1367    }
1368
1369    #[cfg(feature = "deploy")]
1370    #[tokio::test]
1371    async fn into_singleton_top_level_none_cardinality() {
1372        let mut deployment = Deployment::new();
1373
1374        let mut flow = FlowBuilder::new();
1375        let node = flow.process::<()>();
1376        let external = flow.external::<()>();
1377
1378        let node_tick = node.tick();
1379        let top_level_none = node.singleton(q!(123)).filter(q!(|_| false));
1380        let into_singleton = top_level_none.into_singleton();
1381
1382        let tick_driver = node.spin();
1383
1384        let counts = into_singleton
1385            .snapshot(&node_tick, nondet!(/** test */))
1386            .into_stream()
1387            .count()
1388            .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1389            .map(q!(|(c, _)| c))
1390            .all_ticks()
1391            .send_bincode_external(&external);
1392
1393        let nodes = flow
1394            .with_process(&node, deployment.Localhost())
1395            .with_external(&external, deployment.Localhost())
1396            .deploy(&mut deployment);
1397
1398        deployment.deploy().await.unwrap();
1399
1400        let mut external_out = nodes.connect(counts).await;
1401
1402        deployment.start().await.unwrap();
1403
1404        assert_eq!(external_out.next().await.unwrap(), 1);
1405        assert_eq!(external_out.next().await.unwrap(), 1);
1406        assert_eq!(external_out.next().await.unwrap(), 1);
1407    }
1408
1409    #[cfg(feature = "deploy")]
1410    #[tokio::test]
1411    async fn into_singleton_unbounded_top_level_none_cardinality() {
1412        let mut deployment = Deployment::new();
1413
1414        let mut flow = FlowBuilder::new();
1415        let node = flow.process::<()>();
1416        let external = flow.external::<()>();
1417
1418        let node_tick = node.tick();
1419        let top_level_none = node_tick.singleton(q!(123)).latest().filter(q!(|_| false));
1420        let into_singleton = top_level_none.into_singleton();
1421
1422        let tick_driver = node.spin();
1423
1424        let counts = into_singleton
1425            .snapshot(&node_tick, nondet!(/** test */))
1426            .into_stream()
1427            .count()
1428            .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1429            .map(q!(|(c, _)| c))
1430            .all_ticks()
1431            .send_bincode_external(&external);
1432
1433        let nodes = flow
1434            .with_process(&node, deployment.Localhost())
1435            .with_external(&external, deployment.Localhost())
1436            .deploy(&mut deployment);
1437
1438        deployment.deploy().await.unwrap();
1439
1440        let mut external_out = nodes.connect(counts).await;
1441
1442        deployment.start().await.unwrap();
1443
1444        assert_eq!(external_out.next().await.unwrap(), 1);
1445        assert_eq!(external_out.next().await.unwrap(), 1);
1446        assert_eq!(external_out.next().await.unwrap(), 1);
1447    }
1448
1449    #[cfg(feature = "sim")]
1450    #[test]
1451    fn top_level_optional_some_into_stream_no_replay() {
1452        let mut flow = FlowBuilder::new();
1453        let node = flow.process::<()>();
1454
1455        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1456        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1457        let filtered_some = folded.filter(q!(|_| true));
1458
1459        let out_recv = filtered_some.into_stream().sim_output();
1460
1461        flow.sim().exhaustive(async || {
1462            out_recv.assert_yields_only([10]).await;
1463        });
1464    }
1465
1466    #[cfg(feature = "sim")]
1467    #[test]
1468    fn top_level_optional_none_into_stream_no_replay() {
1469        let mut flow = FlowBuilder::new();
1470        let node = flow.process::<()>();
1471
1472        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1473        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1474        let filtered_none = folded.filter(q!(|_| false));
1475
1476        let out_recv = filtered_none.into_stream().sim_output();
1477
1478        flow.sim().exhaustive(async || {
1479            out_recv.assert_yields_only([] as [i32; 0]).await;
1480        });
1481    }
1482}