hydro_lang/live_collections/stream/mod.rs
1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::KeyedStream;
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::builder::CycleId;
19use crate::compile::ir::{
20 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, StreamOrder, StreamRetry, TeeNode,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26#[cfg(stageleft_runtime)]
27use crate::location::dynamic::{DynLocation, LocationId};
28use crate::location::tick::{Atomic, DeferTick, NoAtomic};
29use crate::location::{Location, NoTick, Tick, check_matching_location};
30use crate::nondet::{NonDet, nondet};
31use crate::prelude::manual_proof;
32use crate::properties::{AggFuncAlgebra, ValidCommutativityFor, ValidIdempotenceFor};
33
34pub mod networking;
35
36/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
37#[sealed::sealed]
38pub trait Ordering:
39 MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
40{
41 /// The [`StreamOrder`] corresponding to this type.
42 const ORDERING_KIND: StreamOrder;
43}
44
45/// Marks the stream as being totally ordered, which means that there are
46/// no sources of non-determinism (other than intentional ones) that will
47/// affect the order of elements.
48pub enum TotalOrder {}
49
50#[sealed::sealed]
51impl Ordering for TotalOrder {
52 const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
53}
54
55/// Marks the stream as having no order, which means that the order of
56/// elements may be affected by non-determinism.
57///
58/// This restricts certain operators, such as `fold` and `reduce`, to only
59/// be used with commutative aggregation functions.
60pub enum NoOrder {}
61
62#[sealed::sealed]
63impl Ordering for NoOrder {
64 const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
65}
66
67/// Marker trait for an [`Ordering`] that is available when `Self` is a weaker guarantee than
68/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
69/// have `Self` guarantees instead.
70#[sealed::sealed]
71pub trait WeakerOrderingThan<Other: ?Sized>: Ordering {}
72#[sealed::sealed]
73impl<O: Ordering, O2: Ordering> WeakerOrderingThan<O2> for O where O: MinOrder<O2, Min = O> {}
74
75/// Helper trait for determining the weakest of two orderings.
76#[sealed::sealed]
77pub trait MinOrder<Other: ?Sized> {
78 /// The weaker of the two orderings.
79 type Min: Ordering;
80}
81
82#[sealed::sealed]
83impl<O: Ordering> MinOrder<O> for TotalOrder {
84 type Min = O;
85}
86
87#[sealed::sealed]
88impl<O: Ordering> MinOrder<O> for NoOrder {
89 type Min = NoOrder;
90}
91
92/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
93#[sealed::sealed]
94pub trait Retries:
95 MinRetries<Self, Min = Self>
96 + MinRetries<ExactlyOnce, Min = Self>
97 + MinRetries<AtLeastOnce, Min = AtLeastOnce>
98{
99 /// The [`StreamRetry`] corresponding to this type.
100 const RETRIES_KIND: StreamRetry;
101}
102
103/// Marks the stream as having deterministic message cardinality, with no
104/// possibility of duplicates.
105pub enum ExactlyOnce {}
106
107#[sealed::sealed]
108impl Retries for ExactlyOnce {
109 const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
110}
111
112/// Marks the stream as having non-deterministic message cardinality, which
113/// means that duplicates may occur, but messages will not be dropped.
114pub enum AtLeastOnce {}
115
116#[sealed::sealed]
117impl Retries for AtLeastOnce {
118 const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
119}
120
121/// Marker trait for a [`Retries`] that is available when `Self` is a weaker guarantee than
122/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
123/// have `Self` guarantees instead.
124#[sealed::sealed]
125pub trait WeakerRetryThan<Other: ?Sized>: Retries {}
126#[sealed::sealed]
127impl<R: Retries, R2: Retries> WeakerRetryThan<R2> for R where R: MinRetries<R2, Min = R> {}
128
129/// Helper trait for determining the weakest of two retry guarantees.
130#[sealed::sealed]
131pub trait MinRetries<Other: ?Sized> {
132 /// The weaker of the two retry guarantees.
133 type Min: Retries + WeakerRetryThan<Self> + WeakerRetryThan<Other>;
134}
135
136#[sealed::sealed]
137impl<R: Retries> MinRetries<R> for ExactlyOnce {
138 type Min = R;
139}
140
141#[sealed::sealed]
142impl<R: Retries> MinRetries<R> for AtLeastOnce {
143 type Min = AtLeastOnce;
144}
145
146#[sealed::sealed]
147#[diagnostic::on_unimplemented(
148 message = "The input stream must be totally-ordered (`TotalOrder`), but has order `{Self}`. Strengthen the order upstream or consider a different API.",
149 label = "required here",
150 note = "To intentionally process the stream by observing a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary."
151)]
152/// Marker trait that is implemented for the [`TotalOrder`] ordering guarantee.
153pub trait IsOrdered: Ordering {}
154
155#[sealed::sealed]
156#[diagnostic::do_not_recommend]
157impl IsOrdered for TotalOrder {}
158
159#[sealed::sealed]
160#[diagnostic::on_unimplemented(
161 message = "The input stream must be exactly-once (`ExactlyOnce`), but has retries `{Self}`. Strengthen the retries guarantee upstream or consider a different API.",
162 label = "required here",
163 note = "To intentionally process the stream by observing non-deterministic (randomly duplicated) retries, use `.assume_retries`. This introduces non-determinism so avoid unless necessary."
164)]
165/// Marker trait that is implemented for the [`ExactlyOnce`] retries guarantee.
166pub trait IsExactlyOnce: Retries {}
167
168#[sealed::sealed]
169#[diagnostic::do_not_recommend]
170impl IsExactlyOnce for ExactlyOnce {}
171
172/// Streaming sequence of elements with type `Type`.
173///
174/// This live collection represents a growing sequence of elements, with new elements being
175/// asynchronously appended to the end of the sequence. This can be used to model the arrival
176/// of network input, such as API requests, or streaming ingestion.
177///
178/// By default, all streams have deterministic ordering and each element is materialized exactly
179/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
180/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
181/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
182///
183/// Type Parameters:
184/// - `Type`: the type of elements in the stream
185/// - `Loc`: the location where the stream is being materialized
186/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
187/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
188/// (default is [`TotalOrder`])
189/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
190/// [`AtLeastOnce`] (default is [`ExactlyOnce`])
191pub struct Stream<
192 Type,
193 Loc,
194 Bound: Boundedness = Unbounded,
195 Order: Ordering = TotalOrder,
196 Retry: Retries = ExactlyOnce,
197> {
198 pub(crate) location: Loc,
199 pub(crate) ir_node: RefCell<HydroNode>,
200
201 _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
202}
203
204impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
205 for Stream<T, L, Unbounded, O, R>
206where
207 L: Location<'a>,
208{
209 fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
210 let new_meta = stream
211 .location
212 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
213
214 Stream {
215 location: stream.location,
216 ir_node: RefCell::new(HydroNode::Cast {
217 inner: Box::new(stream.ir_node.into_inner()),
218 metadata: new_meta,
219 }),
220 _phantom: PhantomData,
221 }
222 }
223}
224
225impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
226 for Stream<T, L, B, NoOrder, R>
227where
228 L: Location<'a>,
229{
230 fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
231 stream.weaken_ordering()
232 }
233}
234
235impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
236 for Stream<T, L, B, O, AtLeastOnce>
237where
238 L: Location<'a>,
239{
240 fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
241 stream.weaken_retries()
242 }
243}
244
245impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
246where
247 L: Location<'a>,
248{
249 fn defer_tick(self) -> Self {
250 Stream::defer_tick(self)
251 }
252}
253
254impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
255 for Stream<T, Tick<L>, Bounded, O, R>
256where
257 L: Location<'a>,
258{
259 type Location = Tick<L>;
260
261 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
262 Stream::new(
263 location.clone(),
264 HydroNode::CycleSource {
265 cycle_id,
266 metadata: location.new_node_metadata(Self::collection_kind()),
267 },
268 )
269 }
270}
271
272impl<'a, T, L, O: Ordering, R: Retries> CycleCollectionWithInitial<'a, TickCycle>
273 for Stream<T, Tick<L>, Bounded, O, R>
274where
275 L: Location<'a>,
276{
277 type Location = Tick<L>;
278
279 fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
280 let from_previous_tick: Stream<T, Tick<L>, Bounded, O, R> = Stream::new(
281 location.clone(),
282 HydroNode::DeferTick {
283 input: Box::new(HydroNode::CycleSource {
284 cycle_id,
285 metadata: location.new_node_metadata(Self::collection_kind()),
286 }),
287 metadata: location.new_node_metadata(Self::collection_kind()),
288 },
289 );
290
291 from_previous_tick.chain(initial.filter_if_some(location.optional_first_tick(q!(()))))
292 }
293}
294
295impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
296 for Stream<T, Tick<L>, Bounded, O, R>
297where
298 L: Location<'a>,
299{
300 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
301 assert_eq!(
302 Location::id(&self.location),
303 expected_location,
304 "locations do not match"
305 );
306 self.location
307 .flow_state()
308 .borrow_mut()
309 .push_root(HydroRoot::CycleSink {
310 cycle_id,
311 input: Box::new(self.ir_node.into_inner()),
312 op_metadata: HydroIrOpMetadata::new(),
313 });
314 }
315}
316
317impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
318 for Stream<T, L, B, O, R>
319where
320 L: Location<'a> + NoTick,
321{
322 type Location = L;
323
324 fn create_source(cycle_id: CycleId, location: L) -> Self {
325 Stream::new(
326 location.clone(),
327 HydroNode::CycleSource {
328 cycle_id,
329 metadata: location.new_node_metadata(Self::collection_kind()),
330 },
331 )
332 }
333}
334
335impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
336 for Stream<T, L, B, O, R>
337where
338 L: Location<'a> + NoTick,
339{
340 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
341 assert_eq!(
342 Location::id(&self.location),
343 expected_location,
344 "locations do not match"
345 );
346 self.location
347 .flow_state()
348 .borrow_mut()
349 .push_root(HydroRoot::CycleSink {
350 cycle_id,
351 input: Box::new(self.ir_node.into_inner()),
352 op_metadata: HydroIrOpMetadata::new(),
353 });
354 }
355}
356
357impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
358where
359 T: Clone,
360 L: Location<'a>,
361{
362 fn clone(&self) -> Self {
363 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
364 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
365 *self.ir_node.borrow_mut() = HydroNode::Tee {
366 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
367 metadata: self.location.new_node_metadata(Self::collection_kind()),
368 };
369 }
370
371 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
372 Stream {
373 location: self.location.clone(),
374 ir_node: HydroNode::Tee {
375 inner: TeeNode(inner.0.clone()),
376 metadata: metadata.clone(),
377 }
378 .into(),
379 _phantom: PhantomData,
380 }
381 } else {
382 unreachable!()
383 }
384 }
385}
386
387impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
388where
389 L: Location<'a>,
390{
391 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
392 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
393 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
394
395 Stream {
396 location,
397 ir_node: RefCell::new(ir_node),
398 _phantom: PhantomData,
399 }
400 }
401
402 /// Returns the [`Location`] where this stream is being materialized.
403 pub fn location(&self) -> &L {
404 &self.location
405 }
406
407 pub(crate) fn collection_kind() -> CollectionKind {
408 CollectionKind::Stream {
409 bound: B::BOUND_KIND,
410 order: O::ORDERING_KIND,
411 retry: R::RETRIES_KIND,
412 element_type: quote_type::<T>().into(),
413 }
414 }
415
416 /// Produces a stream based on invoking `f` on each element.
417 /// If you do not want to modify the stream and instead only want to view
418 /// each item use [`Stream::inspect`] instead.
419 ///
420 /// # Example
421 /// ```rust
422 /// # #[cfg(feature = "deploy")] {
423 /// # use hydro_lang::prelude::*;
424 /// # use futures::StreamExt;
425 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
426 /// let words = process.source_iter(q!(vec!["hello", "world"]));
427 /// words.map(q!(|x| x.to_uppercase()))
428 /// # }, |mut stream| async move {
429 /// # for w in vec!["HELLO", "WORLD"] {
430 /// # assert_eq!(stream.next().await.unwrap(), w);
431 /// # }
432 /// # }));
433 /// # }
434 /// ```
435 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
436 where
437 F: Fn(T) -> U + 'a,
438 {
439 let f = f.splice_fn1_ctx(&self.location).into();
440 Stream::new(
441 self.location.clone(),
442 HydroNode::Map {
443 f,
444 input: Box::new(self.ir_node.into_inner()),
445 metadata: self
446 .location
447 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
448 },
449 )
450 }
451
452 /// For each item `i` in the input stream, transform `i` using `f` and then treat the
453 /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
454 /// for the output type `U` must produce items in a **deterministic** order.
455 ///
456 /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
457 /// not deterministic, use [`Stream::flat_map_unordered`] instead.
458 ///
459 /// # Example
460 /// ```rust
461 /// # #[cfg(feature = "deploy")] {
462 /// # use hydro_lang::prelude::*;
463 /// # use futures::StreamExt;
464 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
465 /// process
466 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
467 /// .flat_map_ordered(q!(|x| x))
468 /// # }, |mut stream| async move {
469 /// // 1, 2, 3, 4
470 /// # for w in (1..5) {
471 /// # assert_eq!(stream.next().await.unwrap(), w);
472 /// # }
473 /// # }));
474 /// # }
475 /// ```
476 pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
477 where
478 I: IntoIterator<Item = U>,
479 F: Fn(T) -> I + 'a,
480 {
481 let f = f.splice_fn1_ctx(&self.location).into();
482 Stream::new(
483 self.location.clone(),
484 HydroNode::FlatMap {
485 f,
486 input: Box::new(self.ir_node.into_inner()),
487 metadata: self
488 .location
489 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
490 },
491 )
492 }
493
494 /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
495 /// for the output type `U` to produce items in any order.
496 ///
497 /// # Example
498 /// ```rust
499 /// # #[cfg(feature = "deploy")] {
500 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
501 /// # use futures::StreamExt;
502 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
503 /// process
504 /// .source_iter(q!(vec![
505 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
506 /// std::collections::HashSet::from_iter(vec![3, 4]),
507 /// ]))
508 /// .flat_map_unordered(q!(|x| x))
509 /// # }, |mut stream| async move {
510 /// // 1, 2, 3, 4, but in no particular order
511 /// # let mut results = Vec::new();
512 /// # for w in (1..5) {
513 /// # results.push(stream.next().await.unwrap());
514 /// # }
515 /// # results.sort();
516 /// # assert_eq!(results, vec![1, 2, 3, 4]);
517 /// # }));
518 /// # }
519 /// ```
520 pub fn flat_map_unordered<U, I, F>(
521 self,
522 f: impl IntoQuotedMut<'a, F, L>,
523 ) -> Stream<U, L, B, NoOrder, R>
524 where
525 I: IntoIterator<Item = U>,
526 F: Fn(T) -> I + 'a,
527 {
528 let f = f.splice_fn1_ctx(&self.location).into();
529 Stream::new(
530 self.location.clone(),
531 HydroNode::FlatMap {
532 f,
533 input: Box::new(self.ir_node.into_inner()),
534 metadata: self
535 .location
536 .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
537 },
538 )
539 }
540
541 /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
542 /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
543 ///
544 /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
545 /// not deterministic, use [`Stream::flatten_unordered`] instead.
546 ///
547 /// ```rust
548 /// # #[cfg(feature = "deploy")] {
549 /// # use hydro_lang::prelude::*;
550 /// # use futures::StreamExt;
551 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
552 /// process
553 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
554 /// .flatten_ordered()
555 /// # }, |mut stream| async move {
556 /// // 1, 2, 3, 4
557 /// # for w in (1..5) {
558 /// # assert_eq!(stream.next().await.unwrap(), w);
559 /// # }
560 /// # }));
561 /// # }
562 /// ```
563 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
564 where
565 T: IntoIterator<Item = U>,
566 {
567 self.flat_map_ordered(q!(|d| d))
568 }
569
570 /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
571 /// for the element type `T` to produce items in any order.
572 ///
573 /// # Example
574 /// ```rust
575 /// # #[cfg(feature = "deploy")] {
576 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
577 /// # use futures::StreamExt;
578 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
579 /// process
580 /// .source_iter(q!(vec![
581 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
582 /// std::collections::HashSet::from_iter(vec![3, 4]),
583 /// ]))
584 /// .flatten_unordered()
585 /// # }, |mut stream| async move {
586 /// // 1, 2, 3, 4, but in no particular order
587 /// # let mut results = Vec::new();
588 /// # for w in (1..5) {
589 /// # results.push(stream.next().await.unwrap());
590 /// # }
591 /// # results.sort();
592 /// # assert_eq!(results, vec![1, 2, 3, 4]);
593 /// # }));
594 /// # }
595 /// ```
596 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
597 where
598 T: IntoIterator<Item = U>,
599 {
600 self.flat_map_unordered(q!(|d| d))
601 }
602
603 /// Creates a stream containing only the elements of the input stream that satisfy a predicate
604 /// `f`, preserving the order of the elements.
605 ///
606 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
607 /// not modify or take ownership of the values. If you need to modify the values while filtering
608 /// use [`Stream::filter_map`] instead.
609 ///
610 /// # Example
611 /// ```rust
612 /// # #[cfg(feature = "deploy")] {
613 /// # use hydro_lang::prelude::*;
614 /// # use futures::StreamExt;
615 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
616 /// process
617 /// .source_iter(q!(vec![1, 2, 3, 4]))
618 /// .filter(q!(|&x| x > 2))
619 /// # }, |mut stream| async move {
620 /// // 3, 4
621 /// # for w in (3..5) {
622 /// # assert_eq!(stream.next().await.unwrap(), w);
623 /// # }
624 /// # }));
625 /// # }
626 /// ```
627 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
628 where
629 F: Fn(&T) -> bool + 'a,
630 {
631 let f = f.splice_fn1_borrow_ctx(&self.location).into();
632 Stream::new(
633 self.location.clone(),
634 HydroNode::Filter {
635 f,
636 input: Box::new(self.ir_node.into_inner()),
637 metadata: self.location.new_node_metadata(Self::collection_kind()),
638 },
639 )
640 }
641
642 /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
643 ///
644 /// # Example
645 /// ```rust
646 /// # #[cfg(feature = "deploy")] {
647 /// # use hydro_lang::prelude::*;
648 /// # use futures::StreamExt;
649 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
650 /// process
651 /// .source_iter(q!(vec!["1", "hello", "world", "2"]))
652 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
653 /// # }, |mut stream| async move {
654 /// // 1, 2
655 /// # for w in (1..3) {
656 /// # assert_eq!(stream.next().await.unwrap(), w);
657 /// # }
658 /// # }));
659 /// # }
660 /// ```
661 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
662 where
663 F: Fn(T) -> Option<U> + 'a,
664 {
665 let f = f.splice_fn1_ctx(&self.location).into();
666 Stream::new(
667 self.location.clone(),
668 HydroNode::FilterMap {
669 f,
670 input: Box::new(self.ir_node.into_inner()),
671 metadata: self
672 .location
673 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
674 },
675 )
676 }
677
678 /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
679 /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
680 /// If `other` is an empty [`Optional`], no values will be produced.
681 ///
682 /// # Example
683 /// ```rust
684 /// # #[cfg(feature = "deploy")] {
685 /// # use hydro_lang::prelude::*;
686 /// # use futures::StreamExt;
687 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
688 /// let tick = process.tick();
689 /// let batch = process
690 /// .source_iter(q!(vec![1, 2, 3, 4]))
691 /// .batch(&tick, nondet!(/** test */));
692 /// let count = batch.clone().count(); // `count()` returns a singleton
693 /// batch.cross_singleton(count).all_ticks()
694 /// # }, |mut stream| async move {
695 /// // (1, 4), (2, 4), (3, 4), (4, 4)
696 /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
697 /// # assert_eq!(stream.next().await.unwrap(), w);
698 /// # }
699 /// # }));
700 /// # }
701 /// ```
702 pub fn cross_singleton<O2>(
703 self,
704 other: impl Into<Optional<O2, L, Bounded>>,
705 ) -> Stream<(T, O2), L, B, O, R>
706 where
707 O2: Clone,
708 {
709 let other: Optional<O2, L, Bounded> = other.into();
710 check_matching_location(&self.location, &other.location);
711
712 Stream::new(
713 self.location.clone(),
714 HydroNode::CrossSingleton {
715 left: Box::new(self.ir_node.into_inner()),
716 right: Box::new(other.ir_node.into_inner()),
717 metadata: self
718 .location
719 .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
720 },
721 )
722 }
723
724 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
725 ///
726 /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
727 /// leader of a cluster.
728 ///
729 /// # Example
730 /// ```rust
731 /// # #[cfg(feature = "deploy")] {
732 /// # use hydro_lang::prelude::*;
733 /// # use futures::StreamExt;
734 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
735 /// let tick = process.tick();
736 /// // ticks are lazy by default, forces the second tick to run
737 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
738 ///
739 /// let batch_first_tick = process
740 /// .source_iter(q!(vec![1, 2, 3, 4]))
741 /// .batch(&tick, nondet!(/** test */));
742 /// let batch_second_tick = process
743 /// .source_iter(q!(vec![5, 6, 7, 8]))
744 /// .batch(&tick, nondet!(/** test */))
745 /// .defer_tick(); // appears on the second tick
746 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
747 /// batch_first_tick.chain(batch_second_tick)
748 /// .filter_if_some(some_on_first_tick)
749 /// .all_ticks()
750 /// # }, |mut stream| async move {
751 /// // [1, 2, 3, 4]
752 /// # for w in vec![1, 2, 3, 4] {
753 /// # assert_eq!(stream.next().await.unwrap(), w);
754 /// # }
755 /// # }));
756 /// # }
757 /// ```
758 pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
759 self.cross_singleton(signal.map(q!(|_u| ())))
760 .map(q!(|(d, _signal)| d))
761 }
762
763 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
764 ///
765 /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
766 /// some local state.
767 ///
768 /// # Example
769 /// ```rust
770 /// # #[cfg(feature = "deploy")] {
771 /// # use hydro_lang::prelude::*;
772 /// # use futures::StreamExt;
773 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
774 /// let tick = process.tick();
775 /// // ticks are lazy by default, forces the second tick to run
776 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
777 ///
778 /// let batch_first_tick = process
779 /// .source_iter(q!(vec![1, 2, 3, 4]))
780 /// .batch(&tick, nondet!(/** test */));
781 /// let batch_second_tick = process
782 /// .source_iter(q!(vec![5, 6, 7, 8]))
783 /// .batch(&tick, nondet!(/** test */))
784 /// .defer_tick(); // appears on the second tick
785 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
786 /// batch_first_tick.chain(batch_second_tick)
787 /// .filter_if_none(some_on_first_tick)
788 /// .all_ticks()
789 /// # }, |mut stream| async move {
790 /// // [5, 6, 7, 8]
791 /// # for w in vec![5, 6, 7, 8] {
792 /// # assert_eq!(stream.next().await.unwrap(), w);
793 /// # }
794 /// # }));
795 /// # }
796 /// ```
797 pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
798 self.filter_if_some(
799 other
800 .map(q!(|_| ()))
801 .into_singleton()
802 .filter(q!(|o| o.is_none())),
803 )
804 }
805
806 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
807 /// tupled pairs in a non-deterministic order.
808 ///
809 /// # Example
810 /// ```rust
811 /// # #[cfg(feature = "deploy")] {
812 /// # use hydro_lang::prelude::*;
813 /// # use std::collections::HashSet;
814 /// # use futures::StreamExt;
815 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
816 /// let tick = process.tick();
817 /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
818 /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
819 /// stream1.cross_product(stream2)
820 /// # }, |mut stream| async move {
821 /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
822 /// # stream.map(|i| assert!(expected.contains(&i)));
823 /// # }));
824 /// # }
825 /// ```
826 pub fn cross_product<T2, O2: Ordering>(
827 self,
828 other: Stream<T2, L, B, O2, R>,
829 ) -> Stream<(T, T2), L, B, NoOrder, R>
830 where
831 T: Clone,
832 T2: Clone,
833 {
834 check_matching_location(&self.location, &other.location);
835
836 Stream::new(
837 self.location.clone(),
838 HydroNode::CrossProduct {
839 left: Box::new(self.ir_node.into_inner()),
840 right: Box::new(other.ir_node.into_inner()),
841 metadata: self
842 .location
843 .new_node_metadata(Stream::<(T, T2), L, B, NoOrder, R>::collection_kind()),
844 },
845 )
846 }
847
848 /// Takes one stream as input and filters out any duplicate occurrences. The output
849 /// contains all unique values from the input.
850 ///
851 /// # Example
852 /// ```rust
853 /// # #[cfg(feature = "deploy")] {
854 /// # use hydro_lang::prelude::*;
855 /// # use futures::StreamExt;
856 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
857 /// let tick = process.tick();
858 /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
859 /// # }, |mut stream| async move {
860 /// # for w in vec![1, 2, 3, 4] {
861 /// # assert_eq!(stream.next().await.unwrap(), w);
862 /// # }
863 /// # }));
864 /// # }
865 /// ```
866 pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
867 where
868 T: Eq + Hash,
869 {
870 Stream::new(
871 self.location.clone(),
872 HydroNode::Unique {
873 input: Box::new(self.ir_node.into_inner()),
874 metadata: self
875 .location
876 .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
877 },
878 )
879 }
880
881 /// Outputs everything in this stream that is *not* contained in the `other` stream.
882 ///
883 /// The `other` stream must be [`Bounded`], since this function will wait until
884 /// all its elements are available before producing any output.
885 /// # Example
886 /// ```rust
887 /// # #[cfg(feature = "deploy")] {
888 /// # use hydro_lang::prelude::*;
889 /// # use futures::StreamExt;
890 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
891 /// let tick = process.tick();
892 /// let stream = process
893 /// .source_iter(q!(vec![ 1, 2, 3, 4 ]))
894 /// .batch(&tick, nondet!(/** test */));
895 /// let batch = process
896 /// .source_iter(q!(vec![1, 2]))
897 /// .batch(&tick, nondet!(/** test */));
898 /// stream.filter_not_in(batch).all_ticks()
899 /// # }, |mut stream| async move {
900 /// # for w in vec![3, 4] {
901 /// # assert_eq!(stream.next().await.unwrap(), w);
902 /// # }
903 /// # }));
904 /// # }
905 /// ```
906 pub fn filter_not_in<O2: Ordering, B2>(self, other: Stream<T, L, B2, O2, R>) -> Self
907 where
908 T: Eq + Hash,
909 B2: IsBounded,
910 {
911 check_matching_location(&self.location, &other.location);
912
913 Stream::new(
914 self.location.clone(),
915 HydroNode::Difference {
916 pos: Box::new(self.ir_node.into_inner()),
917 neg: Box::new(other.ir_node.into_inner()),
918 metadata: self
919 .location
920 .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
921 },
922 )
923 }
924
925 /// An operator which allows you to "inspect" each element of a stream without
926 /// modifying it. The closure `f` is called on a reference to each item. This is
927 /// mainly useful for debugging, and should not be used to generate side-effects.
928 ///
929 /// # Example
930 /// ```rust
931 /// # #[cfg(feature = "deploy")] {
932 /// # use hydro_lang::prelude::*;
933 /// # use futures::StreamExt;
934 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
935 /// let nums = process.source_iter(q!(vec![1, 2]));
936 /// // prints "1 * 10 = 10" and "2 * 10 = 20"
937 /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
938 /// # }, |mut stream| async move {
939 /// # for w in vec![1, 2] {
940 /// # assert_eq!(stream.next().await.unwrap(), w);
941 /// # }
942 /// # }));
943 /// # }
944 /// ```
945 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
946 where
947 F: Fn(&T) + 'a,
948 {
949 let f = f.splice_fn1_borrow_ctx(&self.location).into();
950
951 Stream::new(
952 self.location.clone(),
953 HydroNode::Inspect {
954 f,
955 input: Box::new(self.ir_node.into_inner()),
956 metadata: self.location.new_node_metadata(Self::collection_kind()),
957 },
958 )
959 }
960
961 /// Executes the provided closure for every element in this stream.
962 ///
963 /// Because the closure may have side effects, the stream must have deterministic order
964 /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
965 /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
966 /// [`Stream::assume_retries`] with an explanation for why this is the case.
967 pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>)
968 where
969 O: IsOrdered,
970 R: IsExactlyOnce,
971 {
972 let f = f.splice_fn1_ctx(&self.location).into();
973 self.location
974 .flow_state()
975 .borrow_mut()
976 .push_root(HydroRoot::ForEach {
977 input: Box::new(self.ir_node.into_inner()),
978 f,
979 op_metadata: HydroIrOpMetadata::new(),
980 });
981 }
982
983 /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
984 /// TCP socket to some other server. You should _not_ use this API for interacting with
985 /// external clients, instead see [`Location::bidi_external_many_bytes`] and
986 /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
987 /// interaction with asynchronous sinks.
988 pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
989 where
990 O: IsOrdered,
991 R: IsExactlyOnce,
992 S: 'a + futures::Sink<T> + Unpin,
993 {
994 self.location
995 .flow_state()
996 .borrow_mut()
997 .push_root(HydroRoot::DestSink {
998 sink: sink.splice_typed_ctx(&self.location).into(),
999 input: Box::new(self.ir_node.into_inner()),
1000 op_metadata: HydroIrOpMetadata::new(),
1001 });
1002 }
1003
1004 /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1005 ///
1006 /// # Example
1007 /// ```rust
1008 /// # #[cfg(feature = "deploy")] {
1009 /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1010 /// # use futures::StreamExt;
1011 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, TotalOrder, ExactlyOnce>(|process| {
1012 /// let tick = process.tick();
1013 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1014 /// numbers.enumerate()
1015 /// # }, |mut stream| async move {
1016 /// // (0, 1), (1, 2), (2, 3), (3, 4)
1017 /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1018 /// # assert_eq!(stream.next().await.unwrap(), w);
1019 /// # }
1020 /// # }));
1021 /// # }
1022 /// ```
1023 pub fn enumerate(self) -> Stream<(usize, T), L, B, O, R>
1024 where
1025 O: IsOrdered,
1026 R: IsExactlyOnce,
1027 {
1028 Stream::new(
1029 self.location.clone(),
1030 HydroNode::Enumerate {
1031 input: Box::new(self.ir_node.into_inner()),
1032 metadata: self.location.new_node_metadata(Stream::<
1033 (usize, T),
1034 L,
1035 B,
1036 TotalOrder,
1037 ExactlyOnce,
1038 >::collection_kind()),
1039 },
1040 )
1041 }
1042
1043 /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1044 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1045 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1046 ///
1047 /// Depending on the input stream guarantees, the closure may need to be commutative
1048 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1049 ///
1050 /// # Example
1051 /// ```rust
1052 /// # #[cfg(feature = "deploy")] {
1053 /// # use hydro_lang::prelude::*;
1054 /// # use futures::StreamExt;
1055 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1056 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1057 /// words
1058 /// .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1059 /// .into_stream()
1060 /// # }, |mut stream| async move {
1061 /// // "HELLOWORLD"
1062 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1063 /// # }));
1064 /// # }
1065 /// ```
1066 pub fn fold<A, I, F, C, Idemp>(
1067 self,
1068 init: impl IntoQuotedMut<'a, I, L>,
1069 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1070 ) -> Singleton<A, L, B>
1071 where
1072 I: Fn() -> A + 'a,
1073 F: Fn(&mut A, T),
1074 C: ValidCommutativityFor<O>,
1075 Idemp: ValidIdempotenceFor<R>,
1076 {
1077 let init = init.splice_fn0_ctx(&self.location).into();
1078 let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1079 proof.register_proof(&comb);
1080
1081 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1082 let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1083
1084 let core = HydroNode::Fold {
1085 init,
1086 acc: comb.into(),
1087 input: Box::new(ordered_etc.ir_node.into_inner()),
1088 metadata: ordered_etc
1089 .location
1090 .new_node_metadata(Singleton::<A, L, B>::collection_kind()),
1091 };
1092
1093 Singleton::new(ordered_etc.location, core)
1094 }
1095
1096 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1097 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1098 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1099 /// reference, so that it can be modified in place.
1100 ///
1101 /// Depending on the input stream guarantees, the closure may need to be commutative
1102 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1103 ///
1104 /// # Example
1105 /// ```rust
1106 /// # #[cfg(feature = "deploy")] {
1107 /// # use hydro_lang::prelude::*;
1108 /// # use futures::StreamExt;
1109 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1110 /// let bools = process.source_iter(q!(vec![false, true, false]));
1111 /// bools.reduce(q!(|acc, x| *acc |= x)).into_stream()
1112 /// # }, |mut stream| async move {
1113 /// // true
1114 /// # assert_eq!(stream.next().await.unwrap(), true);
1115 /// # }));
1116 /// # }
1117 /// ```
1118 pub fn reduce<F, C, Idemp>(
1119 self,
1120 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1121 ) -> Optional<T, L, B>
1122 where
1123 F: Fn(&mut T, T) + 'a,
1124 C: ValidCommutativityFor<O>,
1125 Idemp: ValidIdempotenceFor<R>,
1126 {
1127 let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1128 proof.register_proof(&f);
1129
1130 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1131 let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1132
1133 let core = HydroNode::Reduce {
1134 f: f.into(),
1135 input: Box::new(ordered_etc.ir_node.into_inner()),
1136 metadata: ordered_etc
1137 .location
1138 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1139 };
1140
1141 Optional::new(ordered_etc.location, core)
1142 }
1143
1144 /// Computes the maximum element in the stream as an [`Optional`], which
1145 /// will be empty until the first element in the input arrives.
1146 ///
1147 /// # Example
1148 /// ```rust
1149 /// # #[cfg(feature = "deploy")] {
1150 /// # use hydro_lang::prelude::*;
1151 /// # use futures::StreamExt;
1152 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1153 /// let tick = process.tick();
1154 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1155 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1156 /// batch.max().all_ticks()
1157 /// # }, |mut stream| async move {
1158 /// // 4
1159 /// # assert_eq!(stream.next().await.unwrap(), 4);
1160 /// # }));
1161 /// # }
1162 /// ```
1163 pub fn max(self) -> Optional<T, L, B>
1164 where
1165 T: Ord,
1166 {
1167 self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** max is idempotent */))
1168 .assume_ordering_trusted_bounded::<TotalOrder>(
1169 nondet!(/** max is commutative, but order affects intermediates */),
1170 )
1171 .reduce(q!(|curr, new| {
1172 if new > *curr {
1173 *curr = new;
1174 }
1175 }))
1176 }
1177
1178 /// Computes the minimum element in the stream as an [`Optional`], which
1179 /// will be empty until the first element in the input arrives.
1180 ///
1181 /// # Example
1182 /// ```rust
1183 /// # #[cfg(feature = "deploy")] {
1184 /// # use hydro_lang::prelude::*;
1185 /// # use futures::StreamExt;
1186 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1187 /// let tick = process.tick();
1188 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1189 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1190 /// batch.min().all_ticks()
1191 /// # }, |mut stream| async move {
1192 /// // 1
1193 /// # assert_eq!(stream.next().await.unwrap(), 1);
1194 /// # }));
1195 /// # }
1196 /// ```
1197 pub fn min(self) -> Optional<T, L, B>
1198 where
1199 T: Ord,
1200 {
1201 self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** min is idempotent */))
1202 .assume_ordering_trusted_bounded::<TotalOrder>(
1203 nondet!(/** max is commutative, but order affects intermediates */),
1204 )
1205 .reduce(q!(|curr, new| {
1206 if new < *curr {
1207 *curr = new;
1208 }
1209 }))
1210 }
1211
1212 /// Computes the first element in the stream as an [`Optional`], which
1213 /// will be empty until the first element in the input arrives.
1214 ///
1215 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1216 /// re-ordering of elements may cause the first element to change.
1217 ///
1218 /// # Example
1219 /// ```rust
1220 /// # #[cfg(feature = "deploy")] {
1221 /// # use hydro_lang::prelude::*;
1222 /// # use futures::StreamExt;
1223 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1224 /// let tick = process.tick();
1225 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1226 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1227 /// batch.first().all_ticks()
1228 /// # }, |mut stream| async move {
1229 /// // 1
1230 /// # assert_eq!(stream.next().await.unwrap(), 1);
1231 /// # }));
1232 /// # }
1233 /// ```
1234 pub fn first(self) -> Optional<T, L, B>
1235 where
1236 O: IsOrdered,
1237 {
1238 self.make_totally_ordered()
1239 .assume_retries_trusted::<ExactlyOnce>(nondet!(/** first is idempotent */))
1240 .reduce(q!(|_, _| {}))
1241 }
1242
1243 /// Computes the last element in the stream as an [`Optional`], which
1244 /// will be empty until an element in the input arrives.
1245 ///
1246 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1247 /// re-ordering of elements may cause the last element to change.
1248 ///
1249 /// # Example
1250 /// ```rust
1251 /// # #[cfg(feature = "deploy")] {
1252 /// # use hydro_lang::prelude::*;
1253 /// # use futures::StreamExt;
1254 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1255 /// let tick = process.tick();
1256 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1257 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1258 /// batch.last().all_ticks()
1259 /// # }, |mut stream| async move {
1260 /// // 4
1261 /// # assert_eq!(stream.next().await.unwrap(), 4);
1262 /// # }));
1263 /// # }
1264 /// ```
1265 pub fn last(self) -> Optional<T, L, B>
1266 where
1267 O: IsOrdered,
1268 {
1269 self.make_totally_ordered()
1270 .assume_retries_trusted::<ExactlyOnce>(nondet!(/** last is idempotent */))
1271 .reduce(q!(|curr, new| *curr = new))
1272 }
1273
1274 /// Collects all the elements of this stream into a single [`Vec`] element.
1275 ///
1276 /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1277 /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1278 /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1279 /// the vector at an arbitrary point in time.
1280 ///
1281 /// # Example
1282 /// ```rust
1283 /// # #[cfg(feature = "deploy")] {
1284 /// # use hydro_lang::prelude::*;
1285 /// # use futures::StreamExt;
1286 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1287 /// let tick = process.tick();
1288 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1289 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1290 /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1291 /// # }, |mut stream| async move {
1292 /// // [ vec![1, 2, 3, 4] ]
1293 /// # for w in vec![vec![1, 2, 3, 4]] {
1294 /// # assert_eq!(stream.next().await.unwrap(), w);
1295 /// # }
1296 /// # }));
1297 /// # }
1298 /// ```
1299 pub fn collect_vec(self) -> Singleton<Vec<T>, L, B>
1300 where
1301 O: IsOrdered,
1302 R: IsExactlyOnce,
1303 {
1304 self.make_totally_ordered().make_exactly_once().fold(
1305 q!(|| vec![]),
1306 q!(|acc, v| {
1307 acc.push(v);
1308 }),
1309 )
1310 }
1311
1312 /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1313 /// and emitting each intermediate result.
1314 ///
1315 /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1316 /// containing all intermediate accumulated values. The scan operation can also terminate early
1317 /// by returning `None`.
1318 ///
1319 /// The function takes a mutable reference to the accumulator and the current element, and returns
1320 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1321 /// If the function returns `None`, the stream is terminated and no more elements are processed.
1322 ///
1323 /// # Examples
1324 ///
1325 /// Basic usage - running sum:
1326 /// ```rust
1327 /// # #[cfg(feature = "deploy")] {
1328 /// # use hydro_lang::prelude::*;
1329 /// # use futures::StreamExt;
1330 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1331 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1332 /// q!(|| 0),
1333 /// q!(|acc, x| {
1334 /// *acc += x;
1335 /// Some(*acc)
1336 /// }),
1337 /// )
1338 /// # }, |mut stream| async move {
1339 /// // Output: 1, 3, 6, 10
1340 /// # for w in vec![1, 3, 6, 10] {
1341 /// # assert_eq!(stream.next().await.unwrap(), w);
1342 /// # }
1343 /// # }));
1344 /// # }
1345 /// ```
1346 ///
1347 /// Early termination example:
1348 /// ```rust
1349 /// # #[cfg(feature = "deploy")] {
1350 /// # use hydro_lang::prelude::*;
1351 /// # use futures::StreamExt;
1352 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1353 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1354 /// q!(|| 1),
1355 /// q!(|state, x| {
1356 /// *state = *state * x;
1357 /// if *state > 6 {
1358 /// None // Terminate the stream
1359 /// } else {
1360 /// Some(-*state)
1361 /// }
1362 /// }),
1363 /// )
1364 /// # }, |mut stream| async move {
1365 /// // Output: -1, -2, -6
1366 /// # for w in vec![-1, -2, -6] {
1367 /// # assert_eq!(stream.next().await.unwrap(), w);
1368 /// # }
1369 /// # }));
1370 /// # }
1371 /// ```
1372 pub fn scan<A, U, I, F>(
1373 self,
1374 init: impl IntoQuotedMut<'a, I, L>,
1375 f: impl IntoQuotedMut<'a, F, L>,
1376 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1377 where
1378 O: IsOrdered,
1379 R: IsExactlyOnce,
1380 I: Fn() -> A + 'a,
1381 F: Fn(&mut A, T) -> Option<U> + 'a,
1382 {
1383 let init = init.splice_fn0_ctx(&self.location).into();
1384 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1385
1386 Stream::new(
1387 self.location.clone(),
1388 HydroNode::Scan {
1389 init,
1390 acc: f,
1391 input: Box::new(self.ir_node.into_inner()),
1392 metadata: self.location.new_node_metadata(
1393 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1394 ),
1395 },
1396 )
1397 }
1398
1399 /// Given a time interval, returns a stream corresponding to samples taken from the
1400 /// stream roughly at that interval. The output will have elements in the same order
1401 /// as the input, but with arbitrary elements skipped between samples. There is also
1402 /// no guarantee on the exact timing of the samples.
1403 ///
1404 /// # Non-Determinism
1405 /// The output stream is non-deterministic in which elements are sampled, since this
1406 /// is controlled by a clock.
1407 pub fn sample_every(
1408 self,
1409 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1410 nondet: NonDet,
1411 ) -> Stream<T, L, Unbounded, O, AtLeastOnce>
1412 where
1413 L: NoTick + NoAtomic,
1414 {
1415 let samples = self.location.source_interval(interval, nondet);
1416
1417 let tick = self.location.tick();
1418 self.batch(&tick, nondet)
1419 .filter_if_some(samples.batch(&tick, nondet).first())
1420 .all_ticks()
1421 .weaken_retries()
1422 }
1423
1424 /// Given a timeout duration, returns an [`Optional`] which will have a value if the
1425 /// stream has not emitted a value since that duration.
1426 ///
1427 /// # Non-Determinism
1428 /// Timeout relies on non-deterministic sampling of the stream, so depending on when
1429 /// samples take place, timeouts may be non-deterministically generated or missed,
1430 /// and the notification of the timeout may be delayed as well. There is also no
1431 /// guarantee on how long the [`Optional`] will have a value after the timeout is
1432 /// detected based on when the next sample is taken.
1433 pub fn timeout(
1434 self,
1435 duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
1436 nondet: NonDet,
1437 ) -> Optional<(), L, Unbounded>
1438 where
1439 L: NoTick + NoAtomic,
1440 {
1441 let tick = self.location.tick();
1442
1443 let latest_received = self.assume_retries::<ExactlyOnce>(nondet).fold(
1444 q!(|| None),
1445 q!(
1446 |latest, _| {
1447 *latest = Some(Instant::now());
1448 },
1449 commutative = manual_proof!(/** TODO */)
1450 ),
1451 );
1452
1453 latest_received
1454 .snapshot(&tick, nondet)
1455 .filter_map(q!(move |latest_received| {
1456 if let Some(latest_received) = latest_received {
1457 if Instant::now().duration_since(latest_received) > duration {
1458 Some(())
1459 } else {
1460 None
1461 }
1462 } else {
1463 Some(())
1464 }
1465 }))
1466 .latest()
1467 }
1468
1469 /// Shifts this stream into an atomic context, which guarantees that any downstream logic
1470 /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
1471 ///
1472 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1473 /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
1474 /// argument that declares where the stream will be atomically processed. Batching a stream into
1475 /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
1476 /// [`Tick`] will introduce asynchrony.
1477 pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, O, R> {
1478 let out_location = Atomic { tick: tick.clone() };
1479 Stream::new(
1480 out_location.clone(),
1481 HydroNode::BeginAtomic {
1482 inner: Box::new(self.ir_node.into_inner()),
1483 metadata: out_location
1484 .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
1485 },
1486 )
1487 }
1488
1489 /// Given a tick, returns a stream corresponding to a batch of elements segmented by
1490 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1491 /// the order of the input. The output stream will execute in the [`Tick`] that was
1492 /// used to create the atomic section.
1493 ///
1494 /// # Non-Determinism
1495 /// The batch boundaries are non-deterministic and may change across executions.
1496 pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
1497 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1498 Stream::new(
1499 tick.clone(),
1500 HydroNode::Batch {
1501 inner: Box::new(self.ir_node.into_inner()),
1502 metadata: tick
1503 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
1504 },
1505 )
1506 }
1507
1508 /// An operator which allows you to "name" a `HydroNode`.
1509 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1510 pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
1511 {
1512 let mut node = self.ir_node.borrow_mut();
1513 let metadata = node.metadata_mut();
1514 metadata.tag = Some(name.to_owned());
1515 }
1516 self
1517 }
1518
1519 /// Explicitly "casts" the stream to a type with a different ordering
1520 /// guarantee. Useful in unsafe code where the ordering cannot be proven
1521 /// by the type-system.
1522 ///
1523 /// # Non-Determinism
1524 /// This function is used as an escape hatch, and any mistakes in the
1525 /// provided ordering guarantee will propagate into the guarantees
1526 /// for the rest of the program.
1527 pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
1528 if O::ORDERING_KIND == O2::ORDERING_KIND {
1529 Stream::new(self.location, self.ir_node.into_inner())
1530 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
1531 // We can always weaken the ordering guarantee
1532 Stream::new(
1533 self.location.clone(),
1534 HydroNode::Cast {
1535 inner: Box::new(self.ir_node.into_inner()),
1536 metadata: self
1537 .location
1538 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1539 },
1540 )
1541 } else {
1542 Stream::new(
1543 self.location.clone(),
1544 HydroNode::ObserveNonDet {
1545 inner: Box::new(self.ir_node.into_inner()),
1546 trusted: false,
1547 metadata: self
1548 .location
1549 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1550 },
1551 )
1552 }
1553 }
1554
1555 // like `assume_ordering_trusted`, but only if the input stream is bounded and therefore
1556 // intermediate states will not be revealed
1557 fn assume_ordering_trusted_bounded<O2: Ordering>(
1558 self,
1559 nondet: NonDet,
1560 ) -> Stream<T, L, B, O2, R> {
1561 if B::BOUNDED {
1562 self.assume_ordering_trusted(nondet)
1563 } else {
1564 self.assume_ordering(nondet)
1565 }
1566 }
1567
1568 // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1569 // is not observable
1570 pub(crate) fn assume_ordering_trusted<O2: Ordering>(
1571 self,
1572 _nondet: NonDet,
1573 ) -> Stream<T, L, B, O2, R> {
1574 if O::ORDERING_KIND == O2::ORDERING_KIND {
1575 Stream::new(self.location, self.ir_node.into_inner())
1576 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
1577 // We can always weaken the ordering guarantee
1578 Stream::new(
1579 self.location.clone(),
1580 HydroNode::Cast {
1581 inner: Box::new(self.ir_node.into_inner()),
1582 metadata: self
1583 .location
1584 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1585 },
1586 )
1587 } else {
1588 Stream::new(
1589 self.location.clone(),
1590 HydroNode::ObserveNonDet {
1591 inner: Box::new(self.ir_node.into_inner()),
1592 trusted: true,
1593 metadata: self
1594 .location
1595 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1596 },
1597 )
1598 }
1599 }
1600
1601 #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
1602 /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
1603 /// which is always safe because that is the weakest possible guarantee.
1604 pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
1605 self.weaken_ordering::<NoOrder>()
1606 }
1607
1608 /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
1609 /// enforcing that `O2` is weaker than the input ordering guarantee.
1610 pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> Stream<T, L, B, O2, R> {
1611 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
1612 self.assume_ordering::<O2>(nondet)
1613 }
1614
1615 /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
1616 /// implies that `O == TotalOrder`.
1617 pub fn make_totally_ordered(self) -> Stream<T, L, B, TotalOrder, R>
1618 where
1619 O: IsOrdered,
1620 {
1621 self.assume_ordering(nondet!(/** no-op */))
1622 }
1623
1624 /// Explicitly "casts" the stream to a type with a different retries
1625 /// guarantee. Useful in unsafe code where the lack of retries cannot
1626 /// be proven by the type-system.
1627 ///
1628 /// # Non-Determinism
1629 /// This function is used as an escape hatch, and any mistakes in the
1630 /// provided retries guarantee will propagate into the guarantees
1631 /// for the rest of the program.
1632 pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1633 if R::RETRIES_KIND == R2::RETRIES_KIND {
1634 Stream::new(self.location, self.ir_node.into_inner())
1635 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1636 // We can always weaken the retries guarantee
1637 Stream::new(
1638 self.location.clone(),
1639 HydroNode::Cast {
1640 inner: Box::new(self.ir_node.into_inner()),
1641 metadata: self
1642 .location
1643 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1644 },
1645 )
1646 } else {
1647 Stream::new(
1648 self.location.clone(),
1649 HydroNode::ObserveNonDet {
1650 inner: Box::new(self.ir_node.into_inner()),
1651 trusted: false,
1652 metadata: self
1653 .location
1654 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1655 },
1656 )
1657 }
1658 }
1659
1660 // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1661 // is not observable
1662 fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1663 if R::RETRIES_KIND == R2::RETRIES_KIND {
1664 Stream::new(self.location, self.ir_node.into_inner())
1665 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1666 // We can always weaken the retries guarantee
1667 Stream::new(
1668 self.location.clone(),
1669 HydroNode::Cast {
1670 inner: Box::new(self.ir_node.into_inner()),
1671 metadata: self
1672 .location
1673 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1674 },
1675 )
1676 } else {
1677 Stream::new(
1678 self.location.clone(),
1679 HydroNode::ObserveNonDet {
1680 inner: Box::new(self.ir_node.into_inner()),
1681 trusted: true,
1682 metadata: self
1683 .location
1684 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1685 },
1686 )
1687 }
1688 }
1689
1690 #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
1691 /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
1692 /// which is always safe because that is the weakest possible guarantee.
1693 pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
1694 self.weaken_retries::<AtLeastOnce>()
1695 }
1696
1697 /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
1698 /// enforcing that `R2` is weaker than the input retries guarantee.
1699 pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> Stream<T, L, B, O, R2> {
1700 let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
1701 self.assume_retries::<R2>(nondet)
1702 }
1703
1704 /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
1705 /// implies that `R == ExactlyOnce`.
1706 pub fn make_exactly_once(self) -> Stream<T, L, B, O, ExactlyOnce>
1707 where
1708 R: IsExactlyOnce,
1709 {
1710 self.assume_retries(nondet!(/** no-op */))
1711 }
1712
1713 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
1714 /// implies that `B == Bounded`.
1715 pub fn make_bounded(self) -> Stream<T, L, Bounded, O, R>
1716 where
1717 B: IsBounded,
1718 {
1719 Stream::new(self.location, self.ir_node.into_inner())
1720 }
1721}
1722
1723impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
1724where
1725 L: Location<'a>,
1726{
1727 /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
1728 ///
1729 /// # Example
1730 /// ```rust
1731 /// # #[cfg(feature = "deploy")] {
1732 /// # use hydro_lang::prelude::*;
1733 /// # use futures::StreamExt;
1734 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1735 /// process.source_iter(q!(&[1, 2, 3])).cloned()
1736 /// # }, |mut stream| async move {
1737 /// // 1, 2, 3
1738 /// # for w in vec![1, 2, 3] {
1739 /// # assert_eq!(stream.next().await.unwrap(), w);
1740 /// # }
1741 /// # }));
1742 /// # }
1743 /// ```
1744 pub fn cloned(self) -> Stream<T, L, B, O, R>
1745 where
1746 T: Clone,
1747 {
1748 self.map(q!(|d| d.clone()))
1749 }
1750}
1751
1752impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
1753where
1754 L: Location<'a>,
1755{
1756 /// Computes the number of elements in the stream as a [`Singleton`].
1757 ///
1758 /// # Example
1759 /// ```rust
1760 /// # #[cfg(feature = "deploy")] {
1761 /// # use hydro_lang::prelude::*;
1762 /// # use futures::StreamExt;
1763 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1764 /// let tick = process.tick();
1765 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1766 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1767 /// batch.count().all_ticks()
1768 /// # }, |mut stream| async move {
1769 /// // 4
1770 /// # assert_eq!(stream.next().await.unwrap(), 4);
1771 /// # }));
1772 /// # }
1773 /// ```
1774 pub fn count(self) -> Singleton<usize, L, B> {
1775 self.assume_ordering_trusted::<TotalOrder>(nondet!(
1776 /// Order does not affect eventual count, and also does not affect intermediate states.
1777 ))
1778 .fold(q!(|| 0usize), q!(|count, _| *count += 1))
1779 }
1780}
1781
1782impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
1783 /// Produces a new stream that interleaves the elements of the two input streams.
1784 /// The result has [`NoOrder`] because the order of interleaving is not guaranteed.
1785 ///
1786 /// Currently, both input streams must be [`Unbounded`]. When the streams are
1787 /// [`Bounded`], you can use [`Stream::chain`] instead.
1788 ///
1789 /// # Example
1790 /// ```rust
1791 /// # #[cfg(feature = "deploy")] {
1792 /// # use hydro_lang::prelude::*;
1793 /// # use futures::StreamExt;
1794 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1795 /// let numbers: Stream<i32, _, Unbounded> = // 1, 2, 3, 4
1796 /// # process.source_iter(q!(vec![1, 2, 3, 4])).into();
1797 /// numbers.clone().map(q!(|x| x + 1)).interleave(numbers)
1798 /// # }, |mut stream| async move {
1799 /// // 2, 3, 4, 5, and 1, 2, 3, 4 interleaved in unknown order
1800 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1801 /// # assert_eq!(stream.next().await.unwrap(), w);
1802 /// # }
1803 /// # }));
1804 /// # }
1805 /// ```
1806 pub fn interleave<O2: Ordering, R2: Retries>(
1807 self,
1808 other: Stream<T, L, Unbounded, O2, R2>,
1809 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
1810 where
1811 R: MinRetries<R2>,
1812 {
1813 Stream::new(
1814 self.location.clone(),
1815 HydroNode::Chain {
1816 first: Box::new(self.ir_node.into_inner()),
1817 second: Box::new(other.ir_node.into_inner()),
1818 metadata: self.location.new_node_metadata(Stream::<
1819 T,
1820 L,
1821 Unbounded,
1822 NoOrder,
1823 <R as MinRetries<R2>>::Min,
1824 >::collection_kind()),
1825 },
1826 )
1827 }
1828}
1829
1830impl<'a, T, L: Location<'a> + NoTick, R: Retries> Stream<T, L, Unbounded, TotalOrder, R> {
1831 /// Produces a new stream that combines the elements of the two input streams,
1832 /// preserving the relative order of elements within each input.
1833 ///
1834 /// Currently, both input streams must be [`Unbounded`]. When the streams are
1835 /// [`Bounded`], you can use [`Stream::chain`] instead.
1836 ///
1837 /// # Non-Determinism
1838 /// The order in which elements *across* the two streams will be interleaved is
1839 /// non-deterministic, so the order of elements will vary across runs. If the output order
1840 /// is irrelevant, use [`Stream::interleave`] instead, which is deterministic but emits an
1841 /// unordered stream.
1842 ///
1843 /// # Example
1844 /// ```rust
1845 /// # #[cfg(feature = "deploy")] {
1846 /// # use hydro_lang::prelude::*;
1847 /// # use futures::StreamExt;
1848 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1849 /// let numbers: Stream<i32, _, Unbounded> = // 1, 3
1850 /// # process.source_iter(q!(vec![1, 3])).into();
1851 /// numbers.clone().merge_ordered(numbers.map(q!(|x| x + 1)), nondet!(/** example */))
1852 /// # }, |mut stream| async move {
1853 /// // 1, 3 and 2, 4 in some order, preserving the original local order
1854 /// # for w in vec![1, 3, 2, 4] {
1855 /// # assert_eq!(stream.next().await.unwrap(), w);
1856 /// # }
1857 /// # }));
1858 /// # }
1859 /// ```
1860 pub fn merge_ordered<R2: Retries>(
1861 self,
1862 other: Stream<T, L, Unbounded, TotalOrder, R2>,
1863 _nondet: NonDet,
1864 ) -> Stream<T, L, Unbounded, TotalOrder, <R as MinRetries<R2>>::Min>
1865 where
1866 R: MinRetries<R2>,
1867 {
1868 Stream::new(
1869 self.location.clone(),
1870 HydroNode::Chain {
1871 first: Box::new(self.ir_node.into_inner()),
1872 second: Box::new(other.ir_node.into_inner()),
1873 metadata: self.location.new_node_metadata(Stream::<
1874 T,
1875 L,
1876 Unbounded,
1877 TotalOrder,
1878 <R as MinRetries<R2>>::Min,
1879 >::collection_kind()),
1880 },
1881 )
1882 }
1883}
1884
1885impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
1886where
1887 L: Location<'a>,
1888{
1889 /// Produces a new stream that emits the input elements in sorted order.
1890 ///
1891 /// The input stream can have any ordering guarantee, but the output stream
1892 /// will have a [`TotalOrder`] guarantee. This operator will block until all
1893 /// elements in the input stream are available, so it requires the input stream
1894 /// to be [`Bounded`].
1895 ///
1896 /// # Example
1897 /// ```rust
1898 /// # #[cfg(feature = "deploy")] {
1899 /// # use hydro_lang::prelude::*;
1900 /// # use futures::StreamExt;
1901 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1902 /// let tick = process.tick();
1903 /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
1904 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1905 /// batch.sort().all_ticks()
1906 /// # }, |mut stream| async move {
1907 /// // 1, 2, 3, 4
1908 /// # for w in (1..5) {
1909 /// # assert_eq!(stream.next().await.unwrap(), w);
1910 /// # }
1911 /// # }));
1912 /// # }
1913 /// ```
1914 pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
1915 where
1916 B: IsBounded,
1917 T: Ord,
1918 {
1919 let this = self.make_bounded();
1920 Stream::new(
1921 this.location.clone(),
1922 HydroNode::Sort {
1923 input: Box::new(this.ir_node.into_inner()),
1924 metadata: this
1925 .location
1926 .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
1927 },
1928 )
1929 }
1930
1931 /// Produces a new stream that first emits the elements of the `self` stream,
1932 /// and then emits the elements of the `other` stream. The output stream has
1933 /// a [`TotalOrder`] guarantee if and only if both input streams have a
1934 /// [`TotalOrder`] guarantee.
1935 ///
1936 /// Currently, both input streams must be [`Bounded`]. This operator will block
1937 /// on the first stream until all its elements are available. In a future version,
1938 /// we will relax the requirement on the `other` stream.
1939 ///
1940 /// # Example
1941 /// ```rust
1942 /// # #[cfg(feature = "deploy")] {
1943 /// # use hydro_lang::prelude::*;
1944 /// # use futures::StreamExt;
1945 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1946 /// let tick = process.tick();
1947 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1948 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1949 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1950 /// # }, |mut stream| async move {
1951 /// // 2, 3, 4, 5, 1, 2, 3, 4
1952 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1953 /// # assert_eq!(stream.next().await.unwrap(), w);
1954 /// # }
1955 /// # }));
1956 /// # }
1957 /// ```
1958 pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
1959 self,
1960 other: Stream<T, L, B2, O2, R2>,
1961 ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
1962 where
1963 B: IsBounded,
1964 O: MinOrder<O2>,
1965 R: MinRetries<R2>,
1966 {
1967 check_matching_location(&self.location, &other.location);
1968
1969 Stream::new(
1970 self.location.clone(),
1971 HydroNode::Chain {
1972 first: Box::new(self.ir_node.into_inner()),
1973 second: Box::new(other.ir_node.into_inner()),
1974 metadata: self.location.new_node_metadata(Stream::<
1975 T,
1976 L,
1977 B2,
1978 <O as MinOrder<O2>>::Min,
1979 <R as MinRetries<R2>>::Min,
1980 >::collection_kind()),
1981 },
1982 )
1983 }
1984
1985 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
1986 /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
1987 /// because this is compiled into a nested loop.
1988 pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
1989 self,
1990 other: Stream<T2, L, Bounded, O2, R>,
1991 ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
1992 where
1993 B: IsBounded,
1994 T: Clone,
1995 T2: Clone,
1996 {
1997 let this = self.make_bounded();
1998 check_matching_location(&this.location, &other.location);
1999
2000 Stream::new(
2001 this.location.clone(),
2002 HydroNode::CrossProduct {
2003 left: Box::new(this.ir_node.into_inner()),
2004 right: Box::new(other.ir_node.into_inner()),
2005 metadata: this.location.new_node_metadata(Stream::<
2006 (T, T2),
2007 L,
2008 Bounded,
2009 <O2 as MinOrder<O>>::Min,
2010 R,
2011 >::collection_kind()),
2012 },
2013 )
2014 }
2015
2016 /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
2017 /// `self` used as the values for *each* key.
2018 ///
2019 /// This is helpful when "broadcasting" a set of values so that all the keys have the same
2020 /// values. For example, it can be used to send the same set of elements to several cluster
2021 /// members, if the membership information is available as a [`KeyedSingleton`].
2022 ///
2023 /// # Example
2024 /// ```rust
2025 /// # #[cfg(feature = "deploy")] {
2026 /// # use hydro_lang::prelude::*;
2027 /// # use futures::StreamExt;
2028 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2029 /// # let tick = process.tick();
2030 /// let keyed_singleton = // { 1: (), 2: () }
2031 /// # process
2032 /// # .source_iter(q!(vec![(1, ()), (2, ())]))
2033 /// # .into_keyed()
2034 /// # .batch(&tick, nondet!(/** test */))
2035 /// # .first();
2036 /// let stream = // [ "a", "b" ]
2037 /// # process
2038 /// # .source_iter(q!(vec!["a".to_owned(), "b".to_owned()]))
2039 /// # .batch(&tick, nondet!(/** test */));
2040 /// stream.repeat_with_keys(keyed_singleton)
2041 /// # .entries().all_ticks()
2042 /// # }, |mut stream| async move {
2043 /// // { 1: ["a", "b" ], 2: ["a", "b"] }
2044 /// # let mut results = Vec::new();
2045 /// # for _ in 0..4 {
2046 /// # results.push(stream.next().await.unwrap());
2047 /// # }
2048 /// # results.sort();
2049 /// # assert_eq!(results, vec![(1, "a".to_owned()), (1, "b".to_owned()), (2, "a".to_owned()), (2, "b".to_owned())]);
2050 /// # }));
2051 /// # }
2052 /// ```
2053 pub fn repeat_with_keys<K, V2>(
2054 self,
2055 keys: KeyedSingleton<K, V2, L, Bounded>,
2056 ) -> KeyedStream<K, T, L, Bounded, O, R>
2057 where
2058 B: IsBounded,
2059 K: Clone,
2060 T: Clone,
2061 {
2062 keys.keys()
2063 .weaken_retries()
2064 .assume_ordering_trusted::<TotalOrder>(
2065 nondet!(/** keyed stream does not depend on ordering of keys */),
2066 )
2067 .cross_product_nested_loop(self.make_bounded())
2068 .into_keyed()
2069 }
2070}
2071
2072impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
2073where
2074 L: Location<'a>,
2075{
2076 #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
2077 /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
2078 /// by equi-joining the two streams on the key attribute `K`.
2079 ///
2080 /// # Example
2081 /// ```rust
2082 /// # #[cfg(feature = "deploy")] {
2083 /// # use hydro_lang::prelude::*;
2084 /// # use std::collections::HashSet;
2085 /// # use futures::StreamExt;
2086 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2087 /// let tick = process.tick();
2088 /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
2089 /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
2090 /// stream1.join(stream2)
2091 /// # }, |mut stream| async move {
2092 /// // (1, ('a', 'x')), (2, ('b', 'y'))
2093 /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
2094 /// # stream.map(|i| assert!(expected.contains(&i)));
2095 /// # }));
2096 /// # }
2097 pub fn join<V2, O2: Ordering, R2: Retries>(
2098 self,
2099 n: Stream<(K, V2), L, B, O2, R2>,
2100 ) -> Stream<(K, (V1, V2)), L, B, NoOrder, <R as MinRetries<R2>>::Min>
2101 where
2102 K: Eq + Hash,
2103 R: MinRetries<R2>,
2104 {
2105 check_matching_location(&self.location, &n.location);
2106
2107 Stream::new(
2108 self.location.clone(),
2109 HydroNode::Join {
2110 left: Box::new(self.ir_node.into_inner()),
2111 right: Box::new(n.ir_node.into_inner()),
2112 metadata: self.location.new_node_metadata(Stream::<
2113 (K, (V1, V2)),
2114 L,
2115 B,
2116 NoOrder,
2117 <R as MinRetries<R2>>::Min,
2118 >::collection_kind()),
2119 },
2120 )
2121 }
2122
2123 /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
2124 /// computes the anti-join of the items in the input -- i.e. returns
2125 /// unique items in the first input that do not have a matching key
2126 /// in the second input.
2127 ///
2128 /// # Example
2129 /// ```rust
2130 /// # #[cfg(feature = "deploy")] {
2131 /// # use hydro_lang::prelude::*;
2132 /// # use futures::StreamExt;
2133 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2134 /// let tick = process.tick();
2135 /// let stream = process
2136 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2137 /// .batch(&tick, nondet!(/** test */));
2138 /// let batch = process
2139 /// .source_iter(q!(vec![1, 2]))
2140 /// .batch(&tick, nondet!(/** test */));
2141 /// stream.anti_join(batch).all_ticks()
2142 /// # }, |mut stream| async move {
2143 /// # for w in vec![(3, 'c'), (4, 'd')] {
2144 /// # assert_eq!(stream.next().await.unwrap(), w);
2145 /// # }
2146 /// # }));
2147 /// # }
2148 pub fn anti_join<O2: Ordering, R2: Retries>(
2149 self,
2150 n: Stream<K, L, Bounded, O2, R2>,
2151 ) -> Stream<(K, V1), L, B, O, R>
2152 where
2153 K: Eq + Hash,
2154 {
2155 check_matching_location(&self.location, &n.location);
2156
2157 Stream::new(
2158 self.location.clone(),
2159 HydroNode::AntiJoin {
2160 pos: Box::new(self.ir_node.into_inner()),
2161 neg: Box::new(n.ir_node.into_inner()),
2162 metadata: self
2163 .location
2164 .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2165 },
2166 )
2167 }
2168}
2169
2170impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2171 Stream<(K, V), L, B, O, R>
2172{
2173 /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2174 /// is used as the key and the second element is added to the entries associated with that key.
2175 ///
2176 /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2177 /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2178 /// performing grouped aggregations, but also for more precise ordering guarantees such as
2179 /// total ordering _within_ each group but no ordering _across_ groups.
2180 ///
2181 /// # Example
2182 /// ```rust
2183 /// # #[cfg(feature = "deploy")] {
2184 /// # use hydro_lang::prelude::*;
2185 /// # use futures::StreamExt;
2186 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2187 /// process
2188 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2189 /// .into_keyed()
2190 /// # .entries()
2191 /// # }, |mut stream| async move {
2192 /// // { 1: [2, 3], 2: [4] }
2193 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2194 /// # assert_eq!(stream.next().await.unwrap(), w);
2195 /// # }
2196 /// # }));
2197 /// # }
2198 /// ```
2199 pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2200 KeyedStream::new(
2201 self.location.clone(),
2202 HydroNode::Cast {
2203 inner: Box::new(self.ir_node.into_inner()),
2204 metadata: self
2205 .location
2206 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2207 },
2208 )
2209 }
2210}
2211
2212impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2213where
2214 K: Eq + Hash,
2215 L: Location<'a>,
2216{
2217 /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2218 /// # Example
2219 /// ```rust
2220 /// # #[cfg(feature = "deploy")] {
2221 /// # use hydro_lang::prelude::*;
2222 /// # use futures::StreamExt;
2223 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2224 /// let tick = process.tick();
2225 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2226 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2227 /// batch.keys().all_ticks()
2228 /// # }, |mut stream| async move {
2229 /// // 1, 2
2230 /// # assert_eq!(stream.next().await.unwrap(), 1);
2231 /// # assert_eq!(stream.next().await.unwrap(), 2);
2232 /// # }));
2233 /// # }
2234 /// ```
2235 pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2236 self.into_keyed()
2237 .fold(
2238 q!(|| ()),
2239 q!(
2240 |_, _| {},
2241 commutative = manual_proof!(/** values are ignored */),
2242 idempotent = manual_proof!(/** values are ignored */)
2243 ),
2244 )
2245 .keys()
2246 }
2247}
2248
2249impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2250where
2251 L: Location<'a> + NoTick,
2252{
2253 /// Returns a stream corresponding to the latest batch of elements being atomically
2254 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2255 /// the order of the input.
2256 ///
2257 /// # Non-Determinism
2258 /// The batch boundaries are non-deterministic and may change across executions.
2259 pub fn batch_atomic(self, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2260 Stream::new(
2261 self.location.clone().tick,
2262 HydroNode::Batch {
2263 inner: Box::new(self.ir_node.into_inner()),
2264 metadata: self
2265 .location
2266 .tick
2267 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2268 },
2269 )
2270 }
2271
2272 /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2273 /// See [`Stream::atomic`] for more details.
2274 pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2275 Stream::new(
2276 self.location.tick.l.clone(),
2277 HydroNode::EndAtomic {
2278 inner: Box::new(self.ir_node.into_inner()),
2279 metadata: self
2280 .location
2281 .tick
2282 .l
2283 .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2284 },
2285 )
2286 }
2287}
2288
2289impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2290where
2291 L: Location<'a> + NoTick + NoAtomic,
2292 F: Future<Output = T>,
2293{
2294 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2295 /// Future outputs are produced as available, regardless of input arrival order.
2296 ///
2297 /// # Example
2298 /// ```rust
2299 /// # #[cfg(feature = "deploy")] {
2300 /// # use std::collections::HashSet;
2301 /// # use futures::StreamExt;
2302 /// # use hydro_lang::prelude::*;
2303 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2304 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2305 /// .map(q!(|x| async move {
2306 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2307 /// x
2308 /// }))
2309 /// .resolve_futures()
2310 /// # },
2311 /// # |mut stream| async move {
2312 /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2313 /// # let mut output = HashSet::new();
2314 /// # for _ in 1..10 {
2315 /// # output.insert(stream.next().await.unwrap());
2316 /// # }
2317 /// # assert_eq!(
2318 /// # output,
2319 /// # HashSet::<i32>::from_iter(1..10)
2320 /// # );
2321 /// # },
2322 /// # ));
2323 /// # }
2324 pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
2325 Stream::new(
2326 self.location.clone(),
2327 HydroNode::ResolveFutures {
2328 input: Box::new(self.ir_node.into_inner()),
2329 metadata: self
2330 .location
2331 .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
2332 },
2333 )
2334 }
2335
2336 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2337 /// Future outputs are produced in the same order as the input stream.
2338 ///
2339 /// # Example
2340 /// ```rust
2341 /// # #[cfg(feature = "deploy")] {
2342 /// # use std::collections::HashSet;
2343 /// # use futures::StreamExt;
2344 /// # use hydro_lang::prelude::*;
2345 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2346 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2347 /// .map(q!(|x| async move {
2348 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2349 /// x
2350 /// }))
2351 /// .resolve_futures_ordered()
2352 /// # },
2353 /// # |mut stream| async move {
2354 /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2355 /// # let mut output = Vec::new();
2356 /// # for _ in 1..10 {
2357 /// # output.push(stream.next().await.unwrap());
2358 /// # }
2359 /// # assert_eq!(
2360 /// # output,
2361 /// # vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2362 /// # );
2363 /// # },
2364 /// # ));
2365 /// # }
2366 pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
2367 Stream::new(
2368 self.location.clone(),
2369 HydroNode::ResolveFuturesOrdered {
2370 input: Box::new(self.ir_node.into_inner()),
2371 metadata: self
2372 .location
2373 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2374 },
2375 )
2376 }
2377}
2378
2379impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2380where
2381 L: Location<'a>,
2382{
2383 /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2384 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2385 pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2386 Stream::new(
2387 self.location.outer().clone(),
2388 HydroNode::YieldConcat {
2389 inner: Box::new(self.ir_node.into_inner()),
2390 metadata: self
2391 .location
2392 .outer()
2393 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2394 },
2395 )
2396 }
2397
2398 /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2399 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2400 ///
2401 /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2402 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2403 /// stream's [`Tick`] context.
2404 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2405 let out_location = Atomic {
2406 tick: self.location.clone(),
2407 };
2408
2409 Stream::new(
2410 out_location.clone(),
2411 HydroNode::YieldConcat {
2412 inner: Box::new(self.ir_node.into_inner()),
2413 metadata: out_location
2414 .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
2415 },
2416 )
2417 }
2418
2419 /// Transforms the stream using the given closure in "stateful" mode, where stateful operators
2420 /// such as `fold` retrain their memory across ticks rather than resetting across batches of
2421 /// input.
2422 ///
2423 /// This API is particularly useful for stateful computation on batches of data, such as
2424 /// maintaining an accumulated state that is up to date with the current batch.
2425 ///
2426 /// # Example
2427 /// ```rust
2428 /// # #[cfg(feature = "deploy")] {
2429 /// # use hydro_lang::prelude::*;
2430 /// # use futures::StreamExt;
2431 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2432 /// let tick = process.tick();
2433 /// # // ticks are lazy by default, forces the second tick to run
2434 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2435 /// # let batch_first_tick = process
2436 /// # .source_iter(q!(vec![1, 2, 3, 4]))
2437 /// # .batch(&tick, nondet!(/** test */));
2438 /// # let batch_second_tick = process
2439 /// # .source_iter(q!(vec![5, 6, 7]))
2440 /// # .batch(&tick, nondet!(/** test */))
2441 /// # .defer_tick(); // appears on the second tick
2442 /// let input = // [1, 2, 3, 4 (first batch), 5, 6, 7 (second batch)]
2443 /// # batch_first_tick.chain(batch_second_tick).all_ticks();
2444 ///
2445 /// input.batch(&tick, nondet!(/** test */))
2446 /// .across_ticks(|s| s.count()).all_ticks()
2447 /// # }, |mut stream| async move {
2448 /// // [4, 7]
2449 /// assert_eq!(stream.next().await.unwrap(), 4);
2450 /// assert_eq!(stream.next().await.unwrap(), 7);
2451 /// # }));
2452 /// # }
2453 /// ```
2454 pub fn across_ticks<Out: BatchAtomic>(
2455 self,
2456 thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
2457 ) -> Out::Batched {
2458 thunk(self.all_ticks_atomic()).batched_atomic()
2459 }
2460
2461 /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
2462 /// always has the elements of `self` at tick `T - 1`.
2463 ///
2464 /// At tick `0`, the output stream is empty, since there is no previous tick.
2465 ///
2466 /// This operator enables stateful iterative processing with ticks, by sending data from one
2467 /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
2468 ///
2469 /// # Example
2470 /// ```rust
2471 /// # #[cfg(feature = "deploy")] {
2472 /// # use hydro_lang::prelude::*;
2473 /// # use futures::StreamExt;
2474 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2475 /// let tick = process.tick();
2476 /// // ticks are lazy by default, forces the second tick to run
2477 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2478 ///
2479 /// let batch_first_tick = process
2480 /// .source_iter(q!(vec![1, 2, 3, 4]))
2481 /// .batch(&tick, nondet!(/** test */));
2482 /// let batch_second_tick = process
2483 /// .source_iter(q!(vec![0, 3, 4, 5, 6]))
2484 /// .batch(&tick, nondet!(/** test */))
2485 /// .defer_tick(); // appears on the second tick
2486 /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
2487 ///
2488 /// changes_across_ticks.clone().filter_not_in(
2489 /// changes_across_ticks.defer_tick() // the elements from the previous tick
2490 /// ).all_ticks()
2491 /// # }, |mut stream| async move {
2492 /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
2493 /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
2494 /// # assert_eq!(stream.next().await.unwrap(), w);
2495 /// # }
2496 /// # }));
2497 /// # }
2498 /// ```
2499 pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2500 Stream::new(
2501 self.location.clone(),
2502 HydroNode::DeferTick {
2503 input: Box::new(self.ir_node.into_inner()),
2504 metadata: self
2505 .location
2506 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2507 },
2508 )
2509 }
2510}
2511
2512#[cfg(test)]
2513mod tests {
2514 #[cfg(feature = "deploy")]
2515 use futures::{SinkExt, StreamExt};
2516 #[cfg(feature = "deploy")]
2517 use hydro_deploy::Deployment;
2518 #[cfg(feature = "deploy")]
2519 use serde::{Deserialize, Serialize};
2520 #[cfg(any(feature = "deploy", feature = "sim"))]
2521 use stageleft::q;
2522
2523 #[cfg(any(feature = "deploy", feature = "sim"))]
2524 use crate::compile::builder::FlowBuilder;
2525 #[cfg(feature = "deploy")]
2526 use crate::live_collections::sliced::sliced;
2527 #[cfg(feature = "deploy")]
2528 use crate::live_collections::stream::ExactlyOnce;
2529 #[cfg(feature = "sim")]
2530 use crate::live_collections::stream::NoOrder;
2531 #[cfg(any(feature = "deploy", feature = "sim"))]
2532 use crate::live_collections::stream::TotalOrder;
2533 #[cfg(any(feature = "deploy", feature = "sim"))]
2534 use crate::location::Location;
2535 #[cfg(any(feature = "deploy", feature = "sim"))]
2536 use crate::nondet::nondet;
2537
2538 mod backtrace_chained_ops;
2539
2540 #[cfg(feature = "deploy")]
2541 struct P1 {}
2542 #[cfg(feature = "deploy")]
2543 struct P2 {}
2544
2545 #[cfg(feature = "deploy")]
2546 #[derive(Serialize, Deserialize, Debug)]
2547 struct SendOverNetwork {
2548 n: u32,
2549 }
2550
2551 #[cfg(feature = "deploy")]
2552 #[tokio::test]
2553 async fn first_ten_distributed() {
2554 use crate::networking::TCP;
2555
2556 let mut deployment = Deployment::new();
2557
2558 let mut flow = FlowBuilder::new();
2559 let first_node = flow.process::<P1>();
2560 let second_node = flow.process::<P2>();
2561 let external = flow.external::<P2>();
2562
2563 let numbers = first_node.source_iter(q!(0..10));
2564 let out_port = numbers
2565 .map(q!(|n| SendOverNetwork { n }))
2566 .send(&second_node, TCP.fail_stop().bincode())
2567 .send_bincode_external(&external);
2568
2569 let nodes = flow
2570 .with_process(&first_node, deployment.Localhost())
2571 .with_process(&second_node, deployment.Localhost())
2572 .with_external(&external, deployment.Localhost())
2573 .deploy(&mut deployment);
2574
2575 deployment.deploy().await.unwrap();
2576
2577 let mut external_out = nodes.connect(out_port).await;
2578
2579 deployment.start().await.unwrap();
2580
2581 for i in 0..10 {
2582 assert_eq!(external_out.next().await.unwrap().n, i);
2583 }
2584 }
2585
2586 #[cfg(feature = "deploy")]
2587 #[tokio::test]
2588 async fn first_cardinality() {
2589 let mut deployment = Deployment::new();
2590
2591 let mut flow = FlowBuilder::new();
2592 let node = flow.process::<()>();
2593 let external = flow.external::<()>();
2594
2595 let node_tick = node.tick();
2596 let count = node_tick
2597 .singleton(q!([1, 2, 3]))
2598 .into_stream()
2599 .flatten_ordered()
2600 .first()
2601 .into_stream()
2602 .count()
2603 .all_ticks()
2604 .send_bincode_external(&external);
2605
2606 let nodes = flow
2607 .with_process(&node, deployment.Localhost())
2608 .with_external(&external, deployment.Localhost())
2609 .deploy(&mut deployment);
2610
2611 deployment.deploy().await.unwrap();
2612
2613 let mut external_out = nodes.connect(count).await;
2614
2615 deployment.start().await.unwrap();
2616
2617 assert_eq!(external_out.next().await.unwrap(), 1);
2618 }
2619
2620 #[cfg(feature = "deploy")]
2621 #[tokio::test]
2622 async fn unbounded_reduce_remembers_state() {
2623 let mut deployment = Deployment::new();
2624
2625 let mut flow = FlowBuilder::new();
2626 let node = flow.process::<()>();
2627 let external = flow.external::<()>();
2628
2629 let (input_port, input) = node.source_external_bincode(&external);
2630 let out = input
2631 .reduce(q!(|acc, v| *acc += v))
2632 .sample_eager(nondet!(/** test */))
2633 .send_bincode_external(&external);
2634
2635 let nodes = flow
2636 .with_process(&node, deployment.Localhost())
2637 .with_external(&external, deployment.Localhost())
2638 .deploy(&mut deployment);
2639
2640 deployment.deploy().await.unwrap();
2641
2642 let mut external_in = nodes.connect(input_port).await;
2643 let mut external_out = nodes.connect(out).await;
2644
2645 deployment.start().await.unwrap();
2646
2647 external_in.send(1).await.unwrap();
2648 assert_eq!(external_out.next().await.unwrap(), 1);
2649
2650 external_in.send(2).await.unwrap();
2651 assert_eq!(external_out.next().await.unwrap(), 3);
2652 }
2653
2654 #[cfg(feature = "deploy")]
2655 #[tokio::test]
2656 async fn top_level_bounded_cross_singleton() {
2657 let mut deployment = Deployment::new();
2658
2659 let mut flow = FlowBuilder::new();
2660 let node = flow.process::<()>();
2661 let external = flow.external::<()>();
2662
2663 let (input_port, input) =
2664 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2665
2666 let out = input
2667 .cross_singleton(
2668 node.source_iter(q!(vec![1, 2, 3]))
2669 .fold(q!(|| 0), q!(|acc, v| *acc += v)),
2670 )
2671 .send_bincode_external(&external);
2672
2673 let nodes = flow
2674 .with_process(&node, deployment.Localhost())
2675 .with_external(&external, deployment.Localhost())
2676 .deploy(&mut deployment);
2677
2678 deployment.deploy().await.unwrap();
2679
2680 let mut external_in = nodes.connect(input_port).await;
2681 let mut external_out = nodes.connect(out).await;
2682
2683 deployment.start().await.unwrap();
2684
2685 external_in.send(1).await.unwrap();
2686 assert_eq!(external_out.next().await.unwrap(), (1, 6));
2687
2688 external_in.send(2).await.unwrap();
2689 assert_eq!(external_out.next().await.unwrap(), (2, 6));
2690 }
2691
2692 #[cfg(feature = "deploy")]
2693 #[tokio::test]
2694 async fn top_level_bounded_reduce_cardinality() {
2695 let mut deployment = Deployment::new();
2696
2697 let mut flow = FlowBuilder::new();
2698 let node = flow.process::<()>();
2699 let external = flow.external::<()>();
2700
2701 let (input_port, input) =
2702 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2703
2704 let out = sliced! {
2705 let input = use(input, nondet!(/** test */));
2706 let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!(/** test */));
2707 input.cross_singleton(v.into_stream().count())
2708 }
2709 .send_bincode_external(&external);
2710
2711 let nodes = flow
2712 .with_process(&node, deployment.Localhost())
2713 .with_external(&external, deployment.Localhost())
2714 .deploy(&mut deployment);
2715
2716 deployment.deploy().await.unwrap();
2717
2718 let mut external_in = nodes.connect(input_port).await;
2719 let mut external_out = nodes.connect(out).await;
2720
2721 deployment.start().await.unwrap();
2722
2723 external_in.send(1).await.unwrap();
2724 assert_eq!(external_out.next().await.unwrap(), (1, 1));
2725
2726 external_in.send(2).await.unwrap();
2727 assert_eq!(external_out.next().await.unwrap(), (2, 1));
2728 }
2729
2730 #[cfg(feature = "deploy")]
2731 #[tokio::test]
2732 async fn top_level_bounded_into_singleton_cardinality() {
2733 let mut deployment = Deployment::new();
2734
2735 let mut flow = FlowBuilder::new();
2736 let node = flow.process::<()>();
2737 let external = flow.external::<()>();
2738
2739 let (input_port, input) =
2740 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2741
2742 let out = sliced! {
2743 let input = use(input, nondet!(/** test */));
2744 let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!(/** test */));
2745 input.cross_singleton(v.into_stream().count())
2746 }
2747 .send_bincode_external(&external);
2748
2749 let nodes = flow
2750 .with_process(&node, deployment.Localhost())
2751 .with_external(&external, deployment.Localhost())
2752 .deploy(&mut deployment);
2753
2754 deployment.deploy().await.unwrap();
2755
2756 let mut external_in = nodes.connect(input_port).await;
2757 let mut external_out = nodes.connect(out).await;
2758
2759 deployment.start().await.unwrap();
2760
2761 external_in.send(1).await.unwrap();
2762 assert_eq!(external_out.next().await.unwrap(), (1, 1));
2763
2764 external_in.send(2).await.unwrap();
2765 assert_eq!(external_out.next().await.unwrap(), (2, 1));
2766 }
2767
2768 #[cfg(feature = "deploy")]
2769 #[tokio::test]
2770 async fn atomic_fold_replays_each_tick() {
2771 let mut deployment = Deployment::new();
2772
2773 let mut flow = FlowBuilder::new();
2774 let node = flow.process::<()>();
2775 let external = flow.external::<()>();
2776
2777 let (input_port, input) =
2778 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2779 let tick = node.tick();
2780
2781 let out = input
2782 .batch(&tick, nondet!(/** test */))
2783 .cross_singleton(
2784 node.source_iter(q!(vec![1, 2, 3]))
2785 .atomic(&tick)
2786 .fold(q!(|| 0), q!(|acc, v| *acc += v))
2787 .snapshot_atomic(nondet!(/** test */)),
2788 )
2789 .all_ticks()
2790 .send_bincode_external(&external);
2791
2792 let nodes = flow
2793 .with_process(&node, deployment.Localhost())
2794 .with_external(&external, deployment.Localhost())
2795 .deploy(&mut deployment);
2796
2797 deployment.deploy().await.unwrap();
2798
2799 let mut external_in = nodes.connect(input_port).await;
2800 let mut external_out = nodes.connect(out).await;
2801
2802 deployment.start().await.unwrap();
2803
2804 external_in.send(1).await.unwrap();
2805 assert_eq!(external_out.next().await.unwrap(), (1, 6));
2806
2807 external_in.send(2).await.unwrap();
2808 assert_eq!(external_out.next().await.unwrap(), (2, 6));
2809 }
2810
2811 #[cfg(feature = "deploy")]
2812 #[tokio::test]
2813 async fn unbounded_scan_remembers_state() {
2814 let mut deployment = Deployment::new();
2815
2816 let mut flow = FlowBuilder::new();
2817 let node = flow.process::<()>();
2818 let external = flow.external::<()>();
2819
2820 let (input_port, input) = node.source_external_bincode(&external);
2821 let out = input
2822 .scan(
2823 q!(|| 0),
2824 q!(|acc, v| {
2825 *acc += v;
2826 Some(*acc)
2827 }),
2828 )
2829 .send_bincode_external(&external);
2830
2831 let nodes = flow
2832 .with_process(&node, deployment.Localhost())
2833 .with_external(&external, deployment.Localhost())
2834 .deploy(&mut deployment);
2835
2836 deployment.deploy().await.unwrap();
2837
2838 let mut external_in = nodes.connect(input_port).await;
2839 let mut external_out = nodes.connect(out).await;
2840
2841 deployment.start().await.unwrap();
2842
2843 external_in.send(1).await.unwrap();
2844 assert_eq!(external_out.next().await.unwrap(), 1);
2845
2846 external_in.send(2).await.unwrap();
2847 assert_eq!(external_out.next().await.unwrap(), 3);
2848 }
2849
2850 #[cfg(feature = "deploy")]
2851 #[tokio::test]
2852 async fn unbounded_enumerate_remembers_state() {
2853 let mut deployment = Deployment::new();
2854
2855 let mut flow = FlowBuilder::new();
2856 let node = flow.process::<()>();
2857 let external = flow.external::<()>();
2858
2859 let (input_port, input) = node.source_external_bincode(&external);
2860 let out = input.enumerate().send_bincode_external(&external);
2861
2862 let nodes = flow
2863 .with_process(&node, deployment.Localhost())
2864 .with_external(&external, deployment.Localhost())
2865 .deploy(&mut deployment);
2866
2867 deployment.deploy().await.unwrap();
2868
2869 let mut external_in = nodes.connect(input_port).await;
2870 let mut external_out = nodes.connect(out).await;
2871
2872 deployment.start().await.unwrap();
2873
2874 external_in.send(1).await.unwrap();
2875 assert_eq!(external_out.next().await.unwrap(), (0, 1));
2876
2877 external_in.send(2).await.unwrap();
2878 assert_eq!(external_out.next().await.unwrap(), (1, 2));
2879 }
2880
2881 #[cfg(feature = "deploy")]
2882 #[tokio::test]
2883 async fn unbounded_unique_remembers_state() {
2884 let mut deployment = Deployment::new();
2885
2886 let mut flow = FlowBuilder::new();
2887 let node = flow.process::<()>();
2888 let external = flow.external::<()>();
2889
2890 let (input_port, input) =
2891 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2892 let out = input.unique().send_bincode_external(&external);
2893
2894 let nodes = flow
2895 .with_process(&node, deployment.Localhost())
2896 .with_external(&external, deployment.Localhost())
2897 .deploy(&mut deployment);
2898
2899 deployment.deploy().await.unwrap();
2900
2901 let mut external_in = nodes.connect(input_port).await;
2902 let mut external_out = nodes.connect(out).await;
2903
2904 deployment.start().await.unwrap();
2905
2906 external_in.send(1).await.unwrap();
2907 assert_eq!(external_out.next().await.unwrap(), 1);
2908
2909 external_in.send(2).await.unwrap();
2910 assert_eq!(external_out.next().await.unwrap(), 2);
2911
2912 external_in.send(1).await.unwrap();
2913 external_in.send(3).await.unwrap();
2914 assert_eq!(external_out.next().await.unwrap(), 3);
2915 }
2916
2917 #[cfg(feature = "sim")]
2918 #[test]
2919 #[should_panic]
2920 fn sim_batch_nondet_size() {
2921 let mut flow = FlowBuilder::new();
2922 let node = flow.process::<()>();
2923
2924 let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
2925
2926 let tick = node.tick();
2927 let out_recv = input
2928 .batch(&tick, nondet!(/** test */))
2929 .count()
2930 .all_ticks()
2931 .sim_output();
2932
2933 flow.sim().exhaustive(async || {
2934 in_send.send(());
2935 in_send.send(());
2936 in_send.send(());
2937
2938 assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
2939 });
2940 }
2941
2942 #[cfg(feature = "sim")]
2943 #[test]
2944 fn sim_batch_preserves_order() {
2945 let mut flow = FlowBuilder::new();
2946 let node = flow.process::<()>();
2947
2948 let (in_send, input) = node.sim_input();
2949
2950 let tick = node.tick();
2951 let out_recv = input
2952 .batch(&tick, nondet!(/** test */))
2953 .all_ticks()
2954 .sim_output();
2955
2956 flow.sim().exhaustive(async || {
2957 in_send.send(1);
2958 in_send.send(2);
2959 in_send.send(3);
2960
2961 out_recv.assert_yields_only([1, 2, 3]).await;
2962 });
2963 }
2964
2965 #[cfg(feature = "sim")]
2966 #[test]
2967 #[should_panic]
2968 fn sim_batch_unordered_shuffles() {
2969 let mut flow = FlowBuilder::new();
2970 let node = flow.process::<()>();
2971
2972 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2973
2974 let tick = node.tick();
2975 let batch = input.batch(&tick, nondet!(/** test */));
2976 let out_recv = batch
2977 .clone()
2978 .min()
2979 .zip(batch.max())
2980 .all_ticks()
2981 .sim_output();
2982
2983 flow.sim().exhaustive(async || {
2984 in_send.send_many_unordered([1, 2, 3]);
2985
2986 if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
2987 panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
2988 }
2989 });
2990 }
2991
2992 #[cfg(feature = "sim")]
2993 #[test]
2994 fn sim_batch_unordered_shuffles_count() {
2995 let mut flow = FlowBuilder::new();
2996 let node = flow.process::<()>();
2997
2998 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2999
3000 let tick = node.tick();
3001 let batch = input.batch(&tick, nondet!(/** test */));
3002 let out_recv = batch.all_ticks().sim_output();
3003
3004 let instance_count = flow.sim().exhaustive(async || {
3005 in_send.send_many_unordered([1, 2, 3, 4]);
3006 out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3007 });
3008
3009 assert_eq!(
3010 instance_count,
3011 75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3012 )
3013 }
3014
3015 #[cfg(feature = "sim")]
3016 #[test]
3017 #[should_panic]
3018 fn sim_observe_order_batched() {
3019 let mut flow = FlowBuilder::new();
3020 let node = flow.process::<()>();
3021
3022 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3023
3024 let tick = node.tick();
3025 let batch = input.batch(&tick, nondet!(/** test */));
3026 let out_recv = batch
3027 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3028 .all_ticks()
3029 .sim_output();
3030
3031 flow.sim().exhaustive(async || {
3032 in_send.send_many_unordered([1, 2, 3, 4]);
3033 out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
3034 });
3035 }
3036
3037 #[cfg(feature = "sim")]
3038 #[test]
3039 fn sim_observe_order_batched_count() {
3040 let mut flow = FlowBuilder::new();
3041 let node = flow.process::<()>();
3042
3043 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3044
3045 let tick = node.tick();
3046 let batch = input.batch(&tick, nondet!(/** test */));
3047 let out_recv = batch
3048 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3049 .all_ticks()
3050 .sim_output();
3051
3052 let instance_count = flow.sim().exhaustive(async || {
3053 in_send.send_many_unordered([1, 2, 3, 4]);
3054 let _ = out_recv.collect::<Vec<_>>().await;
3055 });
3056
3057 assert_eq!(
3058 instance_count,
3059 192 // 4! * 2^{4 - 1}
3060 )
3061 }
3062
3063 #[cfg(feature = "sim")]
3064 #[test]
3065 fn sim_unordered_count_instance_count() {
3066 let mut flow = FlowBuilder::new();
3067 let node = flow.process::<()>();
3068
3069 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3070
3071 let tick = node.tick();
3072 let out_recv = input
3073 .count()
3074 .snapshot(&tick, nondet!(/** test */))
3075 .all_ticks()
3076 .sim_output();
3077
3078 let instance_count = flow.sim().exhaustive(async || {
3079 in_send.send_many_unordered([1, 2, 3, 4]);
3080 assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3081 });
3082
3083 assert_eq!(
3084 instance_count,
3085 16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3086 )
3087 }
3088
3089 #[cfg(feature = "sim")]
3090 #[test]
3091 fn sim_top_level_assume_ordering() {
3092 let mut flow = FlowBuilder::new();
3093 let node = flow.process::<()>();
3094
3095 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3096
3097 let out_recv = input
3098 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3099 .sim_output();
3100
3101 let instance_count = flow.sim().exhaustive(async || {
3102 in_send.send_many_unordered([1, 2, 3]);
3103 let mut out = out_recv.collect::<Vec<_>>().await;
3104 out.sort();
3105 assert_eq!(out, vec![1, 2, 3]);
3106 });
3107
3108 assert_eq!(instance_count, 6)
3109 }
3110
3111 #[cfg(feature = "sim")]
3112 #[test]
3113 fn sim_top_level_assume_ordering_cycle_back() {
3114 let mut flow = FlowBuilder::new();
3115 let node = flow.process::<()>();
3116
3117 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3118
3119 let (complete_cycle_back, cycle_back) =
3120 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3121 let ordered = input
3122 .interleave(cycle_back)
3123 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3124 complete_cycle_back.complete(
3125 ordered
3126 .clone()
3127 .map(q!(|v| v + 1))
3128 .filter(q!(|v| v % 2 == 1)),
3129 );
3130
3131 let out_recv = ordered.sim_output();
3132
3133 let mut saw = false;
3134 let instance_count = flow.sim().exhaustive(async || {
3135 in_send.send_many_unordered([0, 2]);
3136 let out = out_recv.collect::<Vec<_>>().await;
3137
3138 if out.starts_with(&[0, 1, 2]) {
3139 saw = true;
3140 }
3141 });
3142
3143 assert!(saw, "did not see an instance with 0, 1, 2 in order");
3144 assert_eq!(instance_count, 6)
3145 }
3146
3147 #[cfg(feature = "sim")]
3148 #[test]
3149 fn sim_top_level_assume_ordering_cycle_back_tick() {
3150 let mut flow = FlowBuilder::new();
3151 let node = flow.process::<()>();
3152
3153 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3154
3155 let (complete_cycle_back, cycle_back) =
3156 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3157 let ordered = input
3158 .interleave(cycle_back)
3159 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3160 complete_cycle_back.complete(
3161 ordered
3162 .clone()
3163 .batch(&node.tick(), nondet!(/** test */))
3164 .all_ticks()
3165 .map(q!(|v| v + 1))
3166 .filter(q!(|v| v % 2 == 1)),
3167 );
3168
3169 let out_recv = ordered.sim_output();
3170
3171 let mut saw = false;
3172 let instance_count = flow.sim().exhaustive(async || {
3173 in_send.send_many_unordered([0, 2]);
3174 let out = out_recv.collect::<Vec<_>>().await;
3175
3176 if out.starts_with(&[0, 1, 2]) {
3177 saw = true;
3178 }
3179 });
3180
3181 assert!(saw, "did not see an instance with 0, 1, 2 in order");
3182 assert_eq!(instance_count, 58)
3183 }
3184
3185 #[cfg(feature = "sim")]
3186 #[test]
3187 fn sim_top_level_assume_ordering_multiple() {
3188 let mut flow = FlowBuilder::new();
3189 let node = flow.process::<()>();
3190
3191 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3192 let (_, input2) = node.sim_input::<_, NoOrder, _>();
3193
3194 let (complete_cycle_back, cycle_back) =
3195 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3196 let input1_ordered = input
3197 .clone()
3198 .interleave(cycle_back)
3199 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3200 let foo = input1_ordered
3201 .clone()
3202 .map(q!(|v| v + 3))
3203 .weaken_ordering::<NoOrder>()
3204 .interleave(input2)
3205 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3206
3207 complete_cycle_back.complete(foo.filter(q!(|v| *v == 3)));
3208
3209 let out_recv = input1_ordered.sim_output();
3210
3211 let mut saw = false;
3212 let instance_count = flow.sim().exhaustive(async || {
3213 in_send.send_many_unordered([0, 1]);
3214 let out = out_recv.collect::<Vec<_>>().await;
3215
3216 if out.starts_with(&[0, 3, 1]) {
3217 saw = true;
3218 }
3219 });
3220
3221 assert!(saw, "did not see an instance with 0, 3, 1 in order");
3222 assert_eq!(instance_count, 24)
3223 }
3224
3225 #[cfg(feature = "sim")]
3226 #[test]
3227 fn sim_atomic_assume_ordering_cycle_back() {
3228 let mut flow = FlowBuilder::new();
3229 let node = flow.process::<()>();
3230
3231 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3232
3233 let (complete_cycle_back, cycle_back) =
3234 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3235 let ordered = input
3236 .interleave(cycle_back)
3237 .atomic(&node.tick())
3238 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3239 .end_atomic();
3240 complete_cycle_back.complete(
3241 ordered
3242 .clone()
3243 .map(q!(|v| v + 1))
3244 .filter(q!(|v| v % 2 == 1)),
3245 );
3246
3247 let out_recv = ordered.sim_output();
3248
3249 let instance_count = flow.sim().exhaustive(async || {
3250 in_send.send_many_unordered([0, 2]);
3251 let out = out_recv.collect::<Vec<_>>().await;
3252 assert_eq!(out.len(), 4);
3253 });
3254
3255 assert_eq!(instance_count, 22)
3256 }
3257}