1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23use syn::visit::{self, Visit};
24use syn::visit_mut::VisitMut;
25
26use crate::compile::builder::{CycleId, ExternalPortId};
27#[cfg(feature = "build")]
28use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
29use crate::location::dynamic::LocationId;
30use crate::location::{LocationKey, NetworkHint};
31
32pub mod backtrace;
33use backtrace::Backtrace;
34
35#[derive(Clone, Hash)]
39pub struct DebugExpr(pub Box<syn::Expr>);
40
41impl From<syn::Expr> for DebugExpr {
42 fn from(expr: syn::Expr) -> Self {
43 Self(Box::new(expr))
44 }
45}
46
47impl Deref for DebugExpr {
48 type Target = syn::Expr;
49
50 fn deref(&self) -> &Self::Target {
51 &self.0
52 }
53}
54
55impl ToTokens for DebugExpr {
56 fn to_tokens(&self, tokens: &mut TokenStream) {
57 self.0.to_tokens(tokens);
58 }
59}
60
61impl Debug for DebugExpr {
62 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 write!(f, "{}", self.0.to_token_stream())
64 }
65}
66
67impl Display for DebugExpr {
68 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69 let original = self.0.as_ref().clone();
70 let simplified = simplify_q_macro(original);
71
72 write!(f, "q!({})", quote::quote!(#simplified))
75 }
76}
77
78fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
80 let mut simplifier = QMacroSimplifier::new();
83 simplifier.visit_expr_mut(&mut expr);
84
85 if let Some(simplified) = simplifier.simplified_result {
87 simplified
88 } else {
89 expr
90 }
91}
92
93#[derive(Default)]
95pub struct QMacroSimplifier {
96 pub simplified_result: Option<syn::Expr>,
97}
98
99impl QMacroSimplifier {
100 pub fn new() -> Self {
101 Self::default()
102 }
103}
104
105impl VisitMut for QMacroSimplifier {
106 fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
107 if self.simplified_result.is_some() {
109 return;
110 }
111
112 if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
113 && self.is_stageleft_runtime_support_call(&path_expr.path)
115 && let Some(closure) = self.extract_closure_from_args(&call.args)
117 {
118 self.simplified_result = Some(closure);
119 return;
120 }
121
122 syn::visit_mut::visit_expr_mut(self, expr);
125 }
126}
127
128impl QMacroSimplifier {
129 fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
130 if let Some(last_segment) = path.segments.last() {
132 let fn_name = last_segment.ident.to_string();
133 fn_name.contains("_type_hint")
135 && path.segments.len() > 2
136 && path.segments[0].ident == "stageleft"
137 && path.segments[1].ident == "runtime_support"
138 } else {
139 false
140 }
141 }
142
143 fn extract_closure_from_args(
144 &self,
145 args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
146 ) -> Option<syn::Expr> {
147 for arg in args {
149 if let syn::Expr::Closure(_) = arg {
150 return Some(arg.clone());
151 }
152 if let Some(closure_expr) = self.find_closure_in_expr(arg) {
154 return Some(closure_expr);
155 }
156 }
157 None
158 }
159
160 fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
161 let mut visitor = ClosureFinder {
162 found_closure: None,
163 prefer_inner_blocks: true,
164 };
165 visitor.visit_expr(expr);
166 visitor.found_closure
167 }
168}
169
170struct ClosureFinder {
172 found_closure: Option<syn::Expr>,
173 prefer_inner_blocks: bool,
174}
175
176impl<'ast> Visit<'ast> for ClosureFinder {
177 fn visit_expr(&mut self, expr: &'ast syn::Expr) {
178 if self.found_closure.is_some() {
180 return;
181 }
182
183 match expr {
184 syn::Expr::Closure(_) => {
185 self.found_closure = Some(expr.clone());
186 }
187 syn::Expr::Block(block) if self.prefer_inner_blocks => {
188 for stmt in &block.block.stmts {
190 if let syn::Stmt::Expr(stmt_expr, _) = stmt
191 && let syn::Expr::Block(_) = stmt_expr
192 {
193 let mut inner_visitor = ClosureFinder {
195 found_closure: None,
196 prefer_inner_blocks: false, };
198 inner_visitor.visit_expr(stmt_expr);
199 if inner_visitor.found_closure.is_some() {
200 self.found_closure = Some(stmt_expr.clone());
202 return;
203 }
204 }
205 }
206
207 visit::visit_expr(self, expr);
209
210 if self.found_closure.is_some() {
213 }
215 }
216 _ => {
217 visit::visit_expr(self, expr);
219 }
220 }
221 }
222}
223
224#[derive(Clone, PartialEq, Eq, Hash)]
228pub struct DebugType(pub Box<syn::Type>);
229
230impl From<syn::Type> for DebugType {
231 fn from(t: syn::Type) -> Self {
232 Self(Box::new(t))
233 }
234}
235
236impl Deref for DebugType {
237 type Target = syn::Type;
238
239 fn deref(&self) -> &Self::Target {
240 &self.0
241 }
242}
243
244impl ToTokens for DebugType {
245 fn to_tokens(&self, tokens: &mut TokenStream) {
246 self.0.to_tokens(tokens);
247 }
248}
249
250impl Debug for DebugType {
251 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252 write!(f, "{}", self.0.to_token_stream())
253 }
254}
255
256pub enum DebugInstantiate {
257 Building,
258 Finalized(Box<DebugInstantiateFinalized>),
259}
260
261#[cfg_attr(
262 not(feature = "build"),
263 expect(
264 dead_code,
265 reason = "sink, source unused without `feature = \"build\"`."
266 )
267)]
268pub struct DebugInstantiateFinalized {
269 sink: syn::Expr,
270 source: syn::Expr,
271 connect_fn: Option<Box<dyn FnOnce()>>,
272}
273
274impl From<DebugInstantiateFinalized> for DebugInstantiate {
275 fn from(f: DebugInstantiateFinalized) -> Self {
276 Self::Finalized(Box::new(f))
277 }
278}
279
280impl Debug for DebugInstantiate {
281 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
282 write!(f, "<network instantiate>")
283 }
284}
285
286impl Hash for DebugInstantiate {
287 fn hash<H: Hasher>(&self, _state: &mut H) {
288 }
290}
291
292impl Clone for DebugInstantiate {
293 fn clone(&self) -> Self {
294 match self {
295 DebugInstantiate::Building => DebugInstantiate::Building,
296 DebugInstantiate::Finalized(_) => {
297 panic!("DebugInstantiate::Finalized should not be cloned")
298 }
299 }
300 }
301}
302
303#[derive(Debug, Hash, Clone)]
312pub enum ClusterMembersState {
313 Uninit,
315 Stream(DebugExpr),
318 Tee(LocationId, LocationId),
322}
323
324#[derive(Debug, Hash, Clone)]
326pub enum HydroSource {
327 Stream(DebugExpr),
328 ExternalNetwork(),
329 Iter(DebugExpr),
330 Spin(),
331 ClusterMembers(LocationId, ClusterMembersState),
332 Embedded(syn::Ident),
333}
334
335#[cfg(feature = "build")]
336pub trait DfirBuilder {
342 fn singleton_intermediates(&self) -> bool;
344
345 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
347
348 fn batch(
349 &mut self,
350 in_ident: syn::Ident,
351 in_location: &LocationId,
352 in_kind: &CollectionKind,
353 out_ident: &syn::Ident,
354 out_location: &LocationId,
355 op_meta: &HydroIrOpMetadata,
356 );
357 fn yield_from_tick(
358 &mut self,
359 in_ident: syn::Ident,
360 in_location: &LocationId,
361 in_kind: &CollectionKind,
362 out_ident: &syn::Ident,
363 out_location: &LocationId,
364 );
365
366 fn begin_atomic(
367 &mut self,
368 in_ident: syn::Ident,
369 in_location: &LocationId,
370 in_kind: &CollectionKind,
371 out_ident: &syn::Ident,
372 out_location: &LocationId,
373 op_meta: &HydroIrOpMetadata,
374 );
375 fn end_atomic(
376 &mut self,
377 in_ident: syn::Ident,
378 in_location: &LocationId,
379 in_kind: &CollectionKind,
380 out_ident: &syn::Ident,
381 );
382
383 #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
384 fn observe_nondet(
385 &mut self,
386 trusted: bool,
387 location: &LocationId,
388 in_ident: syn::Ident,
389 in_kind: &CollectionKind,
390 out_ident: &syn::Ident,
391 out_kind: &CollectionKind,
392 op_meta: &HydroIrOpMetadata,
393 );
394
395 #[expect(clippy::too_many_arguments, reason = "TODO")]
396 fn create_network(
397 &mut self,
398 from: &LocationId,
399 to: &LocationId,
400 input_ident: syn::Ident,
401 out_ident: &syn::Ident,
402 serialize: Option<&DebugExpr>,
403 sink: syn::Expr,
404 source: syn::Expr,
405 deserialize: Option<&DebugExpr>,
406 tag_id: usize,
407 networking_info: &crate::networking::NetworkingInfo,
408 );
409
410 fn create_external_source(
411 &mut self,
412 on: &LocationId,
413 source_expr: syn::Expr,
414 out_ident: &syn::Ident,
415 deserialize: Option<&DebugExpr>,
416 tag_id: usize,
417 );
418
419 fn create_external_output(
420 &mut self,
421 on: &LocationId,
422 sink_expr: syn::Expr,
423 input_ident: &syn::Ident,
424 serialize: Option<&DebugExpr>,
425 tag_id: usize,
426 );
427}
428
429#[cfg(feature = "build")]
430impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
431 fn singleton_intermediates(&self) -> bool {
432 false
433 }
434
435 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
436 self.entry(location.root().key())
437 .expect("location was removed")
438 .or_default()
439 }
440
441 fn batch(
442 &mut self,
443 in_ident: syn::Ident,
444 in_location: &LocationId,
445 in_kind: &CollectionKind,
446 out_ident: &syn::Ident,
447 _out_location: &LocationId,
448 _op_meta: &HydroIrOpMetadata,
449 ) {
450 let builder = self.get_dfir_mut(in_location.root());
451 if in_kind.is_bounded()
452 && matches!(
453 in_kind,
454 CollectionKind::Singleton { .. }
455 | CollectionKind::Optional { .. }
456 | CollectionKind::KeyedSingleton { .. }
457 )
458 {
459 assert!(in_location.is_top_level());
460 builder.add_dfir(
461 parse_quote! {
462 #out_ident = #in_ident -> persist::<'static>();
463 },
464 None,
465 None,
466 );
467 } else {
468 builder.add_dfir(
469 parse_quote! {
470 #out_ident = #in_ident;
471 },
472 None,
473 None,
474 );
475 }
476 }
477
478 fn yield_from_tick(
479 &mut self,
480 in_ident: syn::Ident,
481 in_location: &LocationId,
482 _in_kind: &CollectionKind,
483 out_ident: &syn::Ident,
484 _out_location: &LocationId,
485 ) {
486 let builder = self.get_dfir_mut(in_location.root());
487 builder.add_dfir(
488 parse_quote! {
489 #out_ident = #in_ident;
490 },
491 None,
492 None,
493 );
494 }
495
496 fn begin_atomic(
497 &mut self,
498 in_ident: syn::Ident,
499 in_location: &LocationId,
500 _in_kind: &CollectionKind,
501 out_ident: &syn::Ident,
502 _out_location: &LocationId,
503 _op_meta: &HydroIrOpMetadata,
504 ) {
505 let builder = self.get_dfir_mut(in_location.root());
506 builder.add_dfir(
507 parse_quote! {
508 #out_ident = #in_ident;
509 },
510 None,
511 None,
512 );
513 }
514
515 fn end_atomic(
516 &mut self,
517 in_ident: syn::Ident,
518 in_location: &LocationId,
519 _in_kind: &CollectionKind,
520 out_ident: &syn::Ident,
521 ) {
522 let builder = self.get_dfir_mut(in_location.root());
523 builder.add_dfir(
524 parse_quote! {
525 #out_ident = #in_ident;
526 },
527 None,
528 None,
529 );
530 }
531
532 fn observe_nondet(
533 &mut self,
534 _trusted: bool,
535 location: &LocationId,
536 in_ident: syn::Ident,
537 _in_kind: &CollectionKind,
538 out_ident: &syn::Ident,
539 _out_kind: &CollectionKind,
540 _op_meta: &HydroIrOpMetadata,
541 ) {
542 let builder = self.get_dfir_mut(location);
543 builder.add_dfir(
544 parse_quote! {
545 #out_ident = #in_ident;
546 },
547 None,
548 None,
549 );
550 }
551
552 fn create_network(
553 &mut self,
554 from: &LocationId,
555 to: &LocationId,
556 input_ident: syn::Ident,
557 out_ident: &syn::Ident,
558 serialize: Option<&DebugExpr>,
559 sink: syn::Expr,
560 source: syn::Expr,
561 deserialize: Option<&DebugExpr>,
562 tag_id: usize,
563 _networking_info: &crate::networking::NetworkingInfo,
564 ) {
565 let sender_builder = self.get_dfir_mut(from);
566 if let Some(serialize_pipeline) = serialize {
567 sender_builder.add_dfir(
568 parse_quote! {
569 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
570 },
571 None,
572 Some(&format!("send{}", tag_id)),
574 );
575 } else {
576 sender_builder.add_dfir(
577 parse_quote! {
578 #input_ident -> dest_sink(#sink);
579 },
580 None,
581 Some(&format!("send{}", tag_id)),
582 );
583 }
584
585 let receiver_builder = self.get_dfir_mut(to);
586 if let Some(deserialize_pipeline) = deserialize {
587 receiver_builder.add_dfir(
588 parse_quote! {
589 #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
590 },
591 None,
592 Some(&format!("recv{}", tag_id)),
593 );
594 } else {
595 receiver_builder.add_dfir(
596 parse_quote! {
597 #out_ident = source_stream(#source);
598 },
599 None,
600 Some(&format!("recv{}", tag_id)),
601 );
602 }
603 }
604
605 fn create_external_source(
606 &mut self,
607 on: &LocationId,
608 source_expr: syn::Expr,
609 out_ident: &syn::Ident,
610 deserialize: Option<&DebugExpr>,
611 tag_id: usize,
612 ) {
613 let receiver_builder = self.get_dfir_mut(on);
614 if let Some(deserialize_pipeline) = deserialize {
615 receiver_builder.add_dfir(
616 parse_quote! {
617 #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
618 },
619 None,
620 Some(&format!("recv{}", tag_id)),
621 );
622 } else {
623 receiver_builder.add_dfir(
624 parse_quote! {
625 #out_ident = source_stream(#source_expr);
626 },
627 None,
628 Some(&format!("recv{}", tag_id)),
629 );
630 }
631 }
632
633 fn create_external_output(
634 &mut self,
635 on: &LocationId,
636 sink_expr: syn::Expr,
637 input_ident: &syn::Ident,
638 serialize: Option<&DebugExpr>,
639 tag_id: usize,
640 ) {
641 let sender_builder = self.get_dfir_mut(on);
642 if let Some(serialize_fn) = serialize {
643 sender_builder.add_dfir(
644 parse_quote! {
645 #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
646 },
647 None,
648 Some(&format!("send{}", tag_id)),
650 );
651 } else {
652 sender_builder.add_dfir(
653 parse_quote! {
654 #input_ident -> dest_sink(#sink_expr);
655 },
656 None,
657 Some(&format!("send{}", tag_id)),
658 );
659 }
660 }
661}
662
663#[cfg(feature = "build")]
664pub enum BuildersOrCallback<'a, L, N>
665where
666 L: FnMut(&mut HydroRoot, &mut usize),
667 N: FnMut(&mut HydroNode, &mut usize),
668{
669 Builders(&'a mut dyn DfirBuilder),
670 Callback(L, N),
671}
672
673#[derive(Debug, Hash)]
677pub enum HydroRoot {
678 ForEach {
679 f: DebugExpr,
680 input: Box<HydroNode>,
681 op_metadata: HydroIrOpMetadata,
682 },
683 SendExternal {
684 to_external_key: LocationKey,
685 to_port_id: ExternalPortId,
686 to_many: bool,
687 unpaired: bool,
688 serialize_fn: Option<DebugExpr>,
689 instantiate_fn: DebugInstantiate,
690 input: Box<HydroNode>,
691 op_metadata: HydroIrOpMetadata,
692 },
693 DestSink {
694 sink: DebugExpr,
695 input: Box<HydroNode>,
696 op_metadata: HydroIrOpMetadata,
697 },
698 CycleSink {
699 cycle_id: CycleId,
700 input: Box<HydroNode>,
701 op_metadata: HydroIrOpMetadata,
702 },
703 EmbeddedOutput {
704 ident: syn::Ident,
705 input: Box<HydroNode>,
706 op_metadata: HydroIrOpMetadata,
707 },
708}
709
710impl HydroRoot {
711 #[cfg(feature = "build")]
712 #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
713 pub fn compile_network<'a, D>(
714 &mut self,
715 extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
716 seen_tees: &mut SeenTees,
717 seen_cluster_members: &mut HashSet<(LocationId, LocationId)>,
718 processes: &SparseSecondaryMap<LocationKey, D::Process>,
719 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
720 externals: &SparseSecondaryMap<LocationKey, D::External>,
721 env: &mut D::InstantiateEnv,
722 ) where
723 D: Deploy<'a>,
724 {
725 let refcell_extra_stmts = RefCell::new(extra_stmts);
726 let refcell_env = RefCell::new(env);
727 let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
728 self.transform_bottom_up(
729 &mut |l| {
730 if let HydroRoot::SendExternal {
731 input,
732 to_external_key,
733 to_port_id,
734 to_many,
735 unpaired,
736 instantiate_fn,
737 ..
738 } = l
739 {
740 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
741 DebugInstantiate::Building => {
742 let to_node = externals
743 .get(*to_external_key)
744 .unwrap_or_else(|| {
745 panic!("A external used in the graph was not instantiated: {}", to_external_key)
746 })
747 .clone();
748
749 match input.metadata().location_id.root() {
750 &LocationId::Process(process_key) => {
751 if *to_many {
752 (
753 (
754 D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
755 parse_quote!(DUMMY),
756 ),
757 Box::new(|| {}) as Box<dyn FnOnce()>,
758 )
759 } else {
760 let from_node = processes
761 .get(process_key)
762 .unwrap_or_else(|| {
763 panic!("A process used in the graph was not instantiated: {}", process_key)
764 })
765 .clone();
766
767 let sink_port = from_node.next_port();
768 let source_port = to_node.next_port();
769
770 if *unpaired {
771 use stageleft::quote_type;
772 use tokio_util::codec::LengthDelimitedCodec;
773
774 to_node.register(*to_port_id, source_port.clone());
775
776 let _ = D::e2o_source(
777 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
778 &to_node, &source_port,
779 &from_node, &sink_port,
780 "e_type::<LengthDelimitedCodec>(),
781 format!("{}_{}", *to_external_key, *to_port_id)
782 );
783 }
784
785 (
786 (
787 D::o2e_sink(
788 &from_node,
789 &sink_port,
790 &to_node,
791 &source_port,
792 format!("{}_{}", *to_external_key, *to_port_id)
793 ),
794 parse_quote!(DUMMY),
795 ),
796 if *unpaired {
797 D::e2o_connect(
798 &to_node,
799 &source_port,
800 &from_node,
801 &sink_port,
802 *to_many,
803 NetworkHint::Auto,
804 )
805 } else {
806 Box::new(|| {}) as Box<dyn FnOnce()>
807 },
808 )
809 }
810 }
811 LocationId::Cluster(_) => todo!(),
812 _ => panic!()
813 }
814 },
815
816 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
817 };
818
819 *instantiate_fn = DebugInstantiateFinalized {
820 sink: sink_expr,
821 source: source_expr,
822 connect_fn: Some(connect_fn),
823 }
824 .into();
825 } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
826 let element_type = match &input.metadata().collection_kind {
827 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
828 _ => panic!("Embedded output must have Stream collection kind"),
829 };
830 let location_key = match input.metadata().location_id.root() {
831 LocationId::Process(key) | LocationId::Cluster(key) => *key,
832 _ => panic!("Embedded output must be on a process or cluster"),
833 };
834 D::register_embedded_output(
835 &mut refcell_env.borrow_mut(),
836 location_key,
837 ident,
838 &element_type,
839 );
840 }
841 },
842 &mut |n| {
843 if let HydroNode::Network {
844 name,
845 networking_info,
846 input,
847 instantiate_fn,
848 metadata,
849 ..
850 } = n
851 {
852 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
853 DebugInstantiate::Building => instantiate_network::<D>(
854 &mut refcell_env.borrow_mut(),
855 input.metadata().location_id.root(),
856 metadata.location_id.root(),
857 processes,
858 clusters,
859 name.as_deref(),
860 networking_info,
861 ),
862
863 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
864 };
865
866 *instantiate_fn = DebugInstantiateFinalized {
867 sink: sink_expr,
868 source: source_expr,
869 connect_fn: Some(connect_fn),
870 }
871 .into();
872 } else if let HydroNode::ExternalInput {
873 from_external_key,
874 from_port_id,
875 from_many,
876 codec_type,
877 port_hint,
878 instantiate_fn,
879 metadata,
880 ..
881 } = n
882 {
883 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
884 DebugInstantiate::Building => {
885 let from_node = externals
886 .get(*from_external_key)
887 .unwrap_or_else(|| {
888 panic!(
889 "A external used in the graph was not instantiated: {}",
890 from_external_key,
891 )
892 })
893 .clone();
894
895 match metadata.location_id.root() {
896 &LocationId::Process(process_key) => {
897 let to_node = processes
898 .get(process_key)
899 .unwrap_or_else(|| {
900 panic!("A process used in the graph was not instantiated: {}", process_key)
901 })
902 .clone();
903
904 let sink_port = from_node.next_port();
905 let source_port = to_node.next_port();
906
907 from_node.register(*from_port_id, sink_port.clone());
908
909 (
910 (
911 parse_quote!(DUMMY),
912 if *from_many {
913 D::e2o_many_source(
914 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
915 &to_node, &source_port,
916 codec_type.0.as_ref(),
917 format!("{}_{}", *from_external_key, *from_port_id)
918 )
919 } else {
920 D::e2o_source(
921 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
922 &from_node, &sink_port,
923 &to_node, &source_port,
924 codec_type.0.as_ref(),
925 format!("{}_{}", *from_external_key, *from_port_id)
926 )
927 },
928 ),
929 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
930 )
931 }
932 LocationId::Cluster(_) => todo!(),
933 _ => panic!()
934 }
935 },
936
937 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
938 };
939
940 *instantiate_fn = DebugInstantiateFinalized {
941 sink: sink_expr,
942 source: source_expr,
943 connect_fn: Some(connect_fn),
944 }
945 .into();
946 } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
947 let element_type = match &metadata.collection_kind {
948 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
949 _ => panic!("Embedded source must have Stream collection kind"),
950 };
951 let location_key = match metadata.location_id.root() {
952 LocationId::Process(key) | LocationId::Cluster(key) => *key,
953 _ => panic!("Embedded source must be on a process or cluster"),
954 };
955 D::register_embedded_input(
956 &mut refcell_env.borrow_mut(),
957 location_key,
958 ident,
959 &element_type,
960 );
961 } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
962 match state {
963 ClusterMembersState::Uninit => {
964 let at_location = metadata.location_id.root().clone();
965 let key = (at_location.clone(), LocationId::Cluster(location_id.key()));
966 if refcell_seen_cluster_members.borrow_mut().insert(key) {
967 let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
969 D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
970 &(),
971 );
972 *state = ClusterMembersState::Stream(expr.into());
973 } else {
974 *state = ClusterMembersState::Tee(at_location, location_id.clone());
976 }
977 }
978 ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
979 panic!("cluster members already finalized");
980 }
981 }
982 }
983 },
984 seen_tees,
985 false,
986 );
987 }
988
989 pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
990 self.transform_bottom_up(
991 &mut |l| {
992 if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
993 match instantiate_fn {
994 DebugInstantiate::Building => panic!("network not built"),
995
996 DebugInstantiate::Finalized(finalized) => {
997 (finalized.connect_fn.take().unwrap())();
998 }
999 }
1000 }
1001 },
1002 &mut |n| {
1003 if let HydroNode::Network { instantiate_fn, .. }
1004 | HydroNode::ExternalInput { instantiate_fn, .. } = n
1005 {
1006 match instantiate_fn {
1007 DebugInstantiate::Building => panic!("network not built"),
1008
1009 DebugInstantiate::Finalized(finalized) => {
1010 (finalized.connect_fn.take().unwrap())();
1011 }
1012 }
1013 }
1014 },
1015 seen_tees,
1016 false,
1017 );
1018 }
1019
1020 pub fn transform_bottom_up(
1021 &mut self,
1022 transform_root: &mut impl FnMut(&mut HydroRoot),
1023 transform_node: &mut impl FnMut(&mut HydroNode),
1024 seen_tees: &mut SeenTees,
1025 check_well_formed: bool,
1026 ) {
1027 self.transform_children(
1028 |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1029 seen_tees,
1030 );
1031
1032 transform_root(self);
1033 }
1034
1035 pub fn transform_children(
1036 &mut self,
1037 mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
1038 seen_tees: &mut SeenTees,
1039 ) {
1040 match self {
1041 HydroRoot::ForEach { input, .. }
1042 | HydroRoot::SendExternal { input, .. }
1043 | HydroRoot::DestSink { input, .. }
1044 | HydroRoot::CycleSink { input, .. }
1045 | HydroRoot::EmbeddedOutput { input, .. } => {
1046 transform(input, seen_tees);
1047 }
1048 }
1049 }
1050
1051 pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroRoot {
1052 match self {
1053 HydroRoot::ForEach {
1054 f,
1055 input,
1056 op_metadata,
1057 } => HydroRoot::ForEach {
1058 f: f.clone(),
1059 input: Box::new(input.deep_clone(seen_tees)),
1060 op_metadata: op_metadata.clone(),
1061 },
1062 HydroRoot::SendExternal {
1063 to_external_key,
1064 to_port_id,
1065 to_many,
1066 unpaired,
1067 serialize_fn,
1068 instantiate_fn,
1069 input,
1070 op_metadata,
1071 } => HydroRoot::SendExternal {
1072 to_external_key: *to_external_key,
1073 to_port_id: *to_port_id,
1074 to_many: *to_many,
1075 unpaired: *unpaired,
1076 serialize_fn: serialize_fn.clone(),
1077 instantiate_fn: instantiate_fn.clone(),
1078 input: Box::new(input.deep_clone(seen_tees)),
1079 op_metadata: op_metadata.clone(),
1080 },
1081 HydroRoot::DestSink {
1082 sink,
1083 input,
1084 op_metadata,
1085 } => HydroRoot::DestSink {
1086 sink: sink.clone(),
1087 input: Box::new(input.deep_clone(seen_tees)),
1088 op_metadata: op_metadata.clone(),
1089 },
1090 HydroRoot::CycleSink {
1091 cycle_id,
1092 input,
1093 op_metadata,
1094 } => HydroRoot::CycleSink {
1095 cycle_id: *cycle_id,
1096 input: Box::new(input.deep_clone(seen_tees)),
1097 op_metadata: op_metadata.clone(),
1098 },
1099 HydroRoot::EmbeddedOutput {
1100 ident,
1101 input,
1102 op_metadata,
1103 } => HydroRoot::EmbeddedOutput {
1104 ident: ident.clone(),
1105 input: Box::new(input.deep_clone(seen_tees)),
1106 op_metadata: op_metadata.clone(),
1107 },
1108 }
1109 }
1110
1111 #[cfg(feature = "build")]
1112 pub fn emit(
1113 &mut self,
1114 graph_builders: &mut dyn DfirBuilder,
1115 seen_tees: &mut SeenTees,
1116 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1117 next_stmt_id: &mut usize,
1118 ) {
1119 self.emit_core(
1120 &mut BuildersOrCallback::<
1121 fn(&mut HydroRoot, &mut usize),
1122 fn(&mut HydroNode, &mut usize),
1123 >::Builders(graph_builders),
1124 seen_tees,
1125 built_tees,
1126 next_stmt_id,
1127 );
1128 }
1129
1130 #[cfg(feature = "build")]
1131 pub fn emit_core(
1132 &mut self,
1133 builders_or_callback: &mut BuildersOrCallback<
1134 impl FnMut(&mut HydroRoot, &mut usize),
1135 impl FnMut(&mut HydroNode, &mut usize),
1136 >,
1137 seen_tees: &mut SeenTees,
1138 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1139 next_stmt_id: &mut usize,
1140 ) {
1141 match self {
1142 HydroRoot::ForEach { f, input, .. } => {
1143 let input_ident =
1144 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1145
1146 match builders_or_callback {
1147 BuildersOrCallback::Builders(graph_builders) => {
1148 graph_builders
1149 .get_dfir_mut(&input.metadata().location_id)
1150 .add_dfir(
1151 parse_quote! {
1152 #input_ident -> for_each(#f);
1153 },
1154 None,
1155 Some(&next_stmt_id.to_string()),
1156 );
1157 }
1158 BuildersOrCallback::Callback(leaf_callback, _) => {
1159 leaf_callback(self, next_stmt_id);
1160 }
1161 }
1162
1163 *next_stmt_id += 1;
1164 }
1165
1166 HydroRoot::SendExternal {
1167 serialize_fn,
1168 instantiate_fn,
1169 input,
1170 ..
1171 } => {
1172 let input_ident =
1173 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1174
1175 match builders_or_callback {
1176 BuildersOrCallback::Builders(graph_builders) => {
1177 let (sink_expr, _) = match instantiate_fn {
1178 DebugInstantiate::Building => (
1179 syn::parse_quote!(DUMMY_SINK),
1180 syn::parse_quote!(DUMMY_SOURCE),
1181 ),
1182
1183 DebugInstantiate::Finalized(finalized) => {
1184 (finalized.sink.clone(), finalized.source.clone())
1185 }
1186 };
1187
1188 graph_builders.create_external_output(
1189 &input.metadata().location_id,
1190 sink_expr,
1191 &input_ident,
1192 serialize_fn.as_ref(),
1193 *next_stmt_id,
1194 );
1195 }
1196 BuildersOrCallback::Callback(leaf_callback, _) => {
1197 leaf_callback(self, next_stmt_id);
1198 }
1199 }
1200
1201 *next_stmt_id += 1;
1202 }
1203
1204 HydroRoot::DestSink { sink, input, .. } => {
1205 let input_ident =
1206 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1207
1208 match builders_or_callback {
1209 BuildersOrCallback::Builders(graph_builders) => {
1210 graph_builders
1211 .get_dfir_mut(&input.metadata().location_id)
1212 .add_dfir(
1213 parse_quote! {
1214 #input_ident -> dest_sink(#sink);
1215 },
1216 None,
1217 Some(&next_stmt_id.to_string()),
1218 );
1219 }
1220 BuildersOrCallback::Callback(leaf_callback, _) => {
1221 leaf_callback(self, next_stmt_id);
1222 }
1223 }
1224
1225 *next_stmt_id += 1;
1226 }
1227
1228 HydroRoot::CycleSink {
1229 cycle_id, input, ..
1230 } => {
1231 let input_ident =
1232 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1233
1234 match builders_or_callback {
1235 BuildersOrCallback::Builders(graph_builders) => {
1236 let elem_type: syn::Type = match &input.metadata().collection_kind {
1237 CollectionKind::KeyedSingleton {
1238 key_type,
1239 value_type,
1240 ..
1241 }
1242 | CollectionKind::KeyedStream {
1243 key_type,
1244 value_type,
1245 ..
1246 } => {
1247 parse_quote!((#key_type, #value_type))
1248 }
1249 CollectionKind::Stream { element_type, .. }
1250 | CollectionKind::Singleton { element_type, .. }
1251 | CollectionKind::Optional { element_type, .. } => {
1252 parse_quote!(#element_type)
1253 }
1254 };
1255
1256 let cycle_id_ident = cycle_id.as_ident();
1257 graph_builders
1258 .get_dfir_mut(&input.metadata().location_id)
1259 .add_dfir(
1260 parse_quote! {
1261 #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1262 },
1263 None,
1264 None,
1265 );
1266 }
1267 BuildersOrCallback::Callback(_, _) => {}
1269 }
1270 }
1271
1272 HydroRoot::EmbeddedOutput { ident, input, .. } => {
1273 let input_ident =
1274 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1275
1276 match builders_or_callback {
1277 BuildersOrCallback::Builders(graph_builders) => {
1278 graph_builders
1279 .get_dfir_mut(&input.metadata().location_id)
1280 .add_dfir(
1281 parse_quote! {
1282 #input_ident -> for_each(&mut #ident);
1283 },
1284 None,
1285 Some(&next_stmt_id.to_string()),
1286 );
1287 }
1288 BuildersOrCallback::Callback(leaf_callback, _) => {
1289 leaf_callback(self, next_stmt_id);
1290 }
1291 }
1292
1293 *next_stmt_id += 1;
1294 }
1295 }
1296 }
1297
1298 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1299 match self {
1300 HydroRoot::ForEach { op_metadata, .. }
1301 | HydroRoot::SendExternal { op_metadata, .. }
1302 | HydroRoot::DestSink { op_metadata, .. }
1303 | HydroRoot::CycleSink { op_metadata, .. }
1304 | HydroRoot::EmbeddedOutput { op_metadata, .. } => op_metadata,
1305 }
1306 }
1307
1308 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1309 match self {
1310 HydroRoot::ForEach { op_metadata, .. }
1311 | HydroRoot::SendExternal { op_metadata, .. }
1312 | HydroRoot::DestSink { op_metadata, .. }
1313 | HydroRoot::CycleSink { op_metadata, .. }
1314 | HydroRoot::EmbeddedOutput { op_metadata, .. } => op_metadata,
1315 }
1316 }
1317
1318 pub fn input(&self) -> &HydroNode {
1319 match self {
1320 HydroRoot::ForEach { input, .. }
1321 | HydroRoot::SendExternal { input, .. }
1322 | HydroRoot::DestSink { input, .. }
1323 | HydroRoot::CycleSink { input, .. }
1324 | HydroRoot::EmbeddedOutput { input, .. } => input,
1325 }
1326 }
1327
1328 pub fn input_metadata(&self) -> &HydroIrMetadata {
1329 self.input().metadata()
1330 }
1331
1332 pub fn print_root(&self) -> String {
1333 match self {
1334 HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1335 HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1336 HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1337 HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1338 HydroRoot::EmbeddedOutput { ident, .. } => {
1339 format!("EmbeddedOutput({})", ident)
1340 }
1341 }
1342 }
1343
1344 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1345 match self {
1346 HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1347 transform(f);
1348 }
1349 HydroRoot::SendExternal { .. }
1350 | HydroRoot::CycleSink { .. }
1351 | HydroRoot::EmbeddedOutput { .. } => {}
1352 }
1353 }
1354}
1355
1356#[cfg(feature = "build")]
1357pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1358 let mut builders = SecondaryMap::new();
1359 let mut seen_tees = HashMap::new();
1360 let mut built_tees = HashMap::new();
1361 let mut next_stmt_id = 0;
1362 for leaf in ir {
1363 leaf.emit(
1364 &mut builders,
1365 &mut seen_tees,
1366 &mut built_tees,
1367 &mut next_stmt_id,
1368 );
1369 }
1370 builders
1371}
1372
1373#[cfg(feature = "build")]
1374pub fn traverse_dfir(
1375 ir: &mut [HydroRoot],
1376 transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1377 transform_node: impl FnMut(&mut HydroNode, &mut usize),
1378) {
1379 let mut seen_tees = HashMap::new();
1380 let mut built_tees = HashMap::new();
1381 let mut next_stmt_id = 0;
1382 let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1383 ir.iter_mut().for_each(|leaf| {
1384 leaf.emit_core(
1385 &mut callback,
1386 &mut seen_tees,
1387 &mut built_tees,
1388 &mut next_stmt_id,
1389 );
1390 });
1391}
1392
1393pub fn transform_bottom_up(
1394 ir: &mut [HydroRoot],
1395 transform_root: &mut impl FnMut(&mut HydroRoot),
1396 transform_node: &mut impl FnMut(&mut HydroNode),
1397 check_well_formed: bool,
1398) {
1399 let mut seen_tees = HashMap::new();
1400 ir.iter_mut().for_each(|leaf| {
1401 leaf.transform_bottom_up(
1402 transform_root,
1403 transform_node,
1404 &mut seen_tees,
1405 check_well_formed,
1406 );
1407 });
1408}
1409
1410pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1411 let mut seen_tees = HashMap::new();
1412 ir.iter()
1413 .map(|leaf| leaf.deep_clone(&mut seen_tees))
1414 .collect()
1415}
1416
1417type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1418thread_local! {
1419 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1420}
1421
1422pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1423 PRINTED_TEES.with(|printed_tees| {
1424 let mut printed_tees_mut = printed_tees.borrow_mut();
1425 *printed_tees_mut = Some((0, HashMap::new()));
1426 drop(printed_tees_mut);
1427
1428 let ret = f();
1429
1430 let mut printed_tees_mut = printed_tees.borrow_mut();
1431 *printed_tees_mut = None;
1432
1433 ret
1434 })
1435}
1436
1437pub struct TeeNode(pub Rc<RefCell<HydroNode>>);
1438
1439impl TeeNode {
1440 pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1441 Rc::as_ptr(&self.0)
1442 }
1443}
1444
1445impl Debug for TeeNode {
1446 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1447 PRINTED_TEES.with(|printed_tees| {
1448 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1449 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1450
1451 if let Some(printed_tees_mut) = printed_tees_mut {
1452 if let Some(existing) = printed_tees_mut
1453 .1
1454 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1455 {
1456 write!(f, "<tee {}>", existing)
1457 } else {
1458 let next_id = printed_tees_mut.0;
1459 printed_tees_mut.0 += 1;
1460 printed_tees_mut
1461 .1
1462 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1463 drop(printed_tees_mut_borrow);
1464 write!(f, "<tee {}>: ", next_id)?;
1465 Debug::fmt(&self.0.borrow(), f)
1466 }
1467 } else {
1468 drop(printed_tees_mut_borrow);
1469 write!(f, "<tee>: ")?;
1470 Debug::fmt(&self.0.borrow(), f)
1471 }
1472 })
1473 }
1474}
1475
1476impl Hash for TeeNode {
1477 fn hash<H: Hasher>(&self, state: &mut H) {
1478 self.0.borrow_mut().hash(state);
1479 }
1480}
1481
1482#[derive(Clone, PartialEq, Eq, Debug)]
1483pub enum BoundKind {
1484 Unbounded,
1485 Bounded,
1486}
1487
1488#[derive(Clone, PartialEq, Eq, Debug)]
1489pub enum StreamOrder {
1490 NoOrder,
1491 TotalOrder,
1492}
1493
1494#[derive(Clone, PartialEq, Eq, Debug)]
1495pub enum StreamRetry {
1496 AtLeastOnce,
1497 ExactlyOnce,
1498}
1499
1500#[derive(Clone, PartialEq, Eq, Debug)]
1501pub enum KeyedSingletonBoundKind {
1502 Unbounded,
1503 BoundedValue,
1504 Bounded,
1505}
1506
1507#[derive(Clone, PartialEq, Eq, Debug)]
1508pub enum CollectionKind {
1509 Stream {
1510 bound: BoundKind,
1511 order: StreamOrder,
1512 retry: StreamRetry,
1513 element_type: DebugType,
1514 },
1515 Singleton {
1516 bound: BoundKind,
1517 element_type: DebugType,
1518 },
1519 Optional {
1520 bound: BoundKind,
1521 element_type: DebugType,
1522 },
1523 KeyedStream {
1524 bound: BoundKind,
1525 value_order: StreamOrder,
1526 value_retry: StreamRetry,
1527 key_type: DebugType,
1528 value_type: DebugType,
1529 },
1530 KeyedSingleton {
1531 bound: KeyedSingletonBoundKind,
1532 key_type: DebugType,
1533 value_type: DebugType,
1534 },
1535}
1536
1537impl CollectionKind {
1538 pub fn is_bounded(&self) -> bool {
1539 matches!(
1540 self,
1541 CollectionKind::Stream {
1542 bound: BoundKind::Bounded,
1543 ..
1544 } | CollectionKind::Singleton {
1545 bound: BoundKind::Bounded,
1546 ..
1547 } | CollectionKind::Optional {
1548 bound: BoundKind::Bounded,
1549 ..
1550 } | CollectionKind::KeyedStream {
1551 bound: BoundKind::Bounded,
1552 ..
1553 } | CollectionKind::KeyedSingleton {
1554 bound: KeyedSingletonBoundKind::Bounded,
1555 ..
1556 }
1557 )
1558 }
1559}
1560
1561#[derive(Clone)]
1562pub struct HydroIrMetadata {
1563 pub location_id: LocationId,
1564 pub collection_kind: CollectionKind,
1565 pub cardinality: Option<usize>,
1566 pub tag: Option<String>,
1567 pub op: HydroIrOpMetadata,
1568}
1569
1570impl Hash for HydroIrMetadata {
1572 fn hash<H: Hasher>(&self, _: &mut H) {}
1573}
1574
1575impl PartialEq for HydroIrMetadata {
1576 fn eq(&self, _: &Self) -> bool {
1577 true
1578 }
1579}
1580
1581impl Eq for HydroIrMetadata {}
1582
1583impl Debug for HydroIrMetadata {
1584 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1585 f.debug_struct("HydroIrMetadata")
1586 .field("location_id", &self.location_id)
1587 .field("collection_kind", &self.collection_kind)
1588 .finish()
1589 }
1590}
1591
1592#[derive(Clone)]
1595pub struct HydroIrOpMetadata {
1596 pub backtrace: Backtrace,
1597 pub cpu_usage: Option<f64>,
1598 pub network_recv_cpu_usage: Option<f64>,
1599 pub id: Option<usize>,
1600}
1601
1602impl HydroIrOpMetadata {
1603 #[expect(
1604 clippy::new_without_default,
1605 reason = "explicit calls to new ensure correct backtrace bounds"
1606 )]
1607 pub fn new() -> HydroIrOpMetadata {
1608 Self::new_with_skip(1)
1609 }
1610
1611 fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1612 HydroIrOpMetadata {
1613 backtrace: Backtrace::get_backtrace(2 + skip_count),
1614 cpu_usage: None,
1615 network_recv_cpu_usage: None,
1616 id: None,
1617 }
1618 }
1619}
1620
1621impl Debug for HydroIrOpMetadata {
1622 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1623 f.debug_struct("HydroIrOpMetadata").finish()
1624 }
1625}
1626
1627impl Hash for HydroIrOpMetadata {
1628 fn hash<H: Hasher>(&self, _: &mut H) {}
1629}
1630
1631#[derive(Debug, Hash)]
1634pub enum HydroNode {
1635 Placeholder,
1636
1637 Cast {
1645 inner: Box<HydroNode>,
1646 metadata: HydroIrMetadata,
1647 },
1648
1649 ObserveNonDet {
1655 inner: Box<HydroNode>,
1656 trusted: bool, metadata: HydroIrMetadata,
1658 },
1659
1660 Source {
1661 source: HydroSource,
1662 metadata: HydroIrMetadata,
1663 },
1664
1665 SingletonSource {
1666 value: DebugExpr,
1667 first_tick_only: bool,
1668 metadata: HydroIrMetadata,
1669 },
1670
1671 CycleSource {
1672 cycle_id: CycleId,
1673 metadata: HydroIrMetadata,
1674 },
1675
1676 Tee {
1677 inner: TeeNode,
1678 metadata: HydroIrMetadata,
1679 },
1680
1681 BeginAtomic {
1682 inner: Box<HydroNode>,
1683 metadata: HydroIrMetadata,
1684 },
1685
1686 EndAtomic {
1687 inner: Box<HydroNode>,
1688 metadata: HydroIrMetadata,
1689 },
1690
1691 Batch {
1692 inner: Box<HydroNode>,
1693 metadata: HydroIrMetadata,
1694 },
1695
1696 YieldConcat {
1697 inner: Box<HydroNode>,
1698 metadata: HydroIrMetadata,
1699 },
1700
1701 Chain {
1702 first: Box<HydroNode>,
1703 second: Box<HydroNode>,
1704 metadata: HydroIrMetadata,
1705 },
1706
1707 ChainFirst {
1708 first: Box<HydroNode>,
1709 second: Box<HydroNode>,
1710 metadata: HydroIrMetadata,
1711 },
1712
1713 CrossProduct {
1714 left: Box<HydroNode>,
1715 right: Box<HydroNode>,
1716 metadata: HydroIrMetadata,
1717 },
1718
1719 CrossSingleton {
1720 left: Box<HydroNode>,
1721 right: Box<HydroNode>,
1722 metadata: HydroIrMetadata,
1723 },
1724
1725 Join {
1726 left: Box<HydroNode>,
1727 right: Box<HydroNode>,
1728 metadata: HydroIrMetadata,
1729 },
1730
1731 Difference {
1732 pos: Box<HydroNode>,
1733 neg: Box<HydroNode>,
1734 metadata: HydroIrMetadata,
1735 },
1736
1737 AntiJoin {
1738 pos: Box<HydroNode>,
1739 neg: Box<HydroNode>,
1740 metadata: HydroIrMetadata,
1741 },
1742
1743 ResolveFutures {
1744 input: Box<HydroNode>,
1745 metadata: HydroIrMetadata,
1746 },
1747 ResolveFuturesOrdered {
1748 input: Box<HydroNode>,
1749 metadata: HydroIrMetadata,
1750 },
1751
1752 Map {
1753 f: DebugExpr,
1754 input: Box<HydroNode>,
1755 metadata: HydroIrMetadata,
1756 },
1757 FlatMap {
1758 f: DebugExpr,
1759 input: Box<HydroNode>,
1760 metadata: HydroIrMetadata,
1761 },
1762 Filter {
1763 f: DebugExpr,
1764 input: Box<HydroNode>,
1765 metadata: HydroIrMetadata,
1766 },
1767 FilterMap {
1768 f: DebugExpr,
1769 input: Box<HydroNode>,
1770 metadata: HydroIrMetadata,
1771 },
1772
1773 DeferTick {
1774 input: Box<HydroNode>,
1775 metadata: HydroIrMetadata,
1776 },
1777 Enumerate {
1778 input: Box<HydroNode>,
1779 metadata: HydroIrMetadata,
1780 },
1781 Inspect {
1782 f: DebugExpr,
1783 input: Box<HydroNode>,
1784 metadata: HydroIrMetadata,
1785 },
1786
1787 Unique {
1788 input: Box<HydroNode>,
1789 metadata: HydroIrMetadata,
1790 },
1791
1792 Sort {
1793 input: Box<HydroNode>,
1794 metadata: HydroIrMetadata,
1795 },
1796 Fold {
1797 init: DebugExpr,
1798 acc: DebugExpr,
1799 input: Box<HydroNode>,
1800 metadata: HydroIrMetadata,
1801 },
1802
1803 Scan {
1804 init: DebugExpr,
1805 acc: DebugExpr,
1806 input: Box<HydroNode>,
1807 metadata: HydroIrMetadata,
1808 },
1809 FoldKeyed {
1810 init: DebugExpr,
1811 acc: DebugExpr,
1812 input: Box<HydroNode>,
1813 metadata: HydroIrMetadata,
1814 },
1815
1816 Reduce {
1817 f: DebugExpr,
1818 input: Box<HydroNode>,
1819 metadata: HydroIrMetadata,
1820 },
1821 ReduceKeyed {
1822 f: DebugExpr,
1823 input: Box<HydroNode>,
1824 metadata: HydroIrMetadata,
1825 },
1826 ReduceKeyedWatermark {
1827 f: DebugExpr,
1828 input: Box<HydroNode>,
1829 watermark: Box<HydroNode>,
1830 metadata: HydroIrMetadata,
1831 },
1832
1833 Network {
1834 name: Option<String>,
1835 networking_info: crate::networking::NetworkingInfo,
1836 serialize_fn: Option<DebugExpr>,
1837 instantiate_fn: DebugInstantiate,
1838 deserialize_fn: Option<DebugExpr>,
1839 input: Box<HydroNode>,
1840 metadata: HydroIrMetadata,
1841 },
1842
1843 ExternalInput {
1844 from_external_key: LocationKey,
1845 from_port_id: ExternalPortId,
1846 from_many: bool,
1847 codec_type: DebugType,
1848 port_hint: NetworkHint,
1849 instantiate_fn: DebugInstantiate,
1850 deserialize_fn: Option<DebugExpr>,
1851 metadata: HydroIrMetadata,
1852 },
1853
1854 Counter {
1855 tag: String,
1856 duration: DebugExpr,
1857 prefix: String,
1858 input: Box<HydroNode>,
1859 metadata: HydroIrMetadata,
1860 },
1861}
1862
1863pub type SeenTees = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
1864pub type SeenTeeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
1865
1866impl HydroNode {
1867 pub fn transform_bottom_up(
1868 &mut self,
1869 transform: &mut impl FnMut(&mut HydroNode),
1870 seen_tees: &mut SeenTees,
1871 check_well_formed: bool,
1872 ) {
1873 self.transform_children(
1874 |n, s| n.transform_bottom_up(transform, s, check_well_formed),
1875 seen_tees,
1876 );
1877
1878 transform(self);
1879
1880 let self_location = self.metadata().location_id.root();
1881
1882 if check_well_formed {
1883 match &*self {
1884 HydroNode::Network { .. } => {}
1885 _ => {
1886 self.input_metadata().iter().for_each(|i| {
1887 if i.location_id.root() != self_location {
1888 panic!(
1889 "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
1890 i,
1891 i.location_id.root(),
1892 self,
1893 self_location
1894 )
1895 }
1896 });
1897 }
1898 }
1899 }
1900 }
1901
1902 #[inline(always)]
1903 pub fn transform_children(
1904 &mut self,
1905 mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
1906 seen_tees: &mut SeenTees,
1907 ) {
1908 match self {
1909 HydroNode::Placeholder => {
1910 panic!();
1911 }
1912
1913 HydroNode::Source { .. }
1914 | HydroNode::SingletonSource { .. }
1915 | HydroNode::CycleSource { .. }
1916 | HydroNode::ExternalInput { .. } => {}
1917
1918 HydroNode::Tee { inner, .. } => {
1919 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1920 *inner = TeeNode(transformed.clone());
1921 } else {
1922 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
1923 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
1924 let mut orig = inner.0.replace(HydroNode::Placeholder);
1925 transform(&mut orig, seen_tees);
1926 *transformed_cell.borrow_mut() = orig;
1927 *inner = TeeNode(transformed_cell);
1928 }
1929 }
1930
1931 HydroNode::Cast { inner, .. }
1932 | HydroNode::ObserveNonDet { inner, .. }
1933 | HydroNode::BeginAtomic { inner, .. }
1934 | HydroNode::EndAtomic { inner, .. }
1935 | HydroNode::Batch { inner, .. }
1936 | HydroNode::YieldConcat { inner, .. } => {
1937 transform(inner.as_mut(), seen_tees);
1938 }
1939
1940 HydroNode::Chain { first, second, .. } => {
1941 transform(first.as_mut(), seen_tees);
1942 transform(second.as_mut(), seen_tees);
1943 }
1944
1945 HydroNode::ChainFirst { first, second, .. } => {
1946 transform(first.as_mut(), seen_tees);
1947 transform(second.as_mut(), seen_tees);
1948 }
1949
1950 HydroNode::CrossSingleton { left, right, .. }
1951 | HydroNode::CrossProduct { left, right, .. }
1952 | HydroNode::Join { left, right, .. } => {
1953 transform(left.as_mut(), seen_tees);
1954 transform(right.as_mut(), seen_tees);
1955 }
1956
1957 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
1958 transform(pos.as_mut(), seen_tees);
1959 transform(neg.as_mut(), seen_tees);
1960 }
1961
1962 HydroNode::ReduceKeyedWatermark {
1963 input, watermark, ..
1964 } => {
1965 transform(input.as_mut(), seen_tees);
1966 transform(watermark.as_mut(), seen_tees);
1967 }
1968
1969 HydroNode::Map { input, .. }
1970 | HydroNode::ResolveFutures { input, .. }
1971 | HydroNode::ResolveFuturesOrdered { input, .. }
1972 | HydroNode::FlatMap { input, .. }
1973 | HydroNode::Filter { input, .. }
1974 | HydroNode::FilterMap { input, .. }
1975 | HydroNode::Sort { input, .. }
1976 | HydroNode::DeferTick { input, .. }
1977 | HydroNode::Enumerate { input, .. }
1978 | HydroNode::Inspect { input, .. }
1979 | HydroNode::Unique { input, .. }
1980 | HydroNode::Network { input, .. }
1981 | HydroNode::Fold { input, .. }
1982 | HydroNode::Scan { input, .. }
1983 | HydroNode::FoldKeyed { input, .. }
1984 | HydroNode::Reduce { input, .. }
1985 | HydroNode::ReduceKeyed { input, .. }
1986 | HydroNode::Counter { input, .. } => {
1987 transform(input.as_mut(), seen_tees);
1988 }
1989 }
1990 }
1991
1992 pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroNode {
1993 match self {
1994 HydroNode::Placeholder => HydroNode::Placeholder,
1995 HydroNode::Cast { inner, metadata } => HydroNode::Cast {
1996 inner: Box::new(inner.deep_clone(seen_tees)),
1997 metadata: metadata.clone(),
1998 },
1999 HydroNode::ObserveNonDet {
2000 inner,
2001 trusted,
2002 metadata,
2003 } => HydroNode::ObserveNonDet {
2004 inner: Box::new(inner.deep_clone(seen_tees)),
2005 trusted: *trusted,
2006 metadata: metadata.clone(),
2007 },
2008 HydroNode::Source { source, metadata } => HydroNode::Source {
2009 source: source.clone(),
2010 metadata: metadata.clone(),
2011 },
2012 HydroNode::SingletonSource {
2013 value,
2014 first_tick_only,
2015 metadata,
2016 } => HydroNode::SingletonSource {
2017 value: value.clone(),
2018 first_tick_only: *first_tick_only,
2019 metadata: metadata.clone(),
2020 },
2021 HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2022 cycle_id: *cycle_id,
2023 metadata: metadata.clone(),
2024 },
2025 HydroNode::Tee { inner, metadata } => {
2026 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2027 HydroNode::Tee {
2028 inner: TeeNode(transformed.clone()),
2029 metadata: metadata.clone(),
2030 }
2031 } else {
2032 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2033 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2034 let cloned = inner.0.borrow().deep_clone(seen_tees);
2035 *new_rc.borrow_mut() = cloned;
2036 HydroNode::Tee {
2037 inner: TeeNode(new_rc),
2038 metadata: metadata.clone(),
2039 }
2040 }
2041 }
2042 HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2043 inner: Box::new(inner.deep_clone(seen_tees)),
2044 metadata: metadata.clone(),
2045 },
2046 HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2047 inner: Box::new(inner.deep_clone(seen_tees)),
2048 metadata: metadata.clone(),
2049 },
2050 HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2051 inner: Box::new(inner.deep_clone(seen_tees)),
2052 metadata: metadata.clone(),
2053 },
2054 HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2055 inner: Box::new(inner.deep_clone(seen_tees)),
2056 metadata: metadata.clone(),
2057 },
2058 HydroNode::Chain {
2059 first,
2060 second,
2061 metadata,
2062 } => HydroNode::Chain {
2063 first: Box::new(first.deep_clone(seen_tees)),
2064 second: Box::new(second.deep_clone(seen_tees)),
2065 metadata: metadata.clone(),
2066 },
2067 HydroNode::ChainFirst {
2068 first,
2069 second,
2070 metadata,
2071 } => HydroNode::ChainFirst {
2072 first: Box::new(first.deep_clone(seen_tees)),
2073 second: Box::new(second.deep_clone(seen_tees)),
2074 metadata: metadata.clone(),
2075 },
2076 HydroNode::CrossProduct {
2077 left,
2078 right,
2079 metadata,
2080 } => HydroNode::CrossProduct {
2081 left: Box::new(left.deep_clone(seen_tees)),
2082 right: Box::new(right.deep_clone(seen_tees)),
2083 metadata: metadata.clone(),
2084 },
2085 HydroNode::CrossSingleton {
2086 left,
2087 right,
2088 metadata,
2089 } => HydroNode::CrossSingleton {
2090 left: Box::new(left.deep_clone(seen_tees)),
2091 right: Box::new(right.deep_clone(seen_tees)),
2092 metadata: metadata.clone(),
2093 },
2094 HydroNode::Join {
2095 left,
2096 right,
2097 metadata,
2098 } => HydroNode::Join {
2099 left: Box::new(left.deep_clone(seen_tees)),
2100 right: Box::new(right.deep_clone(seen_tees)),
2101 metadata: metadata.clone(),
2102 },
2103 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2104 pos: Box::new(pos.deep_clone(seen_tees)),
2105 neg: Box::new(neg.deep_clone(seen_tees)),
2106 metadata: metadata.clone(),
2107 },
2108 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2109 pos: Box::new(pos.deep_clone(seen_tees)),
2110 neg: Box::new(neg.deep_clone(seen_tees)),
2111 metadata: metadata.clone(),
2112 },
2113 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2114 input: Box::new(input.deep_clone(seen_tees)),
2115 metadata: metadata.clone(),
2116 },
2117 HydroNode::ResolveFuturesOrdered { input, metadata } => {
2118 HydroNode::ResolveFuturesOrdered {
2119 input: Box::new(input.deep_clone(seen_tees)),
2120 metadata: metadata.clone(),
2121 }
2122 }
2123 HydroNode::Map { f, input, metadata } => HydroNode::Map {
2124 f: f.clone(),
2125 input: Box::new(input.deep_clone(seen_tees)),
2126 metadata: metadata.clone(),
2127 },
2128 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2129 f: f.clone(),
2130 input: Box::new(input.deep_clone(seen_tees)),
2131 metadata: metadata.clone(),
2132 },
2133 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2134 f: f.clone(),
2135 input: Box::new(input.deep_clone(seen_tees)),
2136 metadata: metadata.clone(),
2137 },
2138 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2139 f: f.clone(),
2140 input: Box::new(input.deep_clone(seen_tees)),
2141 metadata: metadata.clone(),
2142 },
2143 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2144 input: Box::new(input.deep_clone(seen_tees)),
2145 metadata: metadata.clone(),
2146 },
2147 HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2148 input: Box::new(input.deep_clone(seen_tees)),
2149 metadata: metadata.clone(),
2150 },
2151 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2152 f: f.clone(),
2153 input: Box::new(input.deep_clone(seen_tees)),
2154 metadata: metadata.clone(),
2155 },
2156 HydroNode::Unique { input, metadata } => HydroNode::Unique {
2157 input: Box::new(input.deep_clone(seen_tees)),
2158 metadata: metadata.clone(),
2159 },
2160 HydroNode::Sort { input, metadata } => HydroNode::Sort {
2161 input: Box::new(input.deep_clone(seen_tees)),
2162 metadata: metadata.clone(),
2163 },
2164 HydroNode::Fold {
2165 init,
2166 acc,
2167 input,
2168 metadata,
2169 } => HydroNode::Fold {
2170 init: init.clone(),
2171 acc: acc.clone(),
2172 input: Box::new(input.deep_clone(seen_tees)),
2173 metadata: metadata.clone(),
2174 },
2175 HydroNode::Scan {
2176 init,
2177 acc,
2178 input,
2179 metadata,
2180 } => HydroNode::Scan {
2181 init: init.clone(),
2182 acc: acc.clone(),
2183 input: Box::new(input.deep_clone(seen_tees)),
2184 metadata: metadata.clone(),
2185 },
2186 HydroNode::FoldKeyed {
2187 init,
2188 acc,
2189 input,
2190 metadata,
2191 } => HydroNode::FoldKeyed {
2192 init: init.clone(),
2193 acc: acc.clone(),
2194 input: Box::new(input.deep_clone(seen_tees)),
2195 metadata: metadata.clone(),
2196 },
2197 HydroNode::ReduceKeyedWatermark {
2198 f,
2199 input,
2200 watermark,
2201 metadata,
2202 } => HydroNode::ReduceKeyedWatermark {
2203 f: f.clone(),
2204 input: Box::new(input.deep_clone(seen_tees)),
2205 watermark: Box::new(watermark.deep_clone(seen_tees)),
2206 metadata: metadata.clone(),
2207 },
2208 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2209 f: f.clone(),
2210 input: Box::new(input.deep_clone(seen_tees)),
2211 metadata: metadata.clone(),
2212 },
2213 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
2214 f: f.clone(),
2215 input: Box::new(input.deep_clone(seen_tees)),
2216 metadata: metadata.clone(),
2217 },
2218 HydroNode::Network {
2219 name,
2220 networking_info,
2221 serialize_fn,
2222 instantiate_fn,
2223 deserialize_fn,
2224 input,
2225 metadata,
2226 } => HydroNode::Network {
2227 name: name.clone(),
2228 networking_info: networking_info.clone(),
2229 serialize_fn: serialize_fn.clone(),
2230 instantiate_fn: instantiate_fn.clone(),
2231 deserialize_fn: deserialize_fn.clone(),
2232 input: Box::new(input.deep_clone(seen_tees)),
2233 metadata: metadata.clone(),
2234 },
2235 HydroNode::ExternalInput {
2236 from_external_key,
2237 from_port_id,
2238 from_many,
2239 codec_type,
2240 port_hint,
2241 instantiate_fn,
2242 deserialize_fn,
2243 metadata,
2244 } => HydroNode::ExternalInput {
2245 from_external_key: *from_external_key,
2246 from_port_id: *from_port_id,
2247 from_many: *from_many,
2248 codec_type: codec_type.clone(),
2249 port_hint: *port_hint,
2250 instantiate_fn: instantiate_fn.clone(),
2251 deserialize_fn: deserialize_fn.clone(),
2252 metadata: metadata.clone(),
2253 },
2254 HydroNode::Counter {
2255 tag,
2256 duration,
2257 prefix,
2258 input,
2259 metadata,
2260 } => HydroNode::Counter {
2261 tag: tag.clone(),
2262 duration: duration.clone(),
2263 prefix: prefix.clone(),
2264 input: Box::new(input.deep_clone(seen_tees)),
2265 metadata: metadata.clone(),
2266 },
2267 }
2268 }
2269
2270 #[cfg(feature = "build")]
2271 pub fn emit_core(
2272 &mut self,
2273 builders_or_callback: &mut BuildersOrCallback<
2274 impl FnMut(&mut HydroRoot, &mut usize),
2275 impl FnMut(&mut HydroNode, &mut usize),
2276 >,
2277 seen_tees: &mut SeenTees,
2278 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
2279 next_stmt_id: &mut usize,
2280 ) -> syn::Ident {
2281 let mut ident_stack: Vec<syn::Ident> = Vec::new();
2282
2283 self.transform_bottom_up(
2284 &mut |node: &mut HydroNode| {
2285 let out_location = node.metadata().location_id.clone();
2286 match node {
2287 HydroNode::Placeholder => {
2288 panic!()
2289 }
2290
2291 HydroNode::Cast { .. } => {
2292 match builders_or_callback {
2295 BuildersOrCallback::Builders(_) => {}
2296 BuildersOrCallback::Callback(_, node_callback) => {
2297 node_callback(node, next_stmt_id);
2298 }
2299 }
2300
2301 *next_stmt_id += 1;
2302 }
2304
2305 HydroNode::ObserveNonDet {
2306 inner,
2307 trusted,
2308 metadata,
2309 ..
2310 } => {
2311 let inner_ident = ident_stack.pop().unwrap();
2312
2313 let observe_ident =
2314 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2315
2316 match builders_or_callback {
2317 BuildersOrCallback::Builders(graph_builders) => {
2318 graph_builders.observe_nondet(
2319 *trusted,
2320 &inner.metadata().location_id,
2321 inner_ident,
2322 &inner.metadata().collection_kind,
2323 &observe_ident,
2324 &metadata.collection_kind,
2325 &metadata.op,
2326 );
2327 }
2328 BuildersOrCallback::Callback(_, node_callback) => {
2329 node_callback(node, next_stmt_id);
2330 }
2331 }
2332
2333 *next_stmt_id += 1;
2334
2335 ident_stack.push(observe_ident);
2336 }
2337
2338 HydroNode::Batch {
2339 inner, metadata, ..
2340 } => {
2341 let inner_ident = ident_stack.pop().unwrap();
2342
2343 let batch_ident =
2344 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2345
2346 match builders_or_callback {
2347 BuildersOrCallback::Builders(graph_builders) => {
2348 graph_builders.batch(
2349 inner_ident,
2350 &inner.metadata().location_id,
2351 &inner.metadata().collection_kind,
2352 &batch_ident,
2353 &out_location,
2354 &metadata.op,
2355 );
2356 }
2357 BuildersOrCallback::Callback(_, node_callback) => {
2358 node_callback(node, next_stmt_id);
2359 }
2360 }
2361
2362 *next_stmt_id += 1;
2363
2364 ident_stack.push(batch_ident);
2365 }
2366
2367 HydroNode::YieldConcat { inner, .. } => {
2368 let inner_ident = ident_stack.pop().unwrap();
2369
2370 let yield_ident =
2371 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2372
2373 match builders_or_callback {
2374 BuildersOrCallback::Builders(graph_builders) => {
2375 graph_builders.yield_from_tick(
2376 inner_ident,
2377 &inner.metadata().location_id,
2378 &inner.metadata().collection_kind,
2379 &yield_ident,
2380 &out_location,
2381 );
2382 }
2383 BuildersOrCallback::Callback(_, node_callback) => {
2384 node_callback(node, next_stmt_id);
2385 }
2386 }
2387
2388 *next_stmt_id += 1;
2389
2390 ident_stack.push(yield_ident);
2391 }
2392
2393 HydroNode::BeginAtomic { inner, metadata } => {
2394 let inner_ident = ident_stack.pop().unwrap();
2395
2396 let begin_ident =
2397 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2398
2399 match builders_or_callback {
2400 BuildersOrCallback::Builders(graph_builders) => {
2401 graph_builders.begin_atomic(
2402 inner_ident,
2403 &inner.metadata().location_id,
2404 &inner.metadata().collection_kind,
2405 &begin_ident,
2406 &out_location,
2407 &metadata.op,
2408 );
2409 }
2410 BuildersOrCallback::Callback(_, node_callback) => {
2411 node_callback(node, next_stmt_id);
2412 }
2413 }
2414
2415 *next_stmt_id += 1;
2416
2417 ident_stack.push(begin_ident);
2418 }
2419
2420 HydroNode::EndAtomic { inner, .. } => {
2421 let inner_ident = ident_stack.pop().unwrap();
2422
2423 let end_ident =
2424 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2425
2426 match builders_or_callback {
2427 BuildersOrCallback::Builders(graph_builders) => {
2428 graph_builders.end_atomic(
2429 inner_ident,
2430 &inner.metadata().location_id,
2431 &inner.metadata().collection_kind,
2432 &end_ident,
2433 );
2434 }
2435 BuildersOrCallback::Callback(_, node_callback) => {
2436 node_callback(node, next_stmt_id);
2437 }
2438 }
2439
2440 *next_stmt_id += 1;
2441
2442 ident_stack.push(end_ident);
2443 }
2444
2445 HydroNode::Source {
2446 source, metadata, ..
2447 } => {
2448 if let HydroSource::ExternalNetwork() = source {
2449 ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
2450 } else {
2451 let source_ident =
2452 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2453
2454 let source_stmt = match source {
2455 HydroSource::Stream(expr) => {
2456 debug_assert!(metadata.location_id.is_top_level());
2457 parse_quote! {
2458 #source_ident = source_stream(#expr);
2459 }
2460 }
2461
2462 HydroSource::ExternalNetwork() => {
2463 unreachable!()
2464 }
2465
2466 HydroSource::Iter(expr) => {
2467 if metadata.location_id.is_top_level() {
2468 parse_quote! {
2469 #source_ident = source_iter(#expr);
2470 }
2471 } else {
2472 parse_quote! {
2474 #source_ident = source_iter(#expr) -> persist::<'static>();
2475 }
2476 }
2477 }
2478
2479 HydroSource::Spin() => {
2480 debug_assert!(metadata.location_id.is_top_level());
2481 parse_quote! {
2482 #source_ident = spin();
2483 }
2484 }
2485
2486 HydroSource::ClusterMembers(target_loc, state) => {
2487 debug_assert!(metadata.location_id.is_top_level());
2488
2489 let members_tee_ident = syn::Ident::new(
2490 &format!(
2491 "__cluster_members_tee_{}_{}",
2492 metadata.location_id.root().key(),
2493 target_loc.key(),
2494 ),
2495 Span::call_site(),
2496 );
2497
2498 match state {
2499 ClusterMembersState::Stream(d) => {
2500 parse_quote! {
2501 #members_tee_ident = source_stream(#d) -> tee();
2502 #source_ident = #members_tee_ident;
2503 }
2504 },
2505 ClusterMembersState::Uninit => syn::parse_quote! {
2506 #source_ident = source_stream(DUMMY);
2507 },
2508 ClusterMembersState::Tee(..) => parse_quote! {
2509 #source_ident = #members_tee_ident;
2510 },
2511 }
2512 }
2513
2514 HydroSource::Embedded(ident) => {
2515 parse_quote! {
2516 #source_ident = source_stream(#ident);
2517 }
2518 }
2519 };
2520
2521 match builders_or_callback {
2522 BuildersOrCallback::Builders(graph_builders) => {
2523 let builder = graph_builders.get_dfir_mut(&out_location);
2524 builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
2525 }
2526 BuildersOrCallback::Callback(_, node_callback) => {
2527 node_callback(node, next_stmt_id);
2528 }
2529 }
2530
2531 *next_stmt_id += 1;
2532
2533 ident_stack.push(source_ident);
2534 }
2535 }
2536
2537 HydroNode::SingletonSource { value, first_tick_only, metadata } => {
2538 let source_ident =
2539 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2540
2541 match builders_or_callback {
2542 BuildersOrCallback::Builders(graph_builders) => {
2543 let builder = graph_builders.get_dfir_mut(&out_location);
2544
2545 if *first_tick_only {
2546 assert!(
2547 !metadata.location_id.is_top_level(),
2548 "first_tick_only SingletonSource must be inside a tick"
2549 );
2550 }
2551
2552 if *first_tick_only
2553 || (metadata.location_id.is_top_level()
2554 && metadata.collection_kind.is_bounded())
2555 {
2556 builder.add_dfir(
2557 parse_quote! {
2558 #source_ident = source_iter([#value]);
2559 },
2560 None,
2561 Some(&next_stmt_id.to_string()),
2562 );
2563 } else {
2564 builder.add_dfir(
2565 parse_quote! {
2566 #source_ident = source_iter([#value]) -> persist::<'static>();
2567 },
2568 None,
2569 Some(&next_stmt_id.to_string()),
2570 );
2571 }
2572 }
2573 BuildersOrCallback::Callback(_, node_callback) => {
2574 node_callback(node, next_stmt_id);
2575 }
2576 }
2577
2578 *next_stmt_id += 1;
2579
2580 ident_stack.push(source_ident);
2581 }
2582
2583 HydroNode::CycleSource { cycle_id, .. } => {
2584 let ident = cycle_id.as_ident();
2585
2586 match builders_or_callback {
2587 BuildersOrCallback::Builders(_) => {}
2588 BuildersOrCallback::Callback(_, node_callback) => {
2589 node_callback(node, next_stmt_id);
2590 }
2591 }
2592
2593 *next_stmt_id += 1;
2595
2596 ident_stack.push(ident);
2597 }
2598
2599 HydroNode::Tee { inner, .. } => {
2600 let ret_ident = if let Some(teed_from) =
2601 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
2602 {
2603 match builders_or_callback {
2604 BuildersOrCallback::Builders(_) => {}
2605 BuildersOrCallback::Callback(_, node_callback) => {
2606 node_callback(node, next_stmt_id);
2607 }
2608 }
2609
2610 teed_from.clone()
2611 } else {
2612 let inner_ident = ident_stack.pop().unwrap();
2615
2616 let tee_ident =
2617 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2618
2619 built_tees.insert(
2620 inner.0.as_ref() as *const RefCell<HydroNode>,
2621 tee_ident.clone(),
2622 );
2623
2624 match builders_or_callback {
2625 BuildersOrCallback::Builders(graph_builders) => {
2626 let builder = graph_builders.get_dfir_mut(&out_location);
2627 builder.add_dfir(
2628 parse_quote! {
2629 #tee_ident = #inner_ident -> tee();
2630 },
2631 None,
2632 Some(&next_stmt_id.to_string()),
2633 );
2634 }
2635 BuildersOrCallback::Callback(_, node_callback) => {
2636 node_callback(node, next_stmt_id);
2637 }
2638 }
2639
2640 tee_ident
2641 };
2642
2643 *next_stmt_id += 1;
2647 ident_stack.push(ret_ident);
2648 }
2649
2650 HydroNode::Chain { .. } => {
2651 let second_ident = ident_stack.pop().unwrap();
2653 let first_ident = ident_stack.pop().unwrap();
2654
2655 let chain_ident =
2656 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2657
2658 match builders_or_callback {
2659 BuildersOrCallback::Builders(graph_builders) => {
2660 let builder = graph_builders.get_dfir_mut(&out_location);
2661 builder.add_dfir(
2662 parse_quote! {
2663 #chain_ident = chain();
2664 #first_ident -> [0]#chain_ident;
2665 #second_ident -> [1]#chain_ident;
2666 },
2667 None,
2668 Some(&next_stmt_id.to_string()),
2669 );
2670 }
2671 BuildersOrCallback::Callback(_, node_callback) => {
2672 node_callback(node, next_stmt_id);
2673 }
2674 }
2675
2676 *next_stmt_id += 1;
2677
2678 ident_stack.push(chain_ident);
2679 }
2680
2681 HydroNode::ChainFirst { .. } => {
2682 let second_ident = ident_stack.pop().unwrap();
2683 let first_ident = ident_stack.pop().unwrap();
2684
2685 let chain_ident =
2686 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2687
2688 match builders_or_callback {
2689 BuildersOrCallback::Builders(graph_builders) => {
2690 let builder = graph_builders.get_dfir_mut(&out_location);
2691 builder.add_dfir(
2692 parse_quote! {
2693 #chain_ident = chain_first_n(1);
2694 #first_ident -> [0]#chain_ident;
2695 #second_ident -> [1]#chain_ident;
2696 },
2697 None,
2698 Some(&next_stmt_id.to_string()),
2699 );
2700 }
2701 BuildersOrCallback::Callback(_, node_callback) => {
2702 node_callback(node, next_stmt_id);
2703 }
2704 }
2705
2706 *next_stmt_id += 1;
2707
2708 ident_stack.push(chain_ident);
2709 }
2710
2711 HydroNode::CrossSingleton { right, .. } => {
2712 let right_ident = ident_stack.pop().unwrap();
2713 let left_ident = ident_stack.pop().unwrap();
2714
2715 let cross_ident =
2716 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2717
2718 match builders_or_callback {
2719 BuildersOrCallback::Builders(graph_builders) => {
2720 let builder = graph_builders.get_dfir_mut(&out_location);
2721
2722 if right.metadata().location_id.is_top_level()
2723 && right.metadata().collection_kind.is_bounded()
2724 {
2725 builder.add_dfir(
2726 parse_quote! {
2727 #cross_ident = cross_singleton();
2728 #left_ident -> [input]#cross_ident;
2729 #right_ident -> persist::<'static>() -> [single]#cross_ident;
2730 },
2731 None,
2732 Some(&next_stmt_id.to_string()),
2733 );
2734 } else {
2735 builder.add_dfir(
2736 parse_quote! {
2737 #cross_ident = cross_singleton();
2738 #left_ident -> [input]#cross_ident;
2739 #right_ident -> [single]#cross_ident;
2740 },
2741 None,
2742 Some(&next_stmt_id.to_string()),
2743 );
2744 }
2745 }
2746 BuildersOrCallback::Callback(_, node_callback) => {
2747 node_callback(node, next_stmt_id);
2748 }
2749 }
2750
2751 *next_stmt_id += 1;
2752
2753 ident_stack.push(cross_ident);
2754 }
2755
2756 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
2757 let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
2758 parse_quote!(cross_join_multiset)
2759 } else {
2760 parse_quote!(join_multiset)
2761 };
2762
2763 let (HydroNode::CrossProduct { left, right, .. }
2764 | HydroNode::Join { left, right, .. }) = node
2765 else {
2766 unreachable!()
2767 };
2768
2769 let is_top_level = left.metadata().location_id.is_top_level()
2770 && right.metadata().location_id.is_top_level();
2771 let left_lifetime = if left.metadata().location_id.is_top_level() {
2772 quote!('static)
2773 } else {
2774 quote!('tick)
2775 };
2776
2777 let right_lifetime = if right.metadata().location_id.is_top_level() {
2778 quote!('static)
2779 } else {
2780 quote!('tick)
2781 };
2782
2783 let right_ident = ident_stack.pop().unwrap();
2784 let left_ident = ident_stack.pop().unwrap();
2785
2786 let stream_ident =
2787 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2788
2789 match builders_or_callback {
2790 BuildersOrCallback::Builders(graph_builders) => {
2791 let builder = graph_builders.get_dfir_mut(&out_location);
2792 builder.add_dfir(
2793 if is_top_level {
2794 parse_quote! {
2797 #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
2798 #left_ident -> [0]#stream_ident;
2799 #right_ident -> [1]#stream_ident;
2800 }
2801 } else {
2802 parse_quote! {
2803 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
2804 #left_ident -> [0]#stream_ident;
2805 #right_ident -> [1]#stream_ident;
2806 }
2807 }
2808 ,
2809 None,
2810 Some(&next_stmt_id.to_string()),
2811 );
2812 }
2813 BuildersOrCallback::Callback(_, node_callback) => {
2814 node_callback(node, next_stmt_id);
2815 }
2816 }
2817
2818 *next_stmt_id += 1;
2819
2820 ident_stack.push(stream_ident);
2821 }
2822
2823 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
2824 let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
2825 parse_quote!(difference)
2826 } else {
2827 parse_quote!(anti_join)
2828 };
2829
2830 let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
2831 node
2832 else {
2833 unreachable!()
2834 };
2835
2836 let neg_lifetime = if neg.metadata().location_id.is_top_level() {
2837 quote!('static)
2838 } else {
2839 quote!('tick)
2840 };
2841
2842 let neg_ident = ident_stack.pop().unwrap();
2843 let pos_ident = ident_stack.pop().unwrap();
2844
2845 let stream_ident =
2846 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2847
2848 match builders_or_callback {
2849 BuildersOrCallback::Builders(graph_builders) => {
2850 let builder = graph_builders.get_dfir_mut(&out_location);
2851 builder.add_dfir(
2852 parse_quote! {
2853 #stream_ident = #operator::<'tick, #neg_lifetime>();
2854 #pos_ident -> [pos]#stream_ident;
2855 #neg_ident -> [neg]#stream_ident;
2856 },
2857 None,
2858 Some(&next_stmt_id.to_string()),
2859 );
2860 }
2861 BuildersOrCallback::Callback(_, node_callback) => {
2862 node_callback(node, next_stmt_id);
2863 }
2864 }
2865
2866 *next_stmt_id += 1;
2867
2868 ident_stack.push(stream_ident);
2869 }
2870
2871 HydroNode::ResolveFutures { .. } => {
2872 let input_ident = ident_stack.pop().unwrap();
2873
2874 let futures_ident =
2875 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2876
2877 match builders_or_callback {
2878 BuildersOrCallback::Builders(graph_builders) => {
2879 let builder = graph_builders.get_dfir_mut(&out_location);
2880 builder.add_dfir(
2881 parse_quote! {
2882 #futures_ident = #input_ident -> resolve_futures();
2883 },
2884 None,
2885 Some(&next_stmt_id.to_string()),
2886 );
2887 }
2888 BuildersOrCallback::Callback(_, node_callback) => {
2889 node_callback(node, next_stmt_id);
2890 }
2891 }
2892
2893 *next_stmt_id += 1;
2894
2895 ident_stack.push(futures_ident);
2896 }
2897
2898 HydroNode::ResolveFuturesOrdered { .. } => {
2899 let input_ident = ident_stack.pop().unwrap();
2900
2901 let futures_ident =
2902 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2903
2904 match builders_or_callback {
2905 BuildersOrCallback::Builders(graph_builders) => {
2906 let builder = graph_builders.get_dfir_mut(&out_location);
2907 builder.add_dfir(
2908 parse_quote! {
2909 #futures_ident = #input_ident -> resolve_futures_ordered();
2910 },
2911 None,
2912 Some(&next_stmt_id.to_string()),
2913 );
2914 }
2915 BuildersOrCallback::Callback(_, node_callback) => {
2916 node_callback(node, next_stmt_id);
2917 }
2918 }
2919
2920 *next_stmt_id += 1;
2921
2922 ident_stack.push(futures_ident);
2923 }
2924
2925 HydroNode::Map { f, .. } => {
2926 let input_ident = ident_stack.pop().unwrap();
2927
2928 let map_ident =
2929 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2930
2931 match builders_or_callback {
2932 BuildersOrCallback::Builders(graph_builders) => {
2933 let builder = graph_builders.get_dfir_mut(&out_location);
2934 builder.add_dfir(
2935 parse_quote! {
2936 #map_ident = #input_ident -> map(#f);
2937 },
2938 None,
2939 Some(&next_stmt_id.to_string()),
2940 );
2941 }
2942 BuildersOrCallback::Callback(_, node_callback) => {
2943 node_callback(node, next_stmt_id);
2944 }
2945 }
2946
2947 *next_stmt_id += 1;
2948
2949 ident_stack.push(map_ident);
2950 }
2951
2952 HydroNode::FlatMap { f, .. } => {
2953 let input_ident = ident_stack.pop().unwrap();
2954
2955 let flat_map_ident =
2956 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2957
2958 match builders_or_callback {
2959 BuildersOrCallback::Builders(graph_builders) => {
2960 let builder = graph_builders.get_dfir_mut(&out_location);
2961 builder.add_dfir(
2962 parse_quote! {
2963 #flat_map_ident = #input_ident -> flat_map(#f);
2964 },
2965 None,
2966 Some(&next_stmt_id.to_string()),
2967 );
2968 }
2969 BuildersOrCallback::Callback(_, node_callback) => {
2970 node_callback(node, next_stmt_id);
2971 }
2972 }
2973
2974 *next_stmt_id += 1;
2975
2976 ident_stack.push(flat_map_ident);
2977 }
2978
2979 HydroNode::Filter { f, .. } => {
2980 let input_ident = ident_stack.pop().unwrap();
2981
2982 let filter_ident =
2983 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2984
2985 match builders_or_callback {
2986 BuildersOrCallback::Builders(graph_builders) => {
2987 let builder = graph_builders.get_dfir_mut(&out_location);
2988 builder.add_dfir(
2989 parse_quote! {
2990 #filter_ident = #input_ident -> filter(#f);
2991 },
2992 None,
2993 Some(&next_stmt_id.to_string()),
2994 );
2995 }
2996 BuildersOrCallback::Callback(_, node_callback) => {
2997 node_callback(node, next_stmt_id);
2998 }
2999 }
3000
3001 *next_stmt_id += 1;
3002
3003 ident_stack.push(filter_ident);
3004 }
3005
3006 HydroNode::FilterMap { f, .. } => {
3007 let input_ident = ident_stack.pop().unwrap();
3008
3009 let filter_map_ident =
3010 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3011
3012 match builders_or_callback {
3013 BuildersOrCallback::Builders(graph_builders) => {
3014 let builder = graph_builders.get_dfir_mut(&out_location);
3015 builder.add_dfir(
3016 parse_quote! {
3017 #filter_map_ident = #input_ident -> filter_map(#f);
3018 },
3019 None,
3020 Some(&next_stmt_id.to_string()),
3021 );
3022 }
3023 BuildersOrCallback::Callback(_, node_callback) => {
3024 node_callback(node, next_stmt_id);
3025 }
3026 }
3027
3028 *next_stmt_id += 1;
3029
3030 ident_stack.push(filter_map_ident);
3031 }
3032
3033 HydroNode::Sort { .. } => {
3034 let input_ident = ident_stack.pop().unwrap();
3035
3036 let sort_ident =
3037 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3038
3039 match builders_or_callback {
3040 BuildersOrCallback::Builders(graph_builders) => {
3041 let builder = graph_builders.get_dfir_mut(&out_location);
3042 builder.add_dfir(
3043 parse_quote! {
3044 #sort_ident = #input_ident -> sort();
3045 },
3046 None,
3047 Some(&next_stmt_id.to_string()),
3048 );
3049 }
3050 BuildersOrCallback::Callback(_, node_callback) => {
3051 node_callback(node, next_stmt_id);
3052 }
3053 }
3054
3055 *next_stmt_id += 1;
3056
3057 ident_stack.push(sort_ident);
3058 }
3059
3060 HydroNode::DeferTick { .. } => {
3061 let input_ident = ident_stack.pop().unwrap();
3062
3063 let defer_tick_ident =
3064 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3065
3066 match builders_or_callback {
3067 BuildersOrCallback::Builders(graph_builders) => {
3068 let builder = graph_builders.get_dfir_mut(&out_location);
3069 builder.add_dfir(
3070 parse_quote! {
3071 #defer_tick_ident = #input_ident -> defer_tick_lazy();
3072 },
3073 None,
3074 Some(&next_stmt_id.to_string()),
3075 );
3076 }
3077 BuildersOrCallback::Callback(_, node_callback) => {
3078 node_callback(node, next_stmt_id);
3079 }
3080 }
3081
3082 *next_stmt_id += 1;
3083
3084 ident_stack.push(defer_tick_ident);
3085 }
3086
3087 HydroNode::Enumerate { input, .. } => {
3088 let input_ident = ident_stack.pop().unwrap();
3089
3090 let enumerate_ident =
3091 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3092
3093 match builders_or_callback {
3094 BuildersOrCallback::Builders(graph_builders) => {
3095 let builder = graph_builders.get_dfir_mut(&out_location);
3096 let lifetime = if input.metadata().location_id.is_top_level() {
3097 quote!('static)
3098 } else {
3099 quote!('tick)
3100 };
3101 builder.add_dfir(
3102 parse_quote! {
3103 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
3104 },
3105 None,
3106 Some(&next_stmt_id.to_string()),
3107 );
3108 }
3109 BuildersOrCallback::Callback(_, node_callback) => {
3110 node_callback(node, next_stmt_id);
3111 }
3112 }
3113
3114 *next_stmt_id += 1;
3115
3116 ident_stack.push(enumerate_ident);
3117 }
3118
3119 HydroNode::Inspect { f, .. } => {
3120 let input_ident = ident_stack.pop().unwrap();
3121
3122 let inspect_ident =
3123 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3124
3125 match builders_or_callback {
3126 BuildersOrCallback::Builders(graph_builders) => {
3127 let builder = graph_builders.get_dfir_mut(&out_location);
3128 builder.add_dfir(
3129 parse_quote! {
3130 #inspect_ident = #input_ident -> inspect(#f);
3131 },
3132 None,
3133 Some(&next_stmt_id.to_string()),
3134 );
3135 }
3136 BuildersOrCallback::Callback(_, node_callback) => {
3137 node_callback(node, next_stmt_id);
3138 }
3139 }
3140
3141 *next_stmt_id += 1;
3142
3143 ident_stack.push(inspect_ident);
3144 }
3145
3146 HydroNode::Unique { input, .. } => {
3147 let input_ident = ident_stack.pop().unwrap();
3148
3149 let unique_ident =
3150 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3151
3152 match builders_or_callback {
3153 BuildersOrCallback::Builders(graph_builders) => {
3154 let builder = graph_builders.get_dfir_mut(&out_location);
3155 let lifetime = if input.metadata().location_id.is_top_level() {
3156 quote!('static)
3157 } else {
3158 quote!('tick)
3159 };
3160
3161 builder.add_dfir(
3162 parse_quote! {
3163 #unique_ident = #input_ident -> unique::<#lifetime>();
3164 },
3165 None,
3166 Some(&next_stmt_id.to_string()),
3167 );
3168 }
3169 BuildersOrCallback::Callback(_, node_callback) => {
3170 node_callback(node, next_stmt_id);
3171 }
3172 }
3173
3174 *next_stmt_id += 1;
3175
3176 ident_stack.push(unique_ident);
3177 }
3178
3179 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
3180 let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
3181 if input.metadata().location_id.is_top_level()
3182 && input.metadata().collection_kind.is_bounded()
3183 {
3184 parse_quote!(fold_no_replay)
3185 } else {
3186 parse_quote!(fold)
3187 }
3188 } else if matches!(node, HydroNode::Scan { .. }) {
3189 parse_quote!(scan)
3190 } else if let HydroNode::FoldKeyed { input, .. } = node {
3191 if input.metadata().location_id.is_top_level()
3192 && input.metadata().collection_kind.is_bounded()
3193 {
3194 todo!("Fold keyed on a top-level bounded collection is not yet supported")
3195 } else {
3196 parse_quote!(fold_keyed)
3197 }
3198 } else {
3199 unreachable!()
3200 };
3201
3202 let (HydroNode::Fold { input, .. }
3203 | HydroNode::FoldKeyed { input, .. }
3204 | HydroNode::Scan { input, .. }) = node
3205 else {
3206 unreachable!()
3207 };
3208
3209 let lifetime = if input.metadata().location_id.is_top_level() {
3210 quote!('static)
3211 } else {
3212 quote!('tick)
3213 };
3214
3215 let input_ident = ident_stack.pop().unwrap();
3216
3217 let (HydroNode::Fold { init, acc, .. }
3218 | HydroNode::FoldKeyed { init, acc, .. }
3219 | HydroNode::Scan { init, acc, .. }) = &*node
3220 else {
3221 unreachable!()
3222 };
3223
3224 let fold_ident =
3225 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3226
3227 match builders_or_callback {
3228 BuildersOrCallback::Builders(graph_builders) => {
3229 if matches!(node, HydroNode::Fold { .. })
3230 && node.metadata().location_id.is_top_level()
3231 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3232 && graph_builders.singleton_intermediates()
3233 && !node.metadata().collection_kind.is_bounded()
3234 {
3235 let builder = graph_builders.get_dfir_mut(&out_location);
3236
3237 let acc: syn::Expr = parse_quote!({
3238 let mut __inner = #acc;
3239 move |__state, __value| {
3240 __inner(__state, __value);
3241 Some(__state.clone())
3242 }
3243 });
3244
3245 builder.add_dfir(
3246 parse_quote! {
3247 source_iter([(#init)()]) -> [0]#fold_ident;
3248 #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
3249 #fold_ident = chain();
3250 },
3251 None,
3252 Some(&next_stmt_id.to_string()),
3253 );
3254 } else if matches!(node, HydroNode::FoldKeyed { .. })
3255 && node.metadata().location_id.is_top_level()
3256 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3257 && graph_builders.singleton_intermediates()
3258 && !node.metadata().collection_kind.is_bounded()
3259 {
3260 let builder = graph_builders.get_dfir_mut(&out_location);
3261
3262 let acc: syn::Expr = parse_quote!({
3263 let mut __init = #init;
3264 let mut __inner = #acc;
3265 move |__state, __kv: (_, _)| {
3266 let __state = __state
3268 .entry(::std::clone::Clone::clone(&__kv.0))
3269 .or_insert_with(|| (__init)());
3270 __inner(__state, __kv.1);
3271 Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
3272 }
3273 });
3274
3275 builder.add_dfir(
3276 parse_quote! {
3277 #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
3278 },
3279 None,
3280 Some(&next_stmt_id.to_string()),
3281 );
3282 } else {
3283 let builder = graph_builders.get_dfir_mut(&out_location);
3284 builder.add_dfir(
3285 parse_quote! {
3286 #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
3287 },
3288 None,
3289 Some(&next_stmt_id.to_string()),
3290 );
3291 }
3292 }
3293 BuildersOrCallback::Callback(_, node_callback) => {
3294 node_callback(node, next_stmt_id);
3295 }
3296 }
3297
3298 *next_stmt_id += 1;
3299
3300 ident_stack.push(fold_ident);
3301 }
3302
3303 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3304 let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
3305 if input.metadata().location_id.is_top_level()
3306 && input.metadata().collection_kind.is_bounded()
3307 {
3308 parse_quote!(reduce_no_replay)
3309 } else {
3310 parse_quote!(reduce)
3311 }
3312 } else if let HydroNode::ReduceKeyed { input, .. } = node {
3313 if input.metadata().location_id.is_top_level()
3314 && input.metadata().collection_kind.is_bounded()
3315 {
3316 todo!(
3317 "Calling keyed reduce on a top-level bounded collection is not supported"
3318 )
3319 } else {
3320 parse_quote!(reduce_keyed)
3321 }
3322 } else {
3323 unreachable!()
3324 };
3325
3326 let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
3327 else {
3328 unreachable!()
3329 };
3330
3331 let lifetime = if input.metadata().location_id.is_top_level() {
3332 quote!('static)
3333 } else {
3334 quote!('tick)
3335 };
3336
3337 let input_ident = ident_stack.pop().unwrap();
3338
3339 let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
3340 else {
3341 unreachable!()
3342 };
3343
3344 let reduce_ident =
3345 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3346
3347 match builders_or_callback {
3348 BuildersOrCallback::Builders(graph_builders) => {
3349 if matches!(node, HydroNode::Reduce { .. })
3350 && node.metadata().location_id.is_top_level()
3351 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3352 && graph_builders.singleton_intermediates()
3353 && !node.metadata().collection_kind.is_bounded()
3354 {
3355 todo!(
3356 "Reduce with optional intermediates is not yet supported in simulator"
3357 );
3358 } else if matches!(node, HydroNode::ReduceKeyed { .. })
3359 && node.metadata().location_id.is_top_level()
3360 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3361 && graph_builders.singleton_intermediates()
3362 && !node.metadata().collection_kind.is_bounded()
3363 {
3364 todo!(
3365 "Reduce keyed with optional intermediates is not yet supported in simulator"
3366 );
3367 } else {
3368 let builder = graph_builders.get_dfir_mut(&out_location);
3369 builder.add_dfir(
3370 parse_quote! {
3371 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
3372 },
3373 None,
3374 Some(&next_stmt_id.to_string()),
3375 );
3376 }
3377 }
3378 BuildersOrCallback::Callback(_, node_callback) => {
3379 node_callback(node, next_stmt_id);
3380 }
3381 }
3382
3383 *next_stmt_id += 1;
3384
3385 ident_stack.push(reduce_ident);
3386 }
3387
3388 HydroNode::ReduceKeyedWatermark {
3389 f,
3390 input,
3391 metadata,
3392 ..
3393 } => {
3394 let lifetime = if input.metadata().location_id.is_top_level() {
3395 quote!('static)
3396 } else {
3397 quote!('tick)
3398 };
3399
3400 let watermark_ident = ident_stack.pop().unwrap();
3402 let input_ident = ident_stack.pop().unwrap();
3403
3404 let chain_ident = syn::Ident::new(
3405 &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
3406 Span::call_site(),
3407 );
3408
3409 let fold_ident =
3410 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3411
3412 let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
3413 && input.metadata().collection_kind.is_bounded()
3414 {
3415 parse_quote!(fold_no_replay)
3416 } else {
3417 parse_quote!(fold)
3418 };
3419
3420 match builders_or_callback {
3421 BuildersOrCallback::Builders(graph_builders) => {
3422 if metadata.location_id.is_top_level()
3423 && !(matches!(metadata.location_id, LocationId::Atomic(_)))
3424 && graph_builders.singleton_intermediates()
3425 && !metadata.collection_kind.is_bounded()
3426 {
3427 todo!(
3428 "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
3429 )
3430 } else {
3431 let builder = graph_builders.get_dfir_mut(&out_location);
3432 builder.add_dfir(
3433 parse_quote! {
3434 #chain_ident = chain();
3435 #input_ident
3436 -> map(|x| (Some(x), None))
3437 -> [0]#chain_ident;
3438 #watermark_ident
3439 -> map(|watermark| (None, Some(watermark)))
3440 -> [1]#chain_ident;
3441
3442 #fold_ident = #chain_ident
3443 -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
3444 let __reduce_keyed_fn = #f;
3445 move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
3446 if let Some((k, v)) = opt_payload {
3447 if let Some(curr_watermark) = *opt_curr_watermark {
3448 if k < curr_watermark {
3449 return;
3450 }
3451 }
3452 match map.entry(k) {
3453 ::std::collections::hash_map::Entry::Vacant(e) => {
3454 e.insert(v);
3455 }
3456 ::std::collections::hash_map::Entry::Occupied(mut e) => {
3457 __reduce_keyed_fn(e.get_mut(), v);
3458 }
3459 }
3460 } else {
3461 let watermark = opt_watermark.unwrap();
3462 if let Some(curr_watermark) = *opt_curr_watermark {
3463 if watermark <= curr_watermark {
3464 return;
3465 }
3466 }
3467 *opt_curr_watermark = opt_watermark;
3468 map.retain(|k, _| *k >= watermark);
3469 }
3470 }
3471 })
3472 -> flat_map(|(map, _curr_watermark)| map);
3473 },
3474 None,
3475 Some(&next_stmt_id.to_string()),
3476 );
3477 }
3478 }
3479 BuildersOrCallback::Callback(_, node_callback) => {
3480 node_callback(node, next_stmt_id);
3481 }
3482 }
3483
3484 *next_stmt_id += 1;
3485
3486 ident_stack.push(fold_ident);
3487 }
3488
3489 HydroNode::Network {
3490 networking_info,
3491 serialize_fn: serialize_pipeline,
3492 instantiate_fn,
3493 deserialize_fn: deserialize_pipeline,
3494 input,
3495 ..
3496 } => {
3497 let input_ident = ident_stack.pop().unwrap();
3498
3499 let receiver_stream_ident =
3500 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3501
3502 match builders_or_callback {
3503 BuildersOrCallback::Builders(graph_builders) => {
3504 let (sink_expr, source_expr) = match instantiate_fn {
3505 DebugInstantiate::Building => (
3506 syn::parse_quote!(DUMMY_SINK),
3507 syn::parse_quote!(DUMMY_SOURCE),
3508 ),
3509
3510 DebugInstantiate::Finalized(finalized) => {
3511 (finalized.sink.clone(), finalized.source.clone())
3512 }
3513 };
3514
3515 graph_builders.create_network(
3516 &input.metadata().location_id,
3517 &out_location,
3518 input_ident,
3519 &receiver_stream_ident,
3520 serialize_pipeline.as_ref(),
3521 sink_expr,
3522 source_expr,
3523 deserialize_pipeline.as_ref(),
3524 *next_stmt_id,
3525 networking_info,
3526 );
3527 }
3528 BuildersOrCallback::Callback(_, node_callback) => {
3529 node_callback(node, next_stmt_id);
3530 }
3531 }
3532
3533 *next_stmt_id += 1;
3534
3535 ident_stack.push(receiver_stream_ident);
3536 }
3537
3538 HydroNode::ExternalInput {
3539 instantiate_fn,
3540 deserialize_fn: deserialize_pipeline,
3541 ..
3542 } => {
3543 let receiver_stream_ident =
3544 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3545
3546 match builders_or_callback {
3547 BuildersOrCallback::Builders(graph_builders) => {
3548 let (_, source_expr) = match instantiate_fn {
3549 DebugInstantiate::Building => (
3550 syn::parse_quote!(DUMMY_SINK),
3551 syn::parse_quote!(DUMMY_SOURCE),
3552 ),
3553
3554 DebugInstantiate::Finalized(finalized) => {
3555 (finalized.sink.clone(), finalized.source.clone())
3556 }
3557 };
3558
3559 graph_builders.create_external_source(
3560 &out_location,
3561 source_expr,
3562 &receiver_stream_ident,
3563 deserialize_pipeline.as_ref(),
3564 *next_stmt_id,
3565 );
3566 }
3567 BuildersOrCallback::Callback(_, node_callback) => {
3568 node_callback(node, next_stmt_id);
3569 }
3570 }
3571
3572 *next_stmt_id += 1;
3573
3574 ident_stack.push(receiver_stream_ident);
3575 }
3576
3577 HydroNode::Counter {
3578 tag,
3579 duration,
3580 prefix,
3581 ..
3582 } => {
3583 let input_ident = ident_stack.pop().unwrap();
3584
3585 let counter_ident =
3586 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3587
3588 match builders_or_callback {
3589 BuildersOrCallback::Builders(graph_builders) => {
3590 let arg = format!("{}({})", prefix, tag);
3591 let builder = graph_builders.get_dfir_mut(&out_location);
3592 builder.add_dfir(
3593 parse_quote! {
3594 #counter_ident = #input_ident -> _counter(#arg, #duration);
3595 },
3596 None,
3597 Some(&next_stmt_id.to_string()),
3598 );
3599 }
3600 BuildersOrCallback::Callback(_, node_callback) => {
3601 node_callback(node, next_stmt_id);
3602 }
3603 }
3604
3605 *next_stmt_id += 1;
3606
3607 ident_stack.push(counter_ident);
3608 }
3609 }
3610 },
3611 seen_tees,
3612 false,
3613 );
3614
3615 ident_stack
3616 .pop()
3617 .expect("ident_stack should have exactly one element after traversal")
3618 }
3619
3620 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
3621 match self {
3622 HydroNode::Placeholder => {
3623 panic!()
3624 }
3625 HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
3626 HydroNode::Source { source, .. } => match source {
3627 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
3628 HydroSource::ExternalNetwork()
3629 | HydroSource::Spin()
3630 | HydroSource::ClusterMembers(_, _)
3631 | HydroSource::Embedded(_) => {} },
3633 HydroNode::SingletonSource { value, .. } => {
3634 transform(value);
3635 }
3636 HydroNode::CycleSource { .. }
3637 | HydroNode::Tee { .. }
3638 | HydroNode::YieldConcat { .. }
3639 | HydroNode::BeginAtomic { .. }
3640 | HydroNode::EndAtomic { .. }
3641 | HydroNode::Batch { .. }
3642 | HydroNode::Chain { .. }
3643 | HydroNode::ChainFirst { .. }
3644 | HydroNode::CrossProduct { .. }
3645 | HydroNode::CrossSingleton { .. }
3646 | HydroNode::ResolveFutures { .. }
3647 | HydroNode::ResolveFuturesOrdered { .. }
3648 | HydroNode::Join { .. }
3649 | HydroNode::Difference { .. }
3650 | HydroNode::AntiJoin { .. }
3651 | HydroNode::DeferTick { .. }
3652 | HydroNode::Enumerate { .. }
3653 | HydroNode::Unique { .. }
3654 | HydroNode::Sort { .. } => {}
3655 HydroNode::Map { f, .. }
3656 | HydroNode::FlatMap { f, .. }
3657 | HydroNode::Filter { f, .. }
3658 | HydroNode::FilterMap { f, .. }
3659 | HydroNode::Inspect { f, .. }
3660 | HydroNode::Reduce { f, .. }
3661 | HydroNode::ReduceKeyed { f, .. }
3662 | HydroNode::ReduceKeyedWatermark { f, .. } => {
3663 transform(f);
3664 }
3665 HydroNode::Fold { init, acc, .. }
3666 | HydroNode::Scan { init, acc, .. }
3667 | HydroNode::FoldKeyed { init, acc, .. } => {
3668 transform(init);
3669 transform(acc);
3670 }
3671 HydroNode::Network {
3672 serialize_fn,
3673 deserialize_fn,
3674 ..
3675 } => {
3676 if let Some(serialize_fn) = serialize_fn {
3677 transform(serialize_fn);
3678 }
3679 if let Some(deserialize_fn) = deserialize_fn {
3680 transform(deserialize_fn);
3681 }
3682 }
3683 HydroNode::ExternalInput { deserialize_fn, .. } => {
3684 if let Some(deserialize_fn) = deserialize_fn {
3685 transform(deserialize_fn);
3686 }
3687 }
3688 HydroNode::Counter { duration, .. } => {
3689 transform(duration);
3690 }
3691 }
3692 }
3693
3694 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
3695 &self.metadata().op
3696 }
3697
3698 pub fn metadata(&self) -> &HydroIrMetadata {
3699 match self {
3700 HydroNode::Placeholder => {
3701 panic!()
3702 }
3703 HydroNode::Cast { metadata, .. } => metadata,
3704 HydroNode::ObserveNonDet { metadata, .. } => metadata,
3705 HydroNode::Source { metadata, .. } => metadata,
3706 HydroNode::SingletonSource { metadata, .. } => metadata,
3707 HydroNode::CycleSource { metadata, .. } => metadata,
3708 HydroNode::Tee { metadata, .. } => metadata,
3709 HydroNode::YieldConcat { metadata, .. } => metadata,
3710 HydroNode::BeginAtomic { metadata, .. } => metadata,
3711 HydroNode::EndAtomic { metadata, .. } => metadata,
3712 HydroNode::Batch { metadata, .. } => metadata,
3713 HydroNode::Chain { metadata, .. } => metadata,
3714 HydroNode::ChainFirst { metadata, .. } => metadata,
3715 HydroNode::CrossProduct { metadata, .. } => metadata,
3716 HydroNode::CrossSingleton { metadata, .. } => metadata,
3717 HydroNode::Join { metadata, .. } => metadata,
3718 HydroNode::Difference { metadata, .. } => metadata,
3719 HydroNode::AntiJoin { metadata, .. } => metadata,
3720 HydroNode::ResolveFutures { metadata, .. } => metadata,
3721 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3722 HydroNode::Map { metadata, .. } => metadata,
3723 HydroNode::FlatMap { metadata, .. } => metadata,
3724 HydroNode::Filter { metadata, .. } => metadata,
3725 HydroNode::FilterMap { metadata, .. } => metadata,
3726 HydroNode::DeferTick { metadata, .. } => metadata,
3727 HydroNode::Enumerate { metadata, .. } => metadata,
3728 HydroNode::Inspect { metadata, .. } => metadata,
3729 HydroNode::Unique { metadata, .. } => metadata,
3730 HydroNode::Sort { metadata, .. } => metadata,
3731 HydroNode::Scan { metadata, .. } => metadata,
3732 HydroNode::Fold { metadata, .. } => metadata,
3733 HydroNode::FoldKeyed { metadata, .. } => metadata,
3734 HydroNode::Reduce { metadata, .. } => metadata,
3735 HydroNode::ReduceKeyed { metadata, .. } => metadata,
3736 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3737 HydroNode::ExternalInput { metadata, .. } => metadata,
3738 HydroNode::Network { metadata, .. } => metadata,
3739 HydroNode::Counter { metadata, .. } => metadata,
3740 }
3741 }
3742
3743 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
3744 &mut self.metadata_mut().op
3745 }
3746
3747 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
3748 match self {
3749 HydroNode::Placeholder => {
3750 panic!()
3751 }
3752 HydroNode::Cast { metadata, .. } => metadata,
3753 HydroNode::ObserveNonDet { metadata, .. } => metadata,
3754 HydroNode::Source { metadata, .. } => metadata,
3755 HydroNode::SingletonSource { metadata, .. } => metadata,
3756 HydroNode::CycleSource { metadata, .. } => metadata,
3757 HydroNode::Tee { metadata, .. } => metadata,
3758 HydroNode::YieldConcat { metadata, .. } => metadata,
3759 HydroNode::BeginAtomic { metadata, .. } => metadata,
3760 HydroNode::EndAtomic { metadata, .. } => metadata,
3761 HydroNode::Batch { metadata, .. } => metadata,
3762 HydroNode::Chain { metadata, .. } => metadata,
3763 HydroNode::ChainFirst { metadata, .. } => metadata,
3764 HydroNode::CrossProduct { metadata, .. } => metadata,
3765 HydroNode::CrossSingleton { metadata, .. } => metadata,
3766 HydroNode::Join { metadata, .. } => metadata,
3767 HydroNode::Difference { metadata, .. } => metadata,
3768 HydroNode::AntiJoin { metadata, .. } => metadata,
3769 HydroNode::ResolveFutures { metadata, .. } => metadata,
3770 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3771 HydroNode::Map { metadata, .. } => metadata,
3772 HydroNode::FlatMap { metadata, .. } => metadata,
3773 HydroNode::Filter { metadata, .. } => metadata,
3774 HydroNode::FilterMap { metadata, .. } => metadata,
3775 HydroNode::DeferTick { metadata, .. } => metadata,
3776 HydroNode::Enumerate { metadata, .. } => metadata,
3777 HydroNode::Inspect { metadata, .. } => metadata,
3778 HydroNode::Unique { metadata, .. } => metadata,
3779 HydroNode::Sort { metadata, .. } => metadata,
3780 HydroNode::Scan { metadata, .. } => metadata,
3781 HydroNode::Fold { metadata, .. } => metadata,
3782 HydroNode::FoldKeyed { metadata, .. } => metadata,
3783 HydroNode::Reduce { metadata, .. } => metadata,
3784 HydroNode::ReduceKeyed { metadata, .. } => metadata,
3785 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3786 HydroNode::ExternalInput { metadata, .. } => metadata,
3787 HydroNode::Network { metadata, .. } => metadata,
3788 HydroNode::Counter { metadata, .. } => metadata,
3789 }
3790 }
3791
3792 pub fn input(&self) -> Vec<&HydroNode> {
3793 match self {
3794 HydroNode::Placeholder => {
3795 panic!()
3796 }
3797 HydroNode::Source { .. }
3798 | HydroNode::SingletonSource { .. }
3799 | HydroNode::ExternalInput { .. }
3800 | HydroNode::CycleSource { .. }
3801 | HydroNode::Tee { .. } => {
3802 vec![]
3804 }
3805 HydroNode::Cast { inner, .. }
3806 | HydroNode::ObserveNonDet { inner, .. }
3807 | HydroNode::YieldConcat { inner, .. }
3808 | HydroNode::BeginAtomic { inner, .. }
3809 | HydroNode::EndAtomic { inner, .. }
3810 | HydroNode::Batch { inner, .. } => {
3811 vec![inner]
3812 }
3813 HydroNode::Chain { first, second, .. } => {
3814 vec![first, second]
3815 }
3816 HydroNode::ChainFirst { first, second, .. } => {
3817 vec![first, second]
3818 }
3819 HydroNode::CrossProduct { left, right, .. }
3820 | HydroNode::CrossSingleton { left, right, .. }
3821 | HydroNode::Join { left, right, .. } => {
3822 vec![left, right]
3823 }
3824 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
3825 vec![pos, neg]
3826 }
3827 HydroNode::Map { input, .. }
3828 | HydroNode::FlatMap { input, .. }
3829 | HydroNode::Filter { input, .. }
3830 | HydroNode::FilterMap { input, .. }
3831 | HydroNode::Sort { input, .. }
3832 | HydroNode::DeferTick { input, .. }
3833 | HydroNode::Enumerate { input, .. }
3834 | HydroNode::Inspect { input, .. }
3835 | HydroNode::Unique { input, .. }
3836 | HydroNode::Network { input, .. }
3837 | HydroNode::Counter { input, .. }
3838 | HydroNode::ResolveFutures { input, .. }
3839 | HydroNode::ResolveFuturesOrdered { input, .. }
3840 | HydroNode::Fold { input, .. }
3841 | HydroNode::FoldKeyed { input, .. }
3842 | HydroNode::Reduce { input, .. }
3843 | HydroNode::ReduceKeyed { input, .. }
3844 | HydroNode::Scan { input, .. } => {
3845 vec![input]
3846 }
3847 HydroNode::ReduceKeyedWatermark {
3848 input, watermark, ..
3849 } => {
3850 vec![input, watermark]
3851 }
3852 }
3853 }
3854
3855 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
3856 self.input()
3857 .iter()
3858 .map(|input_node| input_node.metadata())
3859 .collect()
3860 }
3861
3862 pub fn print_root(&self) -> String {
3863 match self {
3864 HydroNode::Placeholder => {
3865 panic!()
3866 }
3867 HydroNode::Cast { .. } => "Cast()".to_owned(),
3868 HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
3869 HydroNode::Source { source, .. } => format!("Source({:?})", source),
3870 HydroNode::SingletonSource {
3871 value,
3872 first_tick_only,
3873 ..
3874 } => format!(
3875 "SingletonSource({:?}, first_tick_only={})",
3876 value, first_tick_only
3877 ),
3878 HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
3879 HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
3880 HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
3881 HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
3882 HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
3883 HydroNode::Batch { .. } => "Batch()".to_owned(),
3884 HydroNode::Chain { first, second, .. } => {
3885 format!("Chain({}, {})", first.print_root(), second.print_root())
3886 }
3887 HydroNode::ChainFirst { first, second, .. } => {
3888 format!(
3889 "ChainFirst({}, {})",
3890 first.print_root(),
3891 second.print_root()
3892 )
3893 }
3894 HydroNode::CrossProduct { left, right, .. } => {
3895 format!(
3896 "CrossProduct({}, {})",
3897 left.print_root(),
3898 right.print_root()
3899 )
3900 }
3901 HydroNode::CrossSingleton { left, right, .. } => {
3902 format!(
3903 "CrossSingleton({}, {})",
3904 left.print_root(),
3905 right.print_root()
3906 )
3907 }
3908 HydroNode::Join { left, right, .. } => {
3909 format!("Join({}, {})", left.print_root(), right.print_root())
3910 }
3911 HydroNode::Difference { pos, neg, .. } => {
3912 format!("Difference({}, {})", pos.print_root(), neg.print_root())
3913 }
3914 HydroNode::AntiJoin { pos, neg, .. } => {
3915 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
3916 }
3917 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
3918 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
3919 HydroNode::Map { f, .. } => format!("Map({:?})", f),
3920 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
3921 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
3922 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
3923 HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
3924 HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
3925 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
3926 HydroNode::Unique { .. } => "Unique()".to_owned(),
3927 HydroNode::Sort { .. } => "Sort()".to_owned(),
3928 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
3929 HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
3930 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
3931 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
3932 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
3933 HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
3934 HydroNode::Network { .. } => "Network()".to_owned(),
3935 HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
3936 HydroNode::Counter { tag, duration, .. } => {
3937 format!("Counter({:?}, {:?})", tag, duration)
3938 }
3939 }
3940 }
3941}
3942
3943#[cfg(feature = "build")]
3944fn instantiate_network<'a, D>(
3945 env: &mut D::InstantiateEnv,
3946 from_location: &LocationId,
3947 to_location: &LocationId,
3948 processes: &SparseSecondaryMap<LocationKey, D::Process>,
3949 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
3950 name: Option<&str>,
3951 networking_info: &crate::networking::NetworkingInfo,
3952) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
3953where
3954 D: Deploy<'a>,
3955{
3956 let ((sink, source), connect_fn) = match (from_location, to_location) {
3957 (&LocationId::Process(from), &LocationId::Process(to)) => {
3958 let from_node = processes
3959 .get(from)
3960 .unwrap_or_else(|| {
3961 panic!("A process used in the graph was not instantiated: {}", from)
3962 })
3963 .clone();
3964 let to_node = processes
3965 .get(to)
3966 .unwrap_or_else(|| {
3967 panic!("A process used in the graph was not instantiated: {}", to)
3968 })
3969 .clone();
3970
3971 let sink_port = from_node.next_port();
3972 let source_port = to_node.next_port();
3973
3974 (
3975 D::o2o_sink_source(
3976 env,
3977 &from_node,
3978 &sink_port,
3979 &to_node,
3980 &source_port,
3981 name,
3982 networking_info,
3983 ),
3984 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
3985 )
3986 }
3987 (&LocationId::Process(from), &LocationId::Cluster(to)) => {
3988 let from_node = processes
3989 .get(from)
3990 .unwrap_or_else(|| {
3991 panic!("A process used in the graph was not instantiated: {}", from)
3992 })
3993 .clone();
3994 let to_node = clusters
3995 .get(to)
3996 .unwrap_or_else(|| {
3997 panic!("A cluster used in the graph was not instantiated: {}", to)
3998 })
3999 .clone();
4000
4001 let sink_port = from_node.next_port();
4002 let source_port = to_node.next_port();
4003
4004 (
4005 D::o2m_sink_source(
4006 env,
4007 &from_node,
4008 &sink_port,
4009 &to_node,
4010 &source_port,
4011 name,
4012 networking_info,
4013 ),
4014 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
4015 )
4016 }
4017 (&LocationId::Cluster(from), &LocationId::Process(to)) => {
4018 let from_node = clusters
4019 .get(from)
4020 .unwrap_or_else(|| {
4021 panic!("A cluster used in the graph was not instantiated: {}", from)
4022 })
4023 .clone();
4024 let to_node = processes
4025 .get(to)
4026 .unwrap_or_else(|| {
4027 panic!("A process used in the graph was not instantiated: {}", to)
4028 })
4029 .clone();
4030
4031 let sink_port = from_node.next_port();
4032 let source_port = to_node.next_port();
4033
4034 (
4035 D::m2o_sink_source(
4036 env,
4037 &from_node,
4038 &sink_port,
4039 &to_node,
4040 &source_port,
4041 name,
4042 networking_info,
4043 ),
4044 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
4045 )
4046 }
4047 (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
4048 let from_node = clusters
4049 .get(from)
4050 .unwrap_or_else(|| {
4051 panic!("A cluster used in the graph was not instantiated: {}", from)
4052 })
4053 .clone();
4054 let to_node = clusters
4055 .get(to)
4056 .unwrap_or_else(|| {
4057 panic!("A cluster used in the graph was not instantiated: {}", to)
4058 })
4059 .clone();
4060
4061 let sink_port = from_node.next_port();
4062 let source_port = to_node.next_port();
4063
4064 (
4065 D::m2m_sink_source(
4066 env,
4067 &from_node,
4068 &sink_port,
4069 &to_node,
4070 &source_port,
4071 name,
4072 networking_info,
4073 ),
4074 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
4075 )
4076 }
4077 (LocationId::Tick(_, _), _) => panic!(),
4078 (_, LocationId::Tick(_, _)) => panic!(),
4079 (LocationId::Atomic(_), _) => panic!(),
4080 (_, LocationId::Atomic(_)) => panic!(),
4081 };
4082 (sink, source, connect_fn)
4083}
4084
4085#[cfg(test)]
4086mod test {
4087 use std::mem::size_of;
4088
4089 use stageleft::{QuotedWithContext, q};
4090
4091 use super::*;
4092
4093 #[test]
4094 #[cfg_attr(
4095 not(feature = "build"),
4096 ignore = "expects inclusion of feature-gated fields"
4097 )]
4098 fn hydro_node_size() {
4099 assert_eq!(size_of::<HydroNode>(), 248);
4100 }
4101
4102 #[test]
4103 #[cfg_attr(
4104 not(feature = "build"),
4105 ignore = "expects inclusion of feature-gated fields"
4106 )]
4107 fn hydro_root_size() {
4108 assert_eq!(size_of::<HydroRoot>(), 136);
4109 }
4110
4111 #[test]
4112 fn test_simplify_q_macro_basic() {
4113 let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
4115 let result = simplify_q_macro(simple_expr.clone());
4116 assert_eq!(result, simple_expr);
4117 }
4118
4119 #[test]
4120 fn test_simplify_q_macro_actual_stageleft_call() {
4121 let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
4123 let result = simplify_q_macro(stageleft_call);
4124 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4127 }
4128
4129 #[test]
4130 fn test_closure_no_pipe_at_start() {
4131 let stageleft_call = q!({
4133 let foo = 123;
4134 move |b: usize| b + foo
4135 })
4136 .splice_fn1_ctx(&());
4137 let result = simplify_q_macro(stageleft_call);
4138 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4139 }
4140}