Skip to main content

hydro_lang/live_collections/keyed_stream/
mod.rs

1//! Definitions for the [`KeyedStream`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q};
11
12use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
13use super::keyed_singleton::KeyedSingleton;
14use super::optional::Optional;
15use super::stream::{
16    ExactlyOnce, IsExactlyOnce, IsOrdered, MinOrder, MinRetries, NoOrder, Stream, TotalOrder,
17};
18use crate::compile::builder::CycleId;
19use crate::compile::ir::{
20    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, StreamOrder, StreamRetry, TeeNode,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26use crate::live_collections::stream::{
27    AtLeastOnce, Ordering, Retries, WeakerOrderingThan, WeakerRetryThan,
28};
29#[cfg(stageleft_runtime)]
30use crate::location::dynamic::{DynLocation, LocationId};
31use crate::location::tick::DeferTick;
32use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
33use crate::manual_expr::ManualExpr;
34use crate::nondet::{NonDet, nondet};
35use crate::properties::{AggFuncAlgebra, ValidCommutativityFor, ValidIdempotenceFor};
36
37pub mod networking;
38
39/// Streaming elements of type `V` grouped by a key of type `K`.
40///
41/// Keyed Streams capture streaming elements of type `V` grouped by a key of type `K`, where the
42/// order of keys is non-deterministic but the order *within* each group may be deterministic.
43///
44/// Although keyed streams are conceptually grouped by keys, values are not immediately grouped
45/// into buckets when constructing a keyed stream. Instead, keyed streams defer grouping until an
46/// operator such as [`KeyedStream::fold`] is called, which requires `K: Hash + Eq`.
47///
48/// Type Parameters:
49/// - `K`: the type of the key for each group
50/// - `V`: the type of the elements inside each group
51/// - `Loc`: the [`Location`] where the keyed stream is materialized
52/// - `Bound`: tracks whether the entries are [`Bounded`] (local and finite) or [`Unbounded`] (asynchronous and possibly infinite)
53/// - `Order`: tracks whether the elements within each group have deterministic order
54///   ([`TotalOrder`]) or not ([`NoOrder`])
55/// - `Retries`: tracks whether the elements within each group have deterministic cardinality
56///   ([`ExactlyOnce`]) or may have non-deterministic retries ([`crate::live_collections::stream::AtLeastOnce`])
57pub struct KeyedStream<
58    K,
59    V,
60    Loc,
61    Bound: Boundedness = Unbounded,
62    Order: Ordering = TotalOrder,
63    Retry: Retries = ExactlyOnce,
64> {
65    pub(crate) location: Loc,
66    pub(crate) ir_node: RefCell<HydroNode>,
67
68    _phantom: PhantomData<(K, V, Loc, Bound, Order, Retry)>,
69}
70
71impl<'a, K, V, L, O: Ordering, R: Retries> From<KeyedStream<K, V, L, Bounded, O, R>>
72    for KeyedStream<K, V, L, Unbounded, O, R>
73where
74    L: Location<'a>,
75{
76    fn from(stream: KeyedStream<K, V, L, Bounded, O, R>) -> KeyedStream<K, V, L, Unbounded, O, R> {
77        let new_meta = stream
78            .location
79            .new_node_metadata(KeyedStream::<K, V, L, Unbounded, O, R>::collection_kind());
80
81        KeyedStream {
82            location: stream.location,
83            ir_node: RefCell::new(HydroNode::Cast {
84                inner: Box::new(stream.ir_node.into_inner()),
85                metadata: new_meta,
86            }),
87            _phantom: PhantomData,
88        }
89    }
90}
91
92impl<'a, K, V, L, B: Boundedness, R: Retries> From<KeyedStream<K, V, L, B, TotalOrder, R>>
93    for KeyedStream<K, V, L, B, NoOrder, R>
94where
95    L: Location<'a>,
96{
97    fn from(stream: KeyedStream<K, V, L, B, TotalOrder, R>) -> KeyedStream<K, V, L, B, NoOrder, R> {
98        stream.weaken_ordering()
99    }
100}
101
102impl<'a, K, V, L, O: Ordering, R: Retries> DeferTick for KeyedStream<K, V, Tick<L>, Bounded, O, R>
103where
104    L: Location<'a>,
105{
106    fn defer_tick(self) -> Self {
107        KeyedStream::defer_tick(self)
108    }
109}
110
111impl<'a, K, V, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
112    for KeyedStream<K, V, Tick<L>, Bounded, O, R>
113where
114    L: Location<'a>,
115{
116    type Location = Tick<L>;
117
118    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
119        KeyedStream {
120            location: location.clone(),
121            ir_node: RefCell::new(HydroNode::CycleSource {
122                cycle_id,
123                metadata: location.new_node_metadata(
124                    KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
125                ),
126            }),
127            _phantom: PhantomData,
128        }
129    }
130}
131
132impl<'a, K, V, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
133    for KeyedStream<K, V, Tick<L>, Bounded, O, R>
134where
135    L: Location<'a>,
136{
137    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
138        assert_eq!(
139            Location::id(&self.location),
140            expected_location,
141            "locations do not match"
142        );
143
144        self.location
145            .flow_state()
146            .borrow_mut()
147            .push_root(HydroRoot::CycleSink {
148                cycle_id,
149                input: Box::new(self.ir_node.into_inner()),
150                op_metadata: HydroIrOpMetadata::new(),
151            });
152    }
153}
154
155impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
156    for KeyedStream<K, V, L, B, O, R>
157where
158    L: Location<'a> + NoTick,
159{
160    type Location = L;
161
162    fn create_source(cycle_id: CycleId, location: L) -> Self {
163        KeyedStream {
164            location: location.clone(),
165            ir_node: RefCell::new(HydroNode::CycleSource {
166                cycle_id,
167                metadata: location
168                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
169            }),
170            _phantom: PhantomData,
171        }
172    }
173}
174
175impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
176    for KeyedStream<K, V, L, B, O, R>
177where
178    L: Location<'a> + NoTick,
179{
180    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
181        assert_eq!(
182            Location::id(&self.location),
183            expected_location,
184            "locations do not match"
185        );
186        self.location
187            .flow_state()
188            .borrow_mut()
189            .push_root(HydroRoot::CycleSink {
190                cycle_id,
191                input: Box::new(self.ir_node.into_inner()),
192                op_metadata: HydroIrOpMetadata::new(),
193            });
194    }
195}
196
197impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order: Ordering, R: Retries>
198    Clone for KeyedStream<K, V, Loc, Bound, Order, R>
199{
200    fn clone(&self) -> Self {
201        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
202            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
203            *self.ir_node.borrow_mut() = HydroNode::Tee {
204                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
205                metadata: self.location.new_node_metadata(Self::collection_kind()),
206            };
207        }
208
209        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
210            KeyedStream {
211                location: self.location.clone(),
212                ir_node: HydroNode::Tee {
213                    inner: TeeNode(inner.0.clone()),
214                    metadata: metadata.clone(),
215                }
216                .into(),
217                _phantom: PhantomData,
218            }
219        } else {
220            unreachable!()
221        }
222    }
223}
224
225/// The output of a Hydro generator created with [`KeyedStream::generator`], which can yield elements and
226/// control the processing of future elements.
227pub enum Generate<T> {
228    /// Emit the provided element, and keep processing future inputs.
229    Yield(T),
230    /// Emit the provided element as the _final_ element, do not process future inputs.
231    Return(T),
232    /// Do not emit anything, but continue processing future inputs.
233    Continue,
234    /// Do not emit anything, and do not process further inputs.
235    Break,
236}
237
238impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
239    KeyedStream<K, V, L, B, O, R>
240{
241    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
242        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
243        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
244
245        KeyedStream {
246            location,
247            ir_node: RefCell::new(ir_node),
248            _phantom: PhantomData,
249        }
250    }
251
252    /// Returns the [`CollectionKind`] corresponding to this type.
253    pub fn collection_kind() -> CollectionKind {
254        CollectionKind::KeyedStream {
255            bound: B::BOUND_KIND,
256            value_order: O::ORDERING_KIND,
257            value_retry: R::RETRIES_KIND,
258            key_type: stageleft::quote_type::<K>().into(),
259            value_type: stageleft::quote_type::<V>().into(),
260        }
261    }
262
263    /// Returns the [`Location`] where this keyed stream is being materialized.
264    pub fn location(&self) -> &L {
265        &self.location
266    }
267
268    /// Explicitly "casts" the keyed stream to a type with a different ordering
269    /// guarantee for each group. Useful in unsafe code where the ordering cannot be proven
270    /// by the type-system.
271    ///
272    /// # Non-Determinism
273    /// This function is used as an escape hatch, and any mistakes in the
274    /// provided ordering guarantee will propagate into the guarantees
275    /// for the rest of the program.
276    pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O2, R> {
277        if O::ORDERING_KIND == O2::ORDERING_KIND {
278            KeyedStream::new(self.location, self.ir_node.into_inner())
279        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
280            // We can always weaken the ordering guarantee
281            KeyedStream::new(
282                self.location.clone(),
283                HydroNode::Cast {
284                    inner: Box::new(self.ir_node.into_inner()),
285                    metadata: self
286                        .location
287                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
288                },
289            )
290        } else {
291            KeyedStream::new(
292                self.location.clone(),
293                HydroNode::ObserveNonDet {
294                    inner: Box::new(self.ir_node.into_inner()),
295                    trusted: false,
296                    metadata: self
297                        .location
298                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
299                },
300            )
301        }
302    }
303
304    fn assume_ordering_trusted<O2: Ordering>(
305        self,
306        _nondet: NonDet,
307    ) -> KeyedStream<K, V, L, B, O2, R> {
308        if O::ORDERING_KIND == O2::ORDERING_KIND {
309            KeyedStream::new(self.location, self.ir_node.into_inner())
310        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
311            // We can always weaken the ordering guarantee
312            KeyedStream::new(
313                self.location.clone(),
314                HydroNode::Cast {
315                    inner: Box::new(self.ir_node.into_inner()),
316                    metadata: self
317                        .location
318                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
319                },
320            )
321        } else {
322            KeyedStream::new(
323                self.location.clone(),
324                HydroNode::ObserveNonDet {
325                    inner: Box::new(self.ir_node.into_inner()),
326                    trusted: true,
327                    metadata: self
328                        .location
329                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
330                },
331            )
332        }
333    }
334
335    #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
336    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
337    /// which is always safe because that is the weakest possible guarantee.
338    pub fn weakest_ordering(self) -> KeyedStream<K, V, L, B, NoOrder, R> {
339        self.weaken_ordering::<NoOrder>()
340    }
341
342    /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
343    /// enforcing that `O2` is weaker than the input ordering guarantee.
344    pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> KeyedStream<K, V, L, B, O2, R> {
345        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
346        self.assume_ordering::<O2>(nondet)
347    }
348
349    /// Explicitly "casts" the keyed stream to a type with a different retries
350    /// guarantee for each group. Useful in unsafe code where the lack of retries cannot
351    /// be proven by the type-system.
352    ///
353    /// # Non-Determinism
354    /// This function is used as an escape hatch, and any mistakes in the
355    /// provided retries guarantee will propagate into the guarantees
356    /// for the rest of the program.
357    pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O, R2> {
358        if R::RETRIES_KIND == R2::RETRIES_KIND {
359            KeyedStream::new(self.location, self.ir_node.into_inner())
360        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
361            // We can always weaken the retries guarantee
362            KeyedStream::new(
363                self.location.clone(),
364                HydroNode::Cast {
365                    inner: Box::new(self.ir_node.into_inner()),
366                    metadata: self
367                        .location
368                        .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
369                },
370            )
371        } else {
372            KeyedStream::new(
373                self.location.clone(),
374                HydroNode::ObserveNonDet {
375                    inner: Box::new(self.ir_node.into_inner()),
376                    trusted: false,
377                    metadata: self
378                        .location
379                        .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
380                },
381            )
382        }
383    }
384
385    #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
386    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
387    /// which is always safe because that is the weakest possible guarantee.
388    pub fn weakest_retries(self) -> KeyedStream<K, V, L, B, O, AtLeastOnce> {
389        self.weaken_retries::<AtLeastOnce>()
390    }
391
392    /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
393    /// enforcing that `R2` is weaker than the input retries guarantee.
394    pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> KeyedStream<K, V, L, B, O, R2> {
395        let nondet = nondet!(/** this is a weaker retries guarantee, so it is safe to assume */);
396        self.assume_retries::<R2>(nondet)
397    }
398
399    /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
400    /// implies that `O == TotalOrder`.
401    pub fn make_totally_ordered(self) -> KeyedStream<K, V, L, B, TotalOrder, R>
402    where
403        O: IsOrdered,
404    {
405        self.assume_ordering(nondet!(/** no-op */))
406    }
407
408    /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
409    /// implies that `R == ExactlyOnce`.
410    pub fn make_exactly_once(self) -> KeyedStream<K, V, L, B, O, ExactlyOnce>
411    where
412        R: IsExactlyOnce,
413    {
414        self.assume_retries(nondet!(/** no-op */))
415    }
416
417    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
418    /// implies that `B == Bounded`.
419    pub fn make_bounded(self) -> KeyedStream<K, V, L, Bounded, O, R>
420    where
421        B: IsBounded,
422    {
423        KeyedStream::new(self.location, self.ir_node.into_inner())
424    }
425
426    /// Flattens the keyed stream into an unordered stream of key-value pairs.
427    ///
428    /// # Example
429    /// ```rust
430    /// # #[cfg(feature = "deploy")] {
431    /// # use hydro_lang::prelude::*;
432    /// # use futures::StreamExt;
433    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
434    /// process
435    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
436    ///     .into_keyed()
437    ///     .entries()
438    /// # }, |mut stream| async move {
439    /// // (1, 2), (1, 3), (2, 4) in any order
440    /// # let mut results = Vec::new();
441    /// # for _ in 0..3 {
442    /// #     results.push(stream.next().await.unwrap());
443    /// # }
444    /// # results.sort();
445    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
446    /// # }));
447    /// # }
448    /// ```
449    pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R> {
450        Stream::new(
451            self.location.clone(),
452            HydroNode::Cast {
453                inner: Box::new(self.ir_node.into_inner()),
454                metadata: self
455                    .location
456                    .new_node_metadata(Stream::<(K, V), L, B, NoOrder, R>::collection_kind()),
457            },
458        )
459    }
460
461    /// Flattens the keyed stream into an unordered stream of only the values.
462    ///
463    /// # Example
464    /// ```rust
465    /// # #[cfg(feature = "deploy")] {
466    /// # use hydro_lang::prelude::*;
467    /// # use futures::StreamExt;
468    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
469    /// process
470    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
471    ///     .into_keyed()
472    ///     .values()
473    /// # }, |mut stream| async move {
474    /// // 2, 3, 4 in any order
475    /// # let mut results = Vec::new();
476    /// # for _ in 0..3 {
477    /// #     results.push(stream.next().await.unwrap());
478    /// # }
479    /// # results.sort();
480    /// # assert_eq!(results, vec![2, 3, 4]);
481    /// # }));
482    /// # }
483    /// ```
484    pub fn values(self) -> Stream<V, L, B, NoOrder, R> {
485        self.entries().map(q!(|(_, v)| v))
486    }
487
488    /// Flattens the keyed stream into an unordered stream of just the keys.
489    ///
490    /// # Example
491    /// ```rust
492    /// # #[cfg(feature = "deploy")] {
493    /// # use hydro_lang::prelude::*;
494    /// # use futures::StreamExt;
495    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
496    /// # process
497    /// #     .source_iter(q!(vec![(1, 2), (2, 4), (1, 5)]))
498    /// #     .into_keyed()
499    /// #     .keys()
500    /// # }, |mut stream| async move {
501    /// // 1, 2 in any order
502    /// # let mut results = Vec::new();
503    /// # for _ in 0..2 {
504    /// #     results.push(stream.next().await.unwrap());
505    /// # }
506    /// # results.sort();
507    /// # assert_eq!(results, vec![1, 2]);
508    /// # }));
509    /// # }
510    /// ```
511    pub fn keys(self) -> Stream<K, L, B, NoOrder, ExactlyOnce>
512    where
513        K: Eq + Hash,
514    {
515        self.entries().map(q!(|(k, _)| k)).unique()
516    }
517
518    /// Transforms each value by invoking `f` on each element, with keys staying the same
519    /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
520    ///
521    /// If you do not want to modify the stream and instead only want to view
522    /// each item use [`KeyedStream::inspect`] instead.
523    ///
524    /// # Example
525    /// ```rust
526    /// # #[cfg(feature = "deploy")] {
527    /// # use hydro_lang::prelude::*;
528    /// # use futures::StreamExt;
529    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
530    /// process
531    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
532    ///     .into_keyed()
533    ///     .map(q!(|v| v + 1))
534    /// #   .entries()
535    /// # }, |mut stream| async move {
536    /// // { 1: [3, 4], 2: [5] }
537    /// # let mut results = Vec::new();
538    /// # for _ in 0..3 {
539    /// #     results.push(stream.next().await.unwrap());
540    /// # }
541    /// # results.sort();
542    /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 5)]);
543    /// # }));
544    /// # }
545    /// ```
546    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, U, L, B, O, R>
547    where
548        F: Fn(V) -> U + 'a,
549    {
550        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
551        let map_f = q!({
552            let orig = f;
553            move |(k, v)| (k, orig(v))
554        })
555        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
556        .into();
557
558        KeyedStream::new(
559            self.location.clone(),
560            HydroNode::Map {
561                f: map_f,
562                input: Box::new(self.ir_node.into_inner()),
563                metadata: self
564                    .location
565                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
566            },
567        )
568    }
569
570    /// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
571    /// re-grouped even they are tuples; instead they will be grouped under the original key.
572    ///
573    /// If you do not want to modify the stream and instead only want to view
574    /// each item use [`KeyedStream::inspect_with_key`] instead.
575    ///
576    /// # Example
577    /// ```rust
578    /// # #[cfg(feature = "deploy")] {
579    /// # use hydro_lang::prelude::*;
580    /// # use futures::StreamExt;
581    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
582    /// process
583    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
584    ///     .into_keyed()
585    ///     .map_with_key(q!(|(k, v)| k + v))
586    /// #   .entries()
587    /// # }, |mut stream| async move {
588    /// // { 1: [3, 4], 2: [6] }
589    /// # let mut results = Vec::new();
590    /// # for _ in 0..3 {
591    /// #     results.push(stream.next().await.unwrap());
592    /// # }
593    /// # results.sort();
594    /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 6)]);
595    /// # }));
596    /// # }
597    /// ```
598    pub fn map_with_key<U, F>(
599        self,
600        f: impl IntoQuotedMut<'a, F, L> + Copy,
601    ) -> KeyedStream<K, U, L, B, O, R>
602    where
603        F: Fn((K, V)) -> U + 'a,
604        K: Clone,
605    {
606        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
607        let map_f = q!({
608            let orig = f;
609            move |(k, v)| {
610                let out = orig((Clone::clone(&k), v));
611                (k, out)
612            }
613        })
614        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
615        .into();
616
617        KeyedStream::new(
618            self.location.clone(),
619            HydroNode::Map {
620                f: map_f,
621                input: Box::new(self.ir_node.into_inner()),
622                metadata: self
623                    .location
624                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
625            },
626        )
627    }
628
629    /// Prepends a new value to the key of each element in the stream, producing a new
630    /// keyed stream with compound keys. Because the original key is preserved, no re-grouping
631    /// occurs and the elements in each group preserve their original order.
632    ///
633    /// # Example
634    /// ```rust
635    /// # #[cfg(feature = "deploy")] {
636    /// # use hydro_lang::prelude::*;
637    /// # use futures::StreamExt;
638    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
639    /// process
640    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
641    ///     .into_keyed()
642    ///     .prefix_key(q!(|&(k, _)| k % 2))
643    /// #   .entries()
644    /// # }, |mut stream| async move {
645    /// // { (1, 1): [2, 3], (0, 2): [4] }
646    /// # let mut results = Vec::new();
647    /// # for _ in 0..3 {
648    /// #     results.push(stream.next().await.unwrap());
649    /// # }
650    /// # results.sort();
651    /// # assert_eq!(results, vec![((0, 2), 4), ((1, 1), 2), ((1, 1), 3)]);
652    /// # }));
653    /// # }
654    /// ```
655    pub fn prefix_key<K2, F>(
656        self,
657        f: impl IntoQuotedMut<'a, F, L> + Copy,
658    ) -> KeyedStream<(K2, K), V, L, B, O, R>
659    where
660        F: Fn(&(K, V)) -> K2 + 'a,
661    {
662        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
663        let map_f = q!({
664            let orig = f;
665            move |kv| {
666                let out = orig(&kv);
667                ((out, kv.0), kv.1)
668            }
669        })
670        .splice_fn1_ctx::<(K, V), ((K2, K), V)>(&self.location)
671        .into();
672
673        KeyedStream::new(
674            self.location.clone(),
675            HydroNode::Map {
676                f: map_f,
677                input: Box::new(self.ir_node.into_inner()),
678                metadata: self
679                    .location
680                    .new_node_metadata(KeyedStream::<(K2, K), V, L, B, O, R>::collection_kind()),
681            },
682        )
683    }
684
685    /// Creates a stream containing only the elements of each group stream that satisfy a predicate
686    /// `f`, preserving the order of the elements within the group.
687    ///
688    /// The closure `f` receives a reference `&V` rather than an owned value `v` because filtering does
689    /// not modify or take ownership of the values. If you need to modify the values while filtering
690    /// use [`KeyedStream::filter_map`] instead.
691    ///
692    /// # Example
693    /// ```rust
694    /// # #[cfg(feature = "deploy")] {
695    /// # use hydro_lang::prelude::*;
696    /// # use futures::StreamExt;
697    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
698    /// process
699    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
700    ///     .into_keyed()
701    ///     .filter(q!(|&x| x > 2))
702    /// #   .entries()
703    /// # }, |mut stream| async move {
704    /// // { 1: [3], 2: [4] }
705    /// # let mut results = Vec::new();
706    /// # for _ in 0..2 {
707    /// #     results.push(stream.next().await.unwrap());
708    /// # }
709    /// # results.sort();
710    /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
711    /// # }));
712    /// # }
713    /// ```
714    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
715    where
716        F: Fn(&V) -> bool + 'a,
717    {
718        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
719        let filter_f = q!({
720            let orig = f;
721            move |t: &(_, _)| orig(&t.1)
722        })
723        .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
724        .into();
725
726        KeyedStream::new(
727            self.location.clone(),
728            HydroNode::Filter {
729                f: filter_f,
730                input: Box::new(self.ir_node.into_inner()),
731                metadata: self.location.new_node_metadata(Self::collection_kind()),
732            },
733        )
734    }
735
736    /// Creates a stream containing only the elements of each group stream that satisfy a predicate
737    /// `f` (which receives the key-value tuple), preserving the order of the elements within the group.
738    ///
739    /// The closure `f` receives a reference `&(K, V)` rather than an owned value `(K, V)` because filtering does
740    /// not modify or take ownership of the values. If you need to modify the values while filtering
741    /// use [`KeyedStream::filter_map_with_key`] instead.
742    ///
743    /// # Example
744    /// ```rust
745    /// # #[cfg(feature = "deploy")] {
746    /// # use hydro_lang::prelude::*;
747    /// # use futures::StreamExt;
748    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
749    /// process
750    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
751    ///     .into_keyed()
752    ///     .filter_with_key(q!(|&(k, v)| v - k == 2))
753    /// #   .entries()
754    /// # }, |mut stream| async move {
755    /// // { 1: [3], 2: [4] }
756    /// # let mut results = Vec::new();
757    /// # for _ in 0..2 {
758    /// #     results.push(stream.next().await.unwrap());
759    /// # }
760    /// # results.sort();
761    /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
762    /// # }));
763    /// # }
764    /// ```
765    pub fn filter_with_key<F>(
766        self,
767        f: impl IntoQuotedMut<'a, F, L> + Copy,
768    ) -> KeyedStream<K, V, L, B, O, R>
769    where
770        F: Fn(&(K, V)) -> bool + 'a,
771    {
772        let filter_f = f
773            .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
774            .into();
775
776        KeyedStream::new(
777            self.location.clone(),
778            HydroNode::Filter {
779                f: filter_f,
780                input: Box::new(self.ir_node.into_inner()),
781                metadata: self.location.new_node_metadata(Self::collection_kind()),
782            },
783        )
784    }
785
786    /// An operator that both filters and maps each value, with keys staying the same.
787    /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
788    /// If you need access to the key, see [`KeyedStream::filter_map_with_key`].
789    ///
790    /// # Example
791    /// ```rust
792    /// # #[cfg(feature = "deploy")] {
793    /// # use hydro_lang::prelude::*;
794    /// # use futures::StreamExt;
795    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
796    /// process
797    ///     .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
798    ///     .into_keyed()
799    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
800    /// #   .entries()
801    /// # }, |mut stream| async move {
802    /// // { 1: [2], 2: [4] }
803    /// # let mut results = Vec::new();
804    /// # for _ in 0..2 {
805    /// #     results.push(stream.next().await.unwrap());
806    /// # }
807    /// # results.sort();
808    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
809    /// # }));
810    /// # }
811    /// ```
812    pub fn filter_map<U, F>(
813        self,
814        f: impl IntoQuotedMut<'a, F, L> + Copy,
815    ) -> KeyedStream<K, U, L, B, O, R>
816    where
817        F: Fn(V) -> Option<U> + 'a,
818    {
819        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
820        let filter_map_f = q!({
821            let orig = f;
822            move |(k, v)| orig(v).map(|o| (k, o))
823        })
824        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
825        .into();
826
827        KeyedStream::new(
828            self.location.clone(),
829            HydroNode::FilterMap {
830                f: filter_map_f,
831                input: Box::new(self.ir_node.into_inner()),
832                metadata: self
833                    .location
834                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
835            },
836        )
837    }
838
839    /// An operator that both filters and maps each key-value pair. The resulting values are **not**
840    /// re-grouped even they are tuples; instead they will be grouped under the original key.
841    /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
842    ///
843    /// # Example
844    /// ```rust
845    /// # #[cfg(feature = "deploy")] {
846    /// # use hydro_lang::prelude::*;
847    /// # use futures::StreamExt;
848    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
849    /// process
850    ///     .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
851    ///     .into_keyed()
852    ///     .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
853    /// #   .entries()
854    /// # }, |mut stream| async move {
855    /// // { 2: [2] }
856    /// # let mut results = Vec::new();
857    /// # for _ in 0..1 {
858    /// #     results.push(stream.next().await.unwrap());
859    /// # }
860    /// # results.sort();
861    /// # assert_eq!(results, vec![(2, 2)]);
862    /// # }));
863    /// # }
864    /// ```
865    pub fn filter_map_with_key<U, F>(
866        self,
867        f: impl IntoQuotedMut<'a, F, L> + Copy,
868    ) -> KeyedStream<K, U, L, B, O, R>
869    where
870        F: Fn((K, V)) -> Option<U> + 'a,
871        K: Clone,
872    {
873        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
874        let filter_map_f = q!({
875            let orig = f;
876            move |(k, v)| {
877                let out = orig((Clone::clone(&k), v));
878                out.map(|o| (k, o))
879            }
880        })
881        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
882        .into();
883
884        KeyedStream::new(
885            self.location.clone(),
886            HydroNode::FilterMap {
887                f: filter_map_f,
888                input: Box::new(self.ir_node.into_inner()),
889                metadata: self
890                    .location
891                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
892            },
893        )
894    }
895
896    /// Generates a keyed stream that maps each value `v` to a tuple `(v, x)`,
897    /// where `v` is the value of `other`, a bounded [`super::singleton::Singleton`] or
898    /// [`Optional`]. If `other` is an empty [`Optional`], no values will be produced.
899    ///
900    /// # Example
901    /// ```rust
902    /// # #[cfg(feature = "deploy")] {
903    /// # use hydro_lang::prelude::*;
904    /// # use futures::StreamExt;
905    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
906    /// let tick = process.tick();
907    /// let batch = process
908    ///   .source_iter(q!(vec![(1, 123), (1, 456), (2, 123)]))
909    ///   .into_keyed()
910    ///   .batch(&tick, nondet!(/** test */));
911    /// let count = batch.clone().entries().count(); // `count()` returns a singleton
912    /// batch.cross_singleton(count).all_ticks().entries()
913    /// # }, |mut stream| async move {
914    /// // { 1: [(123, 3), (456, 3)], 2: [(123, 3)] }
915    /// # let mut results = Vec::new();
916    /// # for _ in 0..3 {
917    /// #     results.push(stream.next().await.unwrap());
918    /// # }
919    /// # results.sort();
920    /// # assert_eq!(results, vec![(1, (123, 3)), (1, (456, 3)), (2, (123, 3))]);
921    /// # }));
922    /// # }
923    /// ```
924    pub fn cross_singleton<O2>(
925        self,
926        other: impl Into<Optional<O2, L, Bounded>>,
927    ) -> KeyedStream<K, (V, O2), L, B, O, R>
928    where
929        O2: Clone,
930    {
931        let other: Optional<O2, L, Bounded> = other.into();
932        check_matching_location(&self.location, &other.location);
933
934        Stream::new(
935            self.location.clone(),
936            HydroNode::CrossSingleton {
937                left: Box::new(self.ir_node.into_inner()),
938                right: Box::new(other.ir_node.into_inner()),
939                metadata: self
940                    .location
941                    .new_node_metadata(Stream::<((K, V), O2), L, B, O, R>::collection_kind()),
942            },
943        )
944        .map(q!(|((k, v), o2)| (k, (v, o2))))
945        .into_keyed()
946    }
947
948    /// For each value `v` in each group, transform `v` using `f` and then treat the
949    /// result as an [`Iterator`] to produce values one by one within the same group.
950    /// The implementation for [`Iterator`] for the output type `I` must produce items
951    /// in a **deterministic** order.
952    ///
953    /// For example, `I` could be a `Vec`, but not a `HashSet`. If the order of the items in `I` is
954    /// not deterministic, use [`KeyedStream::flat_map_unordered`] instead.
955    ///
956    /// # Example
957    /// ```rust
958    /// # #[cfg(feature = "deploy")] {
959    /// # use hydro_lang::prelude::*;
960    /// # use futures::StreamExt;
961    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
962    /// process
963    ///     .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
964    ///     .into_keyed()
965    ///     .flat_map_ordered(q!(|x| x))
966    /// #   .entries()
967    /// # }, |mut stream| async move {
968    /// // { 1: [2, 3, 4], 2: [5, 6] }
969    /// # let mut results = Vec::new();
970    /// # for _ in 0..5 {
971    /// #     results.push(stream.next().await.unwrap());
972    /// # }
973    /// # results.sort();
974    /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
975    /// # }));
976    /// # }
977    /// ```
978    pub fn flat_map_ordered<U, I, F>(
979        self,
980        f: impl IntoQuotedMut<'a, F, L> + Copy,
981    ) -> KeyedStream<K, U, L, B, O, R>
982    where
983        I: IntoIterator<Item = U>,
984        F: Fn(V) -> I + 'a,
985        K: Clone,
986    {
987        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
988        let flat_map_f = q!({
989            let orig = f;
990            move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
991        })
992        .splice_fn1_ctx::<(K, V), _>(&self.location)
993        .into();
994
995        KeyedStream::new(
996            self.location.clone(),
997            HydroNode::FlatMap {
998                f: flat_map_f,
999                input: Box::new(self.ir_node.into_inner()),
1000                metadata: self
1001                    .location
1002                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
1003            },
1004        )
1005    }
1006
1007    /// Like [`KeyedStream::flat_map_ordered`], but allows the implementation of [`Iterator`]
1008    /// for the output type `I` to produce items in any order.
1009    ///
1010    /// # Example
1011    /// ```rust
1012    /// # #[cfg(feature = "deploy")] {
1013    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
1014    /// # use futures::StreamExt;
1015    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
1016    /// process
1017    ///     .source_iter(q!(vec![
1018    ///         (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
1019    ///         (2, std::collections::HashSet::from_iter(vec![4, 5]))
1020    ///     ]))
1021    ///     .into_keyed()
1022    ///     .flat_map_unordered(q!(|x| x))
1023    /// #   .entries()
1024    /// # }, |mut stream| async move {
1025    /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
1026    /// # let mut results = Vec::new();
1027    /// # for _ in 0..4 {
1028    /// #     results.push(stream.next().await.unwrap());
1029    /// # }
1030    /// # results.sort();
1031    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
1032    /// # }));
1033    /// # }
1034    /// ```
1035    pub fn flat_map_unordered<U, I, F>(
1036        self,
1037        f: impl IntoQuotedMut<'a, F, L> + Copy,
1038    ) -> KeyedStream<K, U, L, B, NoOrder, R>
1039    where
1040        I: IntoIterator<Item = U>,
1041        F: Fn(V) -> I + 'a,
1042        K: Clone,
1043    {
1044        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1045        let flat_map_f = q!({
1046            let orig = f;
1047            move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
1048        })
1049        .splice_fn1_ctx::<(K, V), _>(&self.location)
1050        .into();
1051
1052        KeyedStream::new(
1053            self.location.clone(),
1054            HydroNode::FlatMap {
1055                f: flat_map_f,
1056                input: Box::new(self.ir_node.into_inner()),
1057                metadata: self
1058                    .location
1059                    .new_node_metadata(KeyedStream::<K, U, L, B, NoOrder, R>::collection_kind()),
1060            },
1061        )
1062    }
1063
1064    /// For each value `v` in each group, treat `v` as an [`Iterator`] and produce its items one by one
1065    /// within the same group. The implementation for [`Iterator`] for the value type `V` must produce
1066    /// items in a **deterministic** order.
1067    ///
1068    /// For example, `V` could be a `Vec`, but not a `HashSet`. If the order of the items in `V` is
1069    /// not deterministic, use [`KeyedStream::flatten_unordered`] instead.
1070    ///
1071    /// # Example
1072    /// ```rust
1073    /// # #[cfg(feature = "deploy")] {
1074    /// # use hydro_lang::prelude::*;
1075    /// # use futures::StreamExt;
1076    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1077    /// process
1078    ///     .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
1079    ///     .into_keyed()
1080    ///     .flatten_ordered()
1081    /// #   .entries()
1082    /// # }, |mut stream| async move {
1083    /// // { 1: [2, 3, 4], 2: [5, 6] }
1084    /// # let mut results = Vec::new();
1085    /// # for _ in 0..5 {
1086    /// #     results.push(stream.next().await.unwrap());
1087    /// # }
1088    /// # results.sort();
1089    /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
1090    /// # }));
1091    /// # }
1092    /// ```
1093    pub fn flatten_ordered<U>(self) -> KeyedStream<K, U, L, B, O, R>
1094    where
1095        V: IntoIterator<Item = U>,
1096        K: Clone,
1097    {
1098        self.flat_map_ordered(q!(|d| d))
1099    }
1100
1101    /// Like [`KeyedStream::flatten_ordered`], but allows the implementation of [`Iterator`]
1102    /// for the value type `V` to produce items in any order.
1103    ///
1104    /// # Example
1105    /// ```rust
1106    /// # #[cfg(feature = "deploy")] {
1107    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
1108    /// # use futures::StreamExt;
1109    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
1110    /// process
1111    ///     .source_iter(q!(vec![
1112    ///         (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
1113    ///         (2, std::collections::HashSet::from_iter(vec![4, 5]))
1114    ///     ]))
1115    ///     .into_keyed()
1116    ///     .flatten_unordered()
1117    /// #   .entries()
1118    /// # }, |mut stream| async move {
1119    /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
1120    /// # let mut results = Vec::new();
1121    /// # for _ in 0..4 {
1122    /// #     results.push(stream.next().await.unwrap());
1123    /// # }
1124    /// # results.sort();
1125    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
1126    /// # }));
1127    /// # }
1128    /// ```
1129    pub fn flatten_unordered<U>(self) -> KeyedStream<K, U, L, B, NoOrder, R>
1130    where
1131        V: IntoIterator<Item = U>,
1132        K: Clone,
1133    {
1134        self.flat_map_unordered(q!(|d| d))
1135    }
1136
1137    /// An operator which allows you to "inspect" each element of a stream without
1138    /// modifying it. The closure `f` is called on a reference to each value. This is
1139    /// mainly useful for debugging, and should not be used to generate side-effects.
1140    ///
1141    /// # Example
1142    /// ```rust
1143    /// # #[cfg(feature = "deploy")] {
1144    /// # use hydro_lang::prelude::*;
1145    /// # use futures::StreamExt;
1146    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1147    /// process
1148    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1149    ///     .into_keyed()
1150    ///     .inspect(q!(|v| println!("{}", v)))
1151    /// #   .entries()
1152    /// # }, |mut stream| async move {
1153    /// # let mut results = Vec::new();
1154    /// # for _ in 0..3 {
1155    /// #     results.push(stream.next().await.unwrap());
1156    /// # }
1157    /// # results.sort();
1158    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1159    /// # }));
1160    /// # }
1161    /// ```
1162    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1163    where
1164        F: Fn(&V) + 'a,
1165    {
1166        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1167        let inspect_f = q!({
1168            let orig = f;
1169            move |t: &(_, _)| orig(&t.1)
1170        })
1171        .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1172        .into();
1173
1174        KeyedStream::new(
1175            self.location.clone(),
1176            HydroNode::Inspect {
1177                f: inspect_f,
1178                input: Box::new(self.ir_node.into_inner()),
1179                metadata: self.location.new_node_metadata(Self::collection_kind()),
1180            },
1181        )
1182    }
1183
1184    /// An operator which allows you to "inspect" each element of a stream without
1185    /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1186    /// mainly useful for debugging, and should not be used to generate side-effects.
1187    ///
1188    /// # Example
1189    /// ```rust
1190    /// # #[cfg(feature = "deploy")] {
1191    /// # use hydro_lang::prelude::*;
1192    /// # use futures::StreamExt;
1193    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1194    /// process
1195    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1196    ///     .into_keyed()
1197    ///     .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1198    /// #   .entries()
1199    /// # }, |mut stream| async move {
1200    /// # let mut results = Vec::new();
1201    /// # for _ in 0..3 {
1202    /// #     results.push(stream.next().await.unwrap());
1203    /// # }
1204    /// # results.sort();
1205    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1206    /// # }));
1207    /// # }
1208    /// ```
1209    pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1210    where
1211        F: Fn(&(K, V)) + 'a,
1212    {
1213        let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1214
1215        KeyedStream::new(
1216            self.location.clone(),
1217            HydroNode::Inspect {
1218                f: inspect_f,
1219                input: Box::new(self.ir_node.into_inner()),
1220                metadata: self.location.new_node_metadata(Self::collection_kind()),
1221            },
1222        )
1223    }
1224
1225    /// An operator which allows you to "name" a `HydroNode`.
1226    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1227    pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R> {
1228        {
1229            let mut node = self.ir_node.borrow_mut();
1230            let metadata = node.metadata_mut();
1231            metadata.tag = Some(name.to_owned());
1232        }
1233        self
1234    }
1235
1236    /// A special case of [`Stream::scan`] for keyed streams. For each key group the values are transformed via the `f` combinator.
1237    ///
1238    /// Unlike [`KeyedStream::fold`] which only returns the final accumulated value, `scan` produces a new stream
1239    /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
1240    /// early by returning `None`.
1241    ///
1242    /// The function takes a mutable reference to the accumulator and the current element, and returns
1243    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1244    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1245    ///
1246    /// # Example
1247    /// ```rust
1248    /// # #[cfg(feature = "deploy")] {
1249    /// # use hydro_lang::prelude::*;
1250    /// # use futures::StreamExt;
1251    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1252    /// process
1253    ///     .source_iter(q!(vec![(0, 1), (0, 3), (1, 3), (1, 4)]))
1254    ///     .into_keyed()
1255    ///     .scan(
1256    ///         q!(|| 0),
1257    ///         q!(|acc, x| {
1258    ///             *acc += x;
1259    ///             if *acc % 2 == 0 { None } else { Some(*acc) }
1260    ///         }),
1261    ///     )
1262    /// #   .entries()
1263    /// # }, |mut stream| async move {
1264    /// // Output: { 0: [1], 1: [3, 7] }
1265    /// # let mut results = Vec::new();
1266    /// # for _ in 0..3 {
1267    /// #     results.push(stream.next().await.unwrap());
1268    /// # }
1269    /// # results.sort();
1270    /// # assert_eq!(results, vec![(0, 1), (1, 3), (1, 7)]);
1271    /// # }));
1272    /// # }
1273    /// ```
1274    pub fn scan<A, U, I, F>(
1275        self,
1276        init: impl IntoQuotedMut<'a, I, L> + Copy,
1277        f: impl IntoQuotedMut<'a, F, L> + Copy,
1278    ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1279    where
1280        O: IsOrdered,
1281        R: IsExactlyOnce,
1282        K: Clone + Eq + Hash,
1283        I: Fn() -> A + 'a,
1284        F: Fn(&mut A, V) -> Option<U> + 'a,
1285    {
1286        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1287        self.make_totally_ordered().make_exactly_once().generator(
1288            init,
1289            q!({
1290                let orig = f;
1291                move |state, v| {
1292                    if let Some(out) = orig(state, v) {
1293                        Generate::Yield(out)
1294                    } else {
1295                        Generate::Break
1296                    }
1297                }
1298            }),
1299        )
1300    }
1301
1302    /// Iteratively processes the elements in each group using a state machine that can yield
1303    /// elements as it processes its inputs. This is designed to mirror the unstable generator
1304    /// syntax in Rust, without requiring special syntax.
1305    ///
1306    /// Like [`KeyedStream::scan`], this function takes in an initializer that emits the initial
1307    /// state for each group. The second argument defines the processing logic, taking in a
1308    /// mutable reference to the group's state and the value to be processed. It emits a
1309    /// [`Generate`] value, whose variants define what is emitted and whether further inputs
1310    /// should be processed.
1311    ///
1312    /// # Example
1313    /// ```rust
1314    /// # #[cfg(feature = "deploy")] {
1315    /// # use hydro_lang::prelude::*;
1316    /// # use futures::StreamExt;
1317    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1318    /// process
1319    ///     .source_iter(q!(vec![(0, 1), (0, 3), (0, 100), (0, 10), (1, 3), (1, 4), (1, 3)]))
1320    ///     .into_keyed()
1321    ///     .generator(
1322    ///         q!(|| 0),
1323    ///         q!(|acc, x| {
1324    ///             *acc += x;
1325    ///             if *acc > 100 {
1326    ///                 hydro_lang::live_collections::keyed_stream::Generate::Return(
1327    ///                     "done!".to_owned()
1328    ///                 )
1329    ///             } else if *acc % 2 == 0 {
1330    ///                 hydro_lang::live_collections::keyed_stream::Generate::Yield(
1331    ///                     "even".to_owned()
1332    ///                 )
1333    ///             } else {
1334    ///                 hydro_lang::live_collections::keyed_stream::Generate::Continue
1335    ///             }
1336    ///         }),
1337    ///     )
1338    /// #   .entries()
1339    /// # }, |mut stream| async move {
1340    /// // Output: { 0: ["even", "done!"], 1: ["even"] }
1341    /// # let mut results = Vec::new();
1342    /// # for _ in 0..3 {
1343    /// #     results.push(stream.next().await.unwrap());
1344    /// # }
1345    /// # results.sort();
1346    /// # assert_eq!(results, vec![(0, "done!".to_owned()), (0, "even".to_owned()), (1, "even".to_owned())]);
1347    /// # }));
1348    /// # }
1349    /// ```
1350    pub fn generator<A, U, I, F>(
1351        self,
1352        init: impl IntoQuotedMut<'a, I, L> + Copy,
1353        f: impl IntoQuotedMut<'a, F, L> + Copy,
1354    ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1355    where
1356        O: IsOrdered,
1357        R: IsExactlyOnce,
1358        K: Clone + Eq + Hash,
1359        I: Fn() -> A + 'a,
1360        F: Fn(&mut A, V) -> Generate<U> + 'a,
1361    {
1362        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1363        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1364
1365        let this = self.make_totally_ordered().make_exactly_once();
1366
1367        let scan_init = q!(|| HashMap::new())
1368            .splice_fn0_ctx::<HashMap<K, Option<A>>>(&this.location)
1369            .into();
1370        let scan_f = q!(move |acc: &mut HashMap<_, _>, (k, v)| {
1371            let existing_state = acc.entry(Clone::clone(&k)).or_insert_with(|| Some(init()));
1372            if let Some(existing_state_value) = existing_state {
1373                match f(existing_state_value, v) {
1374                    Generate::Yield(out) => Some(Some((k, out))),
1375                    Generate::Return(out) => {
1376                        let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1377                        Some(Some((k, out)))
1378                    }
1379                    Generate::Break => {
1380                        let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1381                        Some(None)
1382                    }
1383                    Generate::Continue => Some(None),
1384                }
1385            } else {
1386                Some(None)
1387            }
1388        })
1389        .splice_fn2_borrow_mut_ctx::<HashMap<K, Option<A>>, (K, V), _>(&this.location)
1390        .into();
1391
1392        let scan_node = HydroNode::Scan {
1393            init: scan_init,
1394            acc: scan_f,
1395            input: Box::new(this.ir_node.into_inner()),
1396            metadata: this.location.new_node_metadata(Stream::<
1397                Option<(K, U)>,
1398                L,
1399                B,
1400                TotalOrder,
1401                ExactlyOnce,
1402            >::collection_kind()),
1403        };
1404
1405        let flatten_f = q!(|d| d)
1406            .splice_fn1_ctx::<Option<(K, U)>, _>(&this.location)
1407            .into();
1408        let flatten_node = HydroNode::FlatMap {
1409            f: flatten_f,
1410            input: Box::new(scan_node),
1411            metadata: this.location.new_node_metadata(KeyedStream::<
1412                K,
1413                U,
1414                L,
1415                B,
1416                TotalOrder,
1417                ExactlyOnce,
1418            >::collection_kind()),
1419        };
1420
1421        KeyedStream::new(this.location, flatten_node)
1422    }
1423
1424    /// A variant of [`Stream::fold`], intended for keyed streams. The aggregation is executed
1425    /// in-order across the values in each group. But the aggregation function returns a boolean,
1426    /// which when true indicates that the aggregated result is complete and can be released to
1427    /// downstream computation. Unlike [`KeyedStream::fold`], this means that even if the input
1428    /// stream is [`super::boundedness::Unbounded`], the outputs of the fold can be processed like
1429    /// normal stream elements.
1430    ///
1431    /// # Example
1432    /// ```rust
1433    /// # #[cfg(feature = "deploy")] {
1434    /// # use hydro_lang::prelude::*;
1435    /// # use futures::StreamExt;
1436    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1437    /// process
1438    ///     .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1439    ///     .into_keyed()
1440    ///     .fold_early_stop(
1441    ///         q!(|| 0),
1442    ///         q!(|acc, x| {
1443    ///             *acc += x;
1444    ///             x % 2 == 0
1445    ///         }),
1446    ///     )
1447    /// #   .entries()
1448    /// # }, |mut stream| async move {
1449    /// // Output: { 0: 2, 1: 9 }
1450    /// # let mut results = Vec::new();
1451    /// # for _ in 0..2 {
1452    /// #     results.push(stream.next().await.unwrap());
1453    /// # }
1454    /// # results.sort();
1455    /// # assert_eq!(results, vec![(0, 2), (1, 9)]);
1456    /// # }));
1457    /// # }
1458    /// ```
1459    pub fn fold_early_stop<A, I, F>(
1460        self,
1461        init: impl IntoQuotedMut<'a, I, L> + Copy,
1462        f: impl IntoQuotedMut<'a, F, L> + Copy,
1463    ) -> KeyedSingleton<K, A, L, B::WhenValueBounded>
1464    where
1465        O: IsOrdered,
1466        R: IsExactlyOnce,
1467        K: Clone + Eq + Hash,
1468        I: Fn() -> A + 'a,
1469        F: Fn(&mut A, V) -> bool + 'a,
1470    {
1471        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1472        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1473        let out_without_bound_cast = self.generator(
1474            q!(move || Some(init())),
1475            q!(move |key_state, v| {
1476                if let Some(key_state_value) = key_state.as_mut() {
1477                    if f(key_state_value, v) {
1478                        Generate::Return(key_state.take().unwrap())
1479                    } else {
1480                        Generate::Continue
1481                    }
1482                } else {
1483                    unreachable!()
1484                }
1485            }),
1486        );
1487
1488        KeyedSingleton::new(
1489            out_without_bound_cast.location.clone(),
1490            HydroNode::Cast {
1491                inner: Box::new(out_without_bound_cast.ir_node.into_inner()),
1492                metadata: out_without_bound_cast
1493                    .location
1494                    .new_node_metadata(
1495                        KeyedSingleton::<K, A, L, B::WhenValueBounded>::collection_kind(),
1496                    ),
1497            },
1498        )
1499    }
1500
1501    /// Gets the first element inside each group of values as a [`KeyedSingleton`] that preserves
1502    /// the original group keys. Requires the input stream to have [`TotalOrder`] guarantees,
1503    /// otherwise the first element would be non-deterministic.
1504    ///
1505    /// # Example
1506    /// ```rust
1507    /// # #[cfg(feature = "deploy")] {
1508    /// # use hydro_lang::prelude::*;
1509    /// # use futures::StreamExt;
1510    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1511    /// process
1512    ///     .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1513    ///     .into_keyed()
1514    ///     .first()
1515    /// #   .entries()
1516    /// # }, |mut stream| async move {
1517    /// // Output: { 0: 2, 1: 3 }
1518    /// # let mut results = Vec::new();
1519    /// # for _ in 0..2 {
1520    /// #     results.push(stream.next().await.unwrap());
1521    /// # }
1522    /// # results.sort();
1523    /// # assert_eq!(results, vec![(0, 2), (1, 3)]);
1524    /// # }));
1525    /// # }
1526    /// ```
1527    pub fn first(self) -> KeyedSingleton<K, V, L, B::WhenValueBounded>
1528    where
1529        O: IsOrdered,
1530        R: IsExactlyOnce,
1531        K: Clone + Eq + Hash,
1532    {
1533        self.fold_early_stop(
1534            q!(|| None),
1535            q!(|acc, v| {
1536                *acc = Some(v);
1537                true
1538            }),
1539        )
1540        .map(q!(|v| v.unwrap()))
1541    }
1542
1543    /// Assigns a zero-based index to each value within each key group, emitting
1544    /// `(K, (index, V))` tuples with per-key sequential indices.
1545    ///
1546    /// The output keyed stream has [`TotalOrder`] and [`ExactlyOnce`] guarantees.
1547    /// This is a streaming operator that processes elements as they arrive.
1548    ///
1549    /// # Example
1550    /// ```rust
1551    /// # #[cfg(feature = "deploy")] {
1552    /// # use hydro_lang::prelude::*;
1553    /// # use futures::StreamExt;
1554    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1555    /// process
1556    ///     .source_iter(q!(vec![(1, 10), (2, 20), (1, 30)]))
1557    ///     .into_keyed()
1558    ///     .enumerate()
1559    /// # .entries()
1560    /// # }, |mut stream| async move {
1561    /// // per-key indices: { 1: [(0, 10), (1, 30)], 2: [(0, 20)] }
1562    /// # let mut results = Vec::new();
1563    /// # for _ in 0..3 {
1564    /// #     results.push(stream.next().await.unwrap());
1565    /// # }
1566    /// # let key1: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
1567    /// # let key2: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
1568    /// # assert_eq!(key1, vec![(0, 10), (1, 30)]);
1569    /// # assert_eq!(key2, vec![(0, 20)]);
1570    /// # }));
1571    /// # }
1572    /// ```
1573    pub fn enumerate(self) -> KeyedStream<K, (usize, V), L, B, TotalOrder, ExactlyOnce>
1574    where
1575        O: IsOrdered,
1576        R: IsExactlyOnce,
1577        K: Eq + Hash + Clone,
1578    {
1579        self.scan(
1580            q!(|| 0),
1581            q!(|acc, next| {
1582                let curr = *acc;
1583                *acc += 1;
1584                Some((curr, next))
1585            }),
1586        )
1587    }
1588
1589    /// Counts the number of elements in each group, producing a [`KeyedSingleton`] with the counts.
1590    ///
1591    /// # Example
1592    /// ```rust
1593    /// # #[cfg(feature = "deploy")] {
1594    /// # use hydro_lang::prelude::*;
1595    /// # use futures::StreamExt;
1596    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1597    /// let tick = process.tick();
1598    /// let numbers = process
1599    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4), (1, 5)]))
1600    ///     .into_keyed();
1601    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1602    /// batch
1603    ///     .value_counts()
1604    ///     .entries()
1605    ///     .all_ticks()
1606    /// # }, |mut stream| async move {
1607    /// // (1, 3), (2, 2)
1608    /// # let mut results = Vec::new();
1609    /// # for _ in 0..2 {
1610    /// #     results.push(stream.next().await.unwrap());
1611    /// # }
1612    /// # results.sort();
1613    /// # assert_eq!(results, vec![(1, 3), (2, 2)]);
1614    /// # }));
1615    /// # }
1616    /// ```
1617    pub fn value_counts(self) -> KeyedSingleton<K, usize, L, B::WhenValueUnbounded>
1618    where
1619        R: IsExactlyOnce,
1620        K: Eq + Hash,
1621    {
1622        self.make_exactly_once()
1623            .assume_ordering_trusted(
1624                nondet!(/** ordering within each group affects neither result nor intermediates */),
1625            )
1626            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1627    }
1628
1629    /// Like [`Stream::fold`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
1630    /// group via the `comb` closure.
1631    ///
1632    /// Depending on the input stream guarantees, the closure may need to be commutative
1633    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1634    ///
1635    /// If the input and output value types are the same and do not require initialization then use
1636    /// [`KeyedStream::reduce`].
1637    ///
1638    /// # Example
1639    /// ```rust
1640    /// # #[cfg(feature = "deploy")] {
1641    /// # use hydro_lang::prelude::*;
1642    /// # use futures::StreamExt;
1643    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1644    /// let tick = process.tick();
1645    /// let numbers = process
1646    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1647    ///     .into_keyed();
1648    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1649    /// batch
1650    ///     .fold(q!(|| false), q!(|acc, x| *acc |= x))
1651    ///     .entries()
1652    ///     .all_ticks()
1653    /// # }, |mut stream| async move {
1654    /// // (1, false), (2, true)
1655    /// # let mut results = Vec::new();
1656    /// # for _ in 0..2 {
1657    /// #     results.push(stream.next().await.unwrap());
1658    /// # }
1659    /// # results.sort();
1660    /// # assert_eq!(results, vec![(1, false), (2, true)]);
1661    /// # }));
1662    /// # }
1663    /// ```
1664    pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V), C, Idemp>(
1665        self,
1666        init: impl IntoQuotedMut<'a, I, L>,
1667        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1668    ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>
1669    where
1670        K: Eq + Hash,
1671        C: ValidCommutativityFor<O>,
1672        Idemp: ValidIdempotenceFor<R>,
1673    {
1674        let init = init.splice_fn0_ctx(&self.location).into();
1675        let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1676        proof.register_proof(&comb);
1677
1678        let ordered = self
1679            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1680            .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1681
1682        KeyedSingleton::new(
1683            ordered.location.clone(),
1684            HydroNode::FoldKeyed {
1685                init,
1686                acc: comb.into(),
1687                input: Box::new(ordered.ir_node.into_inner()),
1688                metadata: ordered.location.new_node_metadata(KeyedSingleton::<
1689                    K,
1690                    A,
1691                    L,
1692                    B::WhenValueUnbounded,
1693                >::collection_kind()),
1694            },
1695        )
1696    }
1697
1698    /// Like [`Stream::reduce`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
1699    /// group via the `comb` closure.
1700    ///
1701    /// Depending on the input stream guarantees, the closure may need to be commutative
1702    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1703    ///
1704    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold`].
1705    ///
1706    /// # Example
1707    /// ```rust
1708    /// # #[cfg(feature = "deploy")] {
1709    /// # use hydro_lang::prelude::*;
1710    /// # use futures::StreamExt;
1711    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1712    /// let tick = process.tick();
1713    /// let numbers = process
1714    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1715    ///     .into_keyed();
1716    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1717    /// batch
1718    ///     .reduce(q!(|acc, x| *acc |= x))
1719    ///     .entries()
1720    ///     .all_ticks()
1721    /// # }, |mut stream| async move {
1722    /// // (1, false), (2, true)
1723    /// # let mut results = Vec::new();
1724    /// # for _ in 0..2 {
1725    /// #     results.push(stream.next().await.unwrap());
1726    /// # }
1727    /// # results.sort();
1728    /// # assert_eq!(results, vec![(1, false), (2, true)]);
1729    /// # }));
1730    /// # }
1731    /// ```
1732    pub fn reduce<F: Fn(&mut V, V) + 'a, C, Idemp>(
1733        self,
1734        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1735    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1736    where
1737        K: Eq + Hash,
1738        C: ValidCommutativityFor<O>,
1739        Idemp: ValidIdempotenceFor<R>,
1740    {
1741        let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1742        proof.register_proof(&f);
1743
1744        let ordered = self
1745            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1746            .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1747
1748        KeyedSingleton::new(
1749            ordered.location.clone(),
1750            HydroNode::ReduceKeyed {
1751                f: f.into(),
1752                input: Box::new(ordered.ir_node.into_inner()),
1753                metadata: ordered.location.new_node_metadata(KeyedSingleton::<
1754                    K,
1755                    V,
1756                    L,
1757                    B::WhenValueUnbounded,
1758                >::collection_kind()),
1759            },
1760        )
1761    }
1762
1763    /// A special case of [`KeyedStream::reduce`] where tuples with keys less than the watermark
1764    /// are automatically deleted.
1765    ///
1766    /// Depending on the input stream guarantees, the closure may need to be commutative
1767    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1768    ///
1769    /// # Example
1770    /// ```rust
1771    /// # #[cfg(feature = "deploy")] {
1772    /// # use hydro_lang::prelude::*;
1773    /// # use futures::StreamExt;
1774    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1775    /// let tick = process.tick();
1776    /// let watermark = tick.singleton(q!(2));
1777    /// let numbers = process
1778    ///     .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1779    ///     .into_keyed();
1780    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1781    /// batch
1782    ///     .reduce_watermark(watermark, q!(|acc, x| *acc |= x))
1783    ///     .entries()
1784    ///     .all_ticks()
1785    /// # }, |mut stream| async move {
1786    /// // (2, true)
1787    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1788    /// # }));
1789    /// # }
1790    /// ```
1791    pub fn reduce_watermark<O2, F, C, Idemp>(
1792        self,
1793        other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1794        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1795    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1796    where
1797        K: Eq + Hash,
1798        O2: Clone,
1799        F: Fn(&mut V, V) + 'a,
1800        C: ValidCommutativityFor<O>,
1801        Idemp: ValidIdempotenceFor<R>,
1802    {
1803        let other: Optional<O2, Tick<L::Root>, Bounded> = other.into();
1804        check_matching_location(&self.location.root(), other.location.outer());
1805        let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1806        proof.register_proof(&f);
1807
1808        let ordered = self
1809            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1810            .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1811
1812        KeyedSingleton::new(
1813            ordered.location.clone(),
1814            HydroNode::ReduceKeyedWatermark {
1815                f: f.into(),
1816                input: Box::new(ordered.ir_node.into_inner()),
1817                watermark: Box::new(other.ir_node.into_inner()),
1818                metadata: ordered.location.new_node_metadata(KeyedSingleton::<
1819                    K,
1820                    V,
1821                    L,
1822                    B::WhenValueUnbounded,
1823                >::collection_kind()),
1824            },
1825        )
1826    }
1827
1828    /// Given a bounded stream of keys `K`, returns a new keyed stream containing only the groups
1829    /// whose keys are not in the bounded stream.
1830    ///
1831    /// # Example
1832    /// ```rust
1833    /// # #[cfg(feature = "deploy")] {
1834    /// # use hydro_lang::prelude::*;
1835    /// # use futures::StreamExt;
1836    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1837    /// let tick = process.tick();
1838    /// let keyed_stream = process
1839    ///     .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1840    ///     .batch(&tick, nondet!(/** test */))
1841    ///     .into_keyed();
1842    /// let keys_to_remove = process
1843    ///     .source_iter(q!(vec![1, 2]))
1844    ///     .batch(&tick, nondet!(/** test */));
1845    /// keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
1846    /// #   .entries()
1847    /// # }, |mut stream| async move {
1848    /// // { 3: ['c'], 4: ['d'] }
1849    /// # let mut results = Vec::new();
1850    /// # for _ in 0..2 {
1851    /// #     results.push(stream.next().await.unwrap());
1852    /// # }
1853    /// # results.sort();
1854    /// # assert_eq!(results, vec![(3, 'c'), (4, 'd')]);
1855    /// # }));
1856    /// # }
1857    /// ```
1858    pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1859        self,
1860        other: Stream<K, L, Bounded, O2, R2>,
1861    ) -> Self
1862    where
1863        K: Eq + Hash,
1864    {
1865        check_matching_location(&self.location, &other.location);
1866
1867        KeyedStream::new(
1868            self.location.clone(),
1869            HydroNode::AntiJoin {
1870                pos: Box::new(self.ir_node.into_inner()),
1871                neg: Box::new(other.ir_node.into_inner()),
1872                metadata: self.location.new_node_metadata(Self::collection_kind()),
1873            },
1874        )
1875    }
1876
1877    /// Emit a keyed stream containing keys shared between two keyed streams,
1878    /// where each value in the output keyed stream is a tuple of
1879    /// (self's value, other's value).
1880    /// If there are multiple values for the same key, this performs a cross product
1881    /// for each matching key.
1882    ///
1883    /// # Example
1884    /// ```rust
1885    /// # #[cfg(feature = "deploy")] {
1886    /// # use hydro_lang::prelude::*;
1887    /// # use futures::StreamExt;
1888    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1889    /// let tick = process.tick();
1890    /// let keyed_data = process
1891    ///     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
1892    ///     .into_keyed()
1893    ///     .batch(&tick, nondet!(/** test */));
1894    /// let other_data = process
1895    ///     .source_iter(q!(vec![(1, 100), (2, 200), (2, 201)]))
1896    ///     .into_keyed()
1897    ///     .batch(&tick, nondet!(/** test */));
1898    /// keyed_data.join_keyed_stream(other_data).entries().all_ticks()
1899    /// # }, |mut stream| async move {
1900    /// // { 1: [(10, 100), (11, 100)], 2: [(20, 200), (20, 201)] } in any order
1901    /// # let mut results = vec![];
1902    /// # for _ in 0..4 {
1903    /// #     results.push(stream.next().await.unwrap());
1904    /// # }
1905    /// # results.sort();
1906    /// # assert_eq!(results, vec![(1, (10, 100)), (1, (11, 100)), (2, (20, 200)), (2, (20, 201))]);
1907    /// # }));
1908    /// # }
1909    /// ```
1910    pub fn join_keyed_stream<V2, O2: Ordering, R2: Retries>(
1911        self,
1912        other: KeyedStream<K, V2, L, B, O2, R2>,
1913    ) -> KeyedStream<K, (V, V2), L, B, NoOrder, <R as MinRetries<R2>>::Min>
1914    where
1915        K: Eq + Hash,
1916        R: MinRetries<R2>,
1917    {
1918        self.entries().join(other.entries()).into_keyed()
1919    }
1920
1921    /// Deduplicates values within each key group, emitting each unique value per key
1922    /// exactly once.
1923    ///
1924    /// # Example
1925    /// ```rust
1926    /// # #[cfg(feature = "deploy")] {
1927    /// # use hydro_lang::prelude::*;
1928    /// # use futures::StreamExt;
1929    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1930    /// process
1931    ///     .source_iter(q!(vec![(1, 10), (2, 20), (1, 10), (2, 30), (1, 20)]))
1932    ///     .into_keyed()
1933    ///     .unique()
1934    /// # .entries()
1935    /// # }, |mut stream| async move {
1936    /// // unique values per key: { 1: [10, 20], 2: [20, 30] }
1937    /// # let mut results = Vec::new();
1938    /// # for _ in 0..4 {
1939    /// #     results.push(stream.next().await.unwrap());
1940    /// # }
1941    /// # let mut key1: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
1942    /// # let mut key2: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
1943    /// # key1.sort();
1944    /// # key2.sort();
1945    /// # assert_eq!(key1, vec![10, 20]);
1946    /// # assert_eq!(key2, vec![20, 30]);
1947    /// # }));
1948    /// # }
1949    /// ```
1950    pub fn unique(self) -> KeyedStream<K, V, L, B, NoOrder, ExactlyOnce>
1951    where
1952        K: Eq + Hash + Clone,
1953        V: Eq + Hash + Clone,
1954    {
1955        self.entries().unique().into_keyed()
1956    }
1957
1958    /// Sorts the values within each key group in ascending order.
1959    ///
1960    /// The output keyed stream has a [`TotalOrder`] guarantee on the values within
1961    /// each group. This operator will block until all elements in the input stream
1962    /// are available, so it requires the input stream to be [`Bounded`].
1963    ///
1964    /// # Example
1965    /// ```rust
1966    /// # #[cfg(feature = "deploy")] {
1967    /// # use hydro_lang::prelude::*;
1968    /// # use futures::StreamExt;
1969    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1970    /// let tick = process.tick();
1971    /// let numbers = process
1972    ///     .source_iter(q!(vec![(1, 3), (2, 1), (1, 1), (2, 2)]))
1973    ///     .into_keyed();
1974    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1975    /// batch.sort().all_ticks()
1976    /// # .entries()
1977    /// # }, |mut stream| async move {
1978    /// // values sorted within each key: { 1: [1, 3], 2: [1, 2] }
1979    /// # let mut results = Vec::new();
1980    /// # for _ in 0..4 {
1981    /// #     results.push(stream.next().await.unwrap());
1982    /// # }
1983    /// # let key1_vals: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
1984    /// # let key2_vals: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
1985    /// # assert_eq!(key1_vals, vec![1, 3]);
1986    /// # assert_eq!(key2_vals, vec![1, 2]);
1987    /// # }));
1988    /// # }
1989    /// ```
1990    pub fn sort(self) -> KeyedStream<K, V, L, Bounded, TotalOrder, R>
1991    where
1992        B: IsBounded,
1993        K: Ord,
1994        V: Ord,
1995    {
1996        self.entries().sort().into_keyed()
1997    }
1998
1999    /// Produces a new keyed stream that combines the groups of the inputs by first emitting the
2000    /// elements of the `self` stream, and then emits the elements of the `other` stream (if a key
2001    /// is only present in one of the inputs, its values are passed through as-is). The output has
2002    /// a [`TotalOrder`] guarantee if and only if both inputs have a [`TotalOrder`] guarantee.
2003    ///
2004    /// Currently, both input streams must be [`Bounded`]. This operator will block
2005    /// on the first stream until all its elements are available. In a future version,
2006    /// we will relax the requirement on the `other` stream.
2007    ///
2008    /// # Example
2009    /// ```rust
2010    /// # #[cfg(feature = "deploy")] {
2011    /// # use hydro_lang::prelude::*;
2012    /// # use futures::StreamExt;
2013    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2014    /// let tick = process.tick();
2015    /// let numbers = process.source_iter(q!(vec![(0, 1), (1, 3)])).into_keyed();
2016    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2017    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2018    /// # .entries()
2019    /// # }, |mut stream| async move {
2020    /// // { 0: [2, 1], 1: [4, 3] }
2021    /// # let mut results = Vec::new();
2022    /// # for _ in 0..4 {
2023    /// #     results.push(stream.next().await.unwrap());
2024    /// # }
2025    /// # results.sort();
2026    /// # assert_eq!(results, vec![(0, 1), (0, 2), (1, 3), (1, 4)]);
2027    /// # }));
2028    /// # }
2029    /// ```
2030    pub fn chain<O2: Ordering, R2: Retries>(
2031        self,
2032        other: KeyedStream<K, V, L, Bounded, O2, R2>,
2033    ) -> KeyedStream<K, V, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2034    where
2035        B: IsBounded,
2036        O: MinOrder<O2>,
2037        R: MinRetries<R2>,
2038    {
2039        let this = self.make_bounded();
2040        check_matching_location(&this.location, &other.location);
2041
2042        KeyedStream::new(
2043            this.location.clone(),
2044            HydroNode::Chain {
2045                first: Box::new(this.ir_node.into_inner()),
2046                second: Box::new(other.ir_node.into_inner()),
2047                metadata: this.location.new_node_metadata(KeyedStream::<
2048                    K,
2049                    V,
2050                    L,
2051                    Bounded,
2052                    <O as MinOrder<O2>>::Min,
2053                    <R as MinRetries<R2>>::Min,
2054                >::collection_kind()),
2055            },
2056        )
2057    }
2058
2059    /// Emit a keyed stream containing keys shared between the keyed stream and the
2060    /// keyed singleton, where each value in the output keyed stream is a tuple of
2061    /// (the keyed stream's value, the keyed singleton's value).
2062    ///
2063    /// # Example
2064    /// ```rust
2065    /// # #[cfg(feature = "deploy")] {
2066    /// # use hydro_lang::prelude::*;
2067    /// # use futures::StreamExt;
2068    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2069    /// let tick = process.tick();
2070    /// let keyed_data = process
2071    ///     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2072    ///     .into_keyed()
2073    ///     .batch(&tick, nondet!(/** test */));
2074    /// let singleton_data = process
2075    ///     .source_iter(q!(vec![(1, 100), (2, 200)]))
2076    ///     .into_keyed()
2077    ///     .batch(&tick, nondet!(/** test */))
2078    ///     .first();
2079    /// keyed_data.join_keyed_singleton(singleton_data).entries().all_ticks()
2080    /// # }, |mut stream| async move {
2081    /// // { 1: [(10, 100), (11, 100)], 2: [(20, 200)] } in any order
2082    /// # let mut results = vec![];
2083    /// # for _ in 0..3 {
2084    /// #     results.push(stream.next().await.unwrap());
2085    /// # }
2086    /// # results.sort();
2087    /// # assert_eq!(results, vec![(1, (10, 100)), (1, (11, 100)), (2, (20, 200))]);
2088    /// # }));
2089    /// # }
2090    /// ```
2091    pub fn join_keyed_singleton<V2: Clone>(
2092        self,
2093        keyed_singleton: KeyedSingleton<K, V2, L, Bounded>,
2094    ) -> KeyedStream<K, (V, V2), L, Bounded, NoOrder, R>
2095    where
2096        B: IsBounded,
2097        K: Eq + Hash,
2098    {
2099        keyed_singleton
2100            .join_keyed_stream(self.make_bounded())
2101            .map(q!(|(v2, v)| (v, v2)))
2102    }
2103
2104    /// Gets the values associated with a specific key from the keyed stream.
2105    /// Returns an empty stream if the key is `None` or there are no associated values.
2106    ///
2107    /// # Example
2108    /// ```rust
2109    /// # #[cfg(feature = "deploy")] {
2110    /// # use hydro_lang::prelude::*;
2111    /// # use futures::StreamExt;
2112    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2113    /// let tick = process.tick();
2114    /// let keyed_data = process
2115    ///     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2116    ///     .into_keyed()
2117    ///     .batch(&tick, nondet!(/** test */));
2118    /// let key = tick.singleton(q!(1));
2119    /// keyed_data.get(key).all_ticks()
2120    /// # }, |mut stream| async move {
2121    /// // 10, 11 in any order
2122    /// # let mut results = vec![];
2123    /// # for _ in 0..2 {
2124    /// #     results.push(stream.next().await.unwrap());
2125    /// # }
2126    /// # results.sort();
2127    /// # assert_eq!(results, vec![10, 11]);
2128    /// # }));
2129    /// # }
2130    /// ```
2131    pub fn get(self, key: impl Into<Optional<K, L, Bounded>>) -> Stream<V, L, Bounded, NoOrder, R>
2132    where
2133        B: IsBounded,
2134        K: Eq + Hash,
2135    {
2136        self.make_bounded()
2137            .entries()
2138            .join(key.into().into_stream().map(q!(|k| (k, ()))))
2139            .map(q!(|(_, (v, _))| v))
2140    }
2141
2142    /// For each value in `self`, find the matching key in `lookup`.
2143    /// The output is a keyed stream with the key from `self`, and a value
2144    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
2145    /// If the key is not present in `lookup`, the option will be [`None`].
2146    ///
2147    /// # Example
2148    /// ```rust
2149    /// # #[cfg(feature = "deploy")] {
2150    /// # use hydro_lang::prelude::*;
2151    /// # use futures::StreamExt;
2152    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2153    /// # let tick = process.tick();
2154    /// let requests = // { 1: [10, 11], 2: 20 }
2155    /// # process
2156    /// #     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2157    /// #     .into_keyed()
2158    /// #     .batch(&tick, nondet!(/** test */));
2159    /// let other_data = // { 10: 100, 11: 110 }
2160    /// # process
2161    /// #     .source_iter(q!(vec![(10, 100), (11, 110)]))
2162    /// #     .into_keyed()
2163    /// #     .batch(&tick, nondet!(/** test */))
2164    /// #     .first();
2165    /// requests.lookup_keyed_singleton(other_data)
2166    /// # .entries().all_ticks()
2167    /// # }, |mut stream| async move {
2168    /// // { 1: [(10, Some(100)), (11, Some(110))], 2: (20, None) }
2169    /// # let mut results = vec![];
2170    /// # for _ in 0..3 {
2171    /// #     results.push(stream.next().await.unwrap());
2172    /// # }
2173    /// # results.sort();
2174    /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (11, Some(110))), (2, (20, None))]);
2175    /// # }));
2176    /// # }
2177    /// ```
2178    pub fn lookup_keyed_singleton<V2>(
2179        self,
2180        lookup: KeyedSingleton<V, V2, L, Bounded>,
2181    ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, R>
2182    where
2183        B: IsBounded,
2184        K: Eq + Hash + Clone,
2185        V: Eq + Hash + Clone,
2186        V2: Clone,
2187    {
2188        self.lookup_keyed_stream(
2189            lookup
2190                .into_keyed_stream()
2191                .assume_retries::<R>(nondet!(/** Retries are irrelevant for keyed singletons */)),
2192        )
2193    }
2194
2195    /// For each value in `self`, find the matching key in `lookup`.
2196    /// The output is a keyed stream with the key from `self`, and a value
2197    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
2198    /// If the key is not present in `lookup`, the option will be [`None`].
2199    ///
2200    /// # Example
2201    /// ```rust
2202    /// # #[cfg(feature = "deploy")] {
2203    /// # use hydro_lang::prelude::*;
2204    /// # use futures::StreamExt;
2205    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2206    /// # let tick = process.tick();
2207    /// let requests = // { 1: [10, 11], 2: 20 }
2208    /// # process
2209    /// #     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2210    /// #     .into_keyed()
2211    /// #     .batch(&tick, nondet!(/** test */));
2212    /// let other_data = // { 10: [100, 101], 11: 110 }
2213    /// # process
2214    /// #     .source_iter(q!(vec![(10, 100), (10, 101), (11, 110)]))
2215    /// #     .into_keyed()
2216    /// #     .batch(&tick, nondet!(/** test */));
2217    /// requests.lookup_keyed_stream(other_data)
2218    /// # .entries().all_ticks()
2219    /// # }, |mut stream| async move {
2220    /// // { 1: [(10, Some(100)), (10, Some(101)), (11, Some(110))], 2: (20, None) }
2221    /// # let mut results = vec![];
2222    /// # for _ in 0..4 {
2223    /// #     results.push(stream.next().await.unwrap());
2224    /// # }
2225    /// # results.sort();
2226    /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (10, Some(101))), (1, (11, Some(110))), (2, (20, None))]);
2227    /// # }));
2228    /// # }
2229    /// ```
2230    #[expect(clippy::type_complexity, reason = "retries propagation")]
2231    pub fn lookup_keyed_stream<V2, O2: Ordering, R2: Retries>(
2232        self,
2233        lookup: KeyedStream<V, V2, L, Bounded, O2, R2>,
2234    ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, <R as MinRetries<R2>>::Min>
2235    where
2236        B: IsBounded,
2237        K: Eq + Hash + Clone,
2238        V: Eq + Hash + Clone,
2239        V2: Clone,
2240        R: MinRetries<R2>,
2241    {
2242        let inverted = self
2243            .make_bounded()
2244            .entries()
2245            .map(q!(|(key, lookup_value)| (lookup_value, key)))
2246            .into_keyed();
2247        let found = inverted
2248            .clone()
2249            .join_keyed_stream(lookup.clone())
2250            .entries()
2251            .map(q!(|(lookup_value, (key, value))| (
2252                key,
2253                (lookup_value, Some(value))
2254            )))
2255            .into_keyed();
2256        let not_found = inverted
2257            .filter_key_not_in(lookup.keys())
2258            .entries()
2259            .map(q!(|(lookup_value, key)| (key, (lookup_value, None))))
2260            .into_keyed();
2261
2262        found.chain(not_found.weaken_retries::<<R as MinRetries<R2>>::Min>())
2263    }
2264
2265    /// Shifts this keyed stream into an atomic context, which guarantees that any downstream logic
2266    /// will all be executed synchronously before any outputs are yielded (in [`KeyedStream::end_atomic`]).
2267    ///
2268    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2269    /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
2270    /// argument that declares where the stream will be atomically processed. Batching a stream into
2271    /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
2272    /// [`Tick`] will introduce asynchrony.
2273    pub fn atomic(self, tick: &Tick<L>) -> KeyedStream<K, V, Atomic<L>, B, O, R> {
2274        let out_location = Atomic { tick: tick.clone() };
2275        KeyedStream::new(
2276            out_location.clone(),
2277            HydroNode::BeginAtomic {
2278                inner: Box::new(self.ir_node.into_inner()),
2279                metadata: out_location
2280                    .new_node_metadata(KeyedStream::<K, V, Atomic<L>, B, O, R>::collection_kind()),
2281            },
2282        )
2283    }
2284
2285    /// Given a tick, returns a keyed stream corresponding to a batch of elements segmented by
2286    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2287    /// the order of the input.
2288    ///
2289    /// # Non-Determinism
2290    /// The batch boundaries are non-deterministic and may change across executions.
2291    pub fn batch(
2292        self,
2293        tick: &Tick<L>,
2294        nondet: NonDet,
2295    ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2296        let _ = nondet;
2297        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2298        KeyedStream::new(
2299            tick.clone(),
2300            HydroNode::Batch {
2301                inner: Box::new(self.ir_node.into_inner()),
2302                metadata: tick.new_node_metadata(
2303                    KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
2304                ),
2305            },
2306        )
2307    }
2308}
2309
2310impl<'a, K1, K2, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2311    KeyedStream<(K1, K2), V, L, B, O, R>
2312{
2313    /// Produces a new keyed stream by dropping the first element of the compound key.
2314    ///
2315    /// Because multiple keys may share the same suffix, this operation results in re-grouping
2316    /// of the values under the new keys. The values across groups with the same new key
2317    /// will be interleaved, so the resulting stream has [`NoOrder`] within each group.
2318    ///
2319    /// # Example
2320    /// ```rust
2321    /// # #[cfg(feature = "deploy")] {
2322    /// # use hydro_lang::prelude::*;
2323    /// # use futures::StreamExt;
2324    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2325    /// process
2326    ///     .source_iter(q!(vec![((1, 10), 2), ((1, 10), 3), ((2, 20), 4)]))
2327    ///     .into_keyed()
2328    ///     .drop_key_prefix()
2329    /// #   .entries()
2330    /// # }, |mut stream| async move {
2331    /// // { 10: [2, 3], 20: [4] }
2332    /// # let mut results = Vec::new();
2333    /// # for _ in 0..3 {
2334    /// #     results.push(stream.next().await.unwrap());
2335    /// # }
2336    /// # results.sort();
2337    /// # assert_eq!(results, vec![(10, 2), (10, 3), (20, 4)]);
2338    /// # }));
2339    /// # }
2340    /// ```
2341    pub fn drop_key_prefix(self) -> KeyedStream<K2, V, L, B, NoOrder, R> {
2342        self.entries()
2343            .map(q!(|((_k1, k2), v)| (k2, v)))
2344            .into_keyed()
2345    }
2346}
2347
2348impl<'a, K, V, L: Location<'a> + NoTick, O: Ordering, R: Retries>
2349    KeyedStream<K, V, L, Unbounded, O, R>
2350{
2351    /// Produces a new keyed stream that "merges" the inputs by interleaving the elements
2352    /// of any overlapping groups. The result has [`NoOrder`] on each group because the
2353    /// order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
2354    /// the ordering will be deterministic and you can safely use [`Self::assume_ordering`].
2355    ///
2356    /// Currently, both input streams must be [`Unbounded`].
2357    ///
2358    /// # Example
2359    /// ```rust
2360    /// # #[cfg(feature = "deploy")] {
2361    /// # use hydro_lang::prelude::*;
2362    /// # use futures::StreamExt;
2363    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2364    /// let numbers1: KeyedStream<i32, i32, _> = // { 1: [2], 3: [4] }
2365    /// # process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed().into();
2366    /// let numbers2: KeyedStream<i32, i32, _> = // { 1: [3], 3: [5] }
2367    /// # process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed().into();
2368    /// numbers1.interleave(numbers2)
2369    /// #   .entries()
2370    /// # }, |mut stream| async move {
2371    /// // { 1: [2, 3], 3: [4, 5] } with each group in unknown order
2372    /// # let mut results = Vec::new();
2373    /// # for _ in 0..4 {
2374    /// #     results.push(stream.next().await.unwrap());
2375    /// # }
2376    /// # results.sort();
2377    /// # assert_eq!(results, vec![(1, 2), (1, 3), (3, 4), (3, 5)]);
2378    /// # }));
2379    /// # }
2380    /// ```
2381    pub fn interleave<O2: Ordering, R2: Retries>(
2382        self,
2383        other: KeyedStream<K, V, L, Unbounded, O2, R2>,
2384    ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2385    where
2386        R: MinRetries<R2>,
2387    {
2388        KeyedStream::new(
2389            self.location.clone(),
2390            HydroNode::Chain {
2391                first: Box::new(self.ir_node.into_inner()),
2392                second: Box::new(other.ir_node.into_inner()),
2393                metadata: self.location.new_node_metadata(KeyedStream::<
2394                    K,
2395                    V,
2396                    L,
2397                    Unbounded,
2398                    NoOrder,
2399                    <R as MinRetries<R2>>::Min,
2400                >::collection_kind()),
2401            },
2402        )
2403    }
2404}
2405
2406impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, Atomic<L>, B, O, R>
2407where
2408    L: Location<'a> + NoTick,
2409{
2410    /// Returns a keyed stream corresponding to the latest batch of elements being atomically
2411    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2412    /// the order of the input. The output keyed stream will execute in the [`Tick`] that was
2413    /// used to create the atomic section.
2414    ///
2415    /// # Non-Determinism
2416    /// The batch boundaries are non-deterministic and may change across executions.
2417    pub fn batch_atomic(self, nondet: NonDet) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2418        let _ = nondet;
2419        KeyedStream::new(
2420            self.location.clone().tick,
2421            HydroNode::Batch {
2422                inner: Box::new(self.ir_node.into_inner()),
2423                metadata: self.location.tick.new_node_metadata(KeyedStream::<
2424                    K,
2425                    V,
2426                    Tick<L>,
2427                    Bounded,
2428                    O,
2429                    R,
2430                >::collection_kind(
2431                )),
2432            },
2433        )
2434    }
2435
2436    /// Yields the elements of this keyed stream back into a top-level, asynchronous execution context.
2437    /// See [`KeyedStream::atomic`] for more details.
2438    pub fn end_atomic(self) -> KeyedStream<K, V, L, B, O, R> {
2439        KeyedStream::new(
2440            self.location.tick.l.clone(),
2441            HydroNode::EndAtomic {
2442                inner: Box::new(self.ir_node.into_inner()),
2443                metadata: self
2444                    .location
2445                    .tick
2446                    .l
2447                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2448            },
2449        )
2450    }
2451}
2452
2453impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, Tick<L>, Bounded, O, R>
2454where
2455    L: Location<'a>,
2456{
2457    /// Asynchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2458    /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2459    /// each key.
2460    pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
2461        KeyedStream::new(
2462            self.location.outer().clone(),
2463            HydroNode::YieldConcat {
2464                inner: Box::new(self.ir_node.into_inner()),
2465                metadata: self.location.outer().new_node_metadata(KeyedStream::<
2466                    K,
2467                    V,
2468                    L,
2469                    Unbounded,
2470                    O,
2471                    R,
2472                >::collection_kind(
2473                )),
2474            },
2475        )
2476    }
2477
2478    /// Synchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2479    /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2480    /// each key.
2481    ///
2482    /// Unlike [`KeyedStream::all_ticks`], this preserves synchronous execution, as the output stream
2483    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2484    /// stream's [`Tick`] context.
2485    pub fn all_ticks_atomic(self) -> KeyedStream<K, V, Atomic<L>, Unbounded, O, R> {
2486        let out_location = Atomic {
2487            tick: self.location.clone(),
2488        };
2489
2490        KeyedStream::new(
2491            out_location.clone(),
2492            HydroNode::YieldConcat {
2493                inner: Box::new(self.ir_node.into_inner()),
2494                metadata: out_location.new_node_metadata(KeyedStream::<
2495                    K,
2496                    V,
2497                    Atomic<L>,
2498                    Unbounded,
2499                    O,
2500                    R,
2501                >::collection_kind()),
2502            },
2503        )
2504    }
2505
2506    /// Transforms the keyed stream using the given closure in "stateful" mode, where stateful operators
2507    /// such as `fold` retrain their memory for each key across ticks rather than resetting across batches of each key.
2508    ///
2509    /// This API is particularly useful for stateful computation on batches of data, such as
2510    /// maintaining an accumulated state that is up to date with the current batch.
2511    ///
2512    /// # Example
2513    /// ```rust
2514    /// # #[cfg(feature = "deploy")] {
2515    /// # use hydro_lang::prelude::*;
2516    /// # use futures::StreamExt;
2517    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2518    /// let tick = process.tick();
2519    /// # // ticks are lazy by default, forces the second tick to run
2520    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2521    /// # let batch_first_tick = process
2522    /// #   .source_iter(q!(vec![(0, 1), (1, 2), (2, 3), (3, 4)]))
2523    /// #   .into_keyed()
2524    /// #   .batch(&tick, nondet!(/** test */));
2525    /// # let batch_second_tick = process
2526    /// #   .source_iter(q!(vec![(0, 5), (1, 6), (2, 7)]))
2527    /// #   .into_keyed()
2528    /// #   .batch(&tick, nondet!(/** test */))
2529    /// #   .defer_tick(); // appears on the second tick
2530    /// let input = batch_first_tick.chain(batch_second_tick).all_ticks();
2531    ///
2532    /// input.batch(&tick, nondet!(/** test */))
2533    ///     .across_ticks(|s| s.reduce(q!(|sum, new| {
2534    ///         *sum += new;
2535    ///     }))).entries().all_ticks()
2536    /// # }, |mut stream| async move {
2537    /// // First tick: [(0, 1), (1, 2), (2, 3), (3, 4)]
2538    /// # let mut results = Vec::new();
2539    /// # for _ in 0..4 {
2540    /// #     results.push(stream.next().await.unwrap());
2541    /// # }
2542    /// # results.sort();
2543    /// # assert_eq!(results, vec![(0, 1), (1, 2), (2, 3), (3, 4)]);
2544    /// // Second tick: [(0, 6), (1, 8), (2, 10), (3, 4)]
2545    /// # results.clear();
2546    /// # for _ in 0..4 {
2547    /// #     results.push(stream.next().await.unwrap());
2548    /// # }
2549    /// # results.sort();
2550    /// # assert_eq!(results, vec![(0, 6), (1, 8), (2, 10), (3, 4)]);
2551    /// # }));
2552    /// # }
2553    /// ```
2554    pub fn across_ticks<Out: BatchAtomic>(
2555        self,
2556        thunk: impl FnOnce(KeyedStream<K, V, Atomic<L>, Unbounded, O, R>) -> Out,
2557    ) -> Out::Batched {
2558        thunk(self.all_ticks_atomic()).batched_atomic()
2559    }
2560
2561    /// Shifts the entries in `self` to the **next tick**, so that the returned keyed stream at
2562    /// tick `T` always has the entries of `self` at tick `T - 1`.
2563    ///
2564    /// At tick `0`, the output keyed stream is empty, since there is no previous tick.
2565    ///
2566    /// This operator enables stateful iterative processing with ticks, by sending data from one
2567    /// tick to the next. For example, you can use it to combine inputs across consecutive batches.
2568    ///
2569    /// # Example
2570    /// ```rust
2571    /// # #[cfg(feature = "deploy")] {
2572    /// # use hydro_lang::prelude::*;
2573    /// # use futures::StreamExt;
2574    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2575    /// let tick = process.tick();
2576    /// # // ticks are lazy by default, forces the second tick to run
2577    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2578    /// # let batch_first_tick = process
2579    /// #   .source_iter(q!(vec![(1, 2), (1, 3)]))
2580    /// #   .batch(&tick, nondet!(/** test */))
2581    /// #   .into_keyed();
2582    /// # let batch_second_tick = process
2583    /// #   .source_iter(q!(vec![(1, 4), (2, 5)]))
2584    /// #   .batch(&tick, nondet!(/** test */))
2585    /// #   .defer_tick()
2586    /// #   .into_keyed(); // appears on the second tick
2587    /// let changes_across_ticks = // { 1: [2, 3] } (first tick), { 1: [4], 2: [5] } (second tick)
2588    /// # batch_first_tick.chain(batch_second_tick);
2589    /// changes_across_ticks.clone().defer_tick().chain( // from the previous tick
2590    ///     changes_across_ticks // from the current tick
2591    /// )
2592    /// # .entries().all_ticks()
2593    /// # }, |mut stream| async move {
2594    /// // First tick: { 1: [2, 3] }
2595    /// # let mut results = Vec::new();
2596    /// # for _ in 0..2 {
2597    /// #     results.push(stream.next().await.unwrap());
2598    /// # }
2599    /// # results.sort();
2600    /// # assert_eq!(results, vec![(1, 2), (1, 3)]);
2601    /// // Second tick: { 1: [2, 3, 4], 2: [5] }
2602    /// # results.clear();
2603    /// # for _ in 0..4 {
2604    /// #     results.push(stream.next().await.unwrap());
2605    /// # }
2606    /// # results.sort();
2607    /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5)]);
2608    /// // Third tick: { 1: [4], 2: [5] }
2609    /// # results.clear();
2610    /// # for _ in 0..2 {
2611    /// #     results.push(stream.next().await.unwrap());
2612    /// # }
2613    /// # results.sort();
2614    /// # assert_eq!(results, vec![(1, 4), (2, 5)]);
2615    /// # }));
2616    /// # }
2617    /// ```
2618    pub fn defer_tick(self) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2619        KeyedStream::new(
2620            self.location.clone(),
2621            HydroNode::DeferTick {
2622                input: Box::new(self.ir_node.into_inner()),
2623                metadata: self.location.new_node_metadata(KeyedStream::<
2624                    K,
2625                    V,
2626                    Tick<L>,
2627                    Bounded,
2628                    O,
2629                    R,
2630                >::collection_kind()),
2631            },
2632        )
2633    }
2634}
2635
2636#[cfg(test)]
2637mod tests {
2638    #[cfg(feature = "deploy")]
2639    use futures::{SinkExt, StreamExt};
2640    #[cfg(feature = "deploy")]
2641    use hydro_deploy::Deployment;
2642    #[cfg(any(feature = "deploy", feature = "sim"))]
2643    use stageleft::q;
2644
2645    #[cfg(any(feature = "deploy", feature = "sim"))]
2646    use crate::compile::builder::FlowBuilder;
2647    #[cfg(feature = "deploy")]
2648    use crate::live_collections::stream::ExactlyOnce;
2649    #[cfg(feature = "sim")]
2650    use crate::live_collections::stream::{NoOrder, TotalOrder};
2651    #[cfg(any(feature = "deploy", feature = "sim"))]
2652    use crate::location::Location;
2653    #[cfg(any(feature = "deploy", feature = "sim"))]
2654    use crate::nondet::nondet;
2655    #[cfg(feature = "deploy")]
2656    use crate::properties::manual_proof;
2657
2658    #[cfg(feature = "deploy")]
2659    #[tokio::test]
2660    async fn reduce_watermark_filter() {
2661        let mut deployment = Deployment::new();
2662
2663        let mut flow = FlowBuilder::new();
2664        let node = flow.process::<()>();
2665        let external = flow.external::<()>();
2666
2667        let node_tick = node.tick();
2668        let watermark = node_tick.singleton(q!(2));
2669
2670        let sum = node
2671            .source_stream(q!(tokio_stream::iter([
2672                (0, 100),
2673                (1, 101),
2674                (2, 102),
2675                (2, 102)
2676            ])))
2677            .into_keyed()
2678            .reduce_watermark(
2679                watermark,
2680                q!(|acc, v| {
2681                    *acc += v;
2682                }),
2683            )
2684            .snapshot(&node_tick, nondet!(/** test */))
2685            .entries()
2686            .all_ticks()
2687            .send_bincode_external(&external);
2688
2689        let nodes = flow
2690            .with_process(&node, deployment.Localhost())
2691            .with_external(&external, deployment.Localhost())
2692            .deploy(&mut deployment);
2693
2694        deployment.deploy().await.unwrap();
2695
2696        let mut out = nodes.connect(sum).await;
2697
2698        deployment.start().await.unwrap();
2699
2700        assert_eq!(out.next().await.unwrap(), (2, 204));
2701    }
2702
2703    #[cfg(feature = "deploy")]
2704    #[tokio::test]
2705    async fn reduce_watermark_bounded() {
2706        let mut deployment = Deployment::new();
2707
2708        let mut flow = FlowBuilder::new();
2709        let node = flow.process::<()>();
2710        let external = flow.external::<()>();
2711
2712        let node_tick = node.tick();
2713        let watermark = node_tick.singleton(q!(2));
2714
2715        let sum = node
2716            .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
2717            .into_keyed()
2718            .reduce_watermark(
2719                watermark,
2720                q!(|acc, v| {
2721                    *acc += v;
2722                }),
2723            )
2724            .entries()
2725            .send_bincode_external(&external);
2726
2727        let nodes = flow
2728            .with_process(&node, deployment.Localhost())
2729            .with_external(&external, deployment.Localhost())
2730            .deploy(&mut deployment);
2731
2732        deployment.deploy().await.unwrap();
2733
2734        let mut out = nodes.connect(sum).await;
2735
2736        deployment.start().await.unwrap();
2737
2738        assert_eq!(out.next().await.unwrap(), (2, 204));
2739    }
2740
2741    #[cfg(feature = "deploy")]
2742    #[tokio::test]
2743    async fn reduce_watermark_garbage_collect() {
2744        let mut deployment = Deployment::new();
2745
2746        let mut flow = FlowBuilder::new();
2747        let node = flow.process::<()>();
2748        let external = flow.external::<()>();
2749        let (tick_send, tick_trigger) =
2750            node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
2751
2752        let node_tick = node.tick();
2753        let (watermark_complete_cycle, watermark) =
2754            node_tick.cycle_with_initial(node_tick.singleton(q!(2)));
2755        let next_watermark = watermark.clone().map(q!(|v| v + 1));
2756        watermark_complete_cycle.complete_next_tick(next_watermark);
2757
2758        let tick_triggered_input = node_tick
2759            .singleton(q!((3, 103)))
2760            .into_stream()
2761            .filter_if_some(
2762                tick_trigger
2763                    .clone()
2764                    .batch(&node_tick, nondet!(/** test */))
2765                    .first(),
2766            )
2767            .all_ticks();
2768
2769        let sum = node
2770            .source_stream(q!(tokio_stream::iter([
2771                (0, 100),
2772                (1, 101),
2773                (2, 102),
2774                (2, 102)
2775            ])))
2776            .interleave(tick_triggered_input)
2777            .into_keyed()
2778            .reduce_watermark(
2779                watermark,
2780                q!(
2781                    |acc, v| {
2782                        *acc += v;
2783                    },
2784                    commutative = manual_proof!(/** integer addition is commutative */)
2785                ),
2786            )
2787            .snapshot(&node_tick, nondet!(/** test */))
2788            .entries()
2789            .all_ticks()
2790            .send_bincode_external(&external);
2791
2792        let nodes = flow
2793            .with_default_optimize()
2794            .with_process(&node, deployment.Localhost())
2795            .with_external(&external, deployment.Localhost())
2796            .deploy(&mut deployment);
2797
2798        deployment.deploy().await.unwrap();
2799
2800        let mut tick_send = nodes.connect(tick_send).await;
2801        let mut out_recv = nodes.connect(sum).await;
2802
2803        deployment.start().await.unwrap();
2804
2805        assert_eq!(out_recv.next().await.unwrap(), (2, 204));
2806
2807        tick_send.send(()).await.unwrap();
2808
2809        assert_eq!(out_recv.next().await.unwrap(), (3, 103));
2810    }
2811
2812    #[cfg(feature = "sim")]
2813    #[test]
2814    #[should_panic]
2815    fn sim_batch_nondet_size() {
2816        let mut flow = FlowBuilder::new();
2817        let node = flow.process::<()>();
2818
2819        let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
2820
2821        let tick = node.tick();
2822        let out_recv = input
2823            .batch(&tick, nondet!(/** test */))
2824            .fold(q!(|| vec![]), q!(|acc, v| acc.push(v)))
2825            .entries()
2826            .all_ticks()
2827            .sim_output();
2828
2829        flow.sim().exhaustive(async || {
2830            out_recv
2831                .assert_yields_only_unordered([(1, vec![1, 2])])
2832                .await;
2833        });
2834    }
2835
2836    #[cfg(feature = "sim")]
2837    #[test]
2838    fn sim_batch_preserves_group_order() {
2839        let mut flow = FlowBuilder::new();
2840        let node = flow.process::<()>();
2841
2842        let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
2843
2844        let tick = node.tick();
2845        let out_recv = input
2846            .batch(&tick, nondet!(/** test */))
2847            .all_ticks()
2848            .fold_early_stop(
2849                q!(|| 0),
2850                q!(|acc, v| {
2851                    *acc = std::cmp::max(v, *acc);
2852                    *acc >= 2
2853                }),
2854            )
2855            .entries()
2856            .sim_output();
2857
2858        let instances = flow.sim().exhaustive(async || {
2859            out_recv
2860                .assert_yields_only_unordered([(1, 2), (2, 3)])
2861                .await;
2862        });
2863
2864        assert_eq!(instances, 8);
2865        // - three cases: all three in a separate tick (pick where (2, 3) is)
2866        // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after
2867        // - two cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them
2868        // - one case: all three together
2869    }
2870
2871    #[cfg(feature = "sim")]
2872    #[test]
2873    fn sim_batch_unordered_shuffles() {
2874        let mut flow = FlowBuilder::new();
2875        let node = flow.process::<()>();
2876
2877        let input = node
2878            .source_iter(q!([(1, 1), (1, 2), (2, 3)]))
2879            .into_keyed()
2880            .weaken_ordering::<NoOrder>();
2881
2882        let tick = node.tick();
2883        let out_recv = input
2884            .batch(&tick, nondet!(/** test */))
2885            .all_ticks()
2886            .entries()
2887            .sim_output();
2888
2889        let instances = flow.sim().exhaustive(async || {
2890            out_recv
2891                .assert_yields_only_unordered([(1, 1), (1, 2), (2, 3)])
2892                .await;
2893        });
2894
2895        assert_eq!(instances, 13);
2896        // - 6 (3 * 2) cases: all three in a separate tick (pick where (2, 3) is), and order of (1, 1), (1, 2)
2897        // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
2898        // - 4 (2 * 2) cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them, and order of (1, 1), (1, 2)
2899        // - one case: all three together (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
2900    }
2901
2902    #[cfg(feature = "sim")]
2903    #[test]
2904    #[should_panic]
2905    fn sim_observe_order_batched() {
2906        let mut flow = FlowBuilder::new();
2907        let node = flow.process::<()>();
2908
2909        let (port, input) = node.sim_input::<_, NoOrder, _>();
2910
2911        let tick = node.tick();
2912        let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
2913        let out_recv = batch
2914            .assume_ordering::<TotalOrder>(nondet!(/** test */))
2915            .all_ticks()
2916            .first()
2917            .entries()
2918            .sim_output();
2919
2920        flow.sim().exhaustive(async || {
2921            port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
2922            out_recv
2923                .assert_yields_only_unordered([(1, 1), (2, 1)])
2924                .await; // fails with assume_ordering
2925        });
2926    }
2927
2928    #[cfg(feature = "sim")]
2929    #[test]
2930    fn sim_observe_order_batched_count() {
2931        let mut flow = FlowBuilder::new();
2932        let node = flow.process::<()>();
2933
2934        let (port, input) = node.sim_input::<_, NoOrder, _>();
2935
2936        let tick = node.tick();
2937        let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
2938        let out_recv = batch
2939            .assume_ordering::<TotalOrder>(nondet!(/** test */))
2940            .all_ticks()
2941            .entries()
2942            .sim_output();
2943
2944        let instance_count = flow.sim().exhaustive(async || {
2945            port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
2946            let _ = out_recv.collect_sorted::<Vec<_>>().await;
2947        });
2948
2949        assert_eq!(instance_count, 104); // too complicated to enumerate here, but less than stream equivalent
2950    }
2951
2952    #[cfg(feature = "sim")]
2953    #[test]
2954    fn sim_top_level_assume_ordering() {
2955        use std::collections::HashMap;
2956
2957        let mut flow = FlowBuilder::new();
2958        let node = flow.process::<()>();
2959
2960        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2961
2962        let out_recv = input
2963            .into_keyed()
2964            .assume_ordering::<TotalOrder>(nondet!(/** test */))
2965            .fold_early_stop(
2966                q!(|| Vec::new()),
2967                q!(|acc, v| {
2968                    acc.push(v);
2969                    acc.len() >= 2
2970                }),
2971            )
2972            .entries()
2973            .sim_output();
2974
2975        let instance_count = flow.sim().exhaustive(async || {
2976            in_send.send_many_unordered([(1, 'a'), (1, 'b'), (2, 'c'), (2, 'd')]);
2977            let out: HashMap<_, _> = out_recv
2978                .collect_sorted::<Vec<_>>()
2979                .await
2980                .into_iter()
2981                .collect();
2982            // Each key accumulates its values; we get one entry per key
2983            assert_eq!(out.len(), 2);
2984        });
2985
2986        assert_eq!(instance_count, 24)
2987    }
2988
2989    #[cfg(feature = "sim")]
2990    #[test]
2991    fn sim_top_level_assume_ordering_cycle_back() {
2992        use std::collections::HashMap;
2993
2994        let mut flow = FlowBuilder::new();
2995        let node = flow.process::<()>();
2996
2997        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2998
2999        let (complete_cycle_back, cycle_back) =
3000            node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3001        let ordered = input
3002            .into_keyed()
3003            .interleave(cycle_back)
3004            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3005        complete_cycle_back.complete(
3006            ordered
3007                .clone()
3008                .map(q!(|v| v + 1))
3009                .filter(q!(|v| v % 2 == 1)),
3010        );
3011
3012        let out_recv = ordered
3013            .fold_early_stop(
3014                q!(|| Vec::new()),
3015                q!(|acc, v| {
3016                    acc.push(v);
3017                    acc.len() >= 2
3018                }),
3019            )
3020            .entries()
3021            .sim_output();
3022
3023        let mut saw = false;
3024        let instance_count = flow.sim().exhaustive(async || {
3025            // Send (1, 0) and (1, 2). 0+1=1 is odd so cycles back.
3026            // We want to see [0, 1] - the cycled back value interleaved
3027            in_send.send_many_unordered([(1, 0), (1, 2)]);
3028            let out: HashMap<_, _> = out_recv
3029                .collect_sorted::<Vec<_>>()
3030                .await
3031                .into_iter()
3032                .collect();
3033
3034            // We want to see an instance where key 1 gets: 0, then 1 (cycled back from 0+1)
3035            if let Some(values) = out.get(&1)
3036                && *values == vec![0, 1]
3037            {
3038                saw = true;
3039            }
3040        });
3041
3042        assert!(
3043            saw,
3044            "did not see an instance with key 1 having [0, 1] in order"
3045        );
3046        assert_eq!(instance_count, 6);
3047    }
3048
3049    #[cfg(feature = "sim")]
3050    #[test]
3051    fn sim_top_level_assume_ordering_cross_key_cycle() {
3052        use std::collections::HashMap;
3053
3054        // This test demonstrates why releasing one entry at a time is important:
3055        // When one key's observed order cycles back into a different key, we need
3056        // to be able to interleave the cycled-back entry with pending items for
3057        // that other key.
3058        let mut flow = FlowBuilder::new();
3059        let node = flow.process::<()>();
3060
3061        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3062
3063        let (complete_cycle_back, cycle_back) =
3064            node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3065        let ordered = input
3066            .into_keyed()
3067            .interleave(cycle_back)
3068            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3069
3070        // Cycle back: when we see (1, 10), emit (2, 100) to key 2
3071        complete_cycle_back.complete(
3072            ordered
3073                .clone()
3074                .filter(q!(|v| *v == 10))
3075                .map(q!(|_| 100))
3076                .entries()
3077                .map(q!(|(_, v)| (2, v))) // Change key from 1 to 2
3078                .into_keyed(),
3079        );
3080
3081        let out_recv = ordered
3082            .fold_early_stop(
3083                q!(|| Vec::new()),
3084                q!(|acc, v| {
3085                    acc.push(v);
3086                    acc.len() >= 2
3087                }),
3088            )
3089            .entries()
3090            .sim_output();
3091
3092        // We want to see an instance where:
3093        // - (1, 10) is released first
3094        // - This causes (2, 100) to be cycled back
3095        // - (2, 100) is released BEFORE (2, 20) which was already pending
3096        let mut saw_cross_key_interleave = false;
3097        let instance_count = flow.sim().exhaustive(async || {
3098            // Send (1, 10), (1, 11) for key 1, and (2, 20), (2, 21) for key 2
3099            in_send.send_many_unordered([(1, 10), (1, 11), (2, 20), (2, 21)]);
3100            let out: HashMap<_, _> = out_recv
3101                .collect_sorted::<Vec<_>>()
3102                .await
3103                .into_iter()
3104                .collect();
3105
3106            // Check if we see the cross-key interleaving:
3107            // key 2 should have [100, 20] or [100, 21] - cycled back 100 before a pending item
3108            if let Some(values) = out.get(&2)
3109                && values.len() >= 2
3110                && values[0] == 100
3111            {
3112                saw_cross_key_interleave = true;
3113            }
3114        });
3115
3116        assert!(
3117            saw_cross_key_interleave,
3118            "did not see an instance where cycled-back 100 was released before pending items for key 2"
3119        );
3120        assert_eq!(instance_count, 60);
3121    }
3122
3123    #[cfg(feature = "sim")]
3124    #[test]
3125    fn sim_top_level_assume_ordering_cycle_back_tick() {
3126        use std::collections::HashMap;
3127
3128        let mut flow = FlowBuilder::new();
3129        let node = flow.process::<()>();
3130
3131        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3132
3133        let (complete_cycle_back, cycle_back) =
3134            node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3135        let ordered = input
3136            .into_keyed()
3137            .interleave(cycle_back)
3138            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3139        complete_cycle_back.complete(
3140            ordered
3141                .clone()
3142                .batch(&node.tick(), nondet!(/** test */))
3143                .all_ticks()
3144                .map(q!(|v| v + 1))
3145                .filter(q!(|v| v % 2 == 1)),
3146        );
3147
3148        let out_recv = ordered
3149            .fold_early_stop(
3150                q!(|| Vec::new()),
3151                q!(|acc, v| {
3152                    acc.push(v);
3153                    acc.len() >= 2
3154                }),
3155            )
3156            .entries()
3157            .sim_output();
3158
3159        let mut saw = false;
3160        let instance_count = flow.sim().exhaustive(async || {
3161            in_send.send_many_unordered([(1, 0), (1, 2)]);
3162            let out: HashMap<_, _> = out_recv
3163                .collect_sorted::<Vec<_>>()
3164                .await
3165                .into_iter()
3166                .collect();
3167
3168            if let Some(values) = out.get(&1)
3169                && *values == vec![0, 1]
3170            {
3171                saw = true;
3172            }
3173        });
3174
3175        assert!(
3176            saw,
3177            "did not see an instance with key 1 having [0, 1] in order"
3178        );
3179        assert_eq!(instance_count, 58);
3180    }
3181}