hydro_lang/live_collections/keyed_stream/mod.rs
1//! Definitions for the [`KeyedStream`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q};
11
12use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
13use super::keyed_singleton::KeyedSingleton;
14use super::optional::Optional;
15use super::stream::{
16 ExactlyOnce, IsExactlyOnce, IsOrdered, MinOrder, MinRetries, NoOrder, Stream, TotalOrder,
17};
18use crate::compile::builder::CycleId;
19use crate::compile::ir::{
20 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, StreamOrder, StreamRetry, TeeNode,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26use crate::live_collections::stream::{
27 AtLeastOnce, Ordering, Retries, WeakerOrderingThan, WeakerRetryThan,
28};
29#[cfg(stageleft_runtime)]
30use crate::location::dynamic::{DynLocation, LocationId};
31use crate::location::tick::DeferTick;
32use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
33use crate::manual_expr::ManualExpr;
34use crate::nondet::{NonDet, nondet};
35use crate::properties::{AggFuncAlgebra, ValidCommutativityFor, ValidIdempotenceFor};
36
37pub mod networking;
38
39/// Streaming elements of type `V` grouped by a key of type `K`.
40///
41/// Keyed Streams capture streaming elements of type `V` grouped by a key of type `K`, where the
42/// order of keys is non-deterministic but the order *within* each group may be deterministic.
43///
44/// Although keyed streams are conceptually grouped by keys, values are not immediately grouped
45/// into buckets when constructing a keyed stream. Instead, keyed streams defer grouping until an
46/// operator such as [`KeyedStream::fold`] is called, which requires `K: Hash + Eq`.
47///
48/// Type Parameters:
49/// - `K`: the type of the key for each group
50/// - `V`: the type of the elements inside each group
51/// - `Loc`: the [`Location`] where the keyed stream is materialized
52/// - `Bound`: tracks whether the entries are [`Bounded`] (local and finite) or [`Unbounded`] (asynchronous and possibly infinite)
53/// - `Order`: tracks whether the elements within each group have deterministic order
54/// ([`TotalOrder`]) or not ([`NoOrder`])
55/// - `Retries`: tracks whether the elements within each group have deterministic cardinality
56/// ([`ExactlyOnce`]) or may have non-deterministic retries ([`crate::live_collections::stream::AtLeastOnce`])
57pub struct KeyedStream<
58 K,
59 V,
60 Loc,
61 Bound: Boundedness = Unbounded,
62 Order: Ordering = TotalOrder,
63 Retry: Retries = ExactlyOnce,
64> {
65 pub(crate) location: Loc,
66 pub(crate) ir_node: RefCell<HydroNode>,
67
68 _phantom: PhantomData<(K, V, Loc, Bound, Order, Retry)>,
69}
70
71impl<'a, K, V, L, O: Ordering, R: Retries> From<KeyedStream<K, V, L, Bounded, O, R>>
72 for KeyedStream<K, V, L, Unbounded, O, R>
73where
74 L: Location<'a>,
75{
76 fn from(stream: KeyedStream<K, V, L, Bounded, O, R>) -> KeyedStream<K, V, L, Unbounded, O, R> {
77 let new_meta = stream
78 .location
79 .new_node_metadata(KeyedStream::<K, V, L, Unbounded, O, R>::collection_kind());
80
81 KeyedStream {
82 location: stream.location,
83 ir_node: RefCell::new(HydroNode::Cast {
84 inner: Box::new(stream.ir_node.into_inner()),
85 metadata: new_meta,
86 }),
87 _phantom: PhantomData,
88 }
89 }
90}
91
92impl<'a, K, V, L, B: Boundedness, R: Retries> From<KeyedStream<K, V, L, B, TotalOrder, R>>
93 for KeyedStream<K, V, L, B, NoOrder, R>
94where
95 L: Location<'a>,
96{
97 fn from(stream: KeyedStream<K, V, L, B, TotalOrder, R>) -> KeyedStream<K, V, L, B, NoOrder, R> {
98 stream.weaken_ordering()
99 }
100}
101
102impl<'a, K, V, L, O: Ordering, R: Retries> DeferTick for KeyedStream<K, V, Tick<L>, Bounded, O, R>
103where
104 L: Location<'a>,
105{
106 fn defer_tick(self) -> Self {
107 KeyedStream::defer_tick(self)
108 }
109}
110
111impl<'a, K, V, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
112 for KeyedStream<K, V, Tick<L>, Bounded, O, R>
113where
114 L: Location<'a>,
115{
116 type Location = Tick<L>;
117
118 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
119 KeyedStream {
120 location: location.clone(),
121 ir_node: RefCell::new(HydroNode::CycleSource {
122 cycle_id,
123 metadata: location.new_node_metadata(
124 KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
125 ),
126 }),
127 _phantom: PhantomData,
128 }
129 }
130}
131
132impl<'a, K, V, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
133 for KeyedStream<K, V, Tick<L>, Bounded, O, R>
134where
135 L: Location<'a>,
136{
137 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
138 assert_eq!(
139 Location::id(&self.location),
140 expected_location,
141 "locations do not match"
142 );
143
144 self.location
145 .flow_state()
146 .borrow_mut()
147 .push_root(HydroRoot::CycleSink {
148 cycle_id,
149 input: Box::new(self.ir_node.into_inner()),
150 op_metadata: HydroIrOpMetadata::new(),
151 });
152 }
153}
154
155impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
156 for KeyedStream<K, V, L, B, O, R>
157where
158 L: Location<'a> + NoTick,
159{
160 type Location = L;
161
162 fn create_source(cycle_id: CycleId, location: L) -> Self {
163 KeyedStream {
164 location: location.clone(),
165 ir_node: RefCell::new(HydroNode::CycleSource {
166 cycle_id,
167 metadata: location
168 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
169 }),
170 _phantom: PhantomData,
171 }
172 }
173}
174
175impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
176 for KeyedStream<K, V, L, B, O, R>
177where
178 L: Location<'a> + NoTick,
179{
180 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
181 assert_eq!(
182 Location::id(&self.location),
183 expected_location,
184 "locations do not match"
185 );
186 self.location
187 .flow_state()
188 .borrow_mut()
189 .push_root(HydroRoot::CycleSink {
190 cycle_id,
191 input: Box::new(self.ir_node.into_inner()),
192 op_metadata: HydroIrOpMetadata::new(),
193 });
194 }
195}
196
197impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order: Ordering, R: Retries>
198 Clone for KeyedStream<K, V, Loc, Bound, Order, R>
199{
200 fn clone(&self) -> Self {
201 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
202 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
203 *self.ir_node.borrow_mut() = HydroNode::Tee {
204 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
205 metadata: self.location.new_node_metadata(Self::collection_kind()),
206 };
207 }
208
209 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
210 KeyedStream {
211 location: self.location.clone(),
212 ir_node: HydroNode::Tee {
213 inner: TeeNode(inner.0.clone()),
214 metadata: metadata.clone(),
215 }
216 .into(),
217 _phantom: PhantomData,
218 }
219 } else {
220 unreachable!()
221 }
222 }
223}
224
225/// The output of a Hydro generator created with [`KeyedStream::generator`], which can yield elements and
226/// control the processing of future elements.
227pub enum Generate<T> {
228 /// Emit the provided element, and keep processing future inputs.
229 Yield(T),
230 /// Emit the provided element as the _final_ element, do not process future inputs.
231 Return(T),
232 /// Do not emit anything, but continue processing future inputs.
233 Continue,
234 /// Do not emit anything, and do not process further inputs.
235 Break,
236}
237
238impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
239 KeyedStream<K, V, L, B, O, R>
240{
241 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
242 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
243 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
244
245 KeyedStream {
246 location,
247 ir_node: RefCell::new(ir_node),
248 _phantom: PhantomData,
249 }
250 }
251
252 /// Returns the [`CollectionKind`] corresponding to this type.
253 pub fn collection_kind() -> CollectionKind {
254 CollectionKind::KeyedStream {
255 bound: B::BOUND_KIND,
256 value_order: O::ORDERING_KIND,
257 value_retry: R::RETRIES_KIND,
258 key_type: stageleft::quote_type::<K>().into(),
259 value_type: stageleft::quote_type::<V>().into(),
260 }
261 }
262
263 /// Returns the [`Location`] where this keyed stream is being materialized.
264 pub fn location(&self) -> &L {
265 &self.location
266 }
267
268 /// Explicitly "casts" the keyed stream to a type with a different ordering
269 /// guarantee for each group. Useful in unsafe code where the ordering cannot be proven
270 /// by the type-system.
271 ///
272 /// # Non-Determinism
273 /// This function is used as an escape hatch, and any mistakes in the
274 /// provided ordering guarantee will propagate into the guarantees
275 /// for the rest of the program.
276 pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O2, R> {
277 if O::ORDERING_KIND == O2::ORDERING_KIND {
278 KeyedStream::new(self.location, self.ir_node.into_inner())
279 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
280 // We can always weaken the ordering guarantee
281 KeyedStream::new(
282 self.location.clone(),
283 HydroNode::Cast {
284 inner: Box::new(self.ir_node.into_inner()),
285 metadata: self
286 .location
287 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
288 },
289 )
290 } else {
291 KeyedStream::new(
292 self.location.clone(),
293 HydroNode::ObserveNonDet {
294 inner: Box::new(self.ir_node.into_inner()),
295 trusted: false,
296 metadata: self
297 .location
298 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
299 },
300 )
301 }
302 }
303
304 fn assume_ordering_trusted<O2: Ordering>(
305 self,
306 _nondet: NonDet,
307 ) -> KeyedStream<K, V, L, B, O2, R> {
308 if O::ORDERING_KIND == O2::ORDERING_KIND {
309 KeyedStream::new(self.location, self.ir_node.into_inner())
310 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
311 // We can always weaken the ordering guarantee
312 KeyedStream::new(
313 self.location.clone(),
314 HydroNode::Cast {
315 inner: Box::new(self.ir_node.into_inner()),
316 metadata: self
317 .location
318 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
319 },
320 )
321 } else {
322 KeyedStream::new(
323 self.location.clone(),
324 HydroNode::ObserveNonDet {
325 inner: Box::new(self.ir_node.into_inner()),
326 trusted: true,
327 metadata: self
328 .location
329 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
330 },
331 )
332 }
333 }
334
335 #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
336 /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
337 /// which is always safe because that is the weakest possible guarantee.
338 pub fn weakest_ordering(self) -> KeyedStream<K, V, L, B, NoOrder, R> {
339 self.weaken_ordering::<NoOrder>()
340 }
341
342 /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
343 /// enforcing that `O2` is weaker than the input ordering guarantee.
344 pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> KeyedStream<K, V, L, B, O2, R> {
345 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
346 self.assume_ordering::<O2>(nondet)
347 }
348
349 /// Explicitly "casts" the keyed stream to a type with a different retries
350 /// guarantee for each group. Useful in unsafe code where the lack of retries cannot
351 /// be proven by the type-system.
352 ///
353 /// # Non-Determinism
354 /// This function is used as an escape hatch, and any mistakes in the
355 /// provided retries guarantee will propagate into the guarantees
356 /// for the rest of the program.
357 pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O, R2> {
358 if R::RETRIES_KIND == R2::RETRIES_KIND {
359 KeyedStream::new(self.location, self.ir_node.into_inner())
360 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
361 // We can always weaken the retries guarantee
362 KeyedStream::new(
363 self.location.clone(),
364 HydroNode::Cast {
365 inner: Box::new(self.ir_node.into_inner()),
366 metadata: self
367 .location
368 .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
369 },
370 )
371 } else {
372 KeyedStream::new(
373 self.location.clone(),
374 HydroNode::ObserveNonDet {
375 inner: Box::new(self.ir_node.into_inner()),
376 trusted: false,
377 metadata: self
378 .location
379 .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
380 },
381 )
382 }
383 }
384
385 #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
386 /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
387 /// which is always safe because that is the weakest possible guarantee.
388 pub fn weakest_retries(self) -> KeyedStream<K, V, L, B, O, AtLeastOnce> {
389 self.weaken_retries::<AtLeastOnce>()
390 }
391
392 /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
393 /// enforcing that `R2` is weaker than the input retries guarantee.
394 pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> KeyedStream<K, V, L, B, O, R2> {
395 let nondet = nondet!(/** this is a weaker retries guarantee, so it is safe to assume */);
396 self.assume_retries::<R2>(nondet)
397 }
398
399 /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
400 /// implies that `O == TotalOrder`.
401 pub fn make_totally_ordered(self) -> KeyedStream<K, V, L, B, TotalOrder, R>
402 where
403 O: IsOrdered,
404 {
405 self.assume_ordering(nondet!(/** no-op */))
406 }
407
408 /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
409 /// implies that `R == ExactlyOnce`.
410 pub fn make_exactly_once(self) -> KeyedStream<K, V, L, B, O, ExactlyOnce>
411 where
412 R: IsExactlyOnce,
413 {
414 self.assume_retries(nondet!(/** no-op */))
415 }
416
417 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
418 /// implies that `B == Bounded`.
419 pub fn make_bounded(self) -> KeyedStream<K, V, L, Bounded, O, R>
420 where
421 B: IsBounded,
422 {
423 KeyedStream::new(self.location, self.ir_node.into_inner())
424 }
425
426 /// Flattens the keyed stream into an unordered stream of key-value pairs.
427 ///
428 /// # Example
429 /// ```rust
430 /// # #[cfg(feature = "deploy")] {
431 /// # use hydro_lang::prelude::*;
432 /// # use futures::StreamExt;
433 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
434 /// process
435 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
436 /// .into_keyed()
437 /// .entries()
438 /// # }, |mut stream| async move {
439 /// // (1, 2), (1, 3), (2, 4) in any order
440 /// # let mut results = Vec::new();
441 /// # for _ in 0..3 {
442 /// # results.push(stream.next().await.unwrap());
443 /// # }
444 /// # results.sort();
445 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
446 /// # }));
447 /// # }
448 /// ```
449 pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R> {
450 Stream::new(
451 self.location.clone(),
452 HydroNode::Cast {
453 inner: Box::new(self.ir_node.into_inner()),
454 metadata: self
455 .location
456 .new_node_metadata(Stream::<(K, V), L, B, NoOrder, R>::collection_kind()),
457 },
458 )
459 }
460
461 /// Flattens the keyed stream into an unordered stream of only the values.
462 ///
463 /// # Example
464 /// ```rust
465 /// # #[cfg(feature = "deploy")] {
466 /// # use hydro_lang::prelude::*;
467 /// # use futures::StreamExt;
468 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
469 /// process
470 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
471 /// .into_keyed()
472 /// .values()
473 /// # }, |mut stream| async move {
474 /// // 2, 3, 4 in any order
475 /// # let mut results = Vec::new();
476 /// # for _ in 0..3 {
477 /// # results.push(stream.next().await.unwrap());
478 /// # }
479 /// # results.sort();
480 /// # assert_eq!(results, vec![2, 3, 4]);
481 /// # }));
482 /// # }
483 /// ```
484 pub fn values(self) -> Stream<V, L, B, NoOrder, R> {
485 self.entries().map(q!(|(_, v)| v))
486 }
487
488 /// Flattens the keyed stream into an unordered stream of just the keys.
489 ///
490 /// # Example
491 /// ```rust
492 /// # #[cfg(feature = "deploy")] {
493 /// # use hydro_lang::prelude::*;
494 /// # use futures::StreamExt;
495 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
496 /// # process
497 /// # .source_iter(q!(vec![(1, 2), (2, 4), (1, 5)]))
498 /// # .into_keyed()
499 /// # .keys()
500 /// # }, |mut stream| async move {
501 /// // 1, 2 in any order
502 /// # let mut results = Vec::new();
503 /// # for _ in 0..2 {
504 /// # results.push(stream.next().await.unwrap());
505 /// # }
506 /// # results.sort();
507 /// # assert_eq!(results, vec![1, 2]);
508 /// # }));
509 /// # }
510 /// ```
511 pub fn keys(self) -> Stream<K, L, B, NoOrder, ExactlyOnce>
512 where
513 K: Eq + Hash,
514 {
515 self.entries().map(q!(|(k, _)| k)).unique()
516 }
517
518 /// Transforms each value by invoking `f` on each element, with keys staying the same
519 /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
520 ///
521 /// If you do not want to modify the stream and instead only want to view
522 /// each item use [`KeyedStream::inspect`] instead.
523 ///
524 /// # Example
525 /// ```rust
526 /// # #[cfg(feature = "deploy")] {
527 /// # use hydro_lang::prelude::*;
528 /// # use futures::StreamExt;
529 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
530 /// process
531 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
532 /// .into_keyed()
533 /// .map(q!(|v| v + 1))
534 /// # .entries()
535 /// # }, |mut stream| async move {
536 /// // { 1: [3, 4], 2: [5] }
537 /// # let mut results = Vec::new();
538 /// # for _ in 0..3 {
539 /// # results.push(stream.next().await.unwrap());
540 /// # }
541 /// # results.sort();
542 /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 5)]);
543 /// # }));
544 /// # }
545 /// ```
546 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, U, L, B, O, R>
547 where
548 F: Fn(V) -> U + 'a,
549 {
550 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
551 let map_f = q!({
552 let orig = f;
553 move |(k, v)| (k, orig(v))
554 })
555 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
556 .into();
557
558 KeyedStream::new(
559 self.location.clone(),
560 HydroNode::Map {
561 f: map_f,
562 input: Box::new(self.ir_node.into_inner()),
563 metadata: self
564 .location
565 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
566 },
567 )
568 }
569
570 /// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
571 /// re-grouped even they are tuples; instead they will be grouped under the original key.
572 ///
573 /// If you do not want to modify the stream and instead only want to view
574 /// each item use [`KeyedStream::inspect_with_key`] instead.
575 ///
576 /// # Example
577 /// ```rust
578 /// # #[cfg(feature = "deploy")] {
579 /// # use hydro_lang::prelude::*;
580 /// # use futures::StreamExt;
581 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
582 /// process
583 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
584 /// .into_keyed()
585 /// .map_with_key(q!(|(k, v)| k + v))
586 /// # .entries()
587 /// # }, |mut stream| async move {
588 /// // { 1: [3, 4], 2: [6] }
589 /// # let mut results = Vec::new();
590 /// # for _ in 0..3 {
591 /// # results.push(stream.next().await.unwrap());
592 /// # }
593 /// # results.sort();
594 /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 6)]);
595 /// # }));
596 /// # }
597 /// ```
598 pub fn map_with_key<U, F>(
599 self,
600 f: impl IntoQuotedMut<'a, F, L> + Copy,
601 ) -> KeyedStream<K, U, L, B, O, R>
602 where
603 F: Fn((K, V)) -> U + 'a,
604 K: Clone,
605 {
606 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
607 let map_f = q!({
608 let orig = f;
609 move |(k, v)| {
610 let out = orig((Clone::clone(&k), v));
611 (k, out)
612 }
613 })
614 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
615 .into();
616
617 KeyedStream::new(
618 self.location.clone(),
619 HydroNode::Map {
620 f: map_f,
621 input: Box::new(self.ir_node.into_inner()),
622 metadata: self
623 .location
624 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
625 },
626 )
627 }
628
629 /// Prepends a new value to the key of each element in the stream, producing a new
630 /// keyed stream with compound keys. Because the original key is preserved, no re-grouping
631 /// occurs and the elements in each group preserve their original order.
632 ///
633 /// # Example
634 /// ```rust
635 /// # #[cfg(feature = "deploy")] {
636 /// # use hydro_lang::prelude::*;
637 /// # use futures::StreamExt;
638 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
639 /// process
640 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
641 /// .into_keyed()
642 /// .prefix_key(q!(|&(k, _)| k % 2))
643 /// # .entries()
644 /// # }, |mut stream| async move {
645 /// // { (1, 1): [2, 3], (0, 2): [4] }
646 /// # let mut results = Vec::new();
647 /// # for _ in 0..3 {
648 /// # results.push(stream.next().await.unwrap());
649 /// # }
650 /// # results.sort();
651 /// # assert_eq!(results, vec![((0, 2), 4), ((1, 1), 2), ((1, 1), 3)]);
652 /// # }));
653 /// # }
654 /// ```
655 pub fn prefix_key<K2, F>(
656 self,
657 f: impl IntoQuotedMut<'a, F, L> + Copy,
658 ) -> KeyedStream<(K2, K), V, L, B, O, R>
659 where
660 F: Fn(&(K, V)) -> K2 + 'a,
661 {
662 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
663 let map_f = q!({
664 let orig = f;
665 move |kv| {
666 let out = orig(&kv);
667 ((out, kv.0), kv.1)
668 }
669 })
670 .splice_fn1_ctx::<(K, V), ((K2, K), V)>(&self.location)
671 .into();
672
673 KeyedStream::new(
674 self.location.clone(),
675 HydroNode::Map {
676 f: map_f,
677 input: Box::new(self.ir_node.into_inner()),
678 metadata: self
679 .location
680 .new_node_metadata(KeyedStream::<(K2, K), V, L, B, O, R>::collection_kind()),
681 },
682 )
683 }
684
685 /// Creates a stream containing only the elements of each group stream that satisfy a predicate
686 /// `f`, preserving the order of the elements within the group.
687 ///
688 /// The closure `f` receives a reference `&V` rather than an owned value `v` because filtering does
689 /// not modify or take ownership of the values. If you need to modify the values while filtering
690 /// use [`KeyedStream::filter_map`] instead.
691 ///
692 /// # Example
693 /// ```rust
694 /// # #[cfg(feature = "deploy")] {
695 /// # use hydro_lang::prelude::*;
696 /// # use futures::StreamExt;
697 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
698 /// process
699 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
700 /// .into_keyed()
701 /// .filter(q!(|&x| x > 2))
702 /// # .entries()
703 /// # }, |mut stream| async move {
704 /// // { 1: [3], 2: [4] }
705 /// # let mut results = Vec::new();
706 /// # for _ in 0..2 {
707 /// # results.push(stream.next().await.unwrap());
708 /// # }
709 /// # results.sort();
710 /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
711 /// # }));
712 /// # }
713 /// ```
714 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
715 where
716 F: Fn(&V) -> bool + 'a,
717 {
718 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
719 let filter_f = q!({
720 let orig = f;
721 move |t: &(_, _)| orig(&t.1)
722 })
723 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
724 .into();
725
726 KeyedStream::new(
727 self.location.clone(),
728 HydroNode::Filter {
729 f: filter_f,
730 input: Box::new(self.ir_node.into_inner()),
731 metadata: self.location.new_node_metadata(Self::collection_kind()),
732 },
733 )
734 }
735
736 /// Creates a stream containing only the elements of each group stream that satisfy a predicate
737 /// `f` (which receives the key-value tuple), preserving the order of the elements within the group.
738 ///
739 /// The closure `f` receives a reference `&(K, V)` rather than an owned value `(K, V)` because filtering does
740 /// not modify or take ownership of the values. If you need to modify the values while filtering
741 /// use [`KeyedStream::filter_map_with_key`] instead.
742 ///
743 /// # Example
744 /// ```rust
745 /// # #[cfg(feature = "deploy")] {
746 /// # use hydro_lang::prelude::*;
747 /// # use futures::StreamExt;
748 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
749 /// process
750 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
751 /// .into_keyed()
752 /// .filter_with_key(q!(|&(k, v)| v - k == 2))
753 /// # .entries()
754 /// # }, |mut stream| async move {
755 /// // { 1: [3], 2: [4] }
756 /// # let mut results = Vec::new();
757 /// # for _ in 0..2 {
758 /// # results.push(stream.next().await.unwrap());
759 /// # }
760 /// # results.sort();
761 /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
762 /// # }));
763 /// # }
764 /// ```
765 pub fn filter_with_key<F>(
766 self,
767 f: impl IntoQuotedMut<'a, F, L> + Copy,
768 ) -> KeyedStream<K, V, L, B, O, R>
769 where
770 F: Fn(&(K, V)) -> bool + 'a,
771 {
772 let filter_f = f
773 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
774 .into();
775
776 KeyedStream::new(
777 self.location.clone(),
778 HydroNode::Filter {
779 f: filter_f,
780 input: Box::new(self.ir_node.into_inner()),
781 metadata: self.location.new_node_metadata(Self::collection_kind()),
782 },
783 )
784 }
785
786 /// An operator that both filters and maps each value, with keys staying the same.
787 /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
788 /// If you need access to the key, see [`KeyedStream::filter_map_with_key`].
789 ///
790 /// # Example
791 /// ```rust
792 /// # #[cfg(feature = "deploy")] {
793 /// # use hydro_lang::prelude::*;
794 /// # use futures::StreamExt;
795 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
796 /// process
797 /// .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
798 /// .into_keyed()
799 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
800 /// # .entries()
801 /// # }, |mut stream| async move {
802 /// // { 1: [2], 2: [4] }
803 /// # let mut results = Vec::new();
804 /// # for _ in 0..2 {
805 /// # results.push(stream.next().await.unwrap());
806 /// # }
807 /// # results.sort();
808 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
809 /// # }));
810 /// # }
811 /// ```
812 pub fn filter_map<U, F>(
813 self,
814 f: impl IntoQuotedMut<'a, F, L> + Copy,
815 ) -> KeyedStream<K, U, L, B, O, R>
816 where
817 F: Fn(V) -> Option<U> + 'a,
818 {
819 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
820 let filter_map_f = q!({
821 let orig = f;
822 move |(k, v)| orig(v).map(|o| (k, o))
823 })
824 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
825 .into();
826
827 KeyedStream::new(
828 self.location.clone(),
829 HydroNode::FilterMap {
830 f: filter_map_f,
831 input: Box::new(self.ir_node.into_inner()),
832 metadata: self
833 .location
834 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
835 },
836 )
837 }
838
839 /// An operator that both filters and maps each key-value pair. The resulting values are **not**
840 /// re-grouped even they are tuples; instead they will be grouped under the original key.
841 /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
842 ///
843 /// # Example
844 /// ```rust
845 /// # #[cfg(feature = "deploy")] {
846 /// # use hydro_lang::prelude::*;
847 /// # use futures::StreamExt;
848 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
849 /// process
850 /// .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
851 /// .into_keyed()
852 /// .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
853 /// # .entries()
854 /// # }, |mut stream| async move {
855 /// // { 2: [2] }
856 /// # let mut results = Vec::new();
857 /// # for _ in 0..1 {
858 /// # results.push(stream.next().await.unwrap());
859 /// # }
860 /// # results.sort();
861 /// # assert_eq!(results, vec![(2, 2)]);
862 /// # }));
863 /// # }
864 /// ```
865 pub fn filter_map_with_key<U, F>(
866 self,
867 f: impl IntoQuotedMut<'a, F, L> + Copy,
868 ) -> KeyedStream<K, U, L, B, O, R>
869 where
870 F: Fn((K, V)) -> Option<U> + 'a,
871 K: Clone,
872 {
873 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
874 let filter_map_f = q!({
875 let orig = f;
876 move |(k, v)| {
877 let out = orig((Clone::clone(&k), v));
878 out.map(|o| (k, o))
879 }
880 })
881 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
882 .into();
883
884 KeyedStream::new(
885 self.location.clone(),
886 HydroNode::FilterMap {
887 f: filter_map_f,
888 input: Box::new(self.ir_node.into_inner()),
889 metadata: self
890 .location
891 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
892 },
893 )
894 }
895
896 /// Generates a keyed stream that maps each value `v` to a tuple `(v, x)`,
897 /// where `v` is the value of `other`, a bounded [`super::singleton::Singleton`] or
898 /// [`Optional`]. If `other` is an empty [`Optional`], no values will be produced.
899 ///
900 /// # Example
901 /// ```rust
902 /// # #[cfg(feature = "deploy")] {
903 /// # use hydro_lang::prelude::*;
904 /// # use futures::StreamExt;
905 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
906 /// let tick = process.tick();
907 /// let batch = process
908 /// .source_iter(q!(vec![(1, 123), (1, 456), (2, 123)]))
909 /// .into_keyed()
910 /// .batch(&tick, nondet!(/** test */));
911 /// let count = batch.clone().entries().count(); // `count()` returns a singleton
912 /// batch.cross_singleton(count).all_ticks().entries()
913 /// # }, |mut stream| async move {
914 /// // { 1: [(123, 3), (456, 3)], 2: [(123, 3)] }
915 /// # let mut results = Vec::new();
916 /// # for _ in 0..3 {
917 /// # results.push(stream.next().await.unwrap());
918 /// # }
919 /// # results.sort();
920 /// # assert_eq!(results, vec![(1, (123, 3)), (1, (456, 3)), (2, (123, 3))]);
921 /// # }));
922 /// # }
923 /// ```
924 pub fn cross_singleton<O2>(
925 self,
926 other: impl Into<Optional<O2, L, Bounded>>,
927 ) -> KeyedStream<K, (V, O2), L, B, O, R>
928 where
929 O2: Clone,
930 {
931 let other: Optional<O2, L, Bounded> = other.into();
932 check_matching_location(&self.location, &other.location);
933
934 Stream::new(
935 self.location.clone(),
936 HydroNode::CrossSingleton {
937 left: Box::new(self.ir_node.into_inner()),
938 right: Box::new(other.ir_node.into_inner()),
939 metadata: self
940 .location
941 .new_node_metadata(Stream::<((K, V), O2), L, B, O, R>::collection_kind()),
942 },
943 )
944 .map(q!(|((k, v), o2)| (k, (v, o2))))
945 .into_keyed()
946 }
947
948 /// For each value `v` in each group, transform `v` using `f` and then treat the
949 /// result as an [`Iterator`] to produce values one by one within the same group.
950 /// The implementation for [`Iterator`] for the output type `I` must produce items
951 /// in a **deterministic** order.
952 ///
953 /// For example, `I` could be a `Vec`, but not a `HashSet`. If the order of the items in `I` is
954 /// not deterministic, use [`KeyedStream::flat_map_unordered`] instead.
955 ///
956 /// # Example
957 /// ```rust
958 /// # #[cfg(feature = "deploy")] {
959 /// # use hydro_lang::prelude::*;
960 /// # use futures::StreamExt;
961 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
962 /// process
963 /// .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
964 /// .into_keyed()
965 /// .flat_map_ordered(q!(|x| x))
966 /// # .entries()
967 /// # }, |mut stream| async move {
968 /// // { 1: [2, 3, 4], 2: [5, 6] }
969 /// # let mut results = Vec::new();
970 /// # for _ in 0..5 {
971 /// # results.push(stream.next().await.unwrap());
972 /// # }
973 /// # results.sort();
974 /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
975 /// # }));
976 /// # }
977 /// ```
978 pub fn flat_map_ordered<U, I, F>(
979 self,
980 f: impl IntoQuotedMut<'a, F, L> + Copy,
981 ) -> KeyedStream<K, U, L, B, O, R>
982 where
983 I: IntoIterator<Item = U>,
984 F: Fn(V) -> I + 'a,
985 K: Clone,
986 {
987 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
988 let flat_map_f = q!({
989 let orig = f;
990 move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
991 })
992 .splice_fn1_ctx::<(K, V), _>(&self.location)
993 .into();
994
995 KeyedStream::new(
996 self.location.clone(),
997 HydroNode::FlatMap {
998 f: flat_map_f,
999 input: Box::new(self.ir_node.into_inner()),
1000 metadata: self
1001 .location
1002 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
1003 },
1004 )
1005 }
1006
1007 /// Like [`KeyedStream::flat_map_ordered`], but allows the implementation of [`Iterator`]
1008 /// for the output type `I` to produce items in any order.
1009 ///
1010 /// # Example
1011 /// ```rust
1012 /// # #[cfg(feature = "deploy")] {
1013 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
1014 /// # use futures::StreamExt;
1015 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
1016 /// process
1017 /// .source_iter(q!(vec![
1018 /// (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
1019 /// (2, std::collections::HashSet::from_iter(vec![4, 5]))
1020 /// ]))
1021 /// .into_keyed()
1022 /// .flat_map_unordered(q!(|x| x))
1023 /// # .entries()
1024 /// # }, |mut stream| async move {
1025 /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
1026 /// # let mut results = Vec::new();
1027 /// # for _ in 0..4 {
1028 /// # results.push(stream.next().await.unwrap());
1029 /// # }
1030 /// # results.sort();
1031 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
1032 /// # }));
1033 /// # }
1034 /// ```
1035 pub fn flat_map_unordered<U, I, F>(
1036 self,
1037 f: impl IntoQuotedMut<'a, F, L> + Copy,
1038 ) -> KeyedStream<K, U, L, B, NoOrder, R>
1039 where
1040 I: IntoIterator<Item = U>,
1041 F: Fn(V) -> I + 'a,
1042 K: Clone,
1043 {
1044 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1045 let flat_map_f = q!({
1046 let orig = f;
1047 move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
1048 })
1049 .splice_fn1_ctx::<(K, V), _>(&self.location)
1050 .into();
1051
1052 KeyedStream::new(
1053 self.location.clone(),
1054 HydroNode::FlatMap {
1055 f: flat_map_f,
1056 input: Box::new(self.ir_node.into_inner()),
1057 metadata: self
1058 .location
1059 .new_node_metadata(KeyedStream::<K, U, L, B, NoOrder, R>::collection_kind()),
1060 },
1061 )
1062 }
1063
1064 /// For each value `v` in each group, treat `v` as an [`Iterator`] and produce its items one by one
1065 /// within the same group. The implementation for [`Iterator`] for the value type `V` must produce
1066 /// items in a **deterministic** order.
1067 ///
1068 /// For example, `V` could be a `Vec`, but not a `HashSet`. If the order of the items in `V` is
1069 /// not deterministic, use [`KeyedStream::flatten_unordered`] instead.
1070 ///
1071 /// # Example
1072 /// ```rust
1073 /// # #[cfg(feature = "deploy")] {
1074 /// # use hydro_lang::prelude::*;
1075 /// # use futures::StreamExt;
1076 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1077 /// process
1078 /// .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
1079 /// .into_keyed()
1080 /// .flatten_ordered()
1081 /// # .entries()
1082 /// # }, |mut stream| async move {
1083 /// // { 1: [2, 3, 4], 2: [5, 6] }
1084 /// # let mut results = Vec::new();
1085 /// # for _ in 0..5 {
1086 /// # results.push(stream.next().await.unwrap());
1087 /// # }
1088 /// # results.sort();
1089 /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
1090 /// # }));
1091 /// # }
1092 /// ```
1093 pub fn flatten_ordered<U>(self) -> KeyedStream<K, U, L, B, O, R>
1094 where
1095 V: IntoIterator<Item = U>,
1096 K: Clone,
1097 {
1098 self.flat_map_ordered(q!(|d| d))
1099 }
1100
1101 /// Like [`KeyedStream::flatten_ordered`], but allows the implementation of [`Iterator`]
1102 /// for the value type `V` to produce items in any order.
1103 ///
1104 /// # Example
1105 /// ```rust
1106 /// # #[cfg(feature = "deploy")] {
1107 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
1108 /// # use futures::StreamExt;
1109 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
1110 /// process
1111 /// .source_iter(q!(vec![
1112 /// (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
1113 /// (2, std::collections::HashSet::from_iter(vec![4, 5]))
1114 /// ]))
1115 /// .into_keyed()
1116 /// .flatten_unordered()
1117 /// # .entries()
1118 /// # }, |mut stream| async move {
1119 /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
1120 /// # let mut results = Vec::new();
1121 /// # for _ in 0..4 {
1122 /// # results.push(stream.next().await.unwrap());
1123 /// # }
1124 /// # results.sort();
1125 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
1126 /// # }));
1127 /// # }
1128 /// ```
1129 pub fn flatten_unordered<U>(self) -> KeyedStream<K, U, L, B, NoOrder, R>
1130 where
1131 V: IntoIterator<Item = U>,
1132 K: Clone,
1133 {
1134 self.flat_map_unordered(q!(|d| d))
1135 }
1136
1137 /// An operator which allows you to "inspect" each element of a stream without
1138 /// modifying it. The closure `f` is called on a reference to each value. This is
1139 /// mainly useful for debugging, and should not be used to generate side-effects.
1140 ///
1141 /// # Example
1142 /// ```rust
1143 /// # #[cfg(feature = "deploy")] {
1144 /// # use hydro_lang::prelude::*;
1145 /// # use futures::StreamExt;
1146 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1147 /// process
1148 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1149 /// .into_keyed()
1150 /// .inspect(q!(|v| println!("{}", v)))
1151 /// # .entries()
1152 /// # }, |mut stream| async move {
1153 /// # let mut results = Vec::new();
1154 /// # for _ in 0..3 {
1155 /// # results.push(stream.next().await.unwrap());
1156 /// # }
1157 /// # results.sort();
1158 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1159 /// # }));
1160 /// # }
1161 /// ```
1162 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1163 where
1164 F: Fn(&V) + 'a,
1165 {
1166 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1167 let inspect_f = q!({
1168 let orig = f;
1169 move |t: &(_, _)| orig(&t.1)
1170 })
1171 .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1172 .into();
1173
1174 KeyedStream::new(
1175 self.location.clone(),
1176 HydroNode::Inspect {
1177 f: inspect_f,
1178 input: Box::new(self.ir_node.into_inner()),
1179 metadata: self.location.new_node_metadata(Self::collection_kind()),
1180 },
1181 )
1182 }
1183
1184 /// An operator which allows you to "inspect" each element of a stream without
1185 /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1186 /// mainly useful for debugging, and should not be used to generate side-effects.
1187 ///
1188 /// # Example
1189 /// ```rust
1190 /// # #[cfg(feature = "deploy")] {
1191 /// # use hydro_lang::prelude::*;
1192 /// # use futures::StreamExt;
1193 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1194 /// process
1195 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1196 /// .into_keyed()
1197 /// .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1198 /// # .entries()
1199 /// # }, |mut stream| async move {
1200 /// # let mut results = Vec::new();
1201 /// # for _ in 0..3 {
1202 /// # results.push(stream.next().await.unwrap());
1203 /// # }
1204 /// # results.sort();
1205 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1206 /// # }));
1207 /// # }
1208 /// ```
1209 pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1210 where
1211 F: Fn(&(K, V)) + 'a,
1212 {
1213 let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1214
1215 KeyedStream::new(
1216 self.location.clone(),
1217 HydroNode::Inspect {
1218 f: inspect_f,
1219 input: Box::new(self.ir_node.into_inner()),
1220 metadata: self.location.new_node_metadata(Self::collection_kind()),
1221 },
1222 )
1223 }
1224
1225 /// An operator which allows you to "name" a `HydroNode`.
1226 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1227 pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R> {
1228 {
1229 let mut node = self.ir_node.borrow_mut();
1230 let metadata = node.metadata_mut();
1231 metadata.tag = Some(name.to_owned());
1232 }
1233 self
1234 }
1235
1236 /// A special case of [`Stream::scan`] for keyed streams. For each key group the values are transformed via the `f` combinator.
1237 ///
1238 /// Unlike [`KeyedStream::fold`] which only returns the final accumulated value, `scan` produces a new stream
1239 /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
1240 /// early by returning `None`.
1241 ///
1242 /// The function takes a mutable reference to the accumulator and the current element, and returns
1243 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1244 /// If the function returns `None`, the stream is terminated and no more elements are processed.
1245 ///
1246 /// # Example
1247 /// ```rust
1248 /// # #[cfg(feature = "deploy")] {
1249 /// # use hydro_lang::prelude::*;
1250 /// # use futures::StreamExt;
1251 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1252 /// process
1253 /// .source_iter(q!(vec![(0, 1), (0, 3), (1, 3), (1, 4)]))
1254 /// .into_keyed()
1255 /// .scan(
1256 /// q!(|| 0),
1257 /// q!(|acc, x| {
1258 /// *acc += x;
1259 /// if *acc % 2 == 0 { None } else { Some(*acc) }
1260 /// }),
1261 /// )
1262 /// # .entries()
1263 /// # }, |mut stream| async move {
1264 /// // Output: { 0: [1], 1: [3, 7] }
1265 /// # let mut results = Vec::new();
1266 /// # for _ in 0..3 {
1267 /// # results.push(stream.next().await.unwrap());
1268 /// # }
1269 /// # results.sort();
1270 /// # assert_eq!(results, vec![(0, 1), (1, 3), (1, 7)]);
1271 /// # }));
1272 /// # }
1273 /// ```
1274 pub fn scan<A, U, I, F>(
1275 self,
1276 init: impl IntoQuotedMut<'a, I, L> + Copy,
1277 f: impl IntoQuotedMut<'a, F, L> + Copy,
1278 ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1279 where
1280 O: IsOrdered,
1281 R: IsExactlyOnce,
1282 K: Clone + Eq + Hash,
1283 I: Fn() -> A + 'a,
1284 F: Fn(&mut A, V) -> Option<U> + 'a,
1285 {
1286 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1287 self.make_totally_ordered().make_exactly_once().generator(
1288 init,
1289 q!({
1290 let orig = f;
1291 move |state, v| {
1292 if let Some(out) = orig(state, v) {
1293 Generate::Yield(out)
1294 } else {
1295 Generate::Break
1296 }
1297 }
1298 }),
1299 )
1300 }
1301
1302 /// Iteratively processes the elements in each group using a state machine that can yield
1303 /// elements as it processes its inputs. This is designed to mirror the unstable generator
1304 /// syntax in Rust, without requiring special syntax.
1305 ///
1306 /// Like [`KeyedStream::scan`], this function takes in an initializer that emits the initial
1307 /// state for each group. The second argument defines the processing logic, taking in a
1308 /// mutable reference to the group's state and the value to be processed. It emits a
1309 /// [`Generate`] value, whose variants define what is emitted and whether further inputs
1310 /// should be processed.
1311 ///
1312 /// # Example
1313 /// ```rust
1314 /// # #[cfg(feature = "deploy")] {
1315 /// # use hydro_lang::prelude::*;
1316 /// # use futures::StreamExt;
1317 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1318 /// process
1319 /// .source_iter(q!(vec![(0, 1), (0, 3), (0, 100), (0, 10), (1, 3), (1, 4), (1, 3)]))
1320 /// .into_keyed()
1321 /// .generator(
1322 /// q!(|| 0),
1323 /// q!(|acc, x| {
1324 /// *acc += x;
1325 /// if *acc > 100 {
1326 /// hydro_lang::live_collections::keyed_stream::Generate::Return(
1327 /// "done!".to_owned()
1328 /// )
1329 /// } else if *acc % 2 == 0 {
1330 /// hydro_lang::live_collections::keyed_stream::Generate::Yield(
1331 /// "even".to_owned()
1332 /// )
1333 /// } else {
1334 /// hydro_lang::live_collections::keyed_stream::Generate::Continue
1335 /// }
1336 /// }),
1337 /// )
1338 /// # .entries()
1339 /// # }, |mut stream| async move {
1340 /// // Output: { 0: ["even", "done!"], 1: ["even"] }
1341 /// # let mut results = Vec::new();
1342 /// # for _ in 0..3 {
1343 /// # results.push(stream.next().await.unwrap());
1344 /// # }
1345 /// # results.sort();
1346 /// # assert_eq!(results, vec![(0, "done!".to_owned()), (0, "even".to_owned()), (1, "even".to_owned())]);
1347 /// # }));
1348 /// # }
1349 /// ```
1350 pub fn generator<A, U, I, F>(
1351 self,
1352 init: impl IntoQuotedMut<'a, I, L> + Copy,
1353 f: impl IntoQuotedMut<'a, F, L> + Copy,
1354 ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1355 where
1356 O: IsOrdered,
1357 R: IsExactlyOnce,
1358 K: Clone + Eq + Hash,
1359 I: Fn() -> A + 'a,
1360 F: Fn(&mut A, V) -> Generate<U> + 'a,
1361 {
1362 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1363 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1364
1365 let this = self.make_totally_ordered().make_exactly_once();
1366
1367 let scan_init = q!(|| HashMap::new())
1368 .splice_fn0_ctx::<HashMap<K, Option<A>>>(&this.location)
1369 .into();
1370 let scan_f = q!(move |acc: &mut HashMap<_, _>, (k, v)| {
1371 let existing_state = acc.entry(Clone::clone(&k)).or_insert_with(|| Some(init()));
1372 if let Some(existing_state_value) = existing_state {
1373 match f(existing_state_value, v) {
1374 Generate::Yield(out) => Some(Some((k, out))),
1375 Generate::Return(out) => {
1376 let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1377 Some(Some((k, out)))
1378 }
1379 Generate::Break => {
1380 let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1381 Some(None)
1382 }
1383 Generate::Continue => Some(None),
1384 }
1385 } else {
1386 Some(None)
1387 }
1388 })
1389 .splice_fn2_borrow_mut_ctx::<HashMap<K, Option<A>>, (K, V), _>(&this.location)
1390 .into();
1391
1392 let scan_node = HydroNode::Scan {
1393 init: scan_init,
1394 acc: scan_f,
1395 input: Box::new(this.ir_node.into_inner()),
1396 metadata: this.location.new_node_metadata(Stream::<
1397 Option<(K, U)>,
1398 L,
1399 B,
1400 TotalOrder,
1401 ExactlyOnce,
1402 >::collection_kind()),
1403 };
1404
1405 let flatten_f = q!(|d| d)
1406 .splice_fn1_ctx::<Option<(K, U)>, _>(&this.location)
1407 .into();
1408 let flatten_node = HydroNode::FlatMap {
1409 f: flatten_f,
1410 input: Box::new(scan_node),
1411 metadata: this.location.new_node_metadata(KeyedStream::<
1412 K,
1413 U,
1414 L,
1415 B,
1416 TotalOrder,
1417 ExactlyOnce,
1418 >::collection_kind()),
1419 };
1420
1421 KeyedStream::new(this.location, flatten_node)
1422 }
1423
1424 /// A variant of [`Stream::fold`], intended for keyed streams. The aggregation is executed
1425 /// in-order across the values in each group. But the aggregation function returns a boolean,
1426 /// which when true indicates that the aggregated result is complete and can be released to
1427 /// downstream computation. Unlike [`KeyedStream::fold`], this means that even if the input
1428 /// stream is [`super::boundedness::Unbounded`], the outputs of the fold can be processed like
1429 /// normal stream elements.
1430 ///
1431 /// # Example
1432 /// ```rust
1433 /// # #[cfg(feature = "deploy")] {
1434 /// # use hydro_lang::prelude::*;
1435 /// # use futures::StreamExt;
1436 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1437 /// process
1438 /// .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1439 /// .into_keyed()
1440 /// .fold_early_stop(
1441 /// q!(|| 0),
1442 /// q!(|acc, x| {
1443 /// *acc += x;
1444 /// x % 2 == 0
1445 /// }),
1446 /// )
1447 /// # .entries()
1448 /// # }, |mut stream| async move {
1449 /// // Output: { 0: 2, 1: 9 }
1450 /// # let mut results = Vec::new();
1451 /// # for _ in 0..2 {
1452 /// # results.push(stream.next().await.unwrap());
1453 /// # }
1454 /// # results.sort();
1455 /// # assert_eq!(results, vec![(0, 2), (1, 9)]);
1456 /// # }));
1457 /// # }
1458 /// ```
1459 pub fn fold_early_stop<A, I, F>(
1460 self,
1461 init: impl IntoQuotedMut<'a, I, L> + Copy,
1462 f: impl IntoQuotedMut<'a, F, L> + Copy,
1463 ) -> KeyedSingleton<K, A, L, B::WhenValueBounded>
1464 where
1465 O: IsOrdered,
1466 R: IsExactlyOnce,
1467 K: Clone + Eq + Hash,
1468 I: Fn() -> A + 'a,
1469 F: Fn(&mut A, V) -> bool + 'a,
1470 {
1471 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1472 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1473 let out_without_bound_cast = self.generator(
1474 q!(move || Some(init())),
1475 q!(move |key_state, v| {
1476 if let Some(key_state_value) = key_state.as_mut() {
1477 if f(key_state_value, v) {
1478 Generate::Return(key_state.take().unwrap())
1479 } else {
1480 Generate::Continue
1481 }
1482 } else {
1483 unreachable!()
1484 }
1485 }),
1486 );
1487
1488 KeyedSingleton::new(
1489 out_without_bound_cast.location.clone(),
1490 HydroNode::Cast {
1491 inner: Box::new(out_without_bound_cast.ir_node.into_inner()),
1492 metadata: out_without_bound_cast
1493 .location
1494 .new_node_metadata(
1495 KeyedSingleton::<K, A, L, B::WhenValueBounded>::collection_kind(),
1496 ),
1497 },
1498 )
1499 }
1500
1501 /// Gets the first element inside each group of values as a [`KeyedSingleton`] that preserves
1502 /// the original group keys. Requires the input stream to have [`TotalOrder`] guarantees,
1503 /// otherwise the first element would be non-deterministic.
1504 ///
1505 /// # Example
1506 /// ```rust
1507 /// # #[cfg(feature = "deploy")] {
1508 /// # use hydro_lang::prelude::*;
1509 /// # use futures::StreamExt;
1510 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1511 /// process
1512 /// .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1513 /// .into_keyed()
1514 /// .first()
1515 /// # .entries()
1516 /// # }, |mut stream| async move {
1517 /// // Output: { 0: 2, 1: 3 }
1518 /// # let mut results = Vec::new();
1519 /// # for _ in 0..2 {
1520 /// # results.push(stream.next().await.unwrap());
1521 /// # }
1522 /// # results.sort();
1523 /// # assert_eq!(results, vec![(0, 2), (1, 3)]);
1524 /// # }));
1525 /// # }
1526 /// ```
1527 pub fn first(self) -> KeyedSingleton<K, V, L, B::WhenValueBounded>
1528 where
1529 O: IsOrdered,
1530 R: IsExactlyOnce,
1531 K: Clone + Eq + Hash,
1532 {
1533 self.fold_early_stop(
1534 q!(|| None),
1535 q!(|acc, v| {
1536 *acc = Some(v);
1537 true
1538 }),
1539 )
1540 .map(q!(|v| v.unwrap()))
1541 }
1542
1543 /// Assigns a zero-based index to each value within each key group, emitting
1544 /// `(K, (index, V))` tuples with per-key sequential indices.
1545 ///
1546 /// The output keyed stream has [`TotalOrder`] and [`ExactlyOnce`] guarantees.
1547 /// This is a streaming operator that processes elements as they arrive.
1548 ///
1549 /// # Example
1550 /// ```rust
1551 /// # #[cfg(feature = "deploy")] {
1552 /// # use hydro_lang::prelude::*;
1553 /// # use futures::StreamExt;
1554 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1555 /// process
1556 /// .source_iter(q!(vec![(1, 10), (2, 20), (1, 30)]))
1557 /// .into_keyed()
1558 /// .enumerate()
1559 /// # .entries()
1560 /// # }, |mut stream| async move {
1561 /// // per-key indices: { 1: [(0, 10), (1, 30)], 2: [(0, 20)] }
1562 /// # let mut results = Vec::new();
1563 /// # for _ in 0..3 {
1564 /// # results.push(stream.next().await.unwrap());
1565 /// # }
1566 /// # let key1: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
1567 /// # let key2: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
1568 /// # assert_eq!(key1, vec![(0, 10), (1, 30)]);
1569 /// # assert_eq!(key2, vec![(0, 20)]);
1570 /// # }));
1571 /// # }
1572 /// ```
1573 pub fn enumerate(self) -> KeyedStream<K, (usize, V), L, B, TotalOrder, ExactlyOnce>
1574 where
1575 O: IsOrdered,
1576 R: IsExactlyOnce,
1577 K: Eq + Hash + Clone,
1578 {
1579 self.scan(
1580 q!(|| 0),
1581 q!(|acc, next| {
1582 let curr = *acc;
1583 *acc += 1;
1584 Some((curr, next))
1585 }),
1586 )
1587 }
1588
1589 /// Counts the number of elements in each group, producing a [`KeyedSingleton`] with the counts.
1590 ///
1591 /// # Example
1592 /// ```rust
1593 /// # #[cfg(feature = "deploy")] {
1594 /// # use hydro_lang::prelude::*;
1595 /// # use futures::StreamExt;
1596 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1597 /// let tick = process.tick();
1598 /// let numbers = process
1599 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4), (1, 5)]))
1600 /// .into_keyed();
1601 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1602 /// batch
1603 /// .value_counts()
1604 /// .entries()
1605 /// .all_ticks()
1606 /// # }, |mut stream| async move {
1607 /// // (1, 3), (2, 2)
1608 /// # let mut results = Vec::new();
1609 /// # for _ in 0..2 {
1610 /// # results.push(stream.next().await.unwrap());
1611 /// # }
1612 /// # results.sort();
1613 /// # assert_eq!(results, vec![(1, 3), (2, 2)]);
1614 /// # }));
1615 /// # }
1616 /// ```
1617 pub fn value_counts(self) -> KeyedSingleton<K, usize, L, B::WhenValueUnbounded>
1618 where
1619 R: IsExactlyOnce,
1620 K: Eq + Hash,
1621 {
1622 self.make_exactly_once()
1623 .assume_ordering_trusted(
1624 nondet!(/** ordering within each group affects neither result nor intermediates */),
1625 )
1626 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1627 }
1628
1629 /// Like [`Stream::fold`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
1630 /// group via the `comb` closure.
1631 ///
1632 /// Depending on the input stream guarantees, the closure may need to be commutative
1633 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1634 ///
1635 /// If the input and output value types are the same and do not require initialization then use
1636 /// [`KeyedStream::reduce`].
1637 ///
1638 /// # Example
1639 /// ```rust
1640 /// # #[cfg(feature = "deploy")] {
1641 /// # use hydro_lang::prelude::*;
1642 /// # use futures::StreamExt;
1643 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1644 /// let tick = process.tick();
1645 /// let numbers = process
1646 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1647 /// .into_keyed();
1648 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1649 /// batch
1650 /// .fold(q!(|| false), q!(|acc, x| *acc |= x))
1651 /// .entries()
1652 /// .all_ticks()
1653 /// # }, |mut stream| async move {
1654 /// // (1, false), (2, true)
1655 /// # let mut results = Vec::new();
1656 /// # for _ in 0..2 {
1657 /// # results.push(stream.next().await.unwrap());
1658 /// # }
1659 /// # results.sort();
1660 /// # assert_eq!(results, vec![(1, false), (2, true)]);
1661 /// # }));
1662 /// # }
1663 /// ```
1664 pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V), C, Idemp>(
1665 self,
1666 init: impl IntoQuotedMut<'a, I, L>,
1667 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1668 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>
1669 where
1670 K: Eq + Hash,
1671 C: ValidCommutativityFor<O>,
1672 Idemp: ValidIdempotenceFor<R>,
1673 {
1674 let init = init.splice_fn0_ctx(&self.location).into();
1675 let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1676 proof.register_proof(&comb);
1677
1678 let ordered = self
1679 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1680 .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1681
1682 KeyedSingleton::new(
1683 ordered.location.clone(),
1684 HydroNode::FoldKeyed {
1685 init,
1686 acc: comb.into(),
1687 input: Box::new(ordered.ir_node.into_inner()),
1688 metadata: ordered.location.new_node_metadata(KeyedSingleton::<
1689 K,
1690 A,
1691 L,
1692 B::WhenValueUnbounded,
1693 >::collection_kind()),
1694 },
1695 )
1696 }
1697
1698 /// Like [`Stream::reduce`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
1699 /// group via the `comb` closure.
1700 ///
1701 /// Depending on the input stream guarantees, the closure may need to be commutative
1702 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1703 ///
1704 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold`].
1705 ///
1706 /// # Example
1707 /// ```rust
1708 /// # #[cfg(feature = "deploy")] {
1709 /// # use hydro_lang::prelude::*;
1710 /// # use futures::StreamExt;
1711 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1712 /// let tick = process.tick();
1713 /// let numbers = process
1714 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1715 /// .into_keyed();
1716 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1717 /// batch
1718 /// .reduce(q!(|acc, x| *acc |= x))
1719 /// .entries()
1720 /// .all_ticks()
1721 /// # }, |mut stream| async move {
1722 /// // (1, false), (2, true)
1723 /// # let mut results = Vec::new();
1724 /// # for _ in 0..2 {
1725 /// # results.push(stream.next().await.unwrap());
1726 /// # }
1727 /// # results.sort();
1728 /// # assert_eq!(results, vec![(1, false), (2, true)]);
1729 /// # }));
1730 /// # }
1731 /// ```
1732 pub fn reduce<F: Fn(&mut V, V) + 'a, C, Idemp>(
1733 self,
1734 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1735 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1736 where
1737 K: Eq + Hash,
1738 C: ValidCommutativityFor<O>,
1739 Idemp: ValidIdempotenceFor<R>,
1740 {
1741 let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1742 proof.register_proof(&f);
1743
1744 let ordered = self
1745 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1746 .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1747
1748 KeyedSingleton::new(
1749 ordered.location.clone(),
1750 HydroNode::ReduceKeyed {
1751 f: f.into(),
1752 input: Box::new(ordered.ir_node.into_inner()),
1753 metadata: ordered.location.new_node_metadata(KeyedSingleton::<
1754 K,
1755 V,
1756 L,
1757 B::WhenValueUnbounded,
1758 >::collection_kind()),
1759 },
1760 )
1761 }
1762
1763 /// A special case of [`KeyedStream::reduce`] where tuples with keys less than the watermark
1764 /// are automatically deleted.
1765 ///
1766 /// Depending on the input stream guarantees, the closure may need to be commutative
1767 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1768 ///
1769 /// # Example
1770 /// ```rust
1771 /// # #[cfg(feature = "deploy")] {
1772 /// # use hydro_lang::prelude::*;
1773 /// # use futures::StreamExt;
1774 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1775 /// let tick = process.tick();
1776 /// let watermark = tick.singleton(q!(2));
1777 /// let numbers = process
1778 /// .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1779 /// .into_keyed();
1780 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1781 /// batch
1782 /// .reduce_watermark(watermark, q!(|acc, x| *acc |= x))
1783 /// .entries()
1784 /// .all_ticks()
1785 /// # }, |mut stream| async move {
1786 /// // (2, true)
1787 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1788 /// # }));
1789 /// # }
1790 /// ```
1791 pub fn reduce_watermark<O2, F, C, Idemp>(
1792 self,
1793 other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1794 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1795 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1796 where
1797 K: Eq + Hash,
1798 O2: Clone,
1799 F: Fn(&mut V, V) + 'a,
1800 C: ValidCommutativityFor<O>,
1801 Idemp: ValidIdempotenceFor<R>,
1802 {
1803 let other: Optional<O2, Tick<L::Root>, Bounded> = other.into();
1804 check_matching_location(&self.location.root(), other.location.outer());
1805 let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1806 proof.register_proof(&f);
1807
1808 let ordered = self
1809 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1810 .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1811
1812 KeyedSingleton::new(
1813 ordered.location.clone(),
1814 HydroNode::ReduceKeyedWatermark {
1815 f: f.into(),
1816 input: Box::new(ordered.ir_node.into_inner()),
1817 watermark: Box::new(other.ir_node.into_inner()),
1818 metadata: ordered.location.new_node_metadata(KeyedSingleton::<
1819 K,
1820 V,
1821 L,
1822 B::WhenValueUnbounded,
1823 >::collection_kind()),
1824 },
1825 )
1826 }
1827
1828 /// Given a bounded stream of keys `K`, returns a new keyed stream containing only the groups
1829 /// whose keys are not in the bounded stream.
1830 ///
1831 /// # Example
1832 /// ```rust
1833 /// # #[cfg(feature = "deploy")] {
1834 /// # use hydro_lang::prelude::*;
1835 /// # use futures::StreamExt;
1836 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1837 /// let tick = process.tick();
1838 /// let keyed_stream = process
1839 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1840 /// .batch(&tick, nondet!(/** test */))
1841 /// .into_keyed();
1842 /// let keys_to_remove = process
1843 /// .source_iter(q!(vec![1, 2]))
1844 /// .batch(&tick, nondet!(/** test */));
1845 /// keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
1846 /// # .entries()
1847 /// # }, |mut stream| async move {
1848 /// // { 3: ['c'], 4: ['d'] }
1849 /// # let mut results = Vec::new();
1850 /// # for _ in 0..2 {
1851 /// # results.push(stream.next().await.unwrap());
1852 /// # }
1853 /// # results.sort();
1854 /// # assert_eq!(results, vec![(3, 'c'), (4, 'd')]);
1855 /// # }));
1856 /// # }
1857 /// ```
1858 pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1859 self,
1860 other: Stream<K, L, Bounded, O2, R2>,
1861 ) -> Self
1862 where
1863 K: Eq + Hash,
1864 {
1865 check_matching_location(&self.location, &other.location);
1866
1867 KeyedStream::new(
1868 self.location.clone(),
1869 HydroNode::AntiJoin {
1870 pos: Box::new(self.ir_node.into_inner()),
1871 neg: Box::new(other.ir_node.into_inner()),
1872 metadata: self.location.new_node_metadata(Self::collection_kind()),
1873 },
1874 )
1875 }
1876
1877 /// Emit a keyed stream containing keys shared between two keyed streams,
1878 /// where each value in the output keyed stream is a tuple of
1879 /// (self's value, other's value).
1880 /// If there are multiple values for the same key, this performs a cross product
1881 /// for each matching key.
1882 ///
1883 /// # Example
1884 /// ```rust
1885 /// # #[cfg(feature = "deploy")] {
1886 /// # use hydro_lang::prelude::*;
1887 /// # use futures::StreamExt;
1888 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1889 /// let tick = process.tick();
1890 /// let keyed_data = process
1891 /// .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
1892 /// .into_keyed()
1893 /// .batch(&tick, nondet!(/** test */));
1894 /// let other_data = process
1895 /// .source_iter(q!(vec![(1, 100), (2, 200), (2, 201)]))
1896 /// .into_keyed()
1897 /// .batch(&tick, nondet!(/** test */));
1898 /// keyed_data.join_keyed_stream(other_data).entries().all_ticks()
1899 /// # }, |mut stream| async move {
1900 /// // { 1: [(10, 100), (11, 100)], 2: [(20, 200), (20, 201)] } in any order
1901 /// # let mut results = vec![];
1902 /// # for _ in 0..4 {
1903 /// # results.push(stream.next().await.unwrap());
1904 /// # }
1905 /// # results.sort();
1906 /// # assert_eq!(results, vec![(1, (10, 100)), (1, (11, 100)), (2, (20, 200)), (2, (20, 201))]);
1907 /// # }));
1908 /// # }
1909 /// ```
1910 pub fn join_keyed_stream<V2, O2: Ordering, R2: Retries>(
1911 self,
1912 other: KeyedStream<K, V2, L, B, O2, R2>,
1913 ) -> KeyedStream<K, (V, V2), L, B, NoOrder, <R as MinRetries<R2>>::Min>
1914 where
1915 K: Eq + Hash,
1916 R: MinRetries<R2>,
1917 {
1918 self.entries().join(other.entries()).into_keyed()
1919 }
1920
1921 /// Deduplicates values within each key group, emitting each unique value per key
1922 /// exactly once.
1923 ///
1924 /// # Example
1925 /// ```rust
1926 /// # #[cfg(feature = "deploy")] {
1927 /// # use hydro_lang::prelude::*;
1928 /// # use futures::StreamExt;
1929 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1930 /// process
1931 /// .source_iter(q!(vec![(1, 10), (2, 20), (1, 10), (2, 30), (1, 20)]))
1932 /// .into_keyed()
1933 /// .unique()
1934 /// # .entries()
1935 /// # }, |mut stream| async move {
1936 /// // unique values per key: { 1: [10, 20], 2: [20, 30] }
1937 /// # let mut results = Vec::new();
1938 /// # for _ in 0..4 {
1939 /// # results.push(stream.next().await.unwrap());
1940 /// # }
1941 /// # let mut key1: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
1942 /// # let mut key2: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
1943 /// # key1.sort();
1944 /// # key2.sort();
1945 /// # assert_eq!(key1, vec![10, 20]);
1946 /// # assert_eq!(key2, vec![20, 30]);
1947 /// # }));
1948 /// # }
1949 /// ```
1950 pub fn unique(self) -> KeyedStream<K, V, L, B, NoOrder, ExactlyOnce>
1951 where
1952 K: Eq + Hash + Clone,
1953 V: Eq + Hash + Clone,
1954 {
1955 self.entries().unique().into_keyed()
1956 }
1957
1958 /// Sorts the values within each key group in ascending order.
1959 ///
1960 /// The output keyed stream has a [`TotalOrder`] guarantee on the values within
1961 /// each group. This operator will block until all elements in the input stream
1962 /// are available, so it requires the input stream to be [`Bounded`].
1963 ///
1964 /// # Example
1965 /// ```rust
1966 /// # #[cfg(feature = "deploy")] {
1967 /// # use hydro_lang::prelude::*;
1968 /// # use futures::StreamExt;
1969 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1970 /// let tick = process.tick();
1971 /// let numbers = process
1972 /// .source_iter(q!(vec![(1, 3), (2, 1), (1, 1), (2, 2)]))
1973 /// .into_keyed();
1974 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1975 /// batch.sort().all_ticks()
1976 /// # .entries()
1977 /// # }, |mut stream| async move {
1978 /// // values sorted within each key: { 1: [1, 3], 2: [1, 2] }
1979 /// # let mut results = Vec::new();
1980 /// # for _ in 0..4 {
1981 /// # results.push(stream.next().await.unwrap());
1982 /// # }
1983 /// # let key1_vals: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
1984 /// # let key2_vals: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
1985 /// # assert_eq!(key1_vals, vec![1, 3]);
1986 /// # assert_eq!(key2_vals, vec![1, 2]);
1987 /// # }));
1988 /// # }
1989 /// ```
1990 pub fn sort(self) -> KeyedStream<K, V, L, Bounded, TotalOrder, R>
1991 where
1992 B: IsBounded,
1993 K: Ord,
1994 V: Ord,
1995 {
1996 self.entries().sort().into_keyed()
1997 }
1998
1999 /// Produces a new keyed stream that combines the groups of the inputs by first emitting the
2000 /// elements of the `self` stream, and then emits the elements of the `other` stream (if a key
2001 /// is only present in one of the inputs, its values are passed through as-is). The output has
2002 /// a [`TotalOrder`] guarantee if and only if both inputs have a [`TotalOrder`] guarantee.
2003 ///
2004 /// Currently, both input streams must be [`Bounded`]. This operator will block
2005 /// on the first stream until all its elements are available. In a future version,
2006 /// we will relax the requirement on the `other` stream.
2007 ///
2008 /// # Example
2009 /// ```rust
2010 /// # #[cfg(feature = "deploy")] {
2011 /// # use hydro_lang::prelude::*;
2012 /// # use futures::StreamExt;
2013 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2014 /// let tick = process.tick();
2015 /// let numbers = process.source_iter(q!(vec![(0, 1), (1, 3)])).into_keyed();
2016 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2017 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2018 /// # .entries()
2019 /// # }, |mut stream| async move {
2020 /// // { 0: [2, 1], 1: [4, 3] }
2021 /// # let mut results = Vec::new();
2022 /// # for _ in 0..4 {
2023 /// # results.push(stream.next().await.unwrap());
2024 /// # }
2025 /// # results.sort();
2026 /// # assert_eq!(results, vec![(0, 1), (0, 2), (1, 3), (1, 4)]);
2027 /// # }));
2028 /// # }
2029 /// ```
2030 pub fn chain<O2: Ordering, R2: Retries>(
2031 self,
2032 other: KeyedStream<K, V, L, Bounded, O2, R2>,
2033 ) -> KeyedStream<K, V, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2034 where
2035 B: IsBounded,
2036 O: MinOrder<O2>,
2037 R: MinRetries<R2>,
2038 {
2039 let this = self.make_bounded();
2040 check_matching_location(&this.location, &other.location);
2041
2042 KeyedStream::new(
2043 this.location.clone(),
2044 HydroNode::Chain {
2045 first: Box::new(this.ir_node.into_inner()),
2046 second: Box::new(other.ir_node.into_inner()),
2047 metadata: this.location.new_node_metadata(KeyedStream::<
2048 K,
2049 V,
2050 L,
2051 Bounded,
2052 <O as MinOrder<O2>>::Min,
2053 <R as MinRetries<R2>>::Min,
2054 >::collection_kind()),
2055 },
2056 )
2057 }
2058
2059 /// Emit a keyed stream containing keys shared between the keyed stream and the
2060 /// keyed singleton, where each value in the output keyed stream is a tuple of
2061 /// (the keyed stream's value, the keyed singleton's value).
2062 ///
2063 /// # Example
2064 /// ```rust
2065 /// # #[cfg(feature = "deploy")] {
2066 /// # use hydro_lang::prelude::*;
2067 /// # use futures::StreamExt;
2068 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2069 /// let tick = process.tick();
2070 /// let keyed_data = process
2071 /// .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2072 /// .into_keyed()
2073 /// .batch(&tick, nondet!(/** test */));
2074 /// let singleton_data = process
2075 /// .source_iter(q!(vec![(1, 100), (2, 200)]))
2076 /// .into_keyed()
2077 /// .batch(&tick, nondet!(/** test */))
2078 /// .first();
2079 /// keyed_data.join_keyed_singleton(singleton_data).entries().all_ticks()
2080 /// # }, |mut stream| async move {
2081 /// // { 1: [(10, 100), (11, 100)], 2: [(20, 200)] } in any order
2082 /// # let mut results = vec![];
2083 /// # for _ in 0..3 {
2084 /// # results.push(stream.next().await.unwrap());
2085 /// # }
2086 /// # results.sort();
2087 /// # assert_eq!(results, vec![(1, (10, 100)), (1, (11, 100)), (2, (20, 200))]);
2088 /// # }));
2089 /// # }
2090 /// ```
2091 pub fn join_keyed_singleton<V2: Clone>(
2092 self,
2093 keyed_singleton: KeyedSingleton<K, V2, L, Bounded>,
2094 ) -> KeyedStream<K, (V, V2), L, Bounded, NoOrder, R>
2095 where
2096 B: IsBounded,
2097 K: Eq + Hash,
2098 {
2099 keyed_singleton
2100 .join_keyed_stream(self.make_bounded())
2101 .map(q!(|(v2, v)| (v, v2)))
2102 }
2103
2104 /// Gets the values associated with a specific key from the keyed stream.
2105 /// Returns an empty stream if the key is `None` or there are no associated values.
2106 ///
2107 /// # Example
2108 /// ```rust
2109 /// # #[cfg(feature = "deploy")] {
2110 /// # use hydro_lang::prelude::*;
2111 /// # use futures::StreamExt;
2112 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2113 /// let tick = process.tick();
2114 /// let keyed_data = process
2115 /// .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2116 /// .into_keyed()
2117 /// .batch(&tick, nondet!(/** test */));
2118 /// let key = tick.singleton(q!(1));
2119 /// keyed_data.get(key).all_ticks()
2120 /// # }, |mut stream| async move {
2121 /// // 10, 11 in any order
2122 /// # let mut results = vec![];
2123 /// # for _ in 0..2 {
2124 /// # results.push(stream.next().await.unwrap());
2125 /// # }
2126 /// # results.sort();
2127 /// # assert_eq!(results, vec![10, 11]);
2128 /// # }));
2129 /// # }
2130 /// ```
2131 pub fn get(self, key: impl Into<Optional<K, L, Bounded>>) -> Stream<V, L, Bounded, NoOrder, R>
2132 where
2133 B: IsBounded,
2134 K: Eq + Hash,
2135 {
2136 self.make_bounded()
2137 .entries()
2138 .join(key.into().into_stream().map(q!(|k| (k, ()))))
2139 .map(q!(|(_, (v, _))| v))
2140 }
2141
2142 /// For each value in `self`, find the matching key in `lookup`.
2143 /// The output is a keyed stream with the key from `self`, and a value
2144 /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
2145 /// If the key is not present in `lookup`, the option will be [`None`].
2146 ///
2147 /// # Example
2148 /// ```rust
2149 /// # #[cfg(feature = "deploy")] {
2150 /// # use hydro_lang::prelude::*;
2151 /// # use futures::StreamExt;
2152 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2153 /// # let tick = process.tick();
2154 /// let requests = // { 1: [10, 11], 2: 20 }
2155 /// # process
2156 /// # .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2157 /// # .into_keyed()
2158 /// # .batch(&tick, nondet!(/** test */));
2159 /// let other_data = // { 10: 100, 11: 110 }
2160 /// # process
2161 /// # .source_iter(q!(vec![(10, 100), (11, 110)]))
2162 /// # .into_keyed()
2163 /// # .batch(&tick, nondet!(/** test */))
2164 /// # .first();
2165 /// requests.lookup_keyed_singleton(other_data)
2166 /// # .entries().all_ticks()
2167 /// # }, |mut stream| async move {
2168 /// // { 1: [(10, Some(100)), (11, Some(110))], 2: (20, None) }
2169 /// # let mut results = vec![];
2170 /// # for _ in 0..3 {
2171 /// # results.push(stream.next().await.unwrap());
2172 /// # }
2173 /// # results.sort();
2174 /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (11, Some(110))), (2, (20, None))]);
2175 /// # }));
2176 /// # }
2177 /// ```
2178 pub fn lookup_keyed_singleton<V2>(
2179 self,
2180 lookup: KeyedSingleton<V, V2, L, Bounded>,
2181 ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, R>
2182 where
2183 B: IsBounded,
2184 K: Eq + Hash + Clone,
2185 V: Eq + Hash + Clone,
2186 V2: Clone,
2187 {
2188 self.lookup_keyed_stream(
2189 lookup
2190 .into_keyed_stream()
2191 .assume_retries::<R>(nondet!(/** Retries are irrelevant for keyed singletons */)),
2192 )
2193 }
2194
2195 /// For each value in `self`, find the matching key in `lookup`.
2196 /// The output is a keyed stream with the key from `self`, and a value
2197 /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
2198 /// If the key is not present in `lookup`, the option will be [`None`].
2199 ///
2200 /// # Example
2201 /// ```rust
2202 /// # #[cfg(feature = "deploy")] {
2203 /// # use hydro_lang::prelude::*;
2204 /// # use futures::StreamExt;
2205 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2206 /// # let tick = process.tick();
2207 /// let requests = // { 1: [10, 11], 2: 20 }
2208 /// # process
2209 /// # .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2210 /// # .into_keyed()
2211 /// # .batch(&tick, nondet!(/** test */));
2212 /// let other_data = // { 10: [100, 101], 11: 110 }
2213 /// # process
2214 /// # .source_iter(q!(vec![(10, 100), (10, 101), (11, 110)]))
2215 /// # .into_keyed()
2216 /// # .batch(&tick, nondet!(/** test */));
2217 /// requests.lookup_keyed_stream(other_data)
2218 /// # .entries().all_ticks()
2219 /// # }, |mut stream| async move {
2220 /// // { 1: [(10, Some(100)), (10, Some(101)), (11, Some(110))], 2: (20, None) }
2221 /// # let mut results = vec![];
2222 /// # for _ in 0..4 {
2223 /// # results.push(stream.next().await.unwrap());
2224 /// # }
2225 /// # results.sort();
2226 /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (10, Some(101))), (1, (11, Some(110))), (2, (20, None))]);
2227 /// # }));
2228 /// # }
2229 /// ```
2230 #[expect(clippy::type_complexity, reason = "retries propagation")]
2231 pub fn lookup_keyed_stream<V2, O2: Ordering, R2: Retries>(
2232 self,
2233 lookup: KeyedStream<V, V2, L, Bounded, O2, R2>,
2234 ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, <R as MinRetries<R2>>::Min>
2235 where
2236 B: IsBounded,
2237 K: Eq + Hash + Clone,
2238 V: Eq + Hash + Clone,
2239 V2: Clone,
2240 R: MinRetries<R2>,
2241 {
2242 let inverted = self
2243 .make_bounded()
2244 .entries()
2245 .map(q!(|(key, lookup_value)| (lookup_value, key)))
2246 .into_keyed();
2247 let found = inverted
2248 .clone()
2249 .join_keyed_stream(lookup.clone())
2250 .entries()
2251 .map(q!(|(lookup_value, (key, value))| (
2252 key,
2253 (lookup_value, Some(value))
2254 )))
2255 .into_keyed();
2256 let not_found = inverted
2257 .filter_key_not_in(lookup.keys())
2258 .entries()
2259 .map(q!(|(lookup_value, key)| (key, (lookup_value, None))))
2260 .into_keyed();
2261
2262 found.chain(not_found.weaken_retries::<<R as MinRetries<R2>>::Min>())
2263 }
2264
2265 /// Shifts this keyed stream into an atomic context, which guarantees that any downstream logic
2266 /// will all be executed synchronously before any outputs are yielded (in [`KeyedStream::end_atomic`]).
2267 ///
2268 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2269 /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
2270 /// argument that declares where the stream will be atomically processed. Batching a stream into
2271 /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
2272 /// [`Tick`] will introduce asynchrony.
2273 pub fn atomic(self, tick: &Tick<L>) -> KeyedStream<K, V, Atomic<L>, B, O, R> {
2274 let out_location = Atomic { tick: tick.clone() };
2275 KeyedStream::new(
2276 out_location.clone(),
2277 HydroNode::BeginAtomic {
2278 inner: Box::new(self.ir_node.into_inner()),
2279 metadata: out_location
2280 .new_node_metadata(KeyedStream::<K, V, Atomic<L>, B, O, R>::collection_kind()),
2281 },
2282 )
2283 }
2284
2285 /// Given a tick, returns a keyed stream corresponding to a batch of elements segmented by
2286 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2287 /// the order of the input.
2288 ///
2289 /// # Non-Determinism
2290 /// The batch boundaries are non-deterministic and may change across executions.
2291 pub fn batch(
2292 self,
2293 tick: &Tick<L>,
2294 nondet: NonDet,
2295 ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2296 let _ = nondet;
2297 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2298 KeyedStream::new(
2299 tick.clone(),
2300 HydroNode::Batch {
2301 inner: Box::new(self.ir_node.into_inner()),
2302 metadata: tick.new_node_metadata(
2303 KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
2304 ),
2305 },
2306 )
2307 }
2308}
2309
2310impl<'a, K1, K2, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2311 KeyedStream<(K1, K2), V, L, B, O, R>
2312{
2313 /// Produces a new keyed stream by dropping the first element of the compound key.
2314 ///
2315 /// Because multiple keys may share the same suffix, this operation results in re-grouping
2316 /// of the values under the new keys. The values across groups with the same new key
2317 /// will be interleaved, so the resulting stream has [`NoOrder`] within each group.
2318 ///
2319 /// # Example
2320 /// ```rust
2321 /// # #[cfg(feature = "deploy")] {
2322 /// # use hydro_lang::prelude::*;
2323 /// # use futures::StreamExt;
2324 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2325 /// process
2326 /// .source_iter(q!(vec![((1, 10), 2), ((1, 10), 3), ((2, 20), 4)]))
2327 /// .into_keyed()
2328 /// .drop_key_prefix()
2329 /// # .entries()
2330 /// # }, |mut stream| async move {
2331 /// // { 10: [2, 3], 20: [4] }
2332 /// # let mut results = Vec::new();
2333 /// # for _ in 0..3 {
2334 /// # results.push(stream.next().await.unwrap());
2335 /// # }
2336 /// # results.sort();
2337 /// # assert_eq!(results, vec![(10, 2), (10, 3), (20, 4)]);
2338 /// # }));
2339 /// # }
2340 /// ```
2341 pub fn drop_key_prefix(self) -> KeyedStream<K2, V, L, B, NoOrder, R> {
2342 self.entries()
2343 .map(q!(|((_k1, k2), v)| (k2, v)))
2344 .into_keyed()
2345 }
2346}
2347
2348impl<'a, K, V, L: Location<'a> + NoTick, O: Ordering, R: Retries>
2349 KeyedStream<K, V, L, Unbounded, O, R>
2350{
2351 /// Produces a new keyed stream that "merges" the inputs by interleaving the elements
2352 /// of any overlapping groups. The result has [`NoOrder`] on each group because the
2353 /// order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
2354 /// the ordering will be deterministic and you can safely use [`Self::assume_ordering`].
2355 ///
2356 /// Currently, both input streams must be [`Unbounded`].
2357 ///
2358 /// # Example
2359 /// ```rust
2360 /// # #[cfg(feature = "deploy")] {
2361 /// # use hydro_lang::prelude::*;
2362 /// # use futures::StreamExt;
2363 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2364 /// let numbers1: KeyedStream<i32, i32, _> = // { 1: [2], 3: [4] }
2365 /// # process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed().into();
2366 /// let numbers2: KeyedStream<i32, i32, _> = // { 1: [3], 3: [5] }
2367 /// # process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed().into();
2368 /// numbers1.interleave(numbers2)
2369 /// # .entries()
2370 /// # }, |mut stream| async move {
2371 /// // { 1: [2, 3], 3: [4, 5] } with each group in unknown order
2372 /// # let mut results = Vec::new();
2373 /// # for _ in 0..4 {
2374 /// # results.push(stream.next().await.unwrap());
2375 /// # }
2376 /// # results.sort();
2377 /// # assert_eq!(results, vec![(1, 2), (1, 3), (3, 4), (3, 5)]);
2378 /// # }));
2379 /// # }
2380 /// ```
2381 pub fn interleave<O2: Ordering, R2: Retries>(
2382 self,
2383 other: KeyedStream<K, V, L, Unbounded, O2, R2>,
2384 ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2385 where
2386 R: MinRetries<R2>,
2387 {
2388 KeyedStream::new(
2389 self.location.clone(),
2390 HydroNode::Chain {
2391 first: Box::new(self.ir_node.into_inner()),
2392 second: Box::new(other.ir_node.into_inner()),
2393 metadata: self.location.new_node_metadata(KeyedStream::<
2394 K,
2395 V,
2396 L,
2397 Unbounded,
2398 NoOrder,
2399 <R as MinRetries<R2>>::Min,
2400 >::collection_kind()),
2401 },
2402 )
2403 }
2404}
2405
2406impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, Atomic<L>, B, O, R>
2407where
2408 L: Location<'a> + NoTick,
2409{
2410 /// Returns a keyed stream corresponding to the latest batch of elements being atomically
2411 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2412 /// the order of the input. The output keyed stream will execute in the [`Tick`] that was
2413 /// used to create the atomic section.
2414 ///
2415 /// # Non-Determinism
2416 /// The batch boundaries are non-deterministic and may change across executions.
2417 pub fn batch_atomic(self, nondet: NonDet) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2418 let _ = nondet;
2419 KeyedStream::new(
2420 self.location.clone().tick,
2421 HydroNode::Batch {
2422 inner: Box::new(self.ir_node.into_inner()),
2423 metadata: self.location.tick.new_node_metadata(KeyedStream::<
2424 K,
2425 V,
2426 Tick<L>,
2427 Bounded,
2428 O,
2429 R,
2430 >::collection_kind(
2431 )),
2432 },
2433 )
2434 }
2435
2436 /// Yields the elements of this keyed stream back into a top-level, asynchronous execution context.
2437 /// See [`KeyedStream::atomic`] for more details.
2438 pub fn end_atomic(self) -> KeyedStream<K, V, L, B, O, R> {
2439 KeyedStream::new(
2440 self.location.tick.l.clone(),
2441 HydroNode::EndAtomic {
2442 inner: Box::new(self.ir_node.into_inner()),
2443 metadata: self
2444 .location
2445 .tick
2446 .l
2447 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2448 },
2449 )
2450 }
2451}
2452
2453impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, Tick<L>, Bounded, O, R>
2454where
2455 L: Location<'a>,
2456{
2457 /// Asynchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2458 /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2459 /// each key.
2460 pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
2461 KeyedStream::new(
2462 self.location.outer().clone(),
2463 HydroNode::YieldConcat {
2464 inner: Box::new(self.ir_node.into_inner()),
2465 metadata: self.location.outer().new_node_metadata(KeyedStream::<
2466 K,
2467 V,
2468 L,
2469 Unbounded,
2470 O,
2471 R,
2472 >::collection_kind(
2473 )),
2474 },
2475 )
2476 }
2477
2478 /// Synchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2479 /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2480 /// each key.
2481 ///
2482 /// Unlike [`KeyedStream::all_ticks`], this preserves synchronous execution, as the output stream
2483 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2484 /// stream's [`Tick`] context.
2485 pub fn all_ticks_atomic(self) -> KeyedStream<K, V, Atomic<L>, Unbounded, O, R> {
2486 let out_location = Atomic {
2487 tick: self.location.clone(),
2488 };
2489
2490 KeyedStream::new(
2491 out_location.clone(),
2492 HydroNode::YieldConcat {
2493 inner: Box::new(self.ir_node.into_inner()),
2494 metadata: out_location.new_node_metadata(KeyedStream::<
2495 K,
2496 V,
2497 Atomic<L>,
2498 Unbounded,
2499 O,
2500 R,
2501 >::collection_kind()),
2502 },
2503 )
2504 }
2505
2506 /// Transforms the keyed stream using the given closure in "stateful" mode, where stateful operators
2507 /// such as `fold` retrain their memory for each key across ticks rather than resetting across batches of each key.
2508 ///
2509 /// This API is particularly useful for stateful computation on batches of data, such as
2510 /// maintaining an accumulated state that is up to date with the current batch.
2511 ///
2512 /// # Example
2513 /// ```rust
2514 /// # #[cfg(feature = "deploy")] {
2515 /// # use hydro_lang::prelude::*;
2516 /// # use futures::StreamExt;
2517 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2518 /// let tick = process.tick();
2519 /// # // ticks are lazy by default, forces the second tick to run
2520 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2521 /// # let batch_first_tick = process
2522 /// # .source_iter(q!(vec![(0, 1), (1, 2), (2, 3), (3, 4)]))
2523 /// # .into_keyed()
2524 /// # .batch(&tick, nondet!(/** test */));
2525 /// # let batch_second_tick = process
2526 /// # .source_iter(q!(vec![(0, 5), (1, 6), (2, 7)]))
2527 /// # .into_keyed()
2528 /// # .batch(&tick, nondet!(/** test */))
2529 /// # .defer_tick(); // appears on the second tick
2530 /// let input = batch_first_tick.chain(batch_second_tick).all_ticks();
2531 ///
2532 /// input.batch(&tick, nondet!(/** test */))
2533 /// .across_ticks(|s| s.reduce(q!(|sum, new| {
2534 /// *sum += new;
2535 /// }))).entries().all_ticks()
2536 /// # }, |mut stream| async move {
2537 /// // First tick: [(0, 1), (1, 2), (2, 3), (3, 4)]
2538 /// # let mut results = Vec::new();
2539 /// # for _ in 0..4 {
2540 /// # results.push(stream.next().await.unwrap());
2541 /// # }
2542 /// # results.sort();
2543 /// # assert_eq!(results, vec![(0, 1), (1, 2), (2, 3), (3, 4)]);
2544 /// // Second tick: [(0, 6), (1, 8), (2, 10), (3, 4)]
2545 /// # results.clear();
2546 /// # for _ in 0..4 {
2547 /// # results.push(stream.next().await.unwrap());
2548 /// # }
2549 /// # results.sort();
2550 /// # assert_eq!(results, vec![(0, 6), (1, 8), (2, 10), (3, 4)]);
2551 /// # }));
2552 /// # }
2553 /// ```
2554 pub fn across_ticks<Out: BatchAtomic>(
2555 self,
2556 thunk: impl FnOnce(KeyedStream<K, V, Atomic<L>, Unbounded, O, R>) -> Out,
2557 ) -> Out::Batched {
2558 thunk(self.all_ticks_atomic()).batched_atomic()
2559 }
2560
2561 /// Shifts the entries in `self` to the **next tick**, so that the returned keyed stream at
2562 /// tick `T` always has the entries of `self` at tick `T - 1`.
2563 ///
2564 /// At tick `0`, the output keyed stream is empty, since there is no previous tick.
2565 ///
2566 /// This operator enables stateful iterative processing with ticks, by sending data from one
2567 /// tick to the next. For example, you can use it to combine inputs across consecutive batches.
2568 ///
2569 /// # Example
2570 /// ```rust
2571 /// # #[cfg(feature = "deploy")] {
2572 /// # use hydro_lang::prelude::*;
2573 /// # use futures::StreamExt;
2574 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2575 /// let tick = process.tick();
2576 /// # // ticks are lazy by default, forces the second tick to run
2577 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2578 /// # let batch_first_tick = process
2579 /// # .source_iter(q!(vec![(1, 2), (1, 3)]))
2580 /// # .batch(&tick, nondet!(/** test */))
2581 /// # .into_keyed();
2582 /// # let batch_second_tick = process
2583 /// # .source_iter(q!(vec![(1, 4), (2, 5)]))
2584 /// # .batch(&tick, nondet!(/** test */))
2585 /// # .defer_tick()
2586 /// # .into_keyed(); // appears on the second tick
2587 /// let changes_across_ticks = // { 1: [2, 3] } (first tick), { 1: [4], 2: [5] } (second tick)
2588 /// # batch_first_tick.chain(batch_second_tick);
2589 /// changes_across_ticks.clone().defer_tick().chain( // from the previous tick
2590 /// changes_across_ticks // from the current tick
2591 /// )
2592 /// # .entries().all_ticks()
2593 /// # }, |mut stream| async move {
2594 /// // First tick: { 1: [2, 3] }
2595 /// # let mut results = Vec::new();
2596 /// # for _ in 0..2 {
2597 /// # results.push(stream.next().await.unwrap());
2598 /// # }
2599 /// # results.sort();
2600 /// # assert_eq!(results, vec![(1, 2), (1, 3)]);
2601 /// // Second tick: { 1: [2, 3, 4], 2: [5] }
2602 /// # results.clear();
2603 /// # for _ in 0..4 {
2604 /// # results.push(stream.next().await.unwrap());
2605 /// # }
2606 /// # results.sort();
2607 /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5)]);
2608 /// // Third tick: { 1: [4], 2: [5] }
2609 /// # results.clear();
2610 /// # for _ in 0..2 {
2611 /// # results.push(stream.next().await.unwrap());
2612 /// # }
2613 /// # results.sort();
2614 /// # assert_eq!(results, vec![(1, 4), (2, 5)]);
2615 /// # }));
2616 /// # }
2617 /// ```
2618 pub fn defer_tick(self) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2619 KeyedStream::new(
2620 self.location.clone(),
2621 HydroNode::DeferTick {
2622 input: Box::new(self.ir_node.into_inner()),
2623 metadata: self.location.new_node_metadata(KeyedStream::<
2624 K,
2625 V,
2626 Tick<L>,
2627 Bounded,
2628 O,
2629 R,
2630 >::collection_kind()),
2631 },
2632 )
2633 }
2634}
2635
2636#[cfg(test)]
2637mod tests {
2638 #[cfg(feature = "deploy")]
2639 use futures::{SinkExt, StreamExt};
2640 #[cfg(feature = "deploy")]
2641 use hydro_deploy::Deployment;
2642 #[cfg(any(feature = "deploy", feature = "sim"))]
2643 use stageleft::q;
2644
2645 #[cfg(any(feature = "deploy", feature = "sim"))]
2646 use crate::compile::builder::FlowBuilder;
2647 #[cfg(feature = "deploy")]
2648 use crate::live_collections::stream::ExactlyOnce;
2649 #[cfg(feature = "sim")]
2650 use crate::live_collections::stream::{NoOrder, TotalOrder};
2651 #[cfg(any(feature = "deploy", feature = "sim"))]
2652 use crate::location::Location;
2653 #[cfg(any(feature = "deploy", feature = "sim"))]
2654 use crate::nondet::nondet;
2655 #[cfg(feature = "deploy")]
2656 use crate::properties::manual_proof;
2657
2658 #[cfg(feature = "deploy")]
2659 #[tokio::test]
2660 async fn reduce_watermark_filter() {
2661 let mut deployment = Deployment::new();
2662
2663 let mut flow = FlowBuilder::new();
2664 let node = flow.process::<()>();
2665 let external = flow.external::<()>();
2666
2667 let node_tick = node.tick();
2668 let watermark = node_tick.singleton(q!(2));
2669
2670 let sum = node
2671 .source_stream(q!(tokio_stream::iter([
2672 (0, 100),
2673 (1, 101),
2674 (2, 102),
2675 (2, 102)
2676 ])))
2677 .into_keyed()
2678 .reduce_watermark(
2679 watermark,
2680 q!(|acc, v| {
2681 *acc += v;
2682 }),
2683 )
2684 .snapshot(&node_tick, nondet!(/** test */))
2685 .entries()
2686 .all_ticks()
2687 .send_bincode_external(&external);
2688
2689 let nodes = flow
2690 .with_process(&node, deployment.Localhost())
2691 .with_external(&external, deployment.Localhost())
2692 .deploy(&mut deployment);
2693
2694 deployment.deploy().await.unwrap();
2695
2696 let mut out = nodes.connect(sum).await;
2697
2698 deployment.start().await.unwrap();
2699
2700 assert_eq!(out.next().await.unwrap(), (2, 204));
2701 }
2702
2703 #[cfg(feature = "deploy")]
2704 #[tokio::test]
2705 async fn reduce_watermark_bounded() {
2706 let mut deployment = Deployment::new();
2707
2708 let mut flow = FlowBuilder::new();
2709 let node = flow.process::<()>();
2710 let external = flow.external::<()>();
2711
2712 let node_tick = node.tick();
2713 let watermark = node_tick.singleton(q!(2));
2714
2715 let sum = node
2716 .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
2717 .into_keyed()
2718 .reduce_watermark(
2719 watermark,
2720 q!(|acc, v| {
2721 *acc += v;
2722 }),
2723 )
2724 .entries()
2725 .send_bincode_external(&external);
2726
2727 let nodes = flow
2728 .with_process(&node, deployment.Localhost())
2729 .with_external(&external, deployment.Localhost())
2730 .deploy(&mut deployment);
2731
2732 deployment.deploy().await.unwrap();
2733
2734 let mut out = nodes.connect(sum).await;
2735
2736 deployment.start().await.unwrap();
2737
2738 assert_eq!(out.next().await.unwrap(), (2, 204));
2739 }
2740
2741 #[cfg(feature = "deploy")]
2742 #[tokio::test]
2743 async fn reduce_watermark_garbage_collect() {
2744 let mut deployment = Deployment::new();
2745
2746 let mut flow = FlowBuilder::new();
2747 let node = flow.process::<()>();
2748 let external = flow.external::<()>();
2749 let (tick_send, tick_trigger) =
2750 node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
2751
2752 let node_tick = node.tick();
2753 let (watermark_complete_cycle, watermark) =
2754 node_tick.cycle_with_initial(node_tick.singleton(q!(2)));
2755 let next_watermark = watermark.clone().map(q!(|v| v + 1));
2756 watermark_complete_cycle.complete_next_tick(next_watermark);
2757
2758 let tick_triggered_input = node_tick
2759 .singleton(q!((3, 103)))
2760 .into_stream()
2761 .filter_if_some(
2762 tick_trigger
2763 .clone()
2764 .batch(&node_tick, nondet!(/** test */))
2765 .first(),
2766 )
2767 .all_ticks();
2768
2769 let sum = node
2770 .source_stream(q!(tokio_stream::iter([
2771 (0, 100),
2772 (1, 101),
2773 (2, 102),
2774 (2, 102)
2775 ])))
2776 .interleave(tick_triggered_input)
2777 .into_keyed()
2778 .reduce_watermark(
2779 watermark,
2780 q!(
2781 |acc, v| {
2782 *acc += v;
2783 },
2784 commutative = manual_proof!(/** integer addition is commutative */)
2785 ),
2786 )
2787 .snapshot(&node_tick, nondet!(/** test */))
2788 .entries()
2789 .all_ticks()
2790 .send_bincode_external(&external);
2791
2792 let nodes = flow
2793 .with_default_optimize()
2794 .with_process(&node, deployment.Localhost())
2795 .with_external(&external, deployment.Localhost())
2796 .deploy(&mut deployment);
2797
2798 deployment.deploy().await.unwrap();
2799
2800 let mut tick_send = nodes.connect(tick_send).await;
2801 let mut out_recv = nodes.connect(sum).await;
2802
2803 deployment.start().await.unwrap();
2804
2805 assert_eq!(out_recv.next().await.unwrap(), (2, 204));
2806
2807 tick_send.send(()).await.unwrap();
2808
2809 assert_eq!(out_recv.next().await.unwrap(), (3, 103));
2810 }
2811
2812 #[cfg(feature = "sim")]
2813 #[test]
2814 #[should_panic]
2815 fn sim_batch_nondet_size() {
2816 let mut flow = FlowBuilder::new();
2817 let node = flow.process::<()>();
2818
2819 let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
2820
2821 let tick = node.tick();
2822 let out_recv = input
2823 .batch(&tick, nondet!(/** test */))
2824 .fold(q!(|| vec![]), q!(|acc, v| acc.push(v)))
2825 .entries()
2826 .all_ticks()
2827 .sim_output();
2828
2829 flow.sim().exhaustive(async || {
2830 out_recv
2831 .assert_yields_only_unordered([(1, vec![1, 2])])
2832 .await;
2833 });
2834 }
2835
2836 #[cfg(feature = "sim")]
2837 #[test]
2838 fn sim_batch_preserves_group_order() {
2839 let mut flow = FlowBuilder::new();
2840 let node = flow.process::<()>();
2841
2842 let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
2843
2844 let tick = node.tick();
2845 let out_recv = input
2846 .batch(&tick, nondet!(/** test */))
2847 .all_ticks()
2848 .fold_early_stop(
2849 q!(|| 0),
2850 q!(|acc, v| {
2851 *acc = std::cmp::max(v, *acc);
2852 *acc >= 2
2853 }),
2854 )
2855 .entries()
2856 .sim_output();
2857
2858 let instances = flow.sim().exhaustive(async || {
2859 out_recv
2860 .assert_yields_only_unordered([(1, 2), (2, 3)])
2861 .await;
2862 });
2863
2864 assert_eq!(instances, 8);
2865 // - three cases: all three in a separate tick (pick where (2, 3) is)
2866 // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after
2867 // - two cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them
2868 // - one case: all three together
2869 }
2870
2871 #[cfg(feature = "sim")]
2872 #[test]
2873 fn sim_batch_unordered_shuffles() {
2874 let mut flow = FlowBuilder::new();
2875 let node = flow.process::<()>();
2876
2877 let input = node
2878 .source_iter(q!([(1, 1), (1, 2), (2, 3)]))
2879 .into_keyed()
2880 .weaken_ordering::<NoOrder>();
2881
2882 let tick = node.tick();
2883 let out_recv = input
2884 .batch(&tick, nondet!(/** test */))
2885 .all_ticks()
2886 .entries()
2887 .sim_output();
2888
2889 let instances = flow.sim().exhaustive(async || {
2890 out_recv
2891 .assert_yields_only_unordered([(1, 1), (1, 2), (2, 3)])
2892 .await;
2893 });
2894
2895 assert_eq!(instances, 13);
2896 // - 6 (3 * 2) cases: all three in a separate tick (pick where (2, 3) is), and order of (1, 1), (1, 2)
2897 // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
2898 // - 4 (2 * 2) cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them, and order of (1, 1), (1, 2)
2899 // - one case: all three together (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
2900 }
2901
2902 #[cfg(feature = "sim")]
2903 #[test]
2904 #[should_panic]
2905 fn sim_observe_order_batched() {
2906 let mut flow = FlowBuilder::new();
2907 let node = flow.process::<()>();
2908
2909 let (port, input) = node.sim_input::<_, NoOrder, _>();
2910
2911 let tick = node.tick();
2912 let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
2913 let out_recv = batch
2914 .assume_ordering::<TotalOrder>(nondet!(/** test */))
2915 .all_ticks()
2916 .first()
2917 .entries()
2918 .sim_output();
2919
2920 flow.sim().exhaustive(async || {
2921 port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
2922 out_recv
2923 .assert_yields_only_unordered([(1, 1), (2, 1)])
2924 .await; // fails with assume_ordering
2925 });
2926 }
2927
2928 #[cfg(feature = "sim")]
2929 #[test]
2930 fn sim_observe_order_batched_count() {
2931 let mut flow = FlowBuilder::new();
2932 let node = flow.process::<()>();
2933
2934 let (port, input) = node.sim_input::<_, NoOrder, _>();
2935
2936 let tick = node.tick();
2937 let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
2938 let out_recv = batch
2939 .assume_ordering::<TotalOrder>(nondet!(/** test */))
2940 .all_ticks()
2941 .entries()
2942 .sim_output();
2943
2944 let instance_count = flow.sim().exhaustive(async || {
2945 port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
2946 let _ = out_recv.collect_sorted::<Vec<_>>().await;
2947 });
2948
2949 assert_eq!(instance_count, 104); // too complicated to enumerate here, but less than stream equivalent
2950 }
2951
2952 #[cfg(feature = "sim")]
2953 #[test]
2954 fn sim_top_level_assume_ordering() {
2955 use std::collections::HashMap;
2956
2957 let mut flow = FlowBuilder::new();
2958 let node = flow.process::<()>();
2959
2960 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2961
2962 let out_recv = input
2963 .into_keyed()
2964 .assume_ordering::<TotalOrder>(nondet!(/** test */))
2965 .fold_early_stop(
2966 q!(|| Vec::new()),
2967 q!(|acc, v| {
2968 acc.push(v);
2969 acc.len() >= 2
2970 }),
2971 )
2972 .entries()
2973 .sim_output();
2974
2975 let instance_count = flow.sim().exhaustive(async || {
2976 in_send.send_many_unordered([(1, 'a'), (1, 'b'), (2, 'c'), (2, 'd')]);
2977 let out: HashMap<_, _> = out_recv
2978 .collect_sorted::<Vec<_>>()
2979 .await
2980 .into_iter()
2981 .collect();
2982 // Each key accumulates its values; we get one entry per key
2983 assert_eq!(out.len(), 2);
2984 });
2985
2986 assert_eq!(instance_count, 24)
2987 }
2988
2989 #[cfg(feature = "sim")]
2990 #[test]
2991 fn sim_top_level_assume_ordering_cycle_back() {
2992 use std::collections::HashMap;
2993
2994 let mut flow = FlowBuilder::new();
2995 let node = flow.process::<()>();
2996
2997 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2998
2999 let (complete_cycle_back, cycle_back) =
3000 node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3001 let ordered = input
3002 .into_keyed()
3003 .interleave(cycle_back)
3004 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3005 complete_cycle_back.complete(
3006 ordered
3007 .clone()
3008 .map(q!(|v| v + 1))
3009 .filter(q!(|v| v % 2 == 1)),
3010 );
3011
3012 let out_recv = ordered
3013 .fold_early_stop(
3014 q!(|| Vec::new()),
3015 q!(|acc, v| {
3016 acc.push(v);
3017 acc.len() >= 2
3018 }),
3019 )
3020 .entries()
3021 .sim_output();
3022
3023 let mut saw = false;
3024 let instance_count = flow.sim().exhaustive(async || {
3025 // Send (1, 0) and (1, 2). 0+1=1 is odd so cycles back.
3026 // We want to see [0, 1] - the cycled back value interleaved
3027 in_send.send_many_unordered([(1, 0), (1, 2)]);
3028 let out: HashMap<_, _> = out_recv
3029 .collect_sorted::<Vec<_>>()
3030 .await
3031 .into_iter()
3032 .collect();
3033
3034 // We want to see an instance where key 1 gets: 0, then 1 (cycled back from 0+1)
3035 if let Some(values) = out.get(&1)
3036 && *values == vec![0, 1]
3037 {
3038 saw = true;
3039 }
3040 });
3041
3042 assert!(
3043 saw,
3044 "did not see an instance with key 1 having [0, 1] in order"
3045 );
3046 assert_eq!(instance_count, 6);
3047 }
3048
3049 #[cfg(feature = "sim")]
3050 #[test]
3051 fn sim_top_level_assume_ordering_cross_key_cycle() {
3052 use std::collections::HashMap;
3053
3054 // This test demonstrates why releasing one entry at a time is important:
3055 // When one key's observed order cycles back into a different key, we need
3056 // to be able to interleave the cycled-back entry with pending items for
3057 // that other key.
3058 let mut flow = FlowBuilder::new();
3059 let node = flow.process::<()>();
3060
3061 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3062
3063 let (complete_cycle_back, cycle_back) =
3064 node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3065 let ordered = input
3066 .into_keyed()
3067 .interleave(cycle_back)
3068 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3069
3070 // Cycle back: when we see (1, 10), emit (2, 100) to key 2
3071 complete_cycle_back.complete(
3072 ordered
3073 .clone()
3074 .filter(q!(|v| *v == 10))
3075 .map(q!(|_| 100))
3076 .entries()
3077 .map(q!(|(_, v)| (2, v))) // Change key from 1 to 2
3078 .into_keyed(),
3079 );
3080
3081 let out_recv = ordered
3082 .fold_early_stop(
3083 q!(|| Vec::new()),
3084 q!(|acc, v| {
3085 acc.push(v);
3086 acc.len() >= 2
3087 }),
3088 )
3089 .entries()
3090 .sim_output();
3091
3092 // We want to see an instance where:
3093 // - (1, 10) is released first
3094 // - This causes (2, 100) to be cycled back
3095 // - (2, 100) is released BEFORE (2, 20) which was already pending
3096 let mut saw_cross_key_interleave = false;
3097 let instance_count = flow.sim().exhaustive(async || {
3098 // Send (1, 10), (1, 11) for key 1, and (2, 20), (2, 21) for key 2
3099 in_send.send_many_unordered([(1, 10), (1, 11), (2, 20), (2, 21)]);
3100 let out: HashMap<_, _> = out_recv
3101 .collect_sorted::<Vec<_>>()
3102 .await
3103 .into_iter()
3104 .collect();
3105
3106 // Check if we see the cross-key interleaving:
3107 // key 2 should have [100, 20] or [100, 21] - cycled back 100 before a pending item
3108 if let Some(values) = out.get(&2)
3109 && values.len() >= 2
3110 && values[0] == 100
3111 {
3112 saw_cross_key_interleave = true;
3113 }
3114 });
3115
3116 assert!(
3117 saw_cross_key_interleave,
3118 "did not see an instance where cycled-back 100 was released before pending items for key 2"
3119 );
3120 assert_eq!(instance_count, 60);
3121 }
3122
3123 #[cfg(feature = "sim")]
3124 #[test]
3125 fn sim_top_level_assume_ordering_cycle_back_tick() {
3126 use std::collections::HashMap;
3127
3128 let mut flow = FlowBuilder::new();
3129 let node = flow.process::<()>();
3130
3131 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3132
3133 let (complete_cycle_back, cycle_back) =
3134 node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3135 let ordered = input
3136 .into_keyed()
3137 .interleave(cycle_back)
3138 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3139 complete_cycle_back.complete(
3140 ordered
3141 .clone()
3142 .batch(&node.tick(), nondet!(/** test */))
3143 .all_ticks()
3144 .map(q!(|v| v + 1))
3145 .filter(q!(|v| v % 2 == 1)),
3146 );
3147
3148 let out_recv = ordered
3149 .fold_early_stop(
3150 q!(|| Vec::new()),
3151 q!(|acc, v| {
3152 acc.push(v);
3153 acc.len() >= 2
3154 }),
3155 )
3156 .entries()
3157 .sim_output();
3158
3159 let mut saw = false;
3160 let instance_count = flow.sim().exhaustive(async || {
3161 in_send.send_many_unordered([(1, 0), (1, 2)]);
3162 let out: HashMap<_, _> = out_recv
3163 .collect_sorted::<Vec<_>>()
3164 .await
3165 .into_iter()
3166 .collect();
3167
3168 if let Some(values) = out.get(&1)
3169 && *values == vec![0, 1]
3170 {
3171 saw = true;
3172 }
3173 });
3174
3175 assert!(
3176 saw,
3177 "did not see an instance with key 1 having [0, 1] in order"
3178 );
3179 assert_eq!(instance_count, 58);
3180 }
3181}