Skip to main content

hydro_lang/compile/ir/
mod.rs

1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23use syn::visit::{self, Visit};
24use syn::visit_mut::VisitMut;
25
26use crate::compile::builder::{CycleId, ExternalPortId};
27#[cfg(feature = "build")]
28use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
29use crate::location::dynamic::LocationId;
30use crate::location::{LocationKey, NetworkHint};
31
32pub mod backtrace;
33use backtrace::Backtrace;
34
35/// Wrapper that displays only the tokens of a parsed expr.
36///
37/// Boxes `syn::Type` which is ~240 bytes.
38#[derive(Clone, Hash)]
39pub struct DebugExpr(pub Box<syn::Expr>);
40
41impl From<syn::Expr> for DebugExpr {
42    fn from(expr: syn::Expr) -> Self {
43        Self(Box::new(expr))
44    }
45}
46
47impl Deref for DebugExpr {
48    type Target = syn::Expr;
49
50    fn deref(&self) -> &Self::Target {
51        &self.0
52    }
53}
54
55impl ToTokens for DebugExpr {
56    fn to_tokens(&self, tokens: &mut TokenStream) {
57        self.0.to_tokens(tokens);
58    }
59}
60
61impl Debug for DebugExpr {
62    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63        write!(f, "{}", self.0.to_token_stream())
64    }
65}
66
67impl Display for DebugExpr {
68    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69        let original = self.0.as_ref().clone();
70        let simplified = simplify_q_macro(original);
71
72        // For now, just use quote formatting without trying to parse as a statement
73        // This avoids the syn::parse_quote! issues entirely
74        write!(f, "q!({})", quote::quote!(#simplified))
75    }
76}
77
78/// Simplify expanded q! macro calls back to q!(...) syntax for better readability
79fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
80    // Try to parse the token string as a syn::Expr
81    // Use a visitor to simplify q! macro expansions
82    let mut simplifier = QMacroSimplifier::new();
83    simplifier.visit_expr_mut(&mut expr);
84
85    // If we found and simplified a q! macro, return the simplified version
86    if let Some(simplified) = simplifier.simplified_result {
87        simplified
88    } else {
89        expr
90    }
91}
92
93/// AST visitor that simplifies q! macro expansions
94#[derive(Default)]
95pub struct QMacroSimplifier {
96    pub simplified_result: Option<syn::Expr>,
97}
98
99impl QMacroSimplifier {
100    pub fn new() -> Self {
101        Self::default()
102    }
103}
104
105impl VisitMut for QMacroSimplifier {
106    fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
107        // Check if we already found a result to avoid further processing
108        if self.simplified_result.is_some() {
109            return;
110        }
111
112        if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
113            // Look for calls to stageleft::runtime_support::fn*
114            && self.is_stageleft_runtime_support_call(&path_expr.path)
115            // Try to extract the closure from the arguments
116            && let Some(closure) = self.extract_closure_from_args(&call.args)
117        {
118            self.simplified_result = Some(closure);
119            return;
120        }
121
122        // Continue visiting child expressions using the default implementation
123        // Use the default visitor to avoid infinite recursion
124        syn::visit_mut::visit_expr_mut(self, expr);
125    }
126}
127
128impl QMacroSimplifier {
129    fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
130        // Check if this is a call to stageleft::runtime_support::fn*
131        if let Some(last_segment) = path.segments.last() {
132            let fn_name = last_segment.ident.to_string();
133            // if fn_name.starts_with("fn") && fn_name.contains("_expr") {
134            fn_name.contains("_type_hint")
135                && path.segments.len() > 2
136                && path.segments[0].ident == "stageleft"
137                && path.segments[1].ident == "runtime_support"
138        } else {
139            false
140        }
141    }
142
143    fn extract_closure_from_args(
144        &self,
145        args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
146    ) -> Option<syn::Expr> {
147        // Look through the arguments for a closure expression
148        for arg in args {
149            if let syn::Expr::Closure(_) = arg {
150                return Some(arg.clone());
151            }
152            // Also check for closures nested in other expressions (like blocks)
153            if let Some(closure_expr) = self.find_closure_in_expr(arg) {
154                return Some(closure_expr);
155            }
156        }
157        None
158    }
159
160    fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
161        let mut visitor = ClosureFinder {
162            found_closure: None,
163            prefer_inner_blocks: true,
164        };
165        visitor.visit_expr(expr);
166        visitor.found_closure
167    }
168}
169
170/// Visitor that finds closures in expressions with special block handling
171struct ClosureFinder {
172    found_closure: Option<syn::Expr>,
173    prefer_inner_blocks: bool,
174}
175
176impl<'ast> Visit<'ast> for ClosureFinder {
177    fn visit_expr(&mut self, expr: &'ast syn::Expr) {
178        // If we already found a closure, don't continue searching
179        if self.found_closure.is_some() {
180            return;
181        }
182
183        match expr {
184            syn::Expr::Closure(_) => {
185                self.found_closure = Some(expr.clone());
186            }
187            syn::Expr::Block(block) if self.prefer_inner_blocks => {
188                // Special handling for blocks - look for inner blocks that contain closures
189                for stmt in &block.block.stmts {
190                    if let syn::Stmt::Expr(stmt_expr, _) = stmt
191                        && let syn::Expr::Block(_) = stmt_expr
192                    {
193                        // Check if this nested block contains a closure
194                        let mut inner_visitor = ClosureFinder {
195                            found_closure: None,
196                            prefer_inner_blocks: false, // Avoid infinite recursion
197                        };
198                        inner_visitor.visit_expr(stmt_expr);
199                        if inner_visitor.found_closure.is_some() {
200                            // Found a closure in an inner block, return that block
201                            self.found_closure = Some(stmt_expr.clone());
202                            return;
203                        }
204                    }
205                }
206
207                // If no inner block with closure found, continue with normal visitation
208                visit::visit_expr(self, expr);
209
210                // If we found a closure, just return the closure itself, not the whole block
211                // unless we're in the special case where we want the containing block
212                if self.found_closure.is_some() {
213                    // The closure was found during visitation, no need to wrap in block
214                }
215            }
216            _ => {
217                // Use default visitor behavior for all other expressions
218                visit::visit_expr(self, expr);
219            }
220        }
221    }
222}
223
224/// Debug displays the type's tokens.
225///
226/// Boxes `syn::Type` which is ~320 bytes.
227#[derive(Clone, PartialEq, Eq, Hash)]
228pub struct DebugType(pub Box<syn::Type>);
229
230impl From<syn::Type> for DebugType {
231    fn from(t: syn::Type) -> Self {
232        Self(Box::new(t))
233    }
234}
235
236impl Deref for DebugType {
237    type Target = syn::Type;
238
239    fn deref(&self) -> &Self::Target {
240        &self.0
241    }
242}
243
244impl ToTokens for DebugType {
245    fn to_tokens(&self, tokens: &mut TokenStream) {
246        self.0.to_tokens(tokens);
247    }
248}
249
250impl Debug for DebugType {
251    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252        write!(f, "{}", self.0.to_token_stream())
253    }
254}
255
256pub enum DebugInstantiate {
257    Building,
258    Finalized(Box<DebugInstantiateFinalized>),
259}
260
261#[cfg_attr(
262    not(feature = "build"),
263    expect(
264        dead_code,
265        reason = "sink, source unused without `feature = \"build\"`."
266    )
267)]
268pub struct DebugInstantiateFinalized {
269    sink: syn::Expr,
270    source: syn::Expr,
271    connect_fn: Option<Box<dyn FnOnce()>>,
272}
273
274impl From<DebugInstantiateFinalized> for DebugInstantiate {
275    fn from(f: DebugInstantiateFinalized) -> Self {
276        Self::Finalized(Box::new(f))
277    }
278}
279
280impl Debug for DebugInstantiate {
281    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
282        write!(f, "<network instantiate>")
283    }
284}
285
286impl Hash for DebugInstantiate {
287    fn hash<H: Hasher>(&self, _state: &mut H) {
288        // Do nothing
289    }
290}
291
292impl Clone for DebugInstantiate {
293    fn clone(&self) -> Self {
294        match self {
295            DebugInstantiate::Building => DebugInstantiate::Building,
296            DebugInstantiate::Finalized(_) => {
297                panic!("DebugInstantiate::Finalized should not be cloned")
298            }
299        }
300    }
301}
302
303/// Tracks the instantiation state of a `ClusterMembers` source.
304///
305/// During `compile_network`, the first `ClusterMembers` node for a given
306/// `(at_location, target_cluster)` pair is promoted to [`Self::Stream`] and
307/// receives the expression returned by `Deploy::cluster_membership_stream`.
308/// All subsequent nodes for the same pair are set to [`Self::Tee`] so that
309/// during code-gen they simply reference the tee output of the first node
310/// instead of creating a redundant `source_stream`.
311#[derive(Debug, Hash, Clone)]
312pub enum ClusterMembersState {
313    /// Not yet instantiated.
314    Uninit,
315    /// The primary instance: holds the stream expression and will emit
316    /// `source_stream(expr) -> tee()` during code-gen.
317    Stream(DebugExpr),
318    /// A secondary instance that references the tee output of the primary.
319    /// Stores `(at_location_root, target_cluster_location)` so that `emit_core`
320    /// can derive the deterministic tee ident without extra state.
321    Tee(LocationId, LocationId),
322}
323
324/// A source in a Hydro graph, where data enters the graph.
325#[derive(Debug, Hash, Clone)]
326pub enum HydroSource {
327    Stream(DebugExpr),
328    ExternalNetwork(),
329    Iter(DebugExpr),
330    Spin(),
331    ClusterMembers(LocationId, ClusterMembersState),
332    Embedded(syn::Ident),
333}
334
335#[cfg(feature = "build")]
336/// A trait that abstracts over elements of DFIR code-gen that differ between production deployment
337/// and simulations.
338///
339/// In particular, this lets the simulator fuse together all locations into one DFIR graph, spit
340/// out separate graphs for each tick, and emit hooks for controlling non-deterministic operators.
341pub trait DfirBuilder {
342    /// Whether the representation of singletons should include intermediate states.
343    fn singleton_intermediates(&self) -> bool;
344
345    /// Gets the DFIR builder for the given location, creating it if necessary.
346    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
347
348    fn batch(
349        &mut self,
350        in_ident: syn::Ident,
351        in_location: &LocationId,
352        in_kind: &CollectionKind,
353        out_ident: &syn::Ident,
354        out_location: &LocationId,
355        op_meta: &HydroIrOpMetadata,
356    );
357    fn yield_from_tick(
358        &mut self,
359        in_ident: syn::Ident,
360        in_location: &LocationId,
361        in_kind: &CollectionKind,
362        out_ident: &syn::Ident,
363        out_location: &LocationId,
364    );
365
366    fn begin_atomic(
367        &mut self,
368        in_ident: syn::Ident,
369        in_location: &LocationId,
370        in_kind: &CollectionKind,
371        out_ident: &syn::Ident,
372        out_location: &LocationId,
373        op_meta: &HydroIrOpMetadata,
374    );
375    fn end_atomic(
376        &mut self,
377        in_ident: syn::Ident,
378        in_location: &LocationId,
379        in_kind: &CollectionKind,
380        out_ident: &syn::Ident,
381    );
382
383    #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
384    fn observe_nondet(
385        &mut self,
386        trusted: bool,
387        location: &LocationId,
388        in_ident: syn::Ident,
389        in_kind: &CollectionKind,
390        out_ident: &syn::Ident,
391        out_kind: &CollectionKind,
392        op_meta: &HydroIrOpMetadata,
393    );
394
395    #[expect(clippy::too_many_arguments, reason = "TODO")]
396    fn create_network(
397        &mut self,
398        from: &LocationId,
399        to: &LocationId,
400        input_ident: syn::Ident,
401        out_ident: &syn::Ident,
402        serialize: Option<&DebugExpr>,
403        sink: syn::Expr,
404        source: syn::Expr,
405        deserialize: Option<&DebugExpr>,
406        tag_id: usize,
407        networking_info: &crate::networking::NetworkingInfo,
408    );
409
410    fn create_external_source(
411        &mut self,
412        on: &LocationId,
413        source_expr: syn::Expr,
414        out_ident: &syn::Ident,
415        deserialize: Option<&DebugExpr>,
416        tag_id: usize,
417    );
418
419    fn create_external_output(
420        &mut self,
421        on: &LocationId,
422        sink_expr: syn::Expr,
423        input_ident: &syn::Ident,
424        serialize: Option<&DebugExpr>,
425        tag_id: usize,
426    );
427}
428
429#[cfg(feature = "build")]
430impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
431    fn singleton_intermediates(&self) -> bool {
432        false
433    }
434
435    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
436        self.entry(location.root().key())
437            .expect("location was removed")
438            .or_default()
439    }
440
441    fn batch(
442        &mut self,
443        in_ident: syn::Ident,
444        in_location: &LocationId,
445        in_kind: &CollectionKind,
446        out_ident: &syn::Ident,
447        _out_location: &LocationId,
448        _op_meta: &HydroIrOpMetadata,
449    ) {
450        let builder = self.get_dfir_mut(in_location.root());
451        if in_kind.is_bounded()
452            && matches!(
453                in_kind,
454                CollectionKind::Singleton { .. }
455                    | CollectionKind::Optional { .. }
456                    | CollectionKind::KeyedSingleton { .. }
457            )
458        {
459            assert!(in_location.is_top_level());
460            builder.add_dfir(
461                parse_quote! {
462                    #out_ident = #in_ident -> persist::<'static>();
463                },
464                None,
465                None,
466            );
467        } else {
468            builder.add_dfir(
469                parse_quote! {
470                    #out_ident = #in_ident;
471                },
472                None,
473                None,
474            );
475        }
476    }
477
478    fn yield_from_tick(
479        &mut self,
480        in_ident: syn::Ident,
481        in_location: &LocationId,
482        _in_kind: &CollectionKind,
483        out_ident: &syn::Ident,
484        _out_location: &LocationId,
485    ) {
486        let builder = self.get_dfir_mut(in_location.root());
487        builder.add_dfir(
488            parse_quote! {
489                #out_ident = #in_ident;
490            },
491            None,
492            None,
493        );
494    }
495
496    fn begin_atomic(
497        &mut self,
498        in_ident: syn::Ident,
499        in_location: &LocationId,
500        _in_kind: &CollectionKind,
501        out_ident: &syn::Ident,
502        _out_location: &LocationId,
503        _op_meta: &HydroIrOpMetadata,
504    ) {
505        let builder = self.get_dfir_mut(in_location.root());
506        builder.add_dfir(
507            parse_quote! {
508                #out_ident = #in_ident;
509            },
510            None,
511            None,
512        );
513    }
514
515    fn end_atomic(
516        &mut self,
517        in_ident: syn::Ident,
518        in_location: &LocationId,
519        _in_kind: &CollectionKind,
520        out_ident: &syn::Ident,
521    ) {
522        let builder = self.get_dfir_mut(in_location.root());
523        builder.add_dfir(
524            parse_quote! {
525                #out_ident = #in_ident;
526            },
527            None,
528            None,
529        );
530    }
531
532    fn observe_nondet(
533        &mut self,
534        _trusted: bool,
535        location: &LocationId,
536        in_ident: syn::Ident,
537        _in_kind: &CollectionKind,
538        out_ident: &syn::Ident,
539        _out_kind: &CollectionKind,
540        _op_meta: &HydroIrOpMetadata,
541    ) {
542        let builder = self.get_dfir_mut(location);
543        builder.add_dfir(
544            parse_quote! {
545                #out_ident = #in_ident;
546            },
547            None,
548            None,
549        );
550    }
551
552    fn create_network(
553        &mut self,
554        from: &LocationId,
555        to: &LocationId,
556        input_ident: syn::Ident,
557        out_ident: &syn::Ident,
558        serialize: Option<&DebugExpr>,
559        sink: syn::Expr,
560        source: syn::Expr,
561        deserialize: Option<&DebugExpr>,
562        tag_id: usize,
563        _networking_info: &crate::networking::NetworkingInfo,
564    ) {
565        let sender_builder = self.get_dfir_mut(from);
566        if let Some(serialize_pipeline) = serialize {
567            sender_builder.add_dfir(
568                parse_quote! {
569                    #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
570                },
571                None,
572                // operator tag separates send and receive, which otherwise have the same next_stmt_id
573                Some(&format!("send{}", tag_id)),
574            );
575        } else {
576            sender_builder.add_dfir(
577                parse_quote! {
578                    #input_ident -> dest_sink(#sink);
579                },
580                None,
581                Some(&format!("send{}", tag_id)),
582            );
583        }
584
585        let receiver_builder = self.get_dfir_mut(to);
586        if let Some(deserialize_pipeline) = deserialize {
587            receiver_builder.add_dfir(
588                parse_quote! {
589                    #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
590                },
591                None,
592                Some(&format!("recv{}", tag_id)),
593            );
594        } else {
595            receiver_builder.add_dfir(
596                parse_quote! {
597                    #out_ident = source_stream(#source);
598                },
599                None,
600                Some(&format!("recv{}", tag_id)),
601            );
602        }
603    }
604
605    fn create_external_source(
606        &mut self,
607        on: &LocationId,
608        source_expr: syn::Expr,
609        out_ident: &syn::Ident,
610        deserialize: Option<&DebugExpr>,
611        tag_id: usize,
612    ) {
613        let receiver_builder = self.get_dfir_mut(on);
614        if let Some(deserialize_pipeline) = deserialize {
615            receiver_builder.add_dfir(
616                parse_quote! {
617                    #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
618                },
619                None,
620                Some(&format!("recv{}", tag_id)),
621            );
622        } else {
623            receiver_builder.add_dfir(
624                parse_quote! {
625                    #out_ident = source_stream(#source_expr);
626                },
627                None,
628                Some(&format!("recv{}", tag_id)),
629            );
630        }
631    }
632
633    fn create_external_output(
634        &mut self,
635        on: &LocationId,
636        sink_expr: syn::Expr,
637        input_ident: &syn::Ident,
638        serialize: Option<&DebugExpr>,
639        tag_id: usize,
640    ) {
641        let sender_builder = self.get_dfir_mut(on);
642        if let Some(serialize_fn) = serialize {
643            sender_builder.add_dfir(
644                parse_quote! {
645                    #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
646                },
647                None,
648                // operator tag separates send and receive, which otherwise have the same next_stmt_id
649                Some(&format!("send{}", tag_id)),
650            );
651        } else {
652            sender_builder.add_dfir(
653                parse_quote! {
654                    #input_ident -> dest_sink(#sink_expr);
655                },
656                None,
657                Some(&format!("send{}", tag_id)),
658            );
659        }
660    }
661}
662
663#[cfg(feature = "build")]
664pub enum BuildersOrCallback<'a, L, N>
665where
666    L: FnMut(&mut HydroRoot, &mut usize),
667    N: FnMut(&mut HydroNode, &mut usize),
668{
669    Builders(&'a mut dyn DfirBuilder),
670    Callback(L, N),
671}
672
673/// An root in a Hydro graph, which is an pipeline that doesn't emit
674/// any downstream values. Traversals over the dataflow graph and
675/// generating DFIR IR start from roots.
676#[derive(Debug, Hash)]
677pub enum HydroRoot {
678    ForEach {
679        f: DebugExpr,
680        input: Box<HydroNode>,
681        op_metadata: HydroIrOpMetadata,
682    },
683    SendExternal {
684        to_external_key: LocationKey,
685        to_port_id: ExternalPortId,
686        to_many: bool,
687        unpaired: bool,
688        serialize_fn: Option<DebugExpr>,
689        instantiate_fn: DebugInstantiate,
690        input: Box<HydroNode>,
691        op_metadata: HydroIrOpMetadata,
692    },
693    DestSink {
694        sink: DebugExpr,
695        input: Box<HydroNode>,
696        op_metadata: HydroIrOpMetadata,
697    },
698    CycleSink {
699        cycle_id: CycleId,
700        input: Box<HydroNode>,
701        op_metadata: HydroIrOpMetadata,
702    },
703    EmbeddedOutput {
704        ident: syn::Ident,
705        input: Box<HydroNode>,
706        op_metadata: HydroIrOpMetadata,
707    },
708}
709
710impl HydroRoot {
711    #[cfg(feature = "build")]
712    #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
713    pub fn compile_network<'a, D>(
714        &mut self,
715        extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
716        seen_tees: &mut SeenTees,
717        seen_cluster_members: &mut HashSet<(LocationId, LocationId)>,
718        processes: &SparseSecondaryMap<LocationKey, D::Process>,
719        clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
720        externals: &SparseSecondaryMap<LocationKey, D::External>,
721        env: &mut D::InstantiateEnv,
722    ) where
723        D: Deploy<'a>,
724    {
725        let refcell_extra_stmts = RefCell::new(extra_stmts);
726        let refcell_env = RefCell::new(env);
727        let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
728        self.transform_bottom_up(
729            &mut |l| {
730                if let HydroRoot::SendExternal {
731                    input,
732                    to_external_key,
733                    to_port_id,
734                    to_many,
735                    unpaired,
736                    instantiate_fn,
737                    ..
738                } = l
739                {
740                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
741                        DebugInstantiate::Building => {
742                            let to_node = externals
743                                .get(*to_external_key)
744                                .unwrap_or_else(|| {
745                                    panic!("A external used in the graph was not instantiated: {}", to_external_key)
746                                })
747                                .clone();
748
749                            match input.metadata().location_id.root() {
750                                &LocationId::Process(process_key) => {
751                                    if *to_many {
752                                        (
753                                            (
754                                                D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
755                                                parse_quote!(DUMMY),
756                                            ),
757                                            Box::new(|| {}) as Box<dyn FnOnce()>,
758                                        )
759                                    } else {
760                                        let from_node = processes
761                                            .get(process_key)
762                                            .unwrap_or_else(|| {
763                                                panic!("A process used in the graph was not instantiated: {}", process_key)
764                                            })
765                                            .clone();
766
767                                        let sink_port = from_node.next_port();
768                                        let source_port = to_node.next_port();
769
770                                        if *unpaired {
771                                            use stageleft::quote_type;
772                                            use tokio_util::codec::LengthDelimitedCodec;
773
774                                            to_node.register(*to_port_id, source_port.clone());
775
776                                            let _ = D::e2o_source(
777                                                refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
778                                                &to_node, &source_port,
779                                                &from_node, &sink_port,
780                                                &quote_type::<LengthDelimitedCodec>(),
781                                                format!("{}_{}", *to_external_key, *to_port_id)
782                                            );
783                                        }
784
785                                        (
786                                            (
787                                                D::o2e_sink(
788                                                    &from_node,
789                                                    &sink_port,
790                                                    &to_node,
791                                                    &source_port,
792                                                    format!("{}_{}", *to_external_key, *to_port_id)
793                                                ),
794                                                parse_quote!(DUMMY),
795                                            ),
796                                            if *unpaired {
797                                                D::e2o_connect(
798                                                    &to_node,
799                                                    &source_port,
800                                                    &from_node,
801                                                    &sink_port,
802                                                    *to_many,
803                                                    NetworkHint::Auto,
804                                                )
805                                            } else {
806                                                Box::new(|| {}) as Box<dyn FnOnce()>
807                                            },
808                                        )
809                                    }
810                                }
811                                LocationId::Cluster(_) => todo!(),
812                                _ => panic!()
813                            }
814                        },
815
816                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
817                    };
818
819                    *instantiate_fn = DebugInstantiateFinalized {
820                        sink: sink_expr,
821                        source: source_expr,
822                        connect_fn: Some(connect_fn),
823                    }
824                    .into();
825                } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
826                    let element_type = match &input.metadata().collection_kind {
827                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
828                        _ => panic!("Embedded output must have Stream collection kind"),
829                    };
830                    let location_key = match input.metadata().location_id.root() {
831                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
832                        _ => panic!("Embedded output must be on a process or cluster"),
833                    };
834                    D::register_embedded_output(
835                        &mut refcell_env.borrow_mut(),
836                        location_key,
837                        ident,
838                        &element_type,
839                    );
840                }
841            },
842            &mut |n| {
843                if let HydroNode::Network {
844                    name,
845                    networking_info,
846                    input,
847                    instantiate_fn,
848                    metadata,
849                    ..
850                } = n
851                {
852                    let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
853                        DebugInstantiate::Building => instantiate_network::<D>(
854                            &mut refcell_env.borrow_mut(),
855                            input.metadata().location_id.root(),
856                            metadata.location_id.root(),
857                            processes,
858                            clusters,
859                            name.as_deref(),
860                            networking_info,
861                        ),
862
863                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
864                    };
865
866                    *instantiate_fn = DebugInstantiateFinalized {
867                        sink: sink_expr,
868                        source: source_expr,
869                        connect_fn: Some(connect_fn),
870                    }
871                    .into();
872                } else if let HydroNode::ExternalInput {
873                    from_external_key,
874                    from_port_id,
875                    from_many,
876                    codec_type,
877                    port_hint,
878                    instantiate_fn,
879                    metadata,
880                    ..
881                } = n
882                {
883                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
884                        DebugInstantiate::Building => {
885                            let from_node = externals
886                                .get(*from_external_key)
887                                .unwrap_or_else(|| {
888                                    panic!(
889                                        "A external used in the graph was not instantiated: {}",
890                                        from_external_key,
891                                    )
892                                })
893                                .clone();
894
895                            match metadata.location_id.root() {
896                                &LocationId::Process(process_key) => {
897                                    let to_node = processes
898                                        .get(process_key)
899                                        .unwrap_or_else(|| {
900                                            panic!("A process used in the graph was not instantiated: {}", process_key)
901                                        })
902                                        .clone();
903
904                                    let sink_port = from_node.next_port();
905                                    let source_port = to_node.next_port();
906
907                                    from_node.register(*from_port_id, sink_port.clone());
908
909                                    (
910                                        (
911                                            parse_quote!(DUMMY),
912                                            if *from_many {
913                                                D::e2o_many_source(
914                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
915                                                    &to_node, &source_port,
916                                                    codec_type.0.as_ref(),
917                                                    format!("{}_{}", *from_external_key, *from_port_id)
918                                                )
919                                            } else {
920                                                D::e2o_source(
921                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
922                                                    &from_node, &sink_port,
923                                                    &to_node, &source_port,
924                                                    codec_type.0.as_ref(),
925                                                    format!("{}_{}", *from_external_key, *from_port_id)
926                                                )
927                                            },
928                                        ),
929                                        D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
930                                    )
931                                }
932                                LocationId::Cluster(_) => todo!(),
933                                _ => panic!()
934                            }
935                        },
936
937                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
938                    };
939
940                    *instantiate_fn = DebugInstantiateFinalized {
941                        sink: sink_expr,
942                        source: source_expr,
943                        connect_fn: Some(connect_fn),
944                    }
945                    .into();
946                } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
947                    let element_type = match &metadata.collection_kind {
948                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
949                        _ => panic!("Embedded source must have Stream collection kind"),
950                    };
951                    let location_key = match metadata.location_id.root() {
952                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
953                        _ => panic!("Embedded source must be on a process or cluster"),
954                    };
955                    D::register_embedded_input(
956                        &mut refcell_env.borrow_mut(),
957                        location_key,
958                        ident,
959                        &element_type,
960                    );
961                } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
962                    match state {
963                        ClusterMembersState::Uninit => {
964                            let at_location = metadata.location_id.root().clone();
965                            let key = (at_location.clone(), LocationId::Cluster(location_id.key()));
966                            if refcell_seen_cluster_members.borrow_mut().insert(key) {
967                                // First occurrence: call cluster_membership_stream and mark as Stream.
968                                let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
969                                    D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
970                                    &(),
971                                );
972                                *state = ClusterMembersState::Stream(expr.into());
973                            } else {
974                                // Already instantiated for this (at, target) pair: just tee.
975                                *state = ClusterMembersState::Tee(at_location, location_id.clone());
976                            }
977                        }
978                        ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
979                            panic!("cluster members already finalized");
980                        }
981                    }
982                }
983            },
984            seen_tees,
985            false,
986        );
987    }
988
989    pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
990        self.transform_bottom_up(
991            &mut |l| {
992                if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
993                    match instantiate_fn {
994                        DebugInstantiate::Building => panic!("network not built"),
995
996                        DebugInstantiate::Finalized(finalized) => {
997                            (finalized.connect_fn.take().unwrap())();
998                        }
999                    }
1000                }
1001            },
1002            &mut |n| {
1003                if let HydroNode::Network { instantiate_fn, .. }
1004                | HydroNode::ExternalInput { instantiate_fn, .. } = n
1005                {
1006                    match instantiate_fn {
1007                        DebugInstantiate::Building => panic!("network not built"),
1008
1009                        DebugInstantiate::Finalized(finalized) => {
1010                            (finalized.connect_fn.take().unwrap())();
1011                        }
1012                    }
1013                }
1014            },
1015            seen_tees,
1016            false,
1017        );
1018    }
1019
1020    pub fn transform_bottom_up(
1021        &mut self,
1022        transform_root: &mut impl FnMut(&mut HydroRoot),
1023        transform_node: &mut impl FnMut(&mut HydroNode),
1024        seen_tees: &mut SeenTees,
1025        check_well_formed: bool,
1026    ) {
1027        self.transform_children(
1028            |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1029            seen_tees,
1030        );
1031
1032        transform_root(self);
1033    }
1034
1035    pub fn transform_children(
1036        &mut self,
1037        mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
1038        seen_tees: &mut SeenTees,
1039    ) {
1040        match self {
1041            HydroRoot::ForEach { input, .. }
1042            | HydroRoot::SendExternal { input, .. }
1043            | HydroRoot::DestSink { input, .. }
1044            | HydroRoot::CycleSink { input, .. }
1045            | HydroRoot::EmbeddedOutput { input, .. } => {
1046                transform(input, seen_tees);
1047            }
1048        }
1049    }
1050
1051    pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroRoot {
1052        match self {
1053            HydroRoot::ForEach {
1054                f,
1055                input,
1056                op_metadata,
1057            } => HydroRoot::ForEach {
1058                f: f.clone(),
1059                input: Box::new(input.deep_clone(seen_tees)),
1060                op_metadata: op_metadata.clone(),
1061            },
1062            HydroRoot::SendExternal {
1063                to_external_key,
1064                to_port_id,
1065                to_many,
1066                unpaired,
1067                serialize_fn,
1068                instantiate_fn,
1069                input,
1070                op_metadata,
1071            } => HydroRoot::SendExternal {
1072                to_external_key: *to_external_key,
1073                to_port_id: *to_port_id,
1074                to_many: *to_many,
1075                unpaired: *unpaired,
1076                serialize_fn: serialize_fn.clone(),
1077                instantiate_fn: instantiate_fn.clone(),
1078                input: Box::new(input.deep_clone(seen_tees)),
1079                op_metadata: op_metadata.clone(),
1080            },
1081            HydroRoot::DestSink {
1082                sink,
1083                input,
1084                op_metadata,
1085            } => HydroRoot::DestSink {
1086                sink: sink.clone(),
1087                input: Box::new(input.deep_clone(seen_tees)),
1088                op_metadata: op_metadata.clone(),
1089            },
1090            HydroRoot::CycleSink {
1091                cycle_id,
1092                input,
1093                op_metadata,
1094            } => HydroRoot::CycleSink {
1095                cycle_id: *cycle_id,
1096                input: Box::new(input.deep_clone(seen_tees)),
1097                op_metadata: op_metadata.clone(),
1098            },
1099            HydroRoot::EmbeddedOutput {
1100                ident,
1101                input,
1102                op_metadata,
1103            } => HydroRoot::EmbeddedOutput {
1104                ident: ident.clone(),
1105                input: Box::new(input.deep_clone(seen_tees)),
1106                op_metadata: op_metadata.clone(),
1107            },
1108        }
1109    }
1110
1111    #[cfg(feature = "build")]
1112    pub fn emit(
1113        &mut self,
1114        graph_builders: &mut dyn DfirBuilder,
1115        seen_tees: &mut SeenTees,
1116        built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1117        next_stmt_id: &mut usize,
1118    ) {
1119        self.emit_core(
1120            &mut BuildersOrCallback::<
1121                fn(&mut HydroRoot, &mut usize),
1122                fn(&mut HydroNode, &mut usize),
1123            >::Builders(graph_builders),
1124            seen_tees,
1125            built_tees,
1126            next_stmt_id,
1127        );
1128    }
1129
1130    #[cfg(feature = "build")]
1131    pub fn emit_core(
1132        &mut self,
1133        builders_or_callback: &mut BuildersOrCallback<
1134            impl FnMut(&mut HydroRoot, &mut usize),
1135            impl FnMut(&mut HydroNode, &mut usize),
1136        >,
1137        seen_tees: &mut SeenTees,
1138        built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1139        next_stmt_id: &mut usize,
1140    ) {
1141        match self {
1142            HydroRoot::ForEach { f, input, .. } => {
1143                let input_ident =
1144                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1145
1146                match builders_or_callback {
1147                    BuildersOrCallback::Builders(graph_builders) => {
1148                        graph_builders
1149                            .get_dfir_mut(&input.metadata().location_id)
1150                            .add_dfir(
1151                                parse_quote! {
1152                                    #input_ident -> for_each(#f);
1153                                },
1154                                None,
1155                                Some(&next_stmt_id.to_string()),
1156                            );
1157                    }
1158                    BuildersOrCallback::Callback(leaf_callback, _) => {
1159                        leaf_callback(self, next_stmt_id);
1160                    }
1161                }
1162
1163                *next_stmt_id += 1;
1164            }
1165
1166            HydroRoot::SendExternal {
1167                serialize_fn,
1168                instantiate_fn,
1169                input,
1170                ..
1171            } => {
1172                let input_ident =
1173                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1174
1175                match builders_or_callback {
1176                    BuildersOrCallback::Builders(graph_builders) => {
1177                        let (sink_expr, _) = match instantiate_fn {
1178                            DebugInstantiate::Building => (
1179                                syn::parse_quote!(DUMMY_SINK),
1180                                syn::parse_quote!(DUMMY_SOURCE),
1181                            ),
1182
1183                            DebugInstantiate::Finalized(finalized) => {
1184                                (finalized.sink.clone(), finalized.source.clone())
1185                            }
1186                        };
1187
1188                        graph_builders.create_external_output(
1189                            &input.metadata().location_id,
1190                            sink_expr,
1191                            &input_ident,
1192                            serialize_fn.as_ref(),
1193                            *next_stmt_id,
1194                        );
1195                    }
1196                    BuildersOrCallback::Callback(leaf_callback, _) => {
1197                        leaf_callback(self, next_stmt_id);
1198                    }
1199                }
1200
1201                *next_stmt_id += 1;
1202            }
1203
1204            HydroRoot::DestSink { sink, input, .. } => {
1205                let input_ident =
1206                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1207
1208                match builders_or_callback {
1209                    BuildersOrCallback::Builders(graph_builders) => {
1210                        graph_builders
1211                            .get_dfir_mut(&input.metadata().location_id)
1212                            .add_dfir(
1213                                parse_quote! {
1214                                    #input_ident -> dest_sink(#sink);
1215                                },
1216                                None,
1217                                Some(&next_stmt_id.to_string()),
1218                            );
1219                    }
1220                    BuildersOrCallback::Callback(leaf_callback, _) => {
1221                        leaf_callback(self, next_stmt_id);
1222                    }
1223                }
1224
1225                *next_stmt_id += 1;
1226            }
1227
1228            HydroRoot::CycleSink {
1229                cycle_id, input, ..
1230            } => {
1231                let input_ident =
1232                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1233
1234                match builders_or_callback {
1235                    BuildersOrCallback::Builders(graph_builders) => {
1236                        let elem_type: syn::Type = match &input.metadata().collection_kind {
1237                            CollectionKind::KeyedSingleton {
1238                                key_type,
1239                                value_type,
1240                                ..
1241                            }
1242                            | CollectionKind::KeyedStream {
1243                                key_type,
1244                                value_type,
1245                                ..
1246                            } => {
1247                                parse_quote!((#key_type, #value_type))
1248                            }
1249                            CollectionKind::Stream { element_type, .. }
1250                            | CollectionKind::Singleton { element_type, .. }
1251                            | CollectionKind::Optional { element_type, .. } => {
1252                                parse_quote!(#element_type)
1253                            }
1254                        };
1255
1256                        let cycle_id_ident = cycle_id.as_ident();
1257                        graph_builders
1258                            .get_dfir_mut(&input.metadata().location_id)
1259                            .add_dfir(
1260                                parse_quote! {
1261                                    #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1262                                },
1263                                None,
1264                                None,
1265                            );
1266                    }
1267                    // No ID, no callback
1268                    BuildersOrCallback::Callback(_, _) => {}
1269                }
1270            }
1271
1272            HydroRoot::EmbeddedOutput { ident, input, .. } => {
1273                let input_ident =
1274                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1275
1276                match builders_or_callback {
1277                    BuildersOrCallback::Builders(graph_builders) => {
1278                        graph_builders
1279                            .get_dfir_mut(&input.metadata().location_id)
1280                            .add_dfir(
1281                                parse_quote! {
1282                                    #input_ident -> for_each(&mut #ident);
1283                                },
1284                                None,
1285                                Some(&next_stmt_id.to_string()),
1286                            );
1287                    }
1288                    BuildersOrCallback::Callback(leaf_callback, _) => {
1289                        leaf_callback(self, next_stmt_id);
1290                    }
1291                }
1292
1293                *next_stmt_id += 1;
1294            }
1295        }
1296    }
1297
1298    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1299        match self {
1300            HydroRoot::ForEach { op_metadata, .. }
1301            | HydroRoot::SendExternal { op_metadata, .. }
1302            | HydroRoot::DestSink { op_metadata, .. }
1303            | HydroRoot::CycleSink { op_metadata, .. }
1304            | HydroRoot::EmbeddedOutput { op_metadata, .. } => op_metadata,
1305        }
1306    }
1307
1308    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1309        match self {
1310            HydroRoot::ForEach { op_metadata, .. }
1311            | HydroRoot::SendExternal { op_metadata, .. }
1312            | HydroRoot::DestSink { op_metadata, .. }
1313            | HydroRoot::CycleSink { op_metadata, .. }
1314            | HydroRoot::EmbeddedOutput { op_metadata, .. } => op_metadata,
1315        }
1316    }
1317
1318    pub fn input(&self) -> &HydroNode {
1319        match self {
1320            HydroRoot::ForEach { input, .. }
1321            | HydroRoot::SendExternal { input, .. }
1322            | HydroRoot::DestSink { input, .. }
1323            | HydroRoot::CycleSink { input, .. }
1324            | HydroRoot::EmbeddedOutput { input, .. } => input,
1325        }
1326    }
1327
1328    pub fn input_metadata(&self) -> &HydroIrMetadata {
1329        self.input().metadata()
1330    }
1331
1332    pub fn print_root(&self) -> String {
1333        match self {
1334            HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1335            HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1336            HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1337            HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1338            HydroRoot::EmbeddedOutput { ident, .. } => {
1339                format!("EmbeddedOutput({})", ident)
1340            }
1341        }
1342    }
1343
1344    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1345        match self {
1346            HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1347                transform(f);
1348            }
1349            HydroRoot::SendExternal { .. }
1350            | HydroRoot::CycleSink { .. }
1351            | HydroRoot::EmbeddedOutput { .. } => {}
1352        }
1353    }
1354}
1355
1356#[cfg(feature = "build")]
1357pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1358    let mut builders = SecondaryMap::new();
1359    let mut seen_tees = HashMap::new();
1360    let mut built_tees = HashMap::new();
1361    let mut next_stmt_id = 0;
1362    for leaf in ir {
1363        leaf.emit(
1364            &mut builders,
1365            &mut seen_tees,
1366            &mut built_tees,
1367            &mut next_stmt_id,
1368        );
1369    }
1370    builders
1371}
1372
1373#[cfg(feature = "build")]
1374pub fn traverse_dfir(
1375    ir: &mut [HydroRoot],
1376    transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1377    transform_node: impl FnMut(&mut HydroNode, &mut usize),
1378) {
1379    let mut seen_tees = HashMap::new();
1380    let mut built_tees = HashMap::new();
1381    let mut next_stmt_id = 0;
1382    let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1383    ir.iter_mut().for_each(|leaf| {
1384        leaf.emit_core(
1385            &mut callback,
1386            &mut seen_tees,
1387            &mut built_tees,
1388            &mut next_stmt_id,
1389        );
1390    });
1391}
1392
1393pub fn transform_bottom_up(
1394    ir: &mut [HydroRoot],
1395    transform_root: &mut impl FnMut(&mut HydroRoot),
1396    transform_node: &mut impl FnMut(&mut HydroNode),
1397    check_well_formed: bool,
1398) {
1399    let mut seen_tees = HashMap::new();
1400    ir.iter_mut().for_each(|leaf| {
1401        leaf.transform_bottom_up(
1402            transform_root,
1403            transform_node,
1404            &mut seen_tees,
1405            check_well_formed,
1406        );
1407    });
1408}
1409
1410pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1411    let mut seen_tees = HashMap::new();
1412    ir.iter()
1413        .map(|leaf| leaf.deep_clone(&mut seen_tees))
1414        .collect()
1415}
1416
1417type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1418thread_local! {
1419    static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1420}
1421
1422pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1423    PRINTED_TEES.with(|printed_tees| {
1424        let mut printed_tees_mut = printed_tees.borrow_mut();
1425        *printed_tees_mut = Some((0, HashMap::new()));
1426        drop(printed_tees_mut);
1427
1428        let ret = f();
1429
1430        let mut printed_tees_mut = printed_tees.borrow_mut();
1431        *printed_tees_mut = None;
1432
1433        ret
1434    })
1435}
1436
1437pub struct TeeNode(pub Rc<RefCell<HydroNode>>);
1438
1439impl TeeNode {
1440    pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1441        Rc::as_ptr(&self.0)
1442    }
1443}
1444
1445impl Debug for TeeNode {
1446    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1447        PRINTED_TEES.with(|printed_tees| {
1448            let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1449            let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1450
1451            if let Some(printed_tees_mut) = printed_tees_mut {
1452                if let Some(existing) = printed_tees_mut
1453                    .1
1454                    .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1455                {
1456                    write!(f, "<tee {}>", existing)
1457                } else {
1458                    let next_id = printed_tees_mut.0;
1459                    printed_tees_mut.0 += 1;
1460                    printed_tees_mut
1461                        .1
1462                        .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1463                    drop(printed_tees_mut_borrow);
1464                    write!(f, "<tee {}>: ", next_id)?;
1465                    Debug::fmt(&self.0.borrow(), f)
1466                }
1467            } else {
1468                drop(printed_tees_mut_borrow);
1469                write!(f, "<tee>: ")?;
1470                Debug::fmt(&self.0.borrow(), f)
1471            }
1472        })
1473    }
1474}
1475
1476impl Hash for TeeNode {
1477    fn hash<H: Hasher>(&self, state: &mut H) {
1478        self.0.borrow_mut().hash(state);
1479    }
1480}
1481
1482#[derive(Clone, PartialEq, Eq, Debug)]
1483pub enum BoundKind {
1484    Unbounded,
1485    Bounded,
1486}
1487
1488#[derive(Clone, PartialEq, Eq, Debug)]
1489pub enum StreamOrder {
1490    NoOrder,
1491    TotalOrder,
1492}
1493
1494#[derive(Clone, PartialEq, Eq, Debug)]
1495pub enum StreamRetry {
1496    AtLeastOnce,
1497    ExactlyOnce,
1498}
1499
1500#[derive(Clone, PartialEq, Eq, Debug)]
1501pub enum KeyedSingletonBoundKind {
1502    Unbounded,
1503    BoundedValue,
1504    Bounded,
1505}
1506
1507#[derive(Clone, PartialEq, Eq, Debug)]
1508pub enum CollectionKind {
1509    Stream {
1510        bound: BoundKind,
1511        order: StreamOrder,
1512        retry: StreamRetry,
1513        element_type: DebugType,
1514    },
1515    Singleton {
1516        bound: BoundKind,
1517        element_type: DebugType,
1518    },
1519    Optional {
1520        bound: BoundKind,
1521        element_type: DebugType,
1522    },
1523    KeyedStream {
1524        bound: BoundKind,
1525        value_order: StreamOrder,
1526        value_retry: StreamRetry,
1527        key_type: DebugType,
1528        value_type: DebugType,
1529    },
1530    KeyedSingleton {
1531        bound: KeyedSingletonBoundKind,
1532        key_type: DebugType,
1533        value_type: DebugType,
1534    },
1535}
1536
1537impl CollectionKind {
1538    pub fn is_bounded(&self) -> bool {
1539        matches!(
1540            self,
1541            CollectionKind::Stream {
1542                bound: BoundKind::Bounded,
1543                ..
1544            } | CollectionKind::Singleton {
1545                bound: BoundKind::Bounded,
1546                ..
1547            } | CollectionKind::Optional {
1548                bound: BoundKind::Bounded,
1549                ..
1550            } | CollectionKind::KeyedStream {
1551                bound: BoundKind::Bounded,
1552                ..
1553            } | CollectionKind::KeyedSingleton {
1554                bound: KeyedSingletonBoundKind::Bounded,
1555                ..
1556            }
1557        )
1558    }
1559}
1560
1561#[derive(Clone)]
1562pub struct HydroIrMetadata {
1563    pub location_id: LocationId,
1564    pub collection_kind: CollectionKind,
1565    pub cardinality: Option<usize>,
1566    pub tag: Option<String>,
1567    pub op: HydroIrOpMetadata,
1568}
1569
1570// HydroIrMetadata shouldn't be used to hash or compare
1571impl Hash for HydroIrMetadata {
1572    fn hash<H: Hasher>(&self, _: &mut H) {}
1573}
1574
1575impl PartialEq for HydroIrMetadata {
1576    fn eq(&self, _: &Self) -> bool {
1577        true
1578    }
1579}
1580
1581impl Eq for HydroIrMetadata {}
1582
1583impl Debug for HydroIrMetadata {
1584    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1585        f.debug_struct("HydroIrMetadata")
1586            .field("location_id", &self.location_id)
1587            .field("collection_kind", &self.collection_kind)
1588            .finish()
1589    }
1590}
1591
1592/// Metadata that is specific to the operator itself, rather than its outputs.
1593/// This is available on _both_ inner nodes and roots.
1594#[derive(Clone)]
1595pub struct HydroIrOpMetadata {
1596    pub backtrace: Backtrace,
1597    pub cpu_usage: Option<f64>,
1598    pub network_recv_cpu_usage: Option<f64>,
1599    pub id: Option<usize>,
1600}
1601
1602impl HydroIrOpMetadata {
1603    #[expect(
1604        clippy::new_without_default,
1605        reason = "explicit calls to new ensure correct backtrace bounds"
1606    )]
1607    pub fn new() -> HydroIrOpMetadata {
1608        Self::new_with_skip(1)
1609    }
1610
1611    fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1612        HydroIrOpMetadata {
1613            backtrace: Backtrace::get_backtrace(2 + skip_count),
1614            cpu_usage: None,
1615            network_recv_cpu_usage: None,
1616            id: None,
1617        }
1618    }
1619}
1620
1621impl Debug for HydroIrOpMetadata {
1622    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1623        f.debug_struct("HydroIrOpMetadata").finish()
1624    }
1625}
1626
1627impl Hash for HydroIrOpMetadata {
1628    fn hash<H: Hasher>(&self, _: &mut H) {}
1629}
1630
1631/// An intermediate node in a Hydro graph, which consumes data
1632/// from upstream nodes and emits data to downstream nodes.
1633#[derive(Debug, Hash)]
1634pub enum HydroNode {
1635    Placeholder,
1636
1637    /// Manually "casts" between two different collection kinds.
1638    ///
1639    /// Using this IR node requires special care, since it bypasses many of Hydro's core
1640    /// correctness checks. In particular, the user must ensure that every possible
1641    /// "interpretation" of the input corresponds to a distinct "interpretation" of the output,
1642    /// where an "interpretation" is a possible output of `ObserveNonDet` applied to the
1643    /// collection. This ensures that the simulator does not miss any possible outputs.
1644    Cast {
1645        inner: Box<HydroNode>,
1646        metadata: HydroIrMetadata,
1647    },
1648
1649    /// Strengthens the guarantees of a stream by non-deterministically selecting a possible
1650    /// interpretation of the input stream.
1651    ///
1652    /// In production, this simply passes through the input, but in simulation, this operator
1653    /// explicitly selects a randomized interpretation.
1654    ObserveNonDet {
1655        inner: Box<HydroNode>,
1656        trusted: bool, // if true, we do not need to simulate non-determinism
1657        metadata: HydroIrMetadata,
1658    },
1659
1660    Source {
1661        source: HydroSource,
1662        metadata: HydroIrMetadata,
1663    },
1664
1665    SingletonSource {
1666        value: DebugExpr,
1667        first_tick_only: bool,
1668        metadata: HydroIrMetadata,
1669    },
1670
1671    CycleSource {
1672        cycle_id: CycleId,
1673        metadata: HydroIrMetadata,
1674    },
1675
1676    Tee {
1677        inner: TeeNode,
1678        metadata: HydroIrMetadata,
1679    },
1680
1681    BeginAtomic {
1682        inner: Box<HydroNode>,
1683        metadata: HydroIrMetadata,
1684    },
1685
1686    EndAtomic {
1687        inner: Box<HydroNode>,
1688        metadata: HydroIrMetadata,
1689    },
1690
1691    Batch {
1692        inner: Box<HydroNode>,
1693        metadata: HydroIrMetadata,
1694    },
1695
1696    YieldConcat {
1697        inner: Box<HydroNode>,
1698        metadata: HydroIrMetadata,
1699    },
1700
1701    Chain {
1702        first: Box<HydroNode>,
1703        second: Box<HydroNode>,
1704        metadata: HydroIrMetadata,
1705    },
1706
1707    ChainFirst {
1708        first: Box<HydroNode>,
1709        second: Box<HydroNode>,
1710        metadata: HydroIrMetadata,
1711    },
1712
1713    CrossProduct {
1714        left: Box<HydroNode>,
1715        right: Box<HydroNode>,
1716        metadata: HydroIrMetadata,
1717    },
1718
1719    CrossSingleton {
1720        left: Box<HydroNode>,
1721        right: Box<HydroNode>,
1722        metadata: HydroIrMetadata,
1723    },
1724
1725    Join {
1726        left: Box<HydroNode>,
1727        right: Box<HydroNode>,
1728        metadata: HydroIrMetadata,
1729    },
1730
1731    Difference {
1732        pos: Box<HydroNode>,
1733        neg: Box<HydroNode>,
1734        metadata: HydroIrMetadata,
1735    },
1736
1737    AntiJoin {
1738        pos: Box<HydroNode>,
1739        neg: Box<HydroNode>,
1740        metadata: HydroIrMetadata,
1741    },
1742
1743    ResolveFutures {
1744        input: Box<HydroNode>,
1745        metadata: HydroIrMetadata,
1746    },
1747    ResolveFuturesOrdered {
1748        input: Box<HydroNode>,
1749        metadata: HydroIrMetadata,
1750    },
1751
1752    Map {
1753        f: DebugExpr,
1754        input: Box<HydroNode>,
1755        metadata: HydroIrMetadata,
1756    },
1757    FlatMap {
1758        f: DebugExpr,
1759        input: Box<HydroNode>,
1760        metadata: HydroIrMetadata,
1761    },
1762    Filter {
1763        f: DebugExpr,
1764        input: Box<HydroNode>,
1765        metadata: HydroIrMetadata,
1766    },
1767    FilterMap {
1768        f: DebugExpr,
1769        input: Box<HydroNode>,
1770        metadata: HydroIrMetadata,
1771    },
1772
1773    DeferTick {
1774        input: Box<HydroNode>,
1775        metadata: HydroIrMetadata,
1776    },
1777    Enumerate {
1778        input: Box<HydroNode>,
1779        metadata: HydroIrMetadata,
1780    },
1781    Inspect {
1782        f: DebugExpr,
1783        input: Box<HydroNode>,
1784        metadata: HydroIrMetadata,
1785    },
1786
1787    Unique {
1788        input: Box<HydroNode>,
1789        metadata: HydroIrMetadata,
1790    },
1791
1792    Sort {
1793        input: Box<HydroNode>,
1794        metadata: HydroIrMetadata,
1795    },
1796    Fold {
1797        init: DebugExpr,
1798        acc: DebugExpr,
1799        input: Box<HydroNode>,
1800        metadata: HydroIrMetadata,
1801    },
1802
1803    Scan {
1804        init: DebugExpr,
1805        acc: DebugExpr,
1806        input: Box<HydroNode>,
1807        metadata: HydroIrMetadata,
1808    },
1809    FoldKeyed {
1810        init: DebugExpr,
1811        acc: DebugExpr,
1812        input: Box<HydroNode>,
1813        metadata: HydroIrMetadata,
1814    },
1815
1816    Reduce {
1817        f: DebugExpr,
1818        input: Box<HydroNode>,
1819        metadata: HydroIrMetadata,
1820    },
1821    ReduceKeyed {
1822        f: DebugExpr,
1823        input: Box<HydroNode>,
1824        metadata: HydroIrMetadata,
1825    },
1826    ReduceKeyedWatermark {
1827        f: DebugExpr,
1828        input: Box<HydroNode>,
1829        watermark: Box<HydroNode>,
1830        metadata: HydroIrMetadata,
1831    },
1832
1833    Network {
1834        name: Option<String>,
1835        networking_info: crate::networking::NetworkingInfo,
1836        serialize_fn: Option<DebugExpr>,
1837        instantiate_fn: DebugInstantiate,
1838        deserialize_fn: Option<DebugExpr>,
1839        input: Box<HydroNode>,
1840        metadata: HydroIrMetadata,
1841    },
1842
1843    ExternalInput {
1844        from_external_key: LocationKey,
1845        from_port_id: ExternalPortId,
1846        from_many: bool,
1847        codec_type: DebugType,
1848        port_hint: NetworkHint,
1849        instantiate_fn: DebugInstantiate,
1850        deserialize_fn: Option<DebugExpr>,
1851        metadata: HydroIrMetadata,
1852    },
1853
1854    Counter {
1855        tag: String,
1856        duration: DebugExpr,
1857        prefix: String,
1858        input: Box<HydroNode>,
1859        metadata: HydroIrMetadata,
1860    },
1861}
1862
1863pub type SeenTees = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
1864pub type SeenTeeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
1865
1866impl HydroNode {
1867    pub fn transform_bottom_up(
1868        &mut self,
1869        transform: &mut impl FnMut(&mut HydroNode),
1870        seen_tees: &mut SeenTees,
1871        check_well_formed: bool,
1872    ) {
1873        self.transform_children(
1874            |n, s| n.transform_bottom_up(transform, s, check_well_formed),
1875            seen_tees,
1876        );
1877
1878        transform(self);
1879
1880        let self_location = self.metadata().location_id.root();
1881
1882        if check_well_formed {
1883            match &*self {
1884                HydroNode::Network { .. } => {}
1885                _ => {
1886                    self.input_metadata().iter().for_each(|i| {
1887                        if i.location_id.root() != self_location {
1888                            panic!(
1889                                "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
1890                                i,
1891                                i.location_id.root(),
1892                                self,
1893                                self_location
1894                            )
1895                        }
1896                    });
1897                }
1898            }
1899        }
1900    }
1901
1902    #[inline(always)]
1903    pub fn transform_children(
1904        &mut self,
1905        mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
1906        seen_tees: &mut SeenTees,
1907    ) {
1908        match self {
1909            HydroNode::Placeholder => {
1910                panic!();
1911            }
1912
1913            HydroNode::Source { .. }
1914            | HydroNode::SingletonSource { .. }
1915            | HydroNode::CycleSource { .. }
1916            | HydroNode::ExternalInput { .. } => {}
1917
1918            HydroNode::Tee { inner, .. } => {
1919                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1920                    *inner = TeeNode(transformed.clone());
1921                } else {
1922                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
1923                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
1924                    let mut orig = inner.0.replace(HydroNode::Placeholder);
1925                    transform(&mut orig, seen_tees);
1926                    *transformed_cell.borrow_mut() = orig;
1927                    *inner = TeeNode(transformed_cell);
1928                }
1929            }
1930
1931            HydroNode::Cast { inner, .. }
1932            | HydroNode::ObserveNonDet { inner, .. }
1933            | HydroNode::BeginAtomic { inner, .. }
1934            | HydroNode::EndAtomic { inner, .. }
1935            | HydroNode::Batch { inner, .. }
1936            | HydroNode::YieldConcat { inner, .. } => {
1937                transform(inner.as_mut(), seen_tees);
1938            }
1939
1940            HydroNode::Chain { first, second, .. } => {
1941                transform(first.as_mut(), seen_tees);
1942                transform(second.as_mut(), seen_tees);
1943            }
1944
1945            HydroNode::ChainFirst { first, second, .. } => {
1946                transform(first.as_mut(), seen_tees);
1947                transform(second.as_mut(), seen_tees);
1948            }
1949
1950            HydroNode::CrossSingleton { left, right, .. }
1951            | HydroNode::CrossProduct { left, right, .. }
1952            | HydroNode::Join { left, right, .. } => {
1953                transform(left.as_mut(), seen_tees);
1954                transform(right.as_mut(), seen_tees);
1955            }
1956
1957            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
1958                transform(pos.as_mut(), seen_tees);
1959                transform(neg.as_mut(), seen_tees);
1960            }
1961
1962            HydroNode::ReduceKeyedWatermark {
1963                input, watermark, ..
1964            } => {
1965                transform(input.as_mut(), seen_tees);
1966                transform(watermark.as_mut(), seen_tees);
1967            }
1968
1969            HydroNode::Map { input, .. }
1970            | HydroNode::ResolveFutures { input, .. }
1971            | HydroNode::ResolveFuturesOrdered { input, .. }
1972            | HydroNode::FlatMap { input, .. }
1973            | HydroNode::Filter { input, .. }
1974            | HydroNode::FilterMap { input, .. }
1975            | HydroNode::Sort { input, .. }
1976            | HydroNode::DeferTick { input, .. }
1977            | HydroNode::Enumerate { input, .. }
1978            | HydroNode::Inspect { input, .. }
1979            | HydroNode::Unique { input, .. }
1980            | HydroNode::Network { input, .. }
1981            | HydroNode::Fold { input, .. }
1982            | HydroNode::Scan { input, .. }
1983            | HydroNode::FoldKeyed { input, .. }
1984            | HydroNode::Reduce { input, .. }
1985            | HydroNode::ReduceKeyed { input, .. }
1986            | HydroNode::Counter { input, .. } => {
1987                transform(input.as_mut(), seen_tees);
1988            }
1989        }
1990    }
1991
1992    pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroNode {
1993        match self {
1994            HydroNode::Placeholder => HydroNode::Placeholder,
1995            HydroNode::Cast { inner, metadata } => HydroNode::Cast {
1996                inner: Box::new(inner.deep_clone(seen_tees)),
1997                metadata: metadata.clone(),
1998            },
1999            HydroNode::ObserveNonDet {
2000                inner,
2001                trusted,
2002                metadata,
2003            } => HydroNode::ObserveNonDet {
2004                inner: Box::new(inner.deep_clone(seen_tees)),
2005                trusted: *trusted,
2006                metadata: metadata.clone(),
2007            },
2008            HydroNode::Source { source, metadata } => HydroNode::Source {
2009                source: source.clone(),
2010                metadata: metadata.clone(),
2011            },
2012            HydroNode::SingletonSource {
2013                value,
2014                first_tick_only,
2015                metadata,
2016            } => HydroNode::SingletonSource {
2017                value: value.clone(),
2018                first_tick_only: *first_tick_only,
2019                metadata: metadata.clone(),
2020            },
2021            HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2022                cycle_id: *cycle_id,
2023                metadata: metadata.clone(),
2024            },
2025            HydroNode::Tee { inner, metadata } => {
2026                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2027                    HydroNode::Tee {
2028                        inner: TeeNode(transformed.clone()),
2029                        metadata: metadata.clone(),
2030                    }
2031                } else {
2032                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2033                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2034                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2035                    *new_rc.borrow_mut() = cloned;
2036                    HydroNode::Tee {
2037                        inner: TeeNode(new_rc),
2038                        metadata: metadata.clone(),
2039                    }
2040                }
2041            }
2042            HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2043                inner: Box::new(inner.deep_clone(seen_tees)),
2044                metadata: metadata.clone(),
2045            },
2046            HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2047                inner: Box::new(inner.deep_clone(seen_tees)),
2048                metadata: metadata.clone(),
2049            },
2050            HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2051                inner: Box::new(inner.deep_clone(seen_tees)),
2052                metadata: metadata.clone(),
2053            },
2054            HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2055                inner: Box::new(inner.deep_clone(seen_tees)),
2056                metadata: metadata.clone(),
2057            },
2058            HydroNode::Chain {
2059                first,
2060                second,
2061                metadata,
2062            } => HydroNode::Chain {
2063                first: Box::new(first.deep_clone(seen_tees)),
2064                second: Box::new(second.deep_clone(seen_tees)),
2065                metadata: metadata.clone(),
2066            },
2067            HydroNode::ChainFirst {
2068                first,
2069                second,
2070                metadata,
2071            } => HydroNode::ChainFirst {
2072                first: Box::new(first.deep_clone(seen_tees)),
2073                second: Box::new(second.deep_clone(seen_tees)),
2074                metadata: metadata.clone(),
2075            },
2076            HydroNode::CrossProduct {
2077                left,
2078                right,
2079                metadata,
2080            } => HydroNode::CrossProduct {
2081                left: Box::new(left.deep_clone(seen_tees)),
2082                right: Box::new(right.deep_clone(seen_tees)),
2083                metadata: metadata.clone(),
2084            },
2085            HydroNode::CrossSingleton {
2086                left,
2087                right,
2088                metadata,
2089            } => HydroNode::CrossSingleton {
2090                left: Box::new(left.deep_clone(seen_tees)),
2091                right: Box::new(right.deep_clone(seen_tees)),
2092                metadata: metadata.clone(),
2093            },
2094            HydroNode::Join {
2095                left,
2096                right,
2097                metadata,
2098            } => HydroNode::Join {
2099                left: Box::new(left.deep_clone(seen_tees)),
2100                right: Box::new(right.deep_clone(seen_tees)),
2101                metadata: metadata.clone(),
2102            },
2103            HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2104                pos: Box::new(pos.deep_clone(seen_tees)),
2105                neg: Box::new(neg.deep_clone(seen_tees)),
2106                metadata: metadata.clone(),
2107            },
2108            HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2109                pos: Box::new(pos.deep_clone(seen_tees)),
2110                neg: Box::new(neg.deep_clone(seen_tees)),
2111                metadata: metadata.clone(),
2112            },
2113            HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2114                input: Box::new(input.deep_clone(seen_tees)),
2115                metadata: metadata.clone(),
2116            },
2117            HydroNode::ResolveFuturesOrdered { input, metadata } => {
2118                HydroNode::ResolveFuturesOrdered {
2119                    input: Box::new(input.deep_clone(seen_tees)),
2120                    metadata: metadata.clone(),
2121                }
2122            }
2123            HydroNode::Map { f, input, metadata } => HydroNode::Map {
2124                f: f.clone(),
2125                input: Box::new(input.deep_clone(seen_tees)),
2126                metadata: metadata.clone(),
2127            },
2128            HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2129                f: f.clone(),
2130                input: Box::new(input.deep_clone(seen_tees)),
2131                metadata: metadata.clone(),
2132            },
2133            HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2134                f: f.clone(),
2135                input: Box::new(input.deep_clone(seen_tees)),
2136                metadata: metadata.clone(),
2137            },
2138            HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2139                f: f.clone(),
2140                input: Box::new(input.deep_clone(seen_tees)),
2141                metadata: metadata.clone(),
2142            },
2143            HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2144                input: Box::new(input.deep_clone(seen_tees)),
2145                metadata: metadata.clone(),
2146            },
2147            HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2148                input: Box::new(input.deep_clone(seen_tees)),
2149                metadata: metadata.clone(),
2150            },
2151            HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2152                f: f.clone(),
2153                input: Box::new(input.deep_clone(seen_tees)),
2154                metadata: metadata.clone(),
2155            },
2156            HydroNode::Unique { input, metadata } => HydroNode::Unique {
2157                input: Box::new(input.deep_clone(seen_tees)),
2158                metadata: metadata.clone(),
2159            },
2160            HydroNode::Sort { input, metadata } => HydroNode::Sort {
2161                input: Box::new(input.deep_clone(seen_tees)),
2162                metadata: metadata.clone(),
2163            },
2164            HydroNode::Fold {
2165                init,
2166                acc,
2167                input,
2168                metadata,
2169            } => HydroNode::Fold {
2170                init: init.clone(),
2171                acc: acc.clone(),
2172                input: Box::new(input.deep_clone(seen_tees)),
2173                metadata: metadata.clone(),
2174            },
2175            HydroNode::Scan {
2176                init,
2177                acc,
2178                input,
2179                metadata,
2180            } => HydroNode::Scan {
2181                init: init.clone(),
2182                acc: acc.clone(),
2183                input: Box::new(input.deep_clone(seen_tees)),
2184                metadata: metadata.clone(),
2185            },
2186            HydroNode::FoldKeyed {
2187                init,
2188                acc,
2189                input,
2190                metadata,
2191            } => HydroNode::FoldKeyed {
2192                init: init.clone(),
2193                acc: acc.clone(),
2194                input: Box::new(input.deep_clone(seen_tees)),
2195                metadata: metadata.clone(),
2196            },
2197            HydroNode::ReduceKeyedWatermark {
2198                f,
2199                input,
2200                watermark,
2201                metadata,
2202            } => HydroNode::ReduceKeyedWatermark {
2203                f: f.clone(),
2204                input: Box::new(input.deep_clone(seen_tees)),
2205                watermark: Box::new(watermark.deep_clone(seen_tees)),
2206                metadata: metadata.clone(),
2207            },
2208            HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2209                f: f.clone(),
2210                input: Box::new(input.deep_clone(seen_tees)),
2211                metadata: metadata.clone(),
2212            },
2213            HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
2214                f: f.clone(),
2215                input: Box::new(input.deep_clone(seen_tees)),
2216                metadata: metadata.clone(),
2217            },
2218            HydroNode::Network {
2219                name,
2220                networking_info,
2221                serialize_fn,
2222                instantiate_fn,
2223                deserialize_fn,
2224                input,
2225                metadata,
2226            } => HydroNode::Network {
2227                name: name.clone(),
2228                networking_info: networking_info.clone(),
2229                serialize_fn: serialize_fn.clone(),
2230                instantiate_fn: instantiate_fn.clone(),
2231                deserialize_fn: deserialize_fn.clone(),
2232                input: Box::new(input.deep_clone(seen_tees)),
2233                metadata: metadata.clone(),
2234            },
2235            HydroNode::ExternalInput {
2236                from_external_key,
2237                from_port_id,
2238                from_many,
2239                codec_type,
2240                port_hint,
2241                instantiate_fn,
2242                deserialize_fn,
2243                metadata,
2244            } => HydroNode::ExternalInput {
2245                from_external_key: *from_external_key,
2246                from_port_id: *from_port_id,
2247                from_many: *from_many,
2248                codec_type: codec_type.clone(),
2249                port_hint: *port_hint,
2250                instantiate_fn: instantiate_fn.clone(),
2251                deserialize_fn: deserialize_fn.clone(),
2252                metadata: metadata.clone(),
2253            },
2254            HydroNode::Counter {
2255                tag,
2256                duration,
2257                prefix,
2258                input,
2259                metadata,
2260            } => HydroNode::Counter {
2261                tag: tag.clone(),
2262                duration: duration.clone(),
2263                prefix: prefix.clone(),
2264                input: Box::new(input.deep_clone(seen_tees)),
2265                metadata: metadata.clone(),
2266            },
2267        }
2268    }
2269
2270    #[cfg(feature = "build")]
2271    pub fn emit_core(
2272        &mut self,
2273        builders_or_callback: &mut BuildersOrCallback<
2274            impl FnMut(&mut HydroRoot, &mut usize),
2275            impl FnMut(&mut HydroNode, &mut usize),
2276        >,
2277        seen_tees: &mut SeenTees,
2278        built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
2279        next_stmt_id: &mut usize,
2280    ) -> syn::Ident {
2281        let mut ident_stack: Vec<syn::Ident> = Vec::new();
2282
2283        self.transform_bottom_up(
2284            &mut |node: &mut HydroNode| {
2285                let out_location = node.metadata().location_id.clone();
2286                match node {
2287                    HydroNode::Placeholder => {
2288                        panic!()
2289                    }
2290
2291                    HydroNode::Cast { .. } => {
2292                        // Cast passes through the input ident unchanged
2293                        // The input ident is already on the stack from processing the child
2294                        match builders_or_callback {
2295                            BuildersOrCallback::Builders(_) => {}
2296                            BuildersOrCallback::Callback(_, node_callback) => {
2297                                node_callback(node, next_stmt_id);
2298                            }
2299                        }
2300
2301                        *next_stmt_id += 1;
2302                        // input_ident stays on stack as output
2303                    }
2304
2305                    HydroNode::ObserveNonDet {
2306                        inner,
2307                        trusted,
2308                        metadata,
2309                        ..
2310                    } => {
2311                        let inner_ident = ident_stack.pop().unwrap();
2312
2313                        let observe_ident =
2314                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2315
2316                        match builders_or_callback {
2317                            BuildersOrCallback::Builders(graph_builders) => {
2318                                graph_builders.observe_nondet(
2319                                    *trusted,
2320                                    &inner.metadata().location_id,
2321                                    inner_ident,
2322                                    &inner.metadata().collection_kind,
2323                                    &observe_ident,
2324                                    &metadata.collection_kind,
2325                                    &metadata.op,
2326                                );
2327                            }
2328                            BuildersOrCallback::Callback(_, node_callback) => {
2329                                node_callback(node, next_stmt_id);
2330                            }
2331                        }
2332
2333                        *next_stmt_id += 1;
2334
2335                        ident_stack.push(observe_ident);
2336                    }
2337
2338                    HydroNode::Batch {
2339                        inner, metadata, ..
2340                    } => {
2341                        let inner_ident = ident_stack.pop().unwrap();
2342
2343                        let batch_ident =
2344                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2345
2346                        match builders_or_callback {
2347                            BuildersOrCallback::Builders(graph_builders) => {
2348                                graph_builders.batch(
2349                                    inner_ident,
2350                                    &inner.metadata().location_id,
2351                                    &inner.metadata().collection_kind,
2352                                    &batch_ident,
2353                                    &out_location,
2354                                    &metadata.op,
2355                                );
2356                            }
2357                            BuildersOrCallback::Callback(_, node_callback) => {
2358                                node_callback(node, next_stmt_id);
2359                            }
2360                        }
2361
2362                        *next_stmt_id += 1;
2363
2364                        ident_stack.push(batch_ident);
2365                    }
2366
2367                    HydroNode::YieldConcat { inner, .. } => {
2368                        let inner_ident = ident_stack.pop().unwrap();
2369
2370                        let yield_ident =
2371                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2372
2373                        match builders_or_callback {
2374                            BuildersOrCallback::Builders(graph_builders) => {
2375                                graph_builders.yield_from_tick(
2376                                    inner_ident,
2377                                    &inner.metadata().location_id,
2378                                    &inner.metadata().collection_kind,
2379                                    &yield_ident,
2380                                    &out_location,
2381                                );
2382                            }
2383                            BuildersOrCallback::Callback(_, node_callback) => {
2384                                node_callback(node, next_stmt_id);
2385                            }
2386                        }
2387
2388                        *next_stmt_id += 1;
2389
2390                        ident_stack.push(yield_ident);
2391                    }
2392
2393                    HydroNode::BeginAtomic { inner, metadata } => {
2394                        let inner_ident = ident_stack.pop().unwrap();
2395
2396                        let begin_ident =
2397                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2398
2399                        match builders_or_callback {
2400                            BuildersOrCallback::Builders(graph_builders) => {
2401                                graph_builders.begin_atomic(
2402                                    inner_ident,
2403                                    &inner.metadata().location_id,
2404                                    &inner.metadata().collection_kind,
2405                                    &begin_ident,
2406                                    &out_location,
2407                                    &metadata.op,
2408                                );
2409                            }
2410                            BuildersOrCallback::Callback(_, node_callback) => {
2411                                node_callback(node, next_stmt_id);
2412                            }
2413                        }
2414
2415                        *next_stmt_id += 1;
2416
2417                        ident_stack.push(begin_ident);
2418                    }
2419
2420                    HydroNode::EndAtomic { inner, .. } => {
2421                        let inner_ident = ident_stack.pop().unwrap();
2422
2423                        let end_ident =
2424                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2425
2426                        match builders_or_callback {
2427                            BuildersOrCallback::Builders(graph_builders) => {
2428                                graph_builders.end_atomic(
2429                                    inner_ident,
2430                                    &inner.metadata().location_id,
2431                                    &inner.metadata().collection_kind,
2432                                    &end_ident,
2433                                );
2434                            }
2435                            BuildersOrCallback::Callback(_, node_callback) => {
2436                                node_callback(node, next_stmt_id);
2437                            }
2438                        }
2439
2440                        *next_stmt_id += 1;
2441
2442                        ident_stack.push(end_ident);
2443                    }
2444
2445                    HydroNode::Source {
2446                        source, metadata, ..
2447                    } => {
2448                        if let HydroSource::ExternalNetwork() = source {
2449                            ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
2450                        } else {
2451                            let source_ident =
2452                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2453
2454                            let source_stmt = match source {
2455                                HydroSource::Stream(expr) => {
2456                                    debug_assert!(metadata.location_id.is_top_level());
2457                                    parse_quote! {
2458                                        #source_ident = source_stream(#expr);
2459                                    }
2460                                }
2461
2462                                HydroSource::ExternalNetwork() => {
2463                                    unreachable!()
2464                                }
2465
2466                                HydroSource::Iter(expr) => {
2467                                    if metadata.location_id.is_top_level() {
2468                                        parse_quote! {
2469                                            #source_ident = source_iter(#expr);
2470                                        }
2471                                    } else {
2472                                        // TODO(shadaj): a more natural semantics would be to to re-evaluate the expression on each tick
2473                                        parse_quote! {
2474                                            #source_ident = source_iter(#expr) -> persist::<'static>();
2475                                        }
2476                                    }
2477                                }
2478
2479                                HydroSource::Spin() => {
2480                                    debug_assert!(metadata.location_id.is_top_level());
2481                                    parse_quote! {
2482                                        #source_ident = spin();
2483                                    }
2484                                }
2485
2486                                HydroSource::ClusterMembers(target_loc, state) => {
2487                                    debug_assert!(metadata.location_id.is_top_level());
2488
2489                                    let members_tee_ident = syn::Ident::new(
2490                                        &format!(
2491                                            "__cluster_members_tee_{}_{}",
2492                                            metadata.location_id.root().key(),
2493                                            target_loc.key(),
2494                                        ),
2495                                        Span::call_site(),
2496                                    );
2497
2498                                    match state {
2499                                        ClusterMembersState::Stream(d) => {
2500                                            parse_quote! {
2501                                                #members_tee_ident = source_stream(#d) -> tee();
2502                                                #source_ident = #members_tee_ident;
2503                                            }
2504                                        },
2505                                        ClusterMembersState::Uninit => syn::parse_quote! {
2506                                            #source_ident = source_stream(DUMMY);
2507                                        },
2508                                        ClusterMembersState::Tee(..) => parse_quote! {
2509                                            #source_ident = #members_tee_ident;
2510                                        },
2511                                    }
2512                                }
2513
2514                                HydroSource::Embedded(ident) => {
2515                                    parse_quote! {
2516                                        #source_ident = source_stream(#ident);
2517                                    }
2518                                }
2519                            };
2520
2521                            match builders_or_callback {
2522                                BuildersOrCallback::Builders(graph_builders) => {
2523                                    let builder = graph_builders.get_dfir_mut(&out_location);
2524                                    builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
2525                                }
2526                                BuildersOrCallback::Callback(_, node_callback) => {
2527                                    node_callback(node, next_stmt_id);
2528                                }
2529                            }
2530
2531                            *next_stmt_id += 1;
2532
2533                            ident_stack.push(source_ident);
2534                        }
2535                    }
2536
2537                    HydroNode::SingletonSource { value, first_tick_only, metadata } => {
2538                        let source_ident =
2539                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2540
2541                        match builders_or_callback {
2542                            BuildersOrCallback::Builders(graph_builders) => {
2543                                let builder = graph_builders.get_dfir_mut(&out_location);
2544
2545                                if *first_tick_only {
2546                                    assert!(
2547                                        !metadata.location_id.is_top_level(),
2548                                        "first_tick_only SingletonSource must be inside a tick"
2549                                    );
2550                                }
2551
2552                                if *first_tick_only
2553                                    || (metadata.location_id.is_top_level()
2554                                        && metadata.collection_kind.is_bounded())
2555                                {
2556                                    builder.add_dfir(
2557                                        parse_quote! {
2558                                            #source_ident = source_iter([#value]);
2559                                        },
2560                                        None,
2561                                        Some(&next_stmt_id.to_string()),
2562                                    );
2563                                } else {
2564                                    builder.add_dfir(
2565                                        parse_quote! {
2566                                            #source_ident = source_iter([#value]) -> persist::<'static>();
2567                                        },
2568                                        None,
2569                                        Some(&next_stmt_id.to_string()),
2570                                    );
2571                                }
2572                            }
2573                            BuildersOrCallback::Callback(_, node_callback) => {
2574                                node_callback(node, next_stmt_id);
2575                            }
2576                        }
2577
2578                        *next_stmt_id += 1;
2579
2580                        ident_stack.push(source_ident);
2581                    }
2582
2583                    HydroNode::CycleSource { cycle_id, .. } => {
2584                        let ident = cycle_id.as_ident();
2585
2586                        match builders_or_callback {
2587                            BuildersOrCallback::Builders(_) => {}
2588                            BuildersOrCallback::Callback(_, node_callback) => {
2589                                node_callback(node, next_stmt_id);
2590                            }
2591                        }
2592
2593                        // consume a stmt id even though we did not emit anything so that we can instrument this
2594                        *next_stmt_id += 1;
2595
2596                        ident_stack.push(ident);
2597                    }
2598
2599                    HydroNode::Tee { inner, .. } => {
2600                        let ret_ident = if let Some(teed_from) =
2601                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
2602                        {
2603                            match builders_or_callback {
2604                                BuildersOrCallback::Builders(_) => {}
2605                                BuildersOrCallback::Callback(_, node_callback) => {
2606                                    node_callback(node, next_stmt_id);
2607                                }
2608                            }
2609
2610                            teed_from.clone()
2611                        } else {
2612                            // The inner node was already processed by transform_bottom_up,
2613                            // so its ident is on the stack
2614                            let inner_ident = ident_stack.pop().unwrap();
2615
2616                            let tee_ident =
2617                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2618
2619                            built_tees.insert(
2620                                inner.0.as_ref() as *const RefCell<HydroNode>,
2621                                tee_ident.clone(),
2622                            );
2623
2624                            match builders_or_callback {
2625                                BuildersOrCallback::Builders(graph_builders) => {
2626                                    let builder = graph_builders.get_dfir_mut(&out_location);
2627                                    builder.add_dfir(
2628                                        parse_quote! {
2629                                            #tee_ident = #inner_ident -> tee();
2630                                        },
2631                                        None,
2632                                        Some(&next_stmt_id.to_string()),
2633                                    );
2634                                }
2635                                BuildersOrCallback::Callback(_, node_callback) => {
2636                                    node_callback(node, next_stmt_id);
2637                                }
2638                            }
2639
2640                            tee_ident
2641                        };
2642
2643                        // we consume a stmt id regardless of if we emit the tee() operator,
2644                        // so that during rewrites we touch all recipients of the tee()
2645
2646                        *next_stmt_id += 1;
2647                        ident_stack.push(ret_ident);
2648                    }
2649
2650                    HydroNode::Chain { .. } => {
2651                        // Children are processed left-to-right, so second is on top
2652                        let second_ident = ident_stack.pop().unwrap();
2653                        let first_ident = ident_stack.pop().unwrap();
2654
2655                        let chain_ident =
2656                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2657
2658                        match builders_or_callback {
2659                            BuildersOrCallback::Builders(graph_builders) => {
2660                                let builder = graph_builders.get_dfir_mut(&out_location);
2661                                builder.add_dfir(
2662                                    parse_quote! {
2663                                        #chain_ident = chain();
2664                                        #first_ident -> [0]#chain_ident;
2665                                        #second_ident -> [1]#chain_ident;
2666                                    },
2667                                    None,
2668                                    Some(&next_stmt_id.to_string()),
2669                                );
2670                            }
2671                            BuildersOrCallback::Callback(_, node_callback) => {
2672                                node_callback(node, next_stmt_id);
2673                            }
2674                        }
2675
2676                        *next_stmt_id += 1;
2677
2678                        ident_stack.push(chain_ident);
2679                    }
2680
2681                    HydroNode::ChainFirst { .. } => {
2682                        let second_ident = ident_stack.pop().unwrap();
2683                        let first_ident = ident_stack.pop().unwrap();
2684
2685                        let chain_ident =
2686                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2687
2688                        match builders_or_callback {
2689                            BuildersOrCallback::Builders(graph_builders) => {
2690                                let builder = graph_builders.get_dfir_mut(&out_location);
2691                                builder.add_dfir(
2692                                    parse_quote! {
2693                                        #chain_ident = chain_first_n(1);
2694                                        #first_ident -> [0]#chain_ident;
2695                                        #second_ident -> [1]#chain_ident;
2696                                    },
2697                                    None,
2698                                    Some(&next_stmt_id.to_string()),
2699                                );
2700                            }
2701                            BuildersOrCallback::Callback(_, node_callback) => {
2702                                node_callback(node, next_stmt_id);
2703                            }
2704                        }
2705
2706                        *next_stmt_id += 1;
2707
2708                        ident_stack.push(chain_ident);
2709                    }
2710
2711                    HydroNode::CrossSingleton { right, .. } => {
2712                        let right_ident = ident_stack.pop().unwrap();
2713                        let left_ident = ident_stack.pop().unwrap();
2714
2715                        let cross_ident =
2716                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2717
2718                        match builders_or_callback {
2719                            BuildersOrCallback::Builders(graph_builders) => {
2720                                let builder = graph_builders.get_dfir_mut(&out_location);
2721
2722                                if right.metadata().location_id.is_top_level()
2723                                    && right.metadata().collection_kind.is_bounded()
2724                                {
2725                                    builder.add_dfir(
2726                                        parse_quote! {
2727                                            #cross_ident = cross_singleton();
2728                                            #left_ident -> [input]#cross_ident;
2729                                            #right_ident -> persist::<'static>() -> [single]#cross_ident;
2730                                        },
2731                                        None,
2732                                        Some(&next_stmt_id.to_string()),
2733                                    );
2734                                } else {
2735                                    builder.add_dfir(
2736                                        parse_quote! {
2737                                            #cross_ident = cross_singleton();
2738                                            #left_ident -> [input]#cross_ident;
2739                                            #right_ident -> [single]#cross_ident;
2740                                        },
2741                                        None,
2742                                        Some(&next_stmt_id.to_string()),
2743                                    );
2744                                }
2745                            }
2746                            BuildersOrCallback::Callback(_, node_callback) => {
2747                                node_callback(node, next_stmt_id);
2748                            }
2749                        }
2750
2751                        *next_stmt_id += 1;
2752
2753                        ident_stack.push(cross_ident);
2754                    }
2755
2756                    HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
2757                        let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
2758                            parse_quote!(cross_join_multiset)
2759                        } else {
2760                            parse_quote!(join_multiset)
2761                        };
2762
2763                        let (HydroNode::CrossProduct { left, right, .. }
2764                        | HydroNode::Join { left, right, .. }) = node
2765                        else {
2766                            unreachable!()
2767                        };
2768
2769                        let is_top_level = left.metadata().location_id.is_top_level()
2770                            && right.metadata().location_id.is_top_level();
2771                        let left_lifetime = if left.metadata().location_id.is_top_level() {
2772                            quote!('static)
2773                        } else {
2774                            quote!('tick)
2775                        };
2776
2777                        let right_lifetime = if right.metadata().location_id.is_top_level() {
2778                            quote!('static)
2779                        } else {
2780                            quote!('tick)
2781                        };
2782
2783                        let right_ident = ident_stack.pop().unwrap();
2784                        let left_ident = ident_stack.pop().unwrap();
2785
2786                        let stream_ident =
2787                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2788
2789                        match builders_or_callback {
2790                            BuildersOrCallback::Builders(graph_builders) => {
2791                                let builder = graph_builders.get_dfir_mut(&out_location);
2792                                builder.add_dfir(
2793                                    if is_top_level {
2794                                        // if both inputs are root, the output is expected to have streamy semantics, so we need
2795                                        // a multiset_delta() to negate the replay behavior
2796                                        parse_quote! {
2797                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
2798                                            #left_ident -> [0]#stream_ident;
2799                                            #right_ident -> [1]#stream_ident;
2800                                        }
2801                                    } else {
2802                                        parse_quote! {
2803                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
2804                                            #left_ident -> [0]#stream_ident;
2805                                            #right_ident -> [1]#stream_ident;
2806                                        }
2807                                    }
2808                                    ,
2809                                    None,
2810                                    Some(&next_stmt_id.to_string()),
2811                                );
2812                            }
2813                            BuildersOrCallback::Callback(_, node_callback) => {
2814                                node_callback(node, next_stmt_id);
2815                            }
2816                        }
2817
2818                        *next_stmt_id += 1;
2819
2820                        ident_stack.push(stream_ident);
2821                    }
2822
2823                    HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
2824                        let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
2825                            parse_quote!(difference)
2826                        } else {
2827                            parse_quote!(anti_join)
2828                        };
2829
2830                        let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
2831                            node
2832                        else {
2833                            unreachable!()
2834                        };
2835
2836                        let neg_lifetime = if neg.metadata().location_id.is_top_level() {
2837                            quote!('static)
2838                        } else {
2839                            quote!('tick)
2840                        };
2841
2842                        let neg_ident = ident_stack.pop().unwrap();
2843                        let pos_ident = ident_stack.pop().unwrap();
2844
2845                        let stream_ident =
2846                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2847
2848                        match builders_or_callback {
2849                            BuildersOrCallback::Builders(graph_builders) => {
2850                                let builder = graph_builders.get_dfir_mut(&out_location);
2851                                builder.add_dfir(
2852                                    parse_quote! {
2853                                        #stream_ident = #operator::<'tick, #neg_lifetime>();
2854                                        #pos_ident -> [pos]#stream_ident;
2855                                        #neg_ident -> [neg]#stream_ident;
2856                                    },
2857                                    None,
2858                                    Some(&next_stmt_id.to_string()),
2859                                );
2860                            }
2861                            BuildersOrCallback::Callback(_, node_callback) => {
2862                                node_callback(node, next_stmt_id);
2863                            }
2864                        }
2865
2866                        *next_stmt_id += 1;
2867
2868                        ident_stack.push(stream_ident);
2869                    }
2870
2871                    HydroNode::ResolveFutures { .. } => {
2872                        let input_ident = ident_stack.pop().unwrap();
2873
2874                        let futures_ident =
2875                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2876
2877                        match builders_or_callback {
2878                            BuildersOrCallback::Builders(graph_builders) => {
2879                                let builder = graph_builders.get_dfir_mut(&out_location);
2880                                builder.add_dfir(
2881                                    parse_quote! {
2882                                        #futures_ident = #input_ident -> resolve_futures();
2883                                    },
2884                                    None,
2885                                    Some(&next_stmt_id.to_string()),
2886                                );
2887                            }
2888                            BuildersOrCallback::Callback(_, node_callback) => {
2889                                node_callback(node, next_stmt_id);
2890                            }
2891                        }
2892
2893                        *next_stmt_id += 1;
2894
2895                        ident_stack.push(futures_ident);
2896                    }
2897
2898                    HydroNode::ResolveFuturesOrdered { .. } => {
2899                        let input_ident = ident_stack.pop().unwrap();
2900
2901                        let futures_ident =
2902                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2903
2904                        match builders_or_callback {
2905                            BuildersOrCallback::Builders(graph_builders) => {
2906                                let builder = graph_builders.get_dfir_mut(&out_location);
2907                                builder.add_dfir(
2908                                    parse_quote! {
2909                                        #futures_ident = #input_ident -> resolve_futures_ordered();
2910                                    },
2911                                    None,
2912                                    Some(&next_stmt_id.to_string()),
2913                                );
2914                            }
2915                            BuildersOrCallback::Callback(_, node_callback) => {
2916                                node_callback(node, next_stmt_id);
2917                            }
2918                        }
2919
2920                        *next_stmt_id += 1;
2921
2922                        ident_stack.push(futures_ident);
2923                    }
2924
2925                    HydroNode::Map { f, .. } => {
2926                        let input_ident = ident_stack.pop().unwrap();
2927
2928                        let map_ident =
2929                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2930
2931                        match builders_or_callback {
2932                            BuildersOrCallback::Builders(graph_builders) => {
2933                                let builder = graph_builders.get_dfir_mut(&out_location);
2934                                builder.add_dfir(
2935                                    parse_quote! {
2936                                        #map_ident = #input_ident -> map(#f);
2937                                    },
2938                                    None,
2939                                    Some(&next_stmt_id.to_string()),
2940                                );
2941                            }
2942                            BuildersOrCallback::Callback(_, node_callback) => {
2943                                node_callback(node, next_stmt_id);
2944                            }
2945                        }
2946
2947                        *next_stmt_id += 1;
2948
2949                        ident_stack.push(map_ident);
2950                    }
2951
2952                    HydroNode::FlatMap { f, .. } => {
2953                        let input_ident = ident_stack.pop().unwrap();
2954
2955                        let flat_map_ident =
2956                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2957
2958                        match builders_or_callback {
2959                            BuildersOrCallback::Builders(graph_builders) => {
2960                                let builder = graph_builders.get_dfir_mut(&out_location);
2961                                builder.add_dfir(
2962                                    parse_quote! {
2963                                        #flat_map_ident = #input_ident -> flat_map(#f);
2964                                    },
2965                                    None,
2966                                    Some(&next_stmt_id.to_string()),
2967                                );
2968                            }
2969                            BuildersOrCallback::Callback(_, node_callback) => {
2970                                node_callback(node, next_stmt_id);
2971                            }
2972                        }
2973
2974                        *next_stmt_id += 1;
2975
2976                        ident_stack.push(flat_map_ident);
2977                    }
2978
2979                    HydroNode::Filter { f, .. } => {
2980                        let input_ident = ident_stack.pop().unwrap();
2981
2982                        let filter_ident =
2983                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2984
2985                        match builders_or_callback {
2986                            BuildersOrCallback::Builders(graph_builders) => {
2987                                let builder = graph_builders.get_dfir_mut(&out_location);
2988                                builder.add_dfir(
2989                                    parse_quote! {
2990                                        #filter_ident = #input_ident -> filter(#f);
2991                                    },
2992                                    None,
2993                                    Some(&next_stmt_id.to_string()),
2994                                );
2995                            }
2996                            BuildersOrCallback::Callback(_, node_callback) => {
2997                                node_callback(node, next_stmt_id);
2998                            }
2999                        }
3000
3001                        *next_stmt_id += 1;
3002
3003                        ident_stack.push(filter_ident);
3004                    }
3005
3006                    HydroNode::FilterMap { f, .. } => {
3007                        let input_ident = ident_stack.pop().unwrap();
3008
3009                        let filter_map_ident =
3010                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3011
3012                        match builders_or_callback {
3013                            BuildersOrCallback::Builders(graph_builders) => {
3014                                let builder = graph_builders.get_dfir_mut(&out_location);
3015                                builder.add_dfir(
3016                                    parse_quote! {
3017                                        #filter_map_ident = #input_ident -> filter_map(#f);
3018                                    },
3019                                    None,
3020                                    Some(&next_stmt_id.to_string()),
3021                                );
3022                            }
3023                            BuildersOrCallback::Callback(_, node_callback) => {
3024                                node_callback(node, next_stmt_id);
3025                            }
3026                        }
3027
3028                        *next_stmt_id += 1;
3029
3030                        ident_stack.push(filter_map_ident);
3031                    }
3032
3033                    HydroNode::Sort { .. } => {
3034                        let input_ident = ident_stack.pop().unwrap();
3035
3036                        let sort_ident =
3037                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3038
3039                        match builders_or_callback {
3040                            BuildersOrCallback::Builders(graph_builders) => {
3041                                let builder = graph_builders.get_dfir_mut(&out_location);
3042                                builder.add_dfir(
3043                                    parse_quote! {
3044                                        #sort_ident = #input_ident -> sort();
3045                                    },
3046                                    None,
3047                                    Some(&next_stmt_id.to_string()),
3048                                );
3049                            }
3050                            BuildersOrCallback::Callback(_, node_callback) => {
3051                                node_callback(node, next_stmt_id);
3052                            }
3053                        }
3054
3055                        *next_stmt_id += 1;
3056
3057                        ident_stack.push(sort_ident);
3058                    }
3059
3060                    HydroNode::DeferTick { .. } => {
3061                        let input_ident = ident_stack.pop().unwrap();
3062
3063                        let defer_tick_ident =
3064                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3065
3066                        match builders_or_callback {
3067                            BuildersOrCallback::Builders(graph_builders) => {
3068                                let builder = graph_builders.get_dfir_mut(&out_location);
3069                                builder.add_dfir(
3070                                    parse_quote! {
3071                                        #defer_tick_ident = #input_ident -> defer_tick_lazy();
3072                                    },
3073                                    None,
3074                                    Some(&next_stmt_id.to_string()),
3075                                );
3076                            }
3077                            BuildersOrCallback::Callback(_, node_callback) => {
3078                                node_callback(node, next_stmt_id);
3079                            }
3080                        }
3081
3082                        *next_stmt_id += 1;
3083
3084                        ident_stack.push(defer_tick_ident);
3085                    }
3086
3087                    HydroNode::Enumerate { input, .. } => {
3088                        let input_ident = ident_stack.pop().unwrap();
3089
3090                        let enumerate_ident =
3091                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3092
3093                        match builders_or_callback {
3094                            BuildersOrCallback::Builders(graph_builders) => {
3095                                let builder = graph_builders.get_dfir_mut(&out_location);
3096                                let lifetime = if input.metadata().location_id.is_top_level() {
3097                                    quote!('static)
3098                                } else {
3099                                    quote!('tick)
3100                                };
3101                                builder.add_dfir(
3102                                    parse_quote! {
3103                                        #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
3104                                    },
3105                                    None,
3106                                    Some(&next_stmt_id.to_string()),
3107                                );
3108                            }
3109                            BuildersOrCallback::Callback(_, node_callback) => {
3110                                node_callback(node, next_stmt_id);
3111                            }
3112                        }
3113
3114                        *next_stmt_id += 1;
3115
3116                        ident_stack.push(enumerate_ident);
3117                    }
3118
3119                    HydroNode::Inspect { f, .. } => {
3120                        let input_ident = ident_stack.pop().unwrap();
3121
3122                        let inspect_ident =
3123                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3124
3125                        match builders_or_callback {
3126                            BuildersOrCallback::Builders(graph_builders) => {
3127                                let builder = graph_builders.get_dfir_mut(&out_location);
3128                                builder.add_dfir(
3129                                    parse_quote! {
3130                                        #inspect_ident = #input_ident -> inspect(#f);
3131                                    },
3132                                    None,
3133                                    Some(&next_stmt_id.to_string()),
3134                                );
3135                            }
3136                            BuildersOrCallback::Callback(_, node_callback) => {
3137                                node_callback(node, next_stmt_id);
3138                            }
3139                        }
3140
3141                        *next_stmt_id += 1;
3142
3143                        ident_stack.push(inspect_ident);
3144                    }
3145
3146                    HydroNode::Unique { input, .. } => {
3147                        let input_ident = ident_stack.pop().unwrap();
3148
3149                        let unique_ident =
3150                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3151
3152                        match builders_or_callback {
3153                            BuildersOrCallback::Builders(graph_builders) => {
3154                                let builder = graph_builders.get_dfir_mut(&out_location);
3155                                let lifetime = if input.metadata().location_id.is_top_level() {
3156                                    quote!('static)
3157                                } else {
3158                                    quote!('tick)
3159                                };
3160
3161                                builder.add_dfir(
3162                                    parse_quote! {
3163                                        #unique_ident = #input_ident -> unique::<#lifetime>();
3164                                    },
3165                                    None,
3166                                    Some(&next_stmt_id.to_string()),
3167                                );
3168                            }
3169                            BuildersOrCallback::Callback(_, node_callback) => {
3170                                node_callback(node, next_stmt_id);
3171                            }
3172                        }
3173
3174                        *next_stmt_id += 1;
3175
3176                        ident_stack.push(unique_ident);
3177                    }
3178
3179                    HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
3180                        let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
3181                            if input.metadata().location_id.is_top_level()
3182                                && input.metadata().collection_kind.is_bounded()
3183                            {
3184                                parse_quote!(fold_no_replay)
3185                            } else {
3186                                parse_quote!(fold)
3187                            }
3188                        } else if matches!(node, HydroNode::Scan { .. }) {
3189                            parse_quote!(scan)
3190                        } else if let HydroNode::FoldKeyed { input, .. } = node {
3191                            if input.metadata().location_id.is_top_level()
3192                                && input.metadata().collection_kind.is_bounded()
3193                            {
3194                                todo!("Fold keyed on a top-level bounded collection is not yet supported")
3195                            } else {
3196                                parse_quote!(fold_keyed)
3197                            }
3198                        } else {
3199                            unreachable!()
3200                        };
3201
3202                        let (HydroNode::Fold { input, .. }
3203                        | HydroNode::FoldKeyed { input, .. }
3204                        | HydroNode::Scan { input, .. }) = node
3205                        else {
3206                            unreachable!()
3207                        };
3208
3209                        let lifetime = if input.metadata().location_id.is_top_level() {
3210                            quote!('static)
3211                        } else {
3212                            quote!('tick)
3213                        };
3214
3215                        let input_ident = ident_stack.pop().unwrap();
3216
3217                        let (HydroNode::Fold { init, acc, .. }
3218                        | HydroNode::FoldKeyed { init, acc, .. }
3219                        | HydroNode::Scan { init, acc, .. }) = &*node
3220                        else {
3221                            unreachable!()
3222                        };
3223
3224                        let fold_ident =
3225                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3226
3227                        match builders_or_callback {
3228                            BuildersOrCallback::Builders(graph_builders) => {
3229                                if matches!(node, HydroNode::Fold { .. })
3230                                    && node.metadata().location_id.is_top_level()
3231                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3232                                    && graph_builders.singleton_intermediates()
3233                                    && !node.metadata().collection_kind.is_bounded()
3234                                {
3235                                    let builder = graph_builders.get_dfir_mut(&out_location);
3236
3237                                    let acc: syn::Expr = parse_quote!({
3238                                        let mut __inner = #acc;
3239                                        move |__state, __value| {
3240                                            __inner(__state, __value);
3241                                            Some(__state.clone())
3242                                        }
3243                                    });
3244
3245                                    builder.add_dfir(
3246                                        parse_quote! {
3247                                            source_iter([(#init)()]) -> [0]#fold_ident;
3248                                            #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
3249                                            #fold_ident = chain();
3250                                        },
3251                                        None,
3252                                        Some(&next_stmt_id.to_string()),
3253                                    );
3254                                } else if matches!(node, HydroNode::FoldKeyed { .. })
3255                                    && node.metadata().location_id.is_top_level()
3256                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3257                                    && graph_builders.singleton_intermediates()
3258                                    && !node.metadata().collection_kind.is_bounded()
3259                                {
3260                                    let builder = graph_builders.get_dfir_mut(&out_location);
3261
3262                                    let acc: syn::Expr = parse_quote!({
3263                                        let mut __init = #init;
3264                                        let mut __inner = #acc;
3265                                        move |__state, __kv: (_, _)| {
3266                                            // TODO(shadaj): we can avoid the clone when the entry exists
3267                                            let __state = __state
3268                                                .entry(::std::clone::Clone::clone(&__kv.0))
3269                                                .or_insert_with(|| (__init)());
3270                                            __inner(__state, __kv.1);
3271                                            Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
3272                                        }
3273                                    });
3274
3275                                    builder.add_dfir(
3276                                        parse_quote! {
3277                                            #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
3278                                        },
3279                                        None,
3280                                        Some(&next_stmt_id.to_string()),
3281                                    );
3282                                } else {
3283                                    let builder = graph_builders.get_dfir_mut(&out_location);
3284                                    builder.add_dfir(
3285                                        parse_quote! {
3286                                            #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
3287                                        },
3288                                        None,
3289                                        Some(&next_stmt_id.to_string()),
3290                                    );
3291                                }
3292                            }
3293                            BuildersOrCallback::Callback(_, node_callback) => {
3294                                node_callback(node, next_stmt_id);
3295                            }
3296                        }
3297
3298                        *next_stmt_id += 1;
3299
3300                        ident_stack.push(fold_ident);
3301                    }
3302
3303                    HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3304                        let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
3305                            if input.metadata().location_id.is_top_level()
3306                                && input.metadata().collection_kind.is_bounded()
3307                            {
3308                                parse_quote!(reduce_no_replay)
3309                            } else {
3310                                parse_quote!(reduce)
3311                            }
3312                        } else if let HydroNode::ReduceKeyed { input, .. } = node {
3313                            if input.metadata().location_id.is_top_level()
3314                                && input.metadata().collection_kind.is_bounded()
3315                            {
3316                                todo!(
3317                                    "Calling keyed reduce on a top-level bounded collection is not supported"
3318                                )
3319                            } else {
3320                                parse_quote!(reduce_keyed)
3321                            }
3322                        } else {
3323                            unreachable!()
3324                        };
3325
3326                        let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
3327                        else {
3328                            unreachable!()
3329                        };
3330
3331                        let lifetime = if input.metadata().location_id.is_top_level() {
3332                            quote!('static)
3333                        } else {
3334                            quote!('tick)
3335                        };
3336
3337                        let input_ident = ident_stack.pop().unwrap();
3338
3339                        let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
3340                        else {
3341                            unreachable!()
3342                        };
3343
3344                        let reduce_ident =
3345                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3346
3347                        match builders_or_callback {
3348                            BuildersOrCallback::Builders(graph_builders) => {
3349                                if matches!(node, HydroNode::Reduce { .. })
3350                                    && node.metadata().location_id.is_top_level()
3351                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3352                                    && graph_builders.singleton_intermediates()
3353                                    && !node.metadata().collection_kind.is_bounded()
3354                                {
3355                                    todo!(
3356                                        "Reduce with optional intermediates is not yet supported in simulator"
3357                                    );
3358                                } else if matches!(node, HydroNode::ReduceKeyed { .. })
3359                                    && node.metadata().location_id.is_top_level()
3360                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3361                                    && graph_builders.singleton_intermediates()
3362                                    && !node.metadata().collection_kind.is_bounded()
3363                                {
3364                                    todo!(
3365                                        "Reduce keyed with optional intermediates is not yet supported in simulator"
3366                                    );
3367                                } else {
3368                                    let builder = graph_builders.get_dfir_mut(&out_location);
3369                                    builder.add_dfir(
3370                                        parse_quote! {
3371                                            #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
3372                                        },
3373                                        None,
3374                                        Some(&next_stmt_id.to_string()),
3375                                    );
3376                                }
3377                            }
3378                            BuildersOrCallback::Callback(_, node_callback) => {
3379                                node_callback(node, next_stmt_id);
3380                            }
3381                        }
3382
3383                        *next_stmt_id += 1;
3384
3385                        ident_stack.push(reduce_ident);
3386                    }
3387
3388                    HydroNode::ReduceKeyedWatermark {
3389                        f,
3390                        input,
3391                        metadata,
3392                        ..
3393                    } => {
3394                        let lifetime = if input.metadata().location_id.is_top_level() {
3395                            quote!('static)
3396                        } else {
3397                            quote!('tick)
3398                        };
3399
3400                        // watermark is processed second, so it's on top
3401                        let watermark_ident = ident_stack.pop().unwrap();
3402                        let input_ident = ident_stack.pop().unwrap();
3403
3404                        let chain_ident = syn::Ident::new(
3405                            &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
3406                            Span::call_site(),
3407                        );
3408
3409                        let fold_ident =
3410                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3411
3412                        let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
3413                            && input.metadata().collection_kind.is_bounded()
3414                        {
3415                            parse_quote!(fold_no_replay)
3416                        } else {
3417                            parse_quote!(fold)
3418                        };
3419
3420                        match builders_or_callback {
3421                            BuildersOrCallback::Builders(graph_builders) => {
3422                                if metadata.location_id.is_top_level()
3423                                    && !(matches!(metadata.location_id, LocationId::Atomic(_)))
3424                                    && graph_builders.singleton_intermediates()
3425                                    && !metadata.collection_kind.is_bounded()
3426                                {
3427                                    todo!(
3428                                        "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
3429                                    )
3430                                } else {
3431                                    let builder = graph_builders.get_dfir_mut(&out_location);
3432                                    builder.add_dfir(
3433                                        parse_quote! {
3434                                            #chain_ident = chain();
3435                                            #input_ident
3436                                                -> map(|x| (Some(x), None))
3437                                                -> [0]#chain_ident;
3438                                            #watermark_ident
3439                                                -> map(|watermark| (None, Some(watermark)))
3440                                                -> [1]#chain_ident;
3441
3442                                            #fold_ident = #chain_ident
3443                                                -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
3444                                                    let __reduce_keyed_fn = #f;
3445                                                    move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
3446                                                        if let Some((k, v)) = opt_payload {
3447                                                            if let Some(curr_watermark) = *opt_curr_watermark {
3448                                                                if k < curr_watermark {
3449                                                                    return;
3450                                                                }
3451                                                            }
3452                                                            match map.entry(k) {
3453                                                                ::std::collections::hash_map::Entry::Vacant(e) => {
3454                                                                    e.insert(v);
3455                                                                }
3456                                                                ::std::collections::hash_map::Entry::Occupied(mut e) => {
3457                                                                    __reduce_keyed_fn(e.get_mut(), v);
3458                                                                }
3459                                                            }
3460                                                        } else {
3461                                                            let watermark = opt_watermark.unwrap();
3462                                                            if let Some(curr_watermark) = *opt_curr_watermark {
3463                                                                if watermark <= curr_watermark {
3464                                                                    return;
3465                                                                }
3466                                                            }
3467                                                            *opt_curr_watermark = opt_watermark;
3468                                                            map.retain(|k, _| *k >= watermark);
3469                                                        }
3470                                                    }
3471                                                })
3472                                                -> flat_map(|(map, _curr_watermark)| map);
3473                                        },
3474                                        None,
3475                                        Some(&next_stmt_id.to_string()),
3476                                    );
3477                                }
3478                            }
3479                            BuildersOrCallback::Callback(_, node_callback) => {
3480                                node_callback(node, next_stmt_id);
3481                            }
3482                        }
3483
3484                        *next_stmt_id += 1;
3485
3486                        ident_stack.push(fold_ident);
3487                    }
3488
3489                    HydroNode::Network {
3490                        networking_info,
3491                        serialize_fn: serialize_pipeline,
3492                        instantiate_fn,
3493                        deserialize_fn: deserialize_pipeline,
3494                        input,
3495                        ..
3496                    } => {
3497                        let input_ident = ident_stack.pop().unwrap();
3498
3499                        let receiver_stream_ident =
3500                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3501
3502                        match builders_or_callback {
3503                            BuildersOrCallback::Builders(graph_builders) => {
3504                                let (sink_expr, source_expr) = match instantiate_fn {
3505                                    DebugInstantiate::Building => (
3506                                        syn::parse_quote!(DUMMY_SINK),
3507                                        syn::parse_quote!(DUMMY_SOURCE),
3508                                    ),
3509
3510                                    DebugInstantiate::Finalized(finalized) => {
3511                                        (finalized.sink.clone(), finalized.source.clone())
3512                                    }
3513                                };
3514
3515                                graph_builders.create_network(
3516                                    &input.metadata().location_id,
3517                                    &out_location,
3518                                    input_ident,
3519                                    &receiver_stream_ident,
3520                                    serialize_pipeline.as_ref(),
3521                                    sink_expr,
3522                                    source_expr,
3523                                    deserialize_pipeline.as_ref(),
3524                                    *next_stmt_id,
3525                                    networking_info,
3526                                );
3527                            }
3528                            BuildersOrCallback::Callback(_, node_callback) => {
3529                                node_callback(node, next_stmt_id);
3530                            }
3531                        }
3532
3533                        *next_stmt_id += 1;
3534
3535                        ident_stack.push(receiver_stream_ident);
3536                    }
3537
3538                    HydroNode::ExternalInput {
3539                        instantiate_fn,
3540                        deserialize_fn: deserialize_pipeline,
3541                        ..
3542                    } => {
3543                        let receiver_stream_ident =
3544                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3545
3546                        match builders_or_callback {
3547                            BuildersOrCallback::Builders(graph_builders) => {
3548                                let (_, source_expr) = match instantiate_fn {
3549                                    DebugInstantiate::Building => (
3550                                        syn::parse_quote!(DUMMY_SINK),
3551                                        syn::parse_quote!(DUMMY_SOURCE),
3552                                    ),
3553
3554                                    DebugInstantiate::Finalized(finalized) => {
3555                                        (finalized.sink.clone(), finalized.source.clone())
3556                                    }
3557                                };
3558
3559                                graph_builders.create_external_source(
3560                                    &out_location,
3561                                    source_expr,
3562                                    &receiver_stream_ident,
3563                                    deserialize_pipeline.as_ref(),
3564                                    *next_stmt_id,
3565                                );
3566                            }
3567                            BuildersOrCallback::Callback(_, node_callback) => {
3568                                node_callback(node, next_stmt_id);
3569                            }
3570                        }
3571
3572                        *next_stmt_id += 1;
3573
3574                        ident_stack.push(receiver_stream_ident);
3575                    }
3576
3577                    HydroNode::Counter {
3578                        tag,
3579                        duration,
3580                        prefix,
3581                        ..
3582                    } => {
3583                        let input_ident = ident_stack.pop().unwrap();
3584
3585                        let counter_ident =
3586                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3587
3588                        match builders_or_callback {
3589                            BuildersOrCallback::Builders(graph_builders) => {
3590                                let arg = format!("{}({})", prefix, tag);
3591                                let builder = graph_builders.get_dfir_mut(&out_location);
3592                                builder.add_dfir(
3593                                    parse_quote! {
3594                                        #counter_ident = #input_ident -> _counter(#arg, #duration);
3595                                    },
3596                                    None,
3597                                    Some(&next_stmt_id.to_string()),
3598                                );
3599                            }
3600                            BuildersOrCallback::Callback(_, node_callback) => {
3601                                node_callback(node, next_stmt_id);
3602                            }
3603                        }
3604
3605                        *next_stmt_id += 1;
3606
3607                        ident_stack.push(counter_ident);
3608                    }
3609                }
3610            },
3611            seen_tees,
3612            false,
3613        );
3614
3615        ident_stack
3616            .pop()
3617            .expect("ident_stack should have exactly one element after traversal")
3618    }
3619
3620    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
3621        match self {
3622            HydroNode::Placeholder => {
3623                panic!()
3624            }
3625            HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
3626            HydroNode::Source { source, .. } => match source {
3627                HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
3628                HydroSource::ExternalNetwork()
3629                | HydroSource::Spin()
3630                | HydroSource::ClusterMembers(_, _)
3631                | HydroSource::Embedded(_) => {} // TODO: what goes here?
3632            },
3633            HydroNode::SingletonSource { value, .. } => {
3634                transform(value);
3635            }
3636            HydroNode::CycleSource { .. }
3637            | HydroNode::Tee { .. }
3638            | HydroNode::YieldConcat { .. }
3639            | HydroNode::BeginAtomic { .. }
3640            | HydroNode::EndAtomic { .. }
3641            | HydroNode::Batch { .. }
3642            | HydroNode::Chain { .. }
3643            | HydroNode::ChainFirst { .. }
3644            | HydroNode::CrossProduct { .. }
3645            | HydroNode::CrossSingleton { .. }
3646            | HydroNode::ResolveFutures { .. }
3647            | HydroNode::ResolveFuturesOrdered { .. }
3648            | HydroNode::Join { .. }
3649            | HydroNode::Difference { .. }
3650            | HydroNode::AntiJoin { .. }
3651            | HydroNode::DeferTick { .. }
3652            | HydroNode::Enumerate { .. }
3653            | HydroNode::Unique { .. }
3654            | HydroNode::Sort { .. } => {}
3655            HydroNode::Map { f, .. }
3656            | HydroNode::FlatMap { f, .. }
3657            | HydroNode::Filter { f, .. }
3658            | HydroNode::FilterMap { f, .. }
3659            | HydroNode::Inspect { f, .. }
3660            | HydroNode::Reduce { f, .. }
3661            | HydroNode::ReduceKeyed { f, .. }
3662            | HydroNode::ReduceKeyedWatermark { f, .. } => {
3663                transform(f);
3664            }
3665            HydroNode::Fold { init, acc, .. }
3666            | HydroNode::Scan { init, acc, .. }
3667            | HydroNode::FoldKeyed { init, acc, .. } => {
3668                transform(init);
3669                transform(acc);
3670            }
3671            HydroNode::Network {
3672                serialize_fn,
3673                deserialize_fn,
3674                ..
3675            } => {
3676                if let Some(serialize_fn) = serialize_fn {
3677                    transform(serialize_fn);
3678                }
3679                if let Some(deserialize_fn) = deserialize_fn {
3680                    transform(deserialize_fn);
3681                }
3682            }
3683            HydroNode::ExternalInput { deserialize_fn, .. } => {
3684                if let Some(deserialize_fn) = deserialize_fn {
3685                    transform(deserialize_fn);
3686                }
3687            }
3688            HydroNode::Counter { duration, .. } => {
3689                transform(duration);
3690            }
3691        }
3692    }
3693
3694    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
3695        &self.metadata().op
3696    }
3697
3698    pub fn metadata(&self) -> &HydroIrMetadata {
3699        match self {
3700            HydroNode::Placeholder => {
3701                panic!()
3702            }
3703            HydroNode::Cast { metadata, .. } => metadata,
3704            HydroNode::ObserveNonDet { metadata, .. } => metadata,
3705            HydroNode::Source { metadata, .. } => metadata,
3706            HydroNode::SingletonSource { metadata, .. } => metadata,
3707            HydroNode::CycleSource { metadata, .. } => metadata,
3708            HydroNode::Tee { metadata, .. } => metadata,
3709            HydroNode::YieldConcat { metadata, .. } => metadata,
3710            HydroNode::BeginAtomic { metadata, .. } => metadata,
3711            HydroNode::EndAtomic { metadata, .. } => metadata,
3712            HydroNode::Batch { metadata, .. } => metadata,
3713            HydroNode::Chain { metadata, .. } => metadata,
3714            HydroNode::ChainFirst { metadata, .. } => metadata,
3715            HydroNode::CrossProduct { metadata, .. } => metadata,
3716            HydroNode::CrossSingleton { metadata, .. } => metadata,
3717            HydroNode::Join { metadata, .. } => metadata,
3718            HydroNode::Difference { metadata, .. } => metadata,
3719            HydroNode::AntiJoin { metadata, .. } => metadata,
3720            HydroNode::ResolveFutures { metadata, .. } => metadata,
3721            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3722            HydroNode::Map { metadata, .. } => metadata,
3723            HydroNode::FlatMap { metadata, .. } => metadata,
3724            HydroNode::Filter { metadata, .. } => metadata,
3725            HydroNode::FilterMap { metadata, .. } => metadata,
3726            HydroNode::DeferTick { metadata, .. } => metadata,
3727            HydroNode::Enumerate { metadata, .. } => metadata,
3728            HydroNode::Inspect { metadata, .. } => metadata,
3729            HydroNode::Unique { metadata, .. } => metadata,
3730            HydroNode::Sort { metadata, .. } => metadata,
3731            HydroNode::Scan { metadata, .. } => metadata,
3732            HydroNode::Fold { metadata, .. } => metadata,
3733            HydroNode::FoldKeyed { metadata, .. } => metadata,
3734            HydroNode::Reduce { metadata, .. } => metadata,
3735            HydroNode::ReduceKeyed { metadata, .. } => metadata,
3736            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3737            HydroNode::ExternalInput { metadata, .. } => metadata,
3738            HydroNode::Network { metadata, .. } => metadata,
3739            HydroNode::Counter { metadata, .. } => metadata,
3740        }
3741    }
3742
3743    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
3744        &mut self.metadata_mut().op
3745    }
3746
3747    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
3748        match self {
3749            HydroNode::Placeholder => {
3750                panic!()
3751            }
3752            HydroNode::Cast { metadata, .. } => metadata,
3753            HydroNode::ObserveNonDet { metadata, .. } => metadata,
3754            HydroNode::Source { metadata, .. } => metadata,
3755            HydroNode::SingletonSource { metadata, .. } => metadata,
3756            HydroNode::CycleSource { metadata, .. } => metadata,
3757            HydroNode::Tee { metadata, .. } => metadata,
3758            HydroNode::YieldConcat { metadata, .. } => metadata,
3759            HydroNode::BeginAtomic { metadata, .. } => metadata,
3760            HydroNode::EndAtomic { metadata, .. } => metadata,
3761            HydroNode::Batch { metadata, .. } => metadata,
3762            HydroNode::Chain { metadata, .. } => metadata,
3763            HydroNode::ChainFirst { metadata, .. } => metadata,
3764            HydroNode::CrossProduct { metadata, .. } => metadata,
3765            HydroNode::CrossSingleton { metadata, .. } => metadata,
3766            HydroNode::Join { metadata, .. } => metadata,
3767            HydroNode::Difference { metadata, .. } => metadata,
3768            HydroNode::AntiJoin { metadata, .. } => metadata,
3769            HydroNode::ResolveFutures { metadata, .. } => metadata,
3770            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3771            HydroNode::Map { metadata, .. } => metadata,
3772            HydroNode::FlatMap { metadata, .. } => metadata,
3773            HydroNode::Filter { metadata, .. } => metadata,
3774            HydroNode::FilterMap { metadata, .. } => metadata,
3775            HydroNode::DeferTick { metadata, .. } => metadata,
3776            HydroNode::Enumerate { metadata, .. } => metadata,
3777            HydroNode::Inspect { metadata, .. } => metadata,
3778            HydroNode::Unique { metadata, .. } => metadata,
3779            HydroNode::Sort { metadata, .. } => metadata,
3780            HydroNode::Scan { metadata, .. } => metadata,
3781            HydroNode::Fold { metadata, .. } => metadata,
3782            HydroNode::FoldKeyed { metadata, .. } => metadata,
3783            HydroNode::Reduce { metadata, .. } => metadata,
3784            HydroNode::ReduceKeyed { metadata, .. } => metadata,
3785            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3786            HydroNode::ExternalInput { metadata, .. } => metadata,
3787            HydroNode::Network { metadata, .. } => metadata,
3788            HydroNode::Counter { metadata, .. } => metadata,
3789        }
3790    }
3791
3792    pub fn input(&self) -> Vec<&HydroNode> {
3793        match self {
3794            HydroNode::Placeholder => {
3795                panic!()
3796            }
3797            HydroNode::Source { .. }
3798            | HydroNode::SingletonSource { .. }
3799            | HydroNode::ExternalInput { .. }
3800            | HydroNode::CycleSource { .. }
3801            | HydroNode::Tee { .. } => {
3802                // Tee should find its input in separate special ways
3803                vec![]
3804            }
3805            HydroNode::Cast { inner, .. }
3806            | HydroNode::ObserveNonDet { inner, .. }
3807            | HydroNode::YieldConcat { inner, .. }
3808            | HydroNode::BeginAtomic { inner, .. }
3809            | HydroNode::EndAtomic { inner, .. }
3810            | HydroNode::Batch { inner, .. } => {
3811                vec![inner]
3812            }
3813            HydroNode::Chain { first, second, .. } => {
3814                vec![first, second]
3815            }
3816            HydroNode::ChainFirst { first, second, .. } => {
3817                vec![first, second]
3818            }
3819            HydroNode::CrossProduct { left, right, .. }
3820            | HydroNode::CrossSingleton { left, right, .. }
3821            | HydroNode::Join { left, right, .. } => {
3822                vec![left, right]
3823            }
3824            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
3825                vec![pos, neg]
3826            }
3827            HydroNode::Map { input, .. }
3828            | HydroNode::FlatMap { input, .. }
3829            | HydroNode::Filter { input, .. }
3830            | HydroNode::FilterMap { input, .. }
3831            | HydroNode::Sort { input, .. }
3832            | HydroNode::DeferTick { input, .. }
3833            | HydroNode::Enumerate { input, .. }
3834            | HydroNode::Inspect { input, .. }
3835            | HydroNode::Unique { input, .. }
3836            | HydroNode::Network { input, .. }
3837            | HydroNode::Counter { input, .. }
3838            | HydroNode::ResolveFutures { input, .. }
3839            | HydroNode::ResolveFuturesOrdered { input, .. }
3840            | HydroNode::Fold { input, .. }
3841            | HydroNode::FoldKeyed { input, .. }
3842            | HydroNode::Reduce { input, .. }
3843            | HydroNode::ReduceKeyed { input, .. }
3844            | HydroNode::Scan { input, .. } => {
3845                vec![input]
3846            }
3847            HydroNode::ReduceKeyedWatermark {
3848                input, watermark, ..
3849            } => {
3850                vec![input, watermark]
3851            }
3852        }
3853    }
3854
3855    pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
3856        self.input()
3857            .iter()
3858            .map(|input_node| input_node.metadata())
3859            .collect()
3860    }
3861
3862    pub fn print_root(&self) -> String {
3863        match self {
3864            HydroNode::Placeholder => {
3865                panic!()
3866            }
3867            HydroNode::Cast { .. } => "Cast()".to_owned(),
3868            HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
3869            HydroNode::Source { source, .. } => format!("Source({:?})", source),
3870            HydroNode::SingletonSource {
3871                value,
3872                first_tick_only,
3873                ..
3874            } => format!(
3875                "SingletonSource({:?}, first_tick_only={})",
3876                value, first_tick_only
3877            ),
3878            HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
3879            HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
3880            HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
3881            HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
3882            HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
3883            HydroNode::Batch { .. } => "Batch()".to_owned(),
3884            HydroNode::Chain { first, second, .. } => {
3885                format!("Chain({}, {})", first.print_root(), second.print_root())
3886            }
3887            HydroNode::ChainFirst { first, second, .. } => {
3888                format!(
3889                    "ChainFirst({}, {})",
3890                    first.print_root(),
3891                    second.print_root()
3892                )
3893            }
3894            HydroNode::CrossProduct { left, right, .. } => {
3895                format!(
3896                    "CrossProduct({}, {})",
3897                    left.print_root(),
3898                    right.print_root()
3899                )
3900            }
3901            HydroNode::CrossSingleton { left, right, .. } => {
3902                format!(
3903                    "CrossSingleton({}, {})",
3904                    left.print_root(),
3905                    right.print_root()
3906                )
3907            }
3908            HydroNode::Join { left, right, .. } => {
3909                format!("Join({}, {})", left.print_root(), right.print_root())
3910            }
3911            HydroNode::Difference { pos, neg, .. } => {
3912                format!("Difference({}, {})", pos.print_root(), neg.print_root())
3913            }
3914            HydroNode::AntiJoin { pos, neg, .. } => {
3915                format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
3916            }
3917            HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
3918            HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
3919            HydroNode::Map { f, .. } => format!("Map({:?})", f),
3920            HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
3921            HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
3922            HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
3923            HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
3924            HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
3925            HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
3926            HydroNode::Unique { .. } => "Unique()".to_owned(),
3927            HydroNode::Sort { .. } => "Sort()".to_owned(),
3928            HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
3929            HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
3930            HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
3931            HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
3932            HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
3933            HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
3934            HydroNode::Network { .. } => "Network()".to_owned(),
3935            HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
3936            HydroNode::Counter { tag, duration, .. } => {
3937                format!("Counter({:?}, {:?})", tag, duration)
3938            }
3939        }
3940    }
3941}
3942
3943#[cfg(feature = "build")]
3944fn instantiate_network<'a, D>(
3945    env: &mut D::InstantiateEnv,
3946    from_location: &LocationId,
3947    to_location: &LocationId,
3948    processes: &SparseSecondaryMap<LocationKey, D::Process>,
3949    clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
3950    name: Option<&str>,
3951    networking_info: &crate::networking::NetworkingInfo,
3952) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
3953where
3954    D: Deploy<'a>,
3955{
3956    let ((sink, source), connect_fn) = match (from_location, to_location) {
3957        (&LocationId::Process(from), &LocationId::Process(to)) => {
3958            let from_node = processes
3959                .get(from)
3960                .unwrap_or_else(|| {
3961                    panic!("A process used in the graph was not instantiated: {}", from)
3962                })
3963                .clone();
3964            let to_node = processes
3965                .get(to)
3966                .unwrap_or_else(|| {
3967                    panic!("A process used in the graph was not instantiated: {}", to)
3968                })
3969                .clone();
3970
3971            let sink_port = from_node.next_port();
3972            let source_port = to_node.next_port();
3973
3974            (
3975                D::o2o_sink_source(
3976                    env,
3977                    &from_node,
3978                    &sink_port,
3979                    &to_node,
3980                    &source_port,
3981                    name,
3982                    networking_info,
3983                ),
3984                D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
3985            )
3986        }
3987        (&LocationId::Process(from), &LocationId::Cluster(to)) => {
3988            let from_node = processes
3989                .get(from)
3990                .unwrap_or_else(|| {
3991                    panic!("A process used in the graph was not instantiated: {}", from)
3992                })
3993                .clone();
3994            let to_node = clusters
3995                .get(to)
3996                .unwrap_or_else(|| {
3997                    panic!("A cluster used in the graph was not instantiated: {}", to)
3998                })
3999                .clone();
4000
4001            let sink_port = from_node.next_port();
4002            let source_port = to_node.next_port();
4003
4004            (
4005                D::o2m_sink_source(
4006                    env,
4007                    &from_node,
4008                    &sink_port,
4009                    &to_node,
4010                    &source_port,
4011                    name,
4012                    networking_info,
4013                ),
4014                D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
4015            )
4016        }
4017        (&LocationId::Cluster(from), &LocationId::Process(to)) => {
4018            let from_node = clusters
4019                .get(from)
4020                .unwrap_or_else(|| {
4021                    panic!("A cluster used in the graph was not instantiated: {}", from)
4022                })
4023                .clone();
4024            let to_node = processes
4025                .get(to)
4026                .unwrap_or_else(|| {
4027                    panic!("A process used in the graph was not instantiated: {}", to)
4028                })
4029                .clone();
4030
4031            let sink_port = from_node.next_port();
4032            let source_port = to_node.next_port();
4033
4034            (
4035                D::m2o_sink_source(
4036                    env,
4037                    &from_node,
4038                    &sink_port,
4039                    &to_node,
4040                    &source_port,
4041                    name,
4042                    networking_info,
4043                ),
4044                D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
4045            )
4046        }
4047        (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
4048            let from_node = clusters
4049                .get(from)
4050                .unwrap_or_else(|| {
4051                    panic!("A cluster used in the graph was not instantiated: {}", from)
4052                })
4053                .clone();
4054            let to_node = clusters
4055                .get(to)
4056                .unwrap_or_else(|| {
4057                    panic!("A cluster used in the graph was not instantiated: {}", to)
4058                })
4059                .clone();
4060
4061            let sink_port = from_node.next_port();
4062            let source_port = to_node.next_port();
4063
4064            (
4065                D::m2m_sink_source(
4066                    env,
4067                    &from_node,
4068                    &sink_port,
4069                    &to_node,
4070                    &source_port,
4071                    name,
4072                    networking_info,
4073                ),
4074                D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
4075            )
4076        }
4077        (LocationId::Tick(_, _), _) => panic!(),
4078        (_, LocationId::Tick(_, _)) => panic!(),
4079        (LocationId::Atomic(_), _) => panic!(),
4080        (_, LocationId::Atomic(_)) => panic!(),
4081    };
4082    (sink, source, connect_fn)
4083}
4084
4085#[cfg(test)]
4086mod test {
4087    use std::mem::size_of;
4088
4089    use stageleft::{QuotedWithContext, q};
4090
4091    use super::*;
4092
4093    #[test]
4094    #[cfg_attr(
4095        not(feature = "build"),
4096        ignore = "expects inclusion of feature-gated fields"
4097    )]
4098    fn hydro_node_size() {
4099        assert_eq!(size_of::<HydroNode>(), 248);
4100    }
4101
4102    #[test]
4103    #[cfg_attr(
4104        not(feature = "build"),
4105        ignore = "expects inclusion of feature-gated fields"
4106    )]
4107    fn hydro_root_size() {
4108        assert_eq!(size_of::<HydroRoot>(), 136);
4109    }
4110
4111    #[test]
4112    fn test_simplify_q_macro_basic() {
4113        // Test basic non-q! expression
4114        let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
4115        let result = simplify_q_macro(simple_expr.clone());
4116        assert_eq!(result, simple_expr);
4117    }
4118
4119    #[test]
4120    fn test_simplify_q_macro_actual_stageleft_call() {
4121        // Test a simplified version of what a real stageleft call might look like
4122        let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
4123        let result = simplify_q_macro(stageleft_call);
4124        // This should be processed by our visitor and simplified to q!(...)
4125        // since we detect the stageleft::runtime_support::fn_* pattern
4126        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4127    }
4128
4129    #[test]
4130    fn test_closure_no_pipe_at_start() {
4131        // Test a closure that does not start with a pipe
4132        let stageleft_call = q!({
4133            let foo = 123;
4134            move |b: usize| b + foo
4135        })
4136        .splice_fn1_ctx(&());
4137        let result = simplify_q_macro(stageleft_call);
4138        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4139    }
4140}