1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23use syn::visit::{self, Visit};
24use syn::visit_mut::VisitMut;
25
26#[cfg(feature = "build")]
27use crate::compile::builder::ClockId;
28use crate::compile::builder::{CycleId, ExternalPortId};
29#[cfg(feature = "build")]
30use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
31use crate::location::dynamic::LocationId;
32use crate::location::{LocationKey, NetworkHint};
33
34pub mod backtrace;
35use backtrace::Backtrace;
36
37pub struct ClosureExpr {
43 pub expr: DebugExpr,
44 pub singleton_refs: Vec<(syn::Ident, HydroNode)>,
45}
46
47impl Clone for ClosureExpr {
48 fn clone(&self) -> Self {
49 Self {
50 expr: self.expr.clone(),
51 singleton_refs: self
52 .singleton_refs
53 .iter()
54 .map(|(ident, node)| {
55 let cloned_node = match node {
56 HydroNode::Singleton { inner, metadata } => HydroNode::Singleton {
57 inner: SharedNode(inner.0.clone()),
58 metadata: metadata.clone(),
59 },
60 _ => panic!("singleton_refs should only contain HydroNode::Singleton"),
61 };
62 (ident.clone(), cloned_node)
63 })
64 .collect(),
65 }
66 }
67}
68
69impl Hash for ClosureExpr {
70 fn hash<H: Hasher>(&self, state: &mut H) {
71 self.expr.hash(state);
72 }
76}
77
78impl serde::Serialize for ClosureExpr {
79 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
80 use serde::ser::SerializeStruct;
81 let mut s = serializer.serialize_struct("ClosureExpr", 2)?;
82 s.serialize_field("expr", &self.expr)?;
83 s.serialize_field(
84 "singleton_refs",
85 &SerializableSingletonRefs(&self.singleton_refs),
86 )?;
87 s.end()
88 }
89}
90
91struct SerializableSingletonRefs<'a>(&'a [(syn::Ident, HydroNode)]);
92
93impl serde::Serialize for SerializableSingletonRefs<'_> {
94 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
95 use serde::ser::SerializeSeq;
96 let mut seq = serializer.serialize_seq(Some(self.0.len()))?;
97 for (ident, node) in self.0 {
98 seq.serialize_element(&(ident.to_string(), node))?;
99 }
100 seq.end()
101 }
102}
103
104impl Debug for ClosureExpr {
105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106 Debug::fmt(&self.expr, f)
107 }
108}
109
110impl Display for ClosureExpr {
111 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112 Display::fmt(&self.expr, f)
113 }
114}
115
116impl From<syn::Expr> for ClosureExpr {
117 fn from(expr: syn::Expr) -> Self {
118 Self {
119 expr: DebugExpr(Box::new(expr)),
120 singleton_refs: Vec::new(),
121 }
122 }
123}
124
125impl From<DebugExpr> for ClosureExpr {
126 fn from(expr: DebugExpr) -> Self {
127 Self {
128 expr,
129 singleton_refs: Vec::new(),
130 }
131 }
132}
133
134impl ClosureExpr {
135 pub fn new(expr: DebugExpr, singleton_refs: Vec<(syn::Ident, HydroNode)>) -> Self {
136 Self {
137 expr,
138 singleton_refs,
139 }
140 }
141
142 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> Self {
143 Self {
144 expr: self.expr.clone(),
145 singleton_refs: self
146 .singleton_refs
147 .iter()
148 .map(|(ident, node)| (ident.clone(), node.deep_clone(seen_tees)))
149 .collect(),
150 }
151 }
152
153 pub fn transform_children(
154 &mut self,
155 transform: &mut impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
156 seen_tees: &mut SeenSharedNodes,
157 ) {
158 for (_ident, ref_node) in self.singleton_refs.iter_mut() {
159 transform(ref_node, seen_tees);
160 }
161 }
162
163 #[cfg(feature = "build")]
166 pub fn emit_tokens(&self, ident_stack: &mut Vec<syn::Ident>) -> TokenStream {
167 if self.singleton_refs.is_empty() {
168 self.expr.0.to_token_stream()
169 } else {
170 let ref_idents = (0..self.singleton_refs.len())
171 .map(|_| ident_stack.pop().unwrap())
172 .collect::<Vec<_>>()
173 .into_iter()
174 .rev()
175 .collect::<Vec<_>>();
176 let local_idents = self
177 .singleton_refs
178 .iter()
179 .map(|(local_ident, _)| local_ident);
180 let hash = proc_macro2::Punct::new('#', proc_macro2::Spacing::Alone);
181 let expr = &self.expr.0;
182 quote! {
183 {
184 #(
185 let #local_idents = #hash #ref_idents;
186 )*
187 #expr
188 }
189 }
190 }
191 }
192}
193
194#[derive(Clone, Hash)]
198pub struct DebugExpr(pub Box<syn::Expr>);
199
200impl serde::Serialize for DebugExpr {
201 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
202 serializer.serialize_str(&self.to_string())
203 }
204}
205
206impl From<syn::Expr> for DebugExpr {
207 fn from(expr: syn::Expr) -> Self {
208 Self(Box::new(expr))
209 }
210}
211
212impl Deref for DebugExpr {
213 type Target = syn::Expr;
214
215 fn deref(&self) -> &Self::Target {
216 &self.0
217 }
218}
219
220impl ToTokens for DebugExpr {
221 fn to_tokens(&self, tokens: &mut TokenStream) {
222 self.0.to_tokens(tokens);
223 }
224}
225
226impl Debug for DebugExpr {
227 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
228 write!(f, "{}", self.0.to_token_stream())
229 }
230}
231
232impl Display for DebugExpr {
233 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234 let original = self.0.as_ref().clone();
235 let simplified = simplify_q_macro(original);
236
237 write!(f, "q!({})", quote::quote!(#simplified))
240 }
241}
242
243fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
245 let mut simplifier = QMacroSimplifier::new();
248 simplifier.visit_expr_mut(&mut expr);
249
250 if let Some(simplified) = simplifier.simplified_result {
252 simplified
253 } else {
254 expr
255 }
256}
257
258#[derive(Default)]
260pub struct QMacroSimplifier {
261 pub simplified_result: Option<syn::Expr>,
262}
263
264impl QMacroSimplifier {
265 pub fn new() -> Self {
266 Self::default()
267 }
268}
269
270impl VisitMut for QMacroSimplifier {
271 fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
272 if self.simplified_result.is_some() {
274 return;
275 }
276
277 if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
278 && self.is_stageleft_runtime_support_call(&path_expr.path)
280 && let Some(closure) = self.extract_closure_from_args(&call.args)
282 {
283 self.simplified_result = Some(closure);
284 return;
285 }
286
287 syn::visit_mut::visit_expr_mut(self, expr);
290 }
291}
292
293impl QMacroSimplifier {
294 fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
295 if let Some(last_segment) = path.segments.last() {
297 let fn_name = last_segment.ident.to_string();
298 fn_name.contains("_type_hint")
300 && path.segments.len() > 2
301 && path.segments[0].ident == "stageleft"
302 && path.segments[1].ident == "runtime_support"
303 } else {
304 false
305 }
306 }
307
308 fn extract_closure_from_args(
309 &self,
310 args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
311 ) -> Option<syn::Expr> {
312 for arg in args {
314 if let syn::Expr::Closure(_) = arg {
315 return Some(arg.clone());
316 }
317 if let Some(closure_expr) = self.find_closure_in_expr(arg) {
319 return Some(closure_expr);
320 }
321 }
322 None
323 }
324
325 fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
326 let mut visitor = ClosureFinder {
327 found_closure: None,
328 prefer_inner_blocks: true,
329 };
330 visitor.visit_expr(expr);
331 visitor.found_closure
332 }
333}
334
335struct ClosureFinder {
337 found_closure: Option<syn::Expr>,
338 prefer_inner_blocks: bool,
339}
340
341impl<'ast> Visit<'ast> for ClosureFinder {
342 fn visit_expr(&mut self, expr: &'ast syn::Expr) {
343 if self.found_closure.is_some() {
345 return;
346 }
347
348 match expr {
349 syn::Expr::Closure(_) => {
350 self.found_closure = Some(expr.clone());
351 }
352 syn::Expr::Block(block) if self.prefer_inner_blocks => {
353 for stmt in &block.block.stmts {
355 if let syn::Stmt::Expr(stmt_expr, _) = stmt
356 && let syn::Expr::Block(_) = stmt_expr
357 {
358 let mut inner_visitor = ClosureFinder {
360 found_closure: None,
361 prefer_inner_blocks: false, };
363 inner_visitor.visit_expr(stmt_expr);
364 if inner_visitor.found_closure.is_some() {
365 self.found_closure = Some(stmt_expr.clone());
367 return;
368 }
369 }
370 }
371
372 visit::visit_expr(self, expr);
374
375 if self.found_closure.is_some() {
378 }
380 }
381 _ => {
382 visit::visit_expr(self, expr);
384 }
385 }
386 }
387}
388
389#[derive(Clone, PartialEq, Eq, Hash)]
393pub struct DebugType(pub Box<syn::Type>);
394
395impl From<syn::Type> for DebugType {
396 fn from(t: syn::Type) -> Self {
397 Self(Box::new(t))
398 }
399}
400
401impl Deref for DebugType {
402 type Target = syn::Type;
403
404 fn deref(&self) -> &Self::Target {
405 &self.0
406 }
407}
408
409impl ToTokens for DebugType {
410 fn to_tokens(&self, tokens: &mut TokenStream) {
411 self.0.to_tokens(tokens);
412 }
413}
414
415impl Debug for DebugType {
416 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
417 write!(f, "{}", self.0.to_token_stream())
418 }
419}
420
421impl serde::Serialize for DebugType {
422 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
423 serializer.serialize_str(&format!("{}", self.0.to_token_stream()))
424 }
425}
426
427fn serialize_backtrace_as_span<S: serde::Serializer>(
428 backtrace: &Backtrace,
429 serializer: S,
430) -> Result<S::Ok, S::Error> {
431 match backtrace.format_span() {
432 Some(span) => serializer.serialize_some(&span),
433 None => serializer.serialize_none(),
434 }
435}
436
437fn serialize_ident<S: serde::Serializer>(
438 ident: &syn::Ident,
439 serializer: S,
440) -> Result<S::Ok, S::Error> {
441 serializer.serialize_str(&ident.to_string())
442}
443
444pub enum DebugInstantiate {
445 Building,
446 Finalized(Box<DebugInstantiateFinalized>),
447}
448
449impl serde::Serialize for DebugInstantiate {
450 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
451 match self {
452 DebugInstantiate::Building => {
453 serializer.serialize_unit_variant("DebugInstantiate", 0, "Building")
454 }
455 DebugInstantiate::Finalized(_) => {
456 panic!(
457 "cannot serialize DebugInstantiate::Finalized: contains non-serializable runtime state (closures)"
458 )
459 }
460 }
461 }
462}
463
464#[cfg_attr(
465 not(feature = "build"),
466 expect(
467 dead_code,
468 reason = "sink, source unused without `feature = \"build\"`."
469 )
470)]
471pub struct DebugInstantiateFinalized {
472 sink: syn::Expr,
473 source: syn::Expr,
474 connect_fn: Option<Box<dyn FnOnce()>>,
475}
476
477impl From<DebugInstantiateFinalized> for DebugInstantiate {
478 fn from(f: DebugInstantiateFinalized) -> Self {
479 Self::Finalized(Box::new(f))
480 }
481}
482
483impl Debug for DebugInstantiate {
484 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
485 write!(f, "<network instantiate>")
486 }
487}
488
489impl Hash for DebugInstantiate {
490 fn hash<H: Hasher>(&self, _state: &mut H) {
491 }
493}
494
495impl Clone for DebugInstantiate {
496 fn clone(&self) -> Self {
497 match self {
498 DebugInstantiate::Building => DebugInstantiate::Building,
499 DebugInstantiate::Finalized(_) => {
500 panic!("DebugInstantiate::Finalized should not be cloned")
501 }
502 }
503 }
504}
505
506#[derive(Debug, Hash, Clone, serde::Serialize)]
515pub enum ClusterMembersState {
516 Uninit,
518 Stream(DebugExpr),
521 Tee(LocationId, LocationId),
525}
526
527#[derive(Debug, Hash, Clone, serde::Serialize)]
529pub enum HydroSource {
530 Stream(DebugExpr),
531 ExternalNetwork(),
532 Iter(DebugExpr),
533 Spin(),
534 ClusterMembers(LocationId, ClusterMembersState),
535 Embedded(#[serde(serialize_with = "serialize_ident")] syn::Ident),
536 EmbeddedSingleton(#[serde(serialize_with = "serialize_ident")] syn::Ident),
537}
538
539#[cfg(feature = "build")]
540pub trait DfirBuilder {
546 fn singleton_intermediates(&self) -> bool;
548
549 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
551
552 #[expect(clippy::too_many_arguments, reason = "TODO")]
553 fn batch(
554 &mut self,
555 in_ident: syn::Ident,
556 in_location: &LocationId,
557 in_kind: &CollectionKind,
558 out_ident: &syn::Ident,
559 out_location: &LocationId,
560 op_meta: &HydroIrOpMetadata,
561 fold_hooked_idents: &HashSet<String>,
562 );
563 fn yield_from_tick(
564 &mut self,
565 in_ident: syn::Ident,
566 in_location: &LocationId,
567 in_kind: &CollectionKind,
568 out_ident: &syn::Ident,
569 out_location: &LocationId,
570 );
571
572 fn begin_atomic(
573 &mut self,
574 in_ident: syn::Ident,
575 in_location: &LocationId,
576 in_kind: &CollectionKind,
577 out_ident: &syn::Ident,
578 out_location: &LocationId,
579 op_meta: &HydroIrOpMetadata,
580 );
581 fn end_atomic(
582 &mut self,
583 in_ident: syn::Ident,
584 in_location: &LocationId,
585 in_kind: &CollectionKind,
586 out_ident: &syn::Ident,
587 );
588
589 #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
590 fn observe_nondet(
591 &mut self,
592 trusted: bool,
593 location: &LocationId,
594 in_ident: syn::Ident,
595 in_kind: &CollectionKind,
596 out_ident: &syn::Ident,
597 out_kind: &CollectionKind,
598 op_meta: &HydroIrOpMetadata,
599 );
600
601 #[expect(clippy::too_many_arguments, reason = "TODO")]
602 fn merge_ordered(
603 &mut self,
604 location: &LocationId,
605 first_ident: syn::Ident,
606 second_ident: syn::Ident,
607 out_ident: &syn::Ident,
608 in_kind: &CollectionKind,
609 op_meta: &HydroIrOpMetadata,
610 operator_tag: Option<&str>,
611 );
612
613 #[expect(clippy::too_many_arguments, reason = "TODO")]
614 fn create_network(
615 &mut self,
616 from: &LocationId,
617 to: &LocationId,
618 input_ident: syn::Ident,
619 out_ident: &syn::Ident,
620 serialize: Option<&DebugExpr>,
621 sink: syn::Expr,
622 source: syn::Expr,
623 deserialize: Option<&DebugExpr>,
624 tag_id: usize,
625 networking_info: &crate::networking::NetworkingInfo,
626 );
627
628 fn create_external_source(
629 &mut self,
630 on: &LocationId,
631 source_expr: syn::Expr,
632 out_ident: &syn::Ident,
633 deserialize: Option<&DebugExpr>,
634 tag_id: usize,
635 );
636
637 fn create_external_output(
638 &mut self,
639 on: &LocationId,
640 sink_expr: syn::Expr,
641 input_ident: &syn::Ident,
642 serialize: Option<&DebugExpr>,
643 tag_id: usize,
644 );
645
646 fn emit_fold_hook(
649 &mut self,
650 location: &LocationId,
651 in_ident: &syn::Ident,
652 in_kind: &CollectionKind,
653 op_meta: &HydroIrOpMetadata,
654 ) -> Option<syn::Ident>;
655}
656
657#[cfg(feature = "build")]
658impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
659 fn singleton_intermediates(&self) -> bool {
660 false
661 }
662
663 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
664 self.entry(location.root().key())
665 .expect("location was removed")
666 .or_default()
667 }
668
669 fn batch(
670 &mut self,
671 in_ident: syn::Ident,
672 in_location: &LocationId,
673 in_kind: &CollectionKind,
674 out_ident: &syn::Ident,
675 _out_location: &LocationId,
676 _op_meta: &HydroIrOpMetadata,
677 _fold_hooked_idents: &HashSet<String>,
678 ) {
679 let builder = self.get_dfir_mut(in_location.root());
680 if in_kind.is_bounded()
681 && matches!(
682 in_kind,
683 CollectionKind::Singleton { .. }
684 | CollectionKind::Optional { .. }
685 | CollectionKind::KeyedSingleton { .. }
686 )
687 {
688 assert!(in_location.is_top_level());
689 builder.add_dfir(
690 parse_quote! {
691 #out_ident = #in_ident -> persist::<'static>();
692 },
693 None,
694 None,
695 );
696 } else {
697 builder.add_dfir(
698 parse_quote! {
699 #out_ident = #in_ident;
700 },
701 None,
702 None,
703 );
704 }
705 }
706
707 fn yield_from_tick(
708 &mut self,
709 in_ident: syn::Ident,
710 in_location: &LocationId,
711 _in_kind: &CollectionKind,
712 out_ident: &syn::Ident,
713 _out_location: &LocationId,
714 ) {
715 let builder = self.get_dfir_mut(in_location.root());
716 builder.add_dfir(
717 parse_quote! {
718 #out_ident = #in_ident;
719 },
720 None,
721 None,
722 );
723 }
724
725 fn begin_atomic(
726 &mut self,
727 in_ident: syn::Ident,
728 in_location: &LocationId,
729 _in_kind: &CollectionKind,
730 out_ident: &syn::Ident,
731 _out_location: &LocationId,
732 _op_meta: &HydroIrOpMetadata,
733 ) {
734 let builder = self.get_dfir_mut(in_location.root());
735 builder.add_dfir(
736 parse_quote! {
737 #out_ident = #in_ident;
738 },
739 None,
740 None,
741 );
742 }
743
744 fn end_atomic(
745 &mut self,
746 in_ident: syn::Ident,
747 in_location: &LocationId,
748 _in_kind: &CollectionKind,
749 out_ident: &syn::Ident,
750 ) {
751 let builder = self.get_dfir_mut(in_location.root());
752 builder.add_dfir(
753 parse_quote! {
754 #out_ident = #in_ident;
755 },
756 None,
757 None,
758 );
759 }
760
761 fn observe_nondet(
762 &mut self,
763 _trusted: bool,
764 location: &LocationId,
765 in_ident: syn::Ident,
766 _in_kind: &CollectionKind,
767 out_ident: &syn::Ident,
768 _out_kind: &CollectionKind,
769 _op_meta: &HydroIrOpMetadata,
770 ) {
771 let builder = self.get_dfir_mut(location);
772 builder.add_dfir(
773 parse_quote! {
774 #out_ident = #in_ident;
775 },
776 None,
777 None,
778 );
779 }
780
781 fn merge_ordered(
782 &mut self,
783 location: &LocationId,
784 first_ident: syn::Ident,
785 second_ident: syn::Ident,
786 out_ident: &syn::Ident,
787 _in_kind: &CollectionKind,
788 _op_meta: &HydroIrOpMetadata,
789 operator_tag: Option<&str>,
790 ) {
791 let builder = self.get_dfir_mut(location);
792 builder.add_dfir(
793 parse_quote! {
794 #out_ident = union();
795 #first_ident -> [0]#out_ident;
796 #second_ident -> [1]#out_ident;
797 },
798 None,
799 operator_tag,
800 );
801 }
802
803 fn create_network(
804 &mut self,
805 from: &LocationId,
806 to: &LocationId,
807 input_ident: syn::Ident,
808 out_ident: &syn::Ident,
809 serialize: Option<&DebugExpr>,
810 sink: syn::Expr,
811 source: syn::Expr,
812 deserialize: Option<&DebugExpr>,
813 tag_id: usize,
814 _networking_info: &crate::networking::NetworkingInfo,
815 ) {
816 let sender_builder = self.get_dfir_mut(from);
817 if let Some(serialize_pipeline) = serialize {
818 sender_builder.add_dfir(
819 parse_quote! {
820 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
821 },
822 None,
823 Some(&format!("send{}", tag_id)),
825 );
826 } else {
827 sender_builder.add_dfir(
828 parse_quote! {
829 #input_ident -> dest_sink(#sink);
830 },
831 None,
832 Some(&format!("send{}", tag_id)),
833 );
834 }
835
836 let receiver_builder = self.get_dfir_mut(to);
837 if let Some(deserialize_pipeline) = deserialize {
838 receiver_builder.add_dfir(
839 parse_quote! {
840 #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
841 },
842 None,
843 Some(&format!("recv{}", tag_id)),
844 );
845 } else {
846 receiver_builder.add_dfir(
847 parse_quote! {
848 #out_ident = source_stream(#source);
849 },
850 None,
851 Some(&format!("recv{}", tag_id)),
852 );
853 }
854 }
855
856 fn create_external_source(
857 &mut self,
858 on: &LocationId,
859 source_expr: syn::Expr,
860 out_ident: &syn::Ident,
861 deserialize: Option<&DebugExpr>,
862 tag_id: usize,
863 ) {
864 let receiver_builder = self.get_dfir_mut(on);
865 if let Some(deserialize_pipeline) = deserialize {
866 receiver_builder.add_dfir(
867 parse_quote! {
868 #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
869 },
870 None,
871 Some(&format!("recv{}", tag_id)),
872 );
873 } else {
874 receiver_builder.add_dfir(
875 parse_quote! {
876 #out_ident = source_stream(#source_expr);
877 },
878 None,
879 Some(&format!("recv{}", tag_id)),
880 );
881 }
882 }
883
884 fn create_external_output(
885 &mut self,
886 on: &LocationId,
887 sink_expr: syn::Expr,
888 input_ident: &syn::Ident,
889 serialize: Option<&DebugExpr>,
890 tag_id: usize,
891 ) {
892 let sender_builder = self.get_dfir_mut(on);
893 if let Some(serialize_fn) = serialize {
894 sender_builder.add_dfir(
895 parse_quote! {
896 #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
897 },
898 None,
899 Some(&format!("send{}", tag_id)),
901 );
902 } else {
903 sender_builder.add_dfir(
904 parse_quote! {
905 #input_ident -> dest_sink(#sink_expr);
906 },
907 None,
908 Some(&format!("send{}", tag_id)),
909 );
910 }
911 }
912
913 fn emit_fold_hook(
914 &mut self,
915 _location: &LocationId,
916 _in_ident: &syn::Ident,
917 _in_kind: &CollectionKind,
918 _op_meta: &HydroIrOpMetadata,
919 ) -> Option<syn::Ident> {
920 None
921 }
922}
923
924#[cfg(feature = "build")]
925pub enum BuildersOrCallback<'a, L, N>
926where
927 L: FnMut(&mut HydroRoot, &mut usize),
928 N: FnMut(&mut HydroNode, &mut usize),
929{
930 Builders(&'a mut dyn DfirBuilder),
931 Callback(L, N),
932}
933
934#[derive(Debug, Hash, serde::Serialize)]
938pub enum HydroRoot {
939 ForEach {
940 f: DebugExpr,
941 input: Box<HydroNode>,
942 op_metadata: HydroIrOpMetadata,
943 },
944 SendExternal {
945 to_external_key: LocationKey,
946 to_port_id: ExternalPortId,
947 to_many: bool,
948 unpaired: bool,
949 serialize_fn: Option<DebugExpr>,
950 instantiate_fn: DebugInstantiate,
951 input: Box<HydroNode>,
952 op_metadata: HydroIrOpMetadata,
953 },
954 DestSink {
955 sink: DebugExpr,
956 input: Box<HydroNode>,
957 op_metadata: HydroIrOpMetadata,
958 },
959 CycleSink {
960 cycle_id: CycleId,
961 input: Box<HydroNode>,
962 op_metadata: HydroIrOpMetadata,
963 },
964 EmbeddedOutput {
965 #[serde(serialize_with = "serialize_ident")]
966 ident: syn::Ident,
967 input: Box<HydroNode>,
968 op_metadata: HydroIrOpMetadata,
969 },
970 Null {
971 input: Box<HydroNode>,
972 op_metadata: HydroIrOpMetadata,
973 },
974}
975
976impl HydroRoot {
977 #[cfg(feature = "build")]
978 #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
979 pub fn compile_network<'a, D>(
980 &mut self,
981 extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
982 seen_tees: &mut SeenSharedNodes,
983 seen_cluster_members: &mut HashSet<(LocationId, LocationId)>,
984 processes: &SparseSecondaryMap<LocationKey, D::Process>,
985 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
986 externals: &SparseSecondaryMap<LocationKey, D::External>,
987 env: &mut D::InstantiateEnv,
988 ) where
989 D: Deploy<'a>,
990 {
991 let refcell_extra_stmts = RefCell::new(extra_stmts);
992 let refcell_env = RefCell::new(env);
993 let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
994 self.transform_bottom_up(
995 &mut |l| {
996 if let HydroRoot::SendExternal {
997 input,
998 to_external_key,
999 to_port_id,
1000 to_many,
1001 unpaired,
1002 instantiate_fn,
1003 ..
1004 } = l
1005 {
1006 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
1007 DebugInstantiate::Building => {
1008 let to_node = externals
1009 .get(*to_external_key)
1010 .unwrap_or_else(|| {
1011 panic!("A external used in the graph was not instantiated: {}", to_external_key)
1012 })
1013 .clone();
1014
1015 match input.metadata().location_id.root() {
1016 &LocationId::Process(process_key) => {
1017 if *to_many {
1018 (
1019 (
1020 D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
1021 parse_quote!(DUMMY),
1022 ),
1023 Box::new(|| {}) as Box<dyn FnOnce()>,
1024 )
1025 } else {
1026 let from_node = processes
1027 .get(process_key)
1028 .unwrap_or_else(|| {
1029 panic!("A process used in the graph was not instantiated: {}", process_key)
1030 })
1031 .clone();
1032
1033 let sink_port = from_node.next_port();
1034 let source_port = to_node.next_port();
1035
1036 if *unpaired {
1037 use stageleft::quote_type;
1038 use tokio_util::codec::LengthDelimitedCodec;
1039
1040 to_node.register(*to_port_id, source_port.clone());
1041
1042 let _ = D::e2o_source(
1043 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1044 &to_node, &source_port,
1045 &from_node, &sink_port,
1046 "e_type::<LengthDelimitedCodec>(),
1047 format!("{}_{}", *to_external_key, *to_port_id)
1048 );
1049 }
1050
1051 (
1052 (
1053 D::o2e_sink(
1054 &from_node,
1055 &sink_port,
1056 &to_node,
1057 &source_port,
1058 format!("{}_{}", *to_external_key, *to_port_id)
1059 ),
1060 parse_quote!(DUMMY),
1061 ),
1062 if *unpaired {
1063 D::e2o_connect(
1064 &to_node,
1065 &source_port,
1066 &from_node,
1067 &sink_port,
1068 *to_many,
1069 NetworkHint::Auto,
1070 )
1071 } else {
1072 Box::new(|| {}) as Box<dyn FnOnce()>
1073 },
1074 )
1075 }
1076 }
1077 LocationId::Cluster(cluster_key) => {
1078 let from_node = clusters
1079 .get(*cluster_key)
1080 .unwrap_or_else(|| {
1081 panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1082 })
1083 .clone();
1084
1085 let sink_port = from_node.next_port();
1086 let source_port = to_node.next_port();
1087
1088 if *unpaired {
1089 to_node.register(*to_port_id, source_port.clone());
1090 }
1091
1092 (
1093 (
1094 D::m2e_sink(
1095 &from_node,
1096 &sink_port,
1097 &to_node,
1098 &source_port,
1099 format!("{}_{}", *to_external_key, *to_port_id)
1100 ),
1101 parse_quote!(DUMMY),
1102 ),
1103 Box::new(|| {}) as Box<dyn FnOnce()>,
1104 )
1105 }
1106 _ => panic!()
1107 }
1108 },
1109
1110 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1111 };
1112
1113 *instantiate_fn = DebugInstantiateFinalized {
1114 sink: sink_expr,
1115 source: source_expr,
1116 connect_fn: Some(connect_fn),
1117 }
1118 .into();
1119 } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
1120 let element_type = match &input.metadata().collection_kind {
1121 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1122 _ => panic!("Embedded output must have Stream collection kind"),
1123 };
1124 let location_key = match input.metadata().location_id.root() {
1125 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1126 _ => panic!("Embedded output must be on a process or cluster"),
1127 };
1128 D::register_embedded_output(
1129 &mut refcell_env.borrow_mut(),
1130 location_key,
1131 ident,
1132 &element_type,
1133 );
1134 }
1135 },
1136 &mut |n| {
1137 if let HydroNode::Network {
1138 name,
1139 networking_info,
1140 input,
1141 instantiate_fn,
1142 metadata,
1143 ..
1144 } = n
1145 {
1146 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
1147 DebugInstantiate::Building => instantiate_network::<D>(
1148 &mut refcell_env.borrow_mut(),
1149 input.metadata().location_id.root(),
1150 metadata.location_id.root(),
1151 processes,
1152 clusters,
1153 name.as_deref(),
1154 networking_info,
1155 ),
1156
1157 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1158 };
1159
1160 *instantiate_fn = DebugInstantiateFinalized {
1161 sink: sink_expr,
1162 source: source_expr,
1163 connect_fn: Some(connect_fn),
1164 }
1165 .into();
1166 } else if let HydroNode::ExternalInput {
1167 from_external_key,
1168 from_port_id,
1169 from_many,
1170 codec_type,
1171 port_hint,
1172 instantiate_fn,
1173 metadata,
1174 ..
1175 } = n
1176 {
1177 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
1178 DebugInstantiate::Building => {
1179 let from_node = externals
1180 .get(*from_external_key)
1181 .unwrap_or_else(|| {
1182 panic!(
1183 "A external used in the graph was not instantiated: {}",
1184 from_external_key,
1185 )
1186 })
1187 .clone();
1188
1189 match metadata.location_id.root() {
1190 &LocationId::Process(process_key) => {
1191 let to_node = processes
1192 .get(process_key)
1193 .unwrap_or_else(|| {
1194 panic!("A process used in the graph was not instantiated: {}", process_key)
1195 })
1196 .clone();
1197
1198 let sink_port = from_node.next_port();
1199 let source_port = to_node.next_port();
1200
1201 from_node.register(*from_port_id, sink_port.clone());
1202
1203 (
1204 (
1205 parse_quote!(DUMMY),
1206 if *from_many {
1207 D::e2o_many_source(
1208 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1209 &to_node, &source_port,
1210 codec_type.0.as_ref(),
1211 format!("{}_{}", *from_external_key, *from_port_id)
1212 )
1213 } else {
1214 D::e2o_source(
1215 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1216 &from_node, &sink_port,
1217 &to_node, &source_port,
1218 codec_type.0.as_ref(),
1219 format!("{}_{}", *from_external_key, *from_port_id)
1220 )
1221 },
1222 ),
1223 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
1224 )
1225 }
1226 LocationId::Cluster(cluster_key) => {
1227 let to_node = clusters
1228 .get(*cluster_key)
1229 .unwrap_or_else(|| {
1230 panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1231 })
1232 .clone();
1233
1234 let sink_port = from_node.next_port();
1235 let source_port = to_node.next_port();
1236
1237 from_node.register(*from_port_id, sink_port.clone());
1238
1239 (
1240 (
1241 parse_quote!(DUMMY),
1242 D::e2m_source(
1243 refcell_extra_stmts.borrow_mut().entry(*cluster_key).expect("location was removed").or_default(),
1244 &from_node, &sink_port,
1245 &to_node, &source_port,
1246 codec_type.0.as_ref(),
1247 format!("{}_{}", *from_external_key, *from_port_id)
1248 ),
1249 ),
1250 D::e2m_connect(&from_node, &sink_port, &to_node, &source_port, *port_hint),
1251 )
1252 }
1253 _ => panic!()
1254 }
1255 },
1256
1257 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1258 };
1259
1260 *instantiate_fn = DebugInstantiateFinalized {
1261 sink: sink_expr,
1262 source: source_expr,
1263 connect_fn: Some(connect_fn),
1264 }
1265 .into();
1266 } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
1267 let element_type = match &metadata.collection_kind {
1268 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1269 _ => panic!("Embedded source must have Stream collection kind"),
1270 };
1271 let location_key = match metadata.location_id.root() {
1272 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1273 _ => panic!("Embedded source must be on a process or cluster"),
1274 };
1275 D::register_embedded_stream_input(
1276 &mut refcell_env.borrow_mut(),
1277 location_key,
1278 ident,
1279 &element_type,
1280 );
1281 } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
1282 let element_type = match &metadata.collection_kind {
1283 CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
1284 _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
1285 };
1286 let location_key = match metadata.location_id.root() {
1287 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1288 _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
1289 };
1290 D::register_embedded_singleton_input(
1291 &mut refcell_env.borrow_mut(),
1292 location_key,
1293 ident,
1294 &element_type,
1295 );
1296 } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
1297 match state {
1298 ClusterMembersState::Uninit => {
1299 let at_location = metadata.location_id.root().clone();
1300 let key = (at_location.clone(), LocationId::Cluster(location_id.key()));
1301 if refcell_seen_cluster_members.borrow_mut().insert(key) {
1302 let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
1304 D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
1305 &(),
1306 );
1307 *state = ClusterMembersState::Stream(expr.into());
1308 } else {
1309 *state = ClusterMembersState::Tee(at_location, location_id.clone());
1311 }
1312 }
1313 ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
1314 panic!("cluster members already finalized");
1315 }
1316 }
1317 }
1318 },
1319 seen_tees,
1320 false,
1321 );
1322 }
1323
1324 pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1325 self.transform_bottom_up(
1326 &mut |l| {
1327 if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1328 match instantiate_fn {
1329 DebugInstantiate::Building => panic!("network not built"),
1330
1331 DebugInstantiate::Finalized(finalized) => {
1332 (finalized.connect_fn.take().unwrap())();
1333 }
1334 }
1335 }
1336 },
1337 &mut |n| {
1338 if let HydroNode::Network { instantiate_fn, .. }
1339 | HydroNode::ExternalInput { instantiate_fn, .. } = n
1340 {
1341 match instantiate_fn {
1342 DebugInstantiate::Building => panic!("network not built"),
1343
1344 DebugInstantiate::Finalized(finalized) => {
1345 (finalized.connect_fn.take().unwrap())();
1346 }
1347 }
1348 }
1349 },
1350 seen_tees,
1351 false,
1352 );
1353 }
1354
1355 pub fn transform_bottom_up(
1356 &mut self,
1357 transform_root: &mut impl FnMut(&mut HydroRoot),
1358 transform_node: &mut impl FnMut(&mut HydroNode),
1359 seen_tees: &mut SeenSharedNodes,
1360 check_well_formed: bool,
1361 ) {
1362 self.transform_children(
1363 |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1364 seen_tees,
1365 );
1366
1367 transform_root(self);
1368 }
1369
1370 pub fn transform_children(
1371 &mut self,
1372 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1373 seen_tees: &mut SeenSharedNodes,
1374 ) {
1375 match self {
1376 HydroRoot::ForEach { input, .. }
1377 | HydroRoot::SendExternal { input, .. }
1378 | HydroRoot::DestSink { input, .. }
1379 | HydroRoot::CycleSink { input, .. }
1380 | HydroRoot::EmbeddedOutput { input, .. }
1381 | HydroRoot::Null { input, .. } => {
1382 transform(input, seen_tees);
1383 }
1384 }
1385 }
1386
1387 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1388 match self {
1389 HydroRoot::ForEach {
1390 f,
1391 input,
1392 op_metadata,
1393 } => HydroRoot::ForEach {
1394 f: f.clone(),
1395 input: Box::new(input.deep_clone(seen_tees)),
1396 op_metadata: op_metadata.clone(),
1397 },
1398 HydroRoot::SendExternal {
1399 to_external_key,
1400 to_port_id,
1401 to_many,
1402 unpaired,
1403 serialize_fn,
1404 instantiate_fn,
1405 input,
1406 op_metadata,
1407 } => HydroRoot::SendExternal {
1408 to_external_key: *to_external_key,
1409 to_port_id: *to_port_id,
1410 to_many: *to_many,
1411 unpaired: *unpaired,
1412 serialize_fn: serialize_fn.clone(),
1413 instantiate_fn: instantiate_fn.clone(),
1414 input: Box::new(input.deep_clone(seen_tees)),
1415 op_metadata: op_metadata.clone(),
1416 },
1417 HydroRoot::DestSink {
1418 sink,
1419 input,
1420 op_metadata,
1421 } => HydroRoot::DestSink {
1422 sink: sink.clone(),
1423 input: Box::new(input.deep_clone(seen_tees)),
1424 op_metadata: op_metadata.clone(),
1425 },
1426 HydroRoot::CycleSink {
1427 cycle_id,
1428 input,
1429 op_metadata,
1430 } => HydroRoot::CycleSink {
1431 cycle_id: *cycle_id,
1432 input: Box::new(input.deep_clone(seen_tees)),
1433 op_metadata: op_metadata.clone(),
1434 },
1435 HydroRoot::EmbeddedOutput {
1436 ident,
1437 input,
1438 op_metadata,
1439 } => HydroRoot::EmbeddedOutput {
1440 ident: ident.clone(),
1441 input: Box::new(input.deep_clone(seen_tees)),
1442 op_metadata: op_metadata.clone(),
1443 },
1444 HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1445 input: Box::new(input.deep_clone(seen_tees)),
1446 op_metadata: op_metadata.clone(),
1447 },
1448 }
1449 }
1450
1451 #[cfg(feature = "build")]
1452 pub fn emit(
1453 &mut self,
1454 graph_builders: &mut dyn DfirBuilder,
1455 seen_tees: &mut SeenSharedNodes,
1456 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1457 next_stmt_id: &mut usize,
1458 fold_hooked_idents: &mut HashSet<String>,
1459 ) {
1460 self.emit_core(
1461 &mut BuildersOrCallback::<
1462 fn(&mut HydroRoot, &mut usize),
1463 fn(&mut HydroNode, &mut usize),
1464 >::Builders(graph_builders),
1465 seen_tees,
1466 built_tees,
1467 next_stmt_id,
1468 fold_hooked_idents,
1469 );
1470 }
1471
1472 #[cfg(feature = "build")]
1473 pub fn emit_core(
1474 &mut self,
1475 builders_or_callback: &mut BuildersOrCallback<
1476 impl FnMut(&mut HydroRoot, &mut usize),
1477 impl FnMut(&mut HydroNode, &mut usize),
1478 >,
1479 seen_tees: &mut SeenSharedNodes,
1480 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1481 next_stmt_id: &mut usize,
1482 fold_hooked_idents: &mut HashSet<String>,
1483 ) {
1484 match self {
1485 HydroRoot::ForEach { f, input, .. } => {
1486 let input_ident = input.emit_core(
1487 builders_or_callback,
1488 seen_tees,
1489 built_tees,
1490 next_stmt_id,
1491 fold_hooked_idents,
1492 );
1493
1494 match builders_or_callback {
1495 BuildersOrCallback::Builders(graph_builders) => {
1496 graph_builders
1497 .get_dfir_mut(&input.metadata().location_id)
1498 .add_dfir(
1499 parse_quote! {
1500 #input_ident -> for_each(#f);
1501 },
1502 None,
1503 Some(&next_stmt_id.to_string()),
1504 );
1505 }
1506 BuildersOrCallback::Callback(leaf_callback, _) => {
1507 leaf_callback(self, next_stmt_id);
1508 }
1509 }
1510
1511 *next_stmt_id += 1;
1512 }
1513
1514 HydroRoot::SendExternal {
1515 serialize_fn,
1516 instantiate_fn,
1517 input,
1518 ..
1519 } => {
1520 let input_ident = input.emit_core(
1521 builders_or_callback,
1522 seen_tees,
1523 built_tees,
1524 next_stmt_id,
1525 fold_hooked_idents,
1526 );
1527
1528 match builders_or_callback {
1529 BuildersOrCallback::Builders(graph_builders) => {
1530 let (sink_expr, _) = match instantiate_fn {
1531 DebugInstantiate::Building => (
1532 syn::parse_quote!(DUMMY_SINK),
1533 syn::parse_quote!(DUMMY_SOURCE),
1534 ),
1535
1536 DebugInstantiate::Finalized(finalized) => {
1537 (finalized.sink.clone(), finalized.source.clone())
1538 }
1539 };
1540
1541 graph_builders.create_external_output(
1542 &input.metadata().location_id,
1543 sink_expr,
1544 &input_ident,
1545 serialize_fn.as_ref(),
1546 *next_stmt_id,
1547 );
1548 }
1549 BuildersOrCallback::Callback(leaf_callback, _) => {
1550 leaf_callback(self, next_stmt_id);
1551 }
1552 }
1553
1554 *next_stmt_id += 1;
1555 }
1556
1557 HydroRoot::DestSink { sink, input, .. } => {
1558 let input_ident = input.emit_core(
1559 builders_or_callback,
1560 seen_tees,
1561 built_tees,
1562 next_stmt_id,
1563 fold_hooked_idents,
1564 );
1565
1566 match builders_or_callback {
1567 BuildersOrCallback::Builders(graph_builders) => {
1568 graph_builders
1569 .get_dfir_mut(&input.metadata().location_id)
1570 .add_dfir(
1571 parse_quote! {
1572 #input_ident -> dest_sink(#sink);
1573 },
1574 None,
1575 Some(&next_stmt_id.to_string()),
1576 );
1577 }
1578 BuildersOrCallback::Callback(leaf_callback, _) => {
1579 leaf_callback(self, next_stmt_id);
1580 }
1581 }
1582
1583 *next_stmt_id += 1;
1584 }
1585
1586 HydroRoot::CycleSink {
1587 cycle_id, input, ..
1588 } => {
1589 let input_ident = input.emit_core(
1590 builders_or_callback,
1591 seen_tees,
1592 built_tees,
1593 next_stmt_id,
1594 fold_hooked_idents,
1595 );
1596
1597 match builders_or_callback {
1598 BuildersOrCallback::Builders(graph_builders) => {
1599 let elem_type: syn::Type = match &input.metadata().collection_kind {
1600 CollectionKind::KeyedSingleton {
1601 key_type,
1602 value_type,
1603 ..
1604 }
1605 | CollectionKind::KeyedStream {
1606 key_type,
1607 value_type,
1608 ..
1609 } => {
1610 parse_quote!((#key_type, #value_type))
1611 }
1612 CollectionKind::Stream { element_type, .. }
1613 | CollectionKind::Singleton { element_type, .. }
1614 | CollectionKind::Optional { element_type, .. } => {
1615 parse_quote!(#element_type)
1616 }
1617 };
1618
1619 let cycle_id_ident = cycle_id.as_ident();
1620 graph_builders
1621 .get_dfir_mut(&input.metadata().location_id)
1622 .add_dfir(
1623 parse_quote! {
1624 #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1625 },
1626 None,
1627 None,
1628 );
1629 }
1630 BuildersOrCallback::Callback(_, _) => {}
1632 }
1633 }
1634
1635 HydroRoot::EmbeddedOutput { ident, input, .. } => {
1636 let input_ident = input.emit_core(
1637 builders_or_callback,
1638 seen_tees,
1639 built_tees,
1640 next_stmt_id,
1641 fold_hooked_idents,
1642 );
1643
1644 match builders_or_callback {
1645 BuildersOrCallback::Builders(graph_builders) => {
1646 graph_builders
1647 .get_dfir_mut(&input.metadata().location_id)
1648 .add_dfir(
1649 parse_quote! {
1650 #input_ident -> for_each(&mut #ident);
1651 },
1652 None,
1653 Some(&next_stmt_id.to_string()),
1654 );
1655 }
1656 BuildersOrCallback::Callback(leaf_callback, _) => {
1657 leaf_callback(self, next_stmt_id);
1658 }
1659 }
1660
1661 *next_stmt_id += 1;
1662 }
1663
1664 HydroRoot::Null { input, .. } => {
1665 let input_ident = input.emit_core(
1666 builders_or_callback,
1667 seen_tees,
1668 built_tees,
1669 next_stmt_id,
1670 fold_hooked_idents,
1671 );
1672
1673 match builders_or_callback {
1674 BuildersOrCallback::Builders(graph_builders) => {
1675 graph_builders
1676 .get_dfir_mut(&input.metadata().location_id)
1677 .add_dfir(
1678 parse_quote! {
1679 #input_ident -> for_each(|_| {});
1680 },
1681 None,
1682 Some(&next_stmt_id.to_string()),
1683 );
1684 }
1685 BuildersOrCallback::Callback(leaf_callback, _) => {
1686 leaf_callback(self, next_stmt_id);
1687 }
1688 }
1689
1690 *next_stmt_id += 1;
1691 }
1692 }
1693 }
1694
1695 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1696 match self {
1697 HydroRoot::ForEach { op_metadata, .. }
1698 | HydroRoot::SendExternal { op_metadata, .. }
1699 | HydroRoot::DestSink { op_metadata, .. }
1700 | HydroRoot::CycleSink { op_metadata, .. }
1701 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1702 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1703 }
1704 }
1705
1706 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1707 match self {
1708 HydroRoot::ForEach { op_metadata, .. }
1709 | HydroRoot::SendExternal { op_metadata, .. }
1710 | HydroRoot::DestSink { op_metadata, .. }
1711 | HydroRoot::CycleSink { op_metadata, .. }
1712 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1713 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1714 }
1715 }
1716
1717 pub fn input(&self) -> &HydroNode {
1718 match self {
1719 HydroRoot::ForEach { input, .. }
1720 | HydroRoot::SendExternal { input, .. }
1721 | HydroRoot::DestSink { input, .. }
1722 | HydroRoot::CycleSink { input, .. }
1723 | HydroRoot::EmbeddedOutput { input, .. }
1724 | HydroRoot::Null { input, .. } => input,
1725 }
1726 }
1727
1728 pub fn input_metadata(&self) -> &HydroIrMetadata {
1729 self.input().metadata()
1730 }
1731
1732 pub fn print_root(&self) -> String {
1733 match self {
1734 HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1735 HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1736 HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1737 HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1738 HydroRoot::EmbeddedOutput { ident, .. } => {
1739 format!("EmbeddedOutput({})", ident)
1740 }
1741 HydroRoot::Null { .. } => "Null".to_owned(),
1742 }
1743 }
1744
1745 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1746 match self {
1747 HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1748 transform(f);
1749 }
1750 HydroRoot::SendExternal { .. }
1751 | HydroRoot::CycleSink { .. }
1752 | HydroRoot::EmbeddedOutput { .. }
1753 | HydroRoot::Null { .. } => {}
1754 }
1755 }
1756}
1757
1758#[cfg(feature = "build")]
1759fn tick_of(loc: &LocationId) -> Option<ClockId> {
1760 match loc {
1761 LocationId::Tick(id, _) => Some(*id),
1762 LocationId::Atomic(inner) => tick_of(inner),
1763 _ => None,
1764 }
1765}
1766
1767#[cfg(feature = "build")]
1768fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
1769 match loc {
1770 LocationId::Tick(id, inner) => {
1771 *id = uf_find(uf, *id);
1772 remap_location(inner, uf);
1773 }
1774 LocationId::Atomic(inner) => {
1775 remap_location(inner, uf);
1776 }
1777 LocationId::Process(_) | LocationId::Cluster(_) => {}
1778 }
1779}
1780
1781#[cfg(feature = "build")]
1782fn uf_find(parent: &mut HashMap<ClockId, ClockId>, x: ClockId) -> ClockId {
1783 let p = *parent.get(&x).unwrap_or(&x);
1784 if p == x {
1785 return x;
1786 }
1787 let root = uf_find(parent, p);
1788 parent.insert(x, root);
1789 root
1790}
1791
1792#[cfg(feature = "build")]
1793fn uf_union(parent: &mut HashMap<ClockId, ClockId>, a: ClockId, b: ClockId) {
1794 let ra = uf_find(parent, a);
1795 let rb = uf_find(parent, b);
1796 if ra != rb {
1797 parent.insert(ra, rb);
1798 }
1799}
1800
1801#[cfg(feature = "build")]
1805pub fn unify_atomic_ticks(ir: &mut [HydroRoot]) {
1806 let mut uf: HashMap<ClockId, ClockId> = HashMap::new();
1807
1808 transform_bottom_up(
1810 ir,
1811 &mut |_| {},
1812 &mut |node: &mut HydroNode| {
1813 if let HydroNode::Batch { inner, metadata } | HydroNode::YieldConcat { inner, metadata } =
1814 node
1815 && let (Some(a), Some(b)) = (
1816 tick_of(&inner.metadata().location_id),
1817 tick_of(&metadata.location_id),
1818 )
1819 {
1820 uf_union(&mut uf, a, b);
1821 }
1822 },
1823 false,
1824 );
1825
1826 transform_bottom_up(
1828 ir,
1829 &mut |_| {},
1830 &mut |node: &mut HydroNode| {
1831 remap_location(&mut node.metadata_mut().location_id, &mut uf);
1832 },
1833 false,
1834 );
1835}
1836
1837#[cfg(feature = "build")]
1838pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1839 let mut builders = SecondaryMap::new();
1840 let mut seen_tees = HashMap::new();
1841 let mut built_tees = HashMap::new();
1842 let mut next_stmt_id = 0;
1843 let mut fold_hooked_idents = HashSet::new();
1844 for leaf in ir {
1845 leaf.emit(
1846 &mut builders,
1847 &mut seen_tees,
1848 &mut built_tees,
1849 &mut next_stmt_id,
1850 &mut fold_hooked_idents,
1851 );
1852 }
1853 builders
1854}
1855
1856#[cfg(feature = "build")]
1857pub fn traverse_dfir(
1858 ir: &mut [HydroRoot],
1859 transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1860 transform_node: impl FnMut(&mut HydroNode, &mut usize),
1861) {
1862 let mut seen_tees = HashMap::new();
1863 let mut built_tees = HashMap::new();
1864 let mut next_stmt_id = 0;
1865 let mut fold_hooked_idents = HashSet::new();
1866 let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1867 ir.iter_mut().for_each(|leaf| {
1868 leaf.emit_core(
1869 &mut callback,
1870 &mut seen_tees,
1871 &mut built_tees,
1872 &mut next_stmt_id,
1873 &mut fold_hooked_idents,
1874 );
1875 });
1876}
1877
1878pub fn transform_bottom_up(
1879 ir: &mut [HydroRoot],
1880 transform_root: &mut impl FnMut(&mut HydroRoot),
1881 transform_node: &mut impl FnMut(&mut HydroNode),
1882 check_well_formed: bool,
1883) {
1884 let mut seen_tees = HashMap::new();
1885 ir.iter_mut().for_each(|leaf| {
1886 leaf.transform_bottom_up(
1887 transform_root,
1888 transform_node,
1889 &mut seen_tees,
1890 check_well_formed,
1891 );
1892 });
1893}
1894
1895pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1896 let mut seen_tees = HashMap::new();
1897 ir.iter()
1898 .map(|leaf| leaf.deep_clone(&mut seen_tees))
1899 .collect()
1900}
1901
1902type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1903thread_local! {
1904 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1905 static SERIALIZED_SHARED: PrintedTees
1909 = const { RefCell::new(None) };
1910}
1911
1912pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1913 PRINTED_TEES.with(|printed_tees| {
1914 let mut printed_tees_mut = printed_tees.borrow_mut();
1915 *printed_tees_mut = Some((0, HashMap::new()));
1916 drop(printed_tees_mut);
1917
1918 let ret = f();
1919
1920 let mut printed_tees_mut = printed_tees.borrow_mut();
1921 *printed_tees_mut = None;
1922
1923 ret
1924 })
1925}
1926
1927pub fn serialize_dedup_shared<T>(f: impl FnOnce() -> T) -> T {
1932 let _guard = SerializedSharedGuard::enter();
1933 f()
1934}
1935
1936struct SerializedSharedGuard {
1939 previous: Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>,
1940}
1941
1942impl SerializedSharedGuard {
1943 fn enter() -> Self {
1944 let previous = SERIALIZED_SHARED.with(|cell| {
1945 let mut guard = cell.borrow_mut();
1946 guard.replace((0, HashMap::new()))
1947 });
1948 Self { previous }
1949 }
1950}
1951
1952impl Drop for SerializedSharedGuard {
1953 fn drop(&mut self) {
1954 SERIALIZED_SHARED.with(|cell| {
1955 *cell.borrow_mut() = self.previous.take();
1956 });
1957 }
1958}
1959
1960pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
1961
1962impl serde::Serialize for SharedNode {
1963 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
1974 SERIALIZED_SHARED.with(|cell| {
1975 let mut guard = cell.borrow_mut();
1976 let state = guard.as_mut().ok_or_else(|| {
1978 serde::ser::Error::custom(
1979 "SharedNode serialization requires an active serialize_dedup_shared scope",
1980 )
1981 })?;
1982 let ptr = self.0.as_ptr() as *const RefCell<HydroNode>;
1983
1984 if let Some(&id) = state.1.get(&ptr) {
1985 drop(guard);
1986 use serde::ser::SerializeMap;
1987 let mut map = serializer.serialize_map(Some(1))?;
1988 map.serialize_entry("$shared_ref", &id)?;
1989 map.end()
1990 } else {
1991 let id = state.0;
1992 state.0 += 1;
1993 state.1.insert(ptr, id);
1994 drop(guard);
1995
1996 use serde::ser::SerializeMap;
1997 let mut map = serializer.serialize_map(Some(2))?;
1998 map.serialize_entry("$shared", &id)?;
1999 map.serialize_entry("node", &*self.0.borrow())?;
2000 map.end()
2001 }
2002 })
2003 }
2004}
2005
2006impl SharedNode {
2007 pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
2008 Rc::as_ptr(&self.0)
2009 }
2010}
2011
2012impl Debug for SharedNode {
2013 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2014 PRINTED_TEES.with(|printed_tees| {
2015 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
2016 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
2017
2018 if let Some(printed_tees_mut) = printed_tees_mut {
2019 if let Some(existing) = printed_tees_mut
2020 .1
2021 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
2022 {
2023 write!(f, "<shared {}>", existing)
2024 } else {
2025 let next_id = printed_tees_mut.0;
2026 printed_tees_mut.0 += 1;
2027 printed_tees_mut
2028 .1
2029 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
2030 drop(printed_tees_mut_borrow);
2031 write!(f, "<shared {}>: ", next_id)?;
2032 Debug::fmt(&self.0.borrow(), f)
2033 }
2034 } else {
2035 drop(printed_tees_mut_borrow);
2036 write!(f, "<shared>: ")?;
2037 Debug::fmt(&self.0.borrow(), f)
2038 }
2039 })
2040 }
2041}
2042
2043impl Hash for SharedNode {
2044 fn hash<H: Hasher>(&self, state: &mut H) {
2045 self.0.borrow_mut().hash(state);
2046 }
2047}
2048
2049#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2050pub enum BoundKind {
2051 Unbounded,
2052 Bounded,
2053}
2054
2055#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2056pub enum StreamOrder {
2057 NoOrder,
2058 TotalOrder,
2059}
2060
2061#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2062pub enum StreamRetry {
2063 AtLeastOnce,
2064 ExactlyOnce,
2065}
2066
2067#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2068pub enum KeyedSingletonBoundKind {
2069 Unbounded,
2070 MonotonicValue,
2071 BoundedValue,
2072 Bounded,
2073}
2074
2075#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2076pub enum SingletonBoundKind {
2077 Unbounded,
2078 Monotonic,
2079 Bounded,
2080}
2081
2082#[derive(Clone, PartialEq, Eq, Debug, serde::Serialize)]
2083pub enum CollectionKind {
2084 Stream {
2085 bound: BoundKind,
2086 order: StreamOrder,
2087 retry: StreamRetry,
2088 element_type: DebugType,
2089 },
2090 Singleton {
2091 bound: SingletonBoundKind,
2092 element_type: DebugType,
2093 },
2094 Optional {
2095 bound: BoundKind,
2096 element_type: DebugType,
2097 },
2098 KeyedStream {
2099 bound: BoundKind,
2100 value_order: StreamOrder,
2101 value_retry: StreamRetry,
2102 key_type: DebugType,
2103 value_type: DebugType,
2104 },
2105 KeyedSingleton {
2106 bound: KeyedSingletonBoundKind,
2107 key_type: DebugType,
2108 value_type: DebugType,
2109 },
2110}
2111
2112impl CollectionKind {
2113 pub fn is_bounded(&self) -> bool {
2114 matches!(
2115 self,
2116 CollectionKind::Stream {
2117 bound: BoundKind::Bounded,
2118 ..
2119 } | CollectionKind::Singleton {
2120 bound: SingletonBoundKind::Bounded,
2121 ..
2122 } | CollectionKind::Optional {
2123 bound: BoundKind::Bounded,
2124 ..
2125 } | CollectionKind::KeyedStream {
2126 bound: BoundKind::Bounded,
2127 ..
2128 } | CollectionKind::KeyedSingleton {
2129 bound: KeyedSingletonBoundKind::Bounded,
2130 ..
2131 }
2132 )
2133 }
2134}
2135
2136#[derive(Clone, serde::Serialize)]
2137pub struct HydroIrMetadata {
2138 pub location_id: LocationId,
2139 pub collection_kind: CollectionKind,
2140 pub cardinality: Option<usize>,
2141 pub tag: Option<String>,
2142 pub op: HydroIrOpMetadata,
2143}
2144
2145impl Hash for HydroIrMetadata {
2147 fn hash<H: Hasher>(&self, _: &mut H) {}
2148}
2149
2150impl PartialEq for HydroIrMetadata {
2151 fn eq(&self, _: &Self) -> bool {
2152 true
2153 }
2154}
2155
2156impl Eq for HydroIrMetadata {}
2157
2158impl Debug for HydroIrMetadata {
2159 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2160 f.debug_struct("HydroIrMetadata")
2161 .field("location_id", &self.location_id)
2162 .field("collection_kind", &self.collection_kind)
2163 .finish()
2164 }
2165}
2166
2167#[derive(Clone, serde::Serialize)]
2170pub struct HydroIrOpMetadata {
2171 #[serde(rename = "span", serialize_with = "serialize_backtrace_as_span")]
2172 pub backtrace: Backtrace,
2173 pub cpu_usage: Option<f64>,
2174 pub network_recv_cpu_usage: Option<f64>,
2175 pub id: Option<usize>,
2176}
2177
2178impl HydroIrOpMetadata {
2179 #[expect(
2180 clippy::new_without_default,
2181 reason = "explicit calls to new ensure correct backtrace bounds"
2182 )]
2183 pub fn new() -> HydroIrOpMetadata {
2184 Self::new_with_skip(1)
2185 }
2186
2187 fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
2188 HydroIrOpMetadata {
2189 backtrace: Backtrace::get_backtrace(2 + skip_count),
2190 cpu_usage: None,
2191 network_recv_cpu_usage: None,
2192 id: None,
2193 }
2194 }
2195}
2196
2197impl Debug for HydroIrOpMetadata {
2198 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2199 f.debug_struct("HydroIrOpMetadata").finish()
2200 }
2201}
2202
2203impl Hash for HydroIrOpMetadata {
2204 fn hash<H: Hasher>(&self, _: &mut H) {}
2205}
2206
2207#[derive(Debug, Hash, serde::Serialize)]
2210pub enum HydroNode {
2211 Placeholder,
2212
2213 Cast {
2221 inner: Box<HydroNode>,
2222 metadata: HydroIrMetadata,
2223 },
2224
2225 ObserveNonDet {
2231 inner: Box<HydroNode>,
2232 trusted: bool, metadata: HydroIrMetadata,
2234 },
2235
2236 Source {
2237 source: HydroSource,
2238 metadata: HydroIrMetadata,
2239 },
2240
2241 SingletonSource {
2242 value: DebugExpr,
2243 first_tick_only: bool,
2244 metadata: HydroIrMetadata,
2245 },
2246
2247 CycleSource {
2248 cycle_id: CycleId,
2249 metadata: HydroIrMetadata,
2250 },
2251
2252 Tee {
2253 inner: SharedNode,
2254 metadata: HydroIrMetadata,
2255 },
2256
2257 Singleton {
2265 inner: SharedNode,
2266 metadata: HydroIrMetadata,
2267 },
2268
2269 Partition {
2270 inner: SharedNode,
2271 f: ClosureExpr,
2272 is_true: bool,
2273 metadata: HydroIrMetadata,
2274 },
2275
2276 BeginAtomic {
2277 inner: Box<HydroNode>,
2278 metadata: HydroIrMetadata,
2279 },
2280
2281 EndAtomic {
2282 inner: Box<HydroNode>,
2283 metadata: HydroIrMetadata,
2284 },
2285
2286 Batch {
2287 inner: Box<HydroNode>,
2288 metadata: HydroIrMetadata,
2289 },
2290
2291 YieldConcat {
2292 inner: Box<HydroNode>,
2293 metadata: HydroIrMetadata,
2294 },
2295
2296 Chain {
2297 first: Box<HydroNode>,
2298 second: Box<HydroNode>,
2299 metadata: HydroIrMetadata,
2300 },
2301
2302 MergeOrdered {
2303 first: Box<HydroNode>,
2304 second: Box<HydroNode>,
2305 metadata: HydroIrMetadata,
2306 },
2307
2308 ChainFirst {
2309 first: Box<HydroNode>,
2310 second: Box<HydroNode>,
2311 metadata: HydroIrMetadata,
2312 },
2313
2314 CrossProduct {
2315 left: Box<HydroNode>,
2316 right: Box<HydroNode>,
2317 metadata: HydroIrMetadata,
2318 },
2319
2320 CrossSingleton {
2321 left: Box<HydroNode>,
2322 right: Box<HydroNode>,
2323 metadata: HydroIrMetadata,
2324 },
2325
2326 Join {
2327 left: Box<HydroNode>,
2328 right: Box<HydroNode>,
2329 metadata: HydroIrMetadata,
2330 },
2331
2332 JoinHalf {
2336 left: Box<HydroNode>,
2337 right: Box<HydroNode>,
2338 metadata: HydroIrMetadata,
2339 },
2340
2341 Difference {
2342 pos: Box<HydroNode>,
2343 neg: Box<HydroNode>,
2344 metadata: HydroIrMetadata,
2345 },
2346
2347 AntiJoin {
2348 pos: Box<HydroNode>,
2349 neg: Box<HydroNode>,
2350 metadata: HydroIrMetadata,
2351 },
2352
2353 ResolveFutures {
2354 input: Box<HydroNode>,
2355 metadata: HydroIrMetadata,
2356 },
2357 ResolveFuturesBlocking {
2358 input: Box<HydroNode>,
2359 metadata: HydroIrMetadata,
2360 },
2361 ResolveFuturesOrdered {
2362 input: Box<HydroNode>,
2363 metadata: HydroIrMetadata,
2364 },
2365
2366 Map {
2367 f: ClosureExpr,
2368 input: Box<HydroNode>,
2369 metadata: HydroIrMetadata,
2370 },
2371 FlatMap {
2372 f: ClosureExpr,
2373 input: Box<HydroNode>,
2374 metadata: HydroIrMetadata,
2375 },
2376 FlatMapStreamBlocking {
2377 f: ClosureExpr,
2378 input: Box<HydroNode>,
2379 metadata: HydroIrMetadata,
2380 },
2381 Filter {
2382 f: ClosureExpr,
2383 input: Box<HydroNode>,
2384 metadata: HydroIrMetadata,
2385 },
2386 FilterMap {
2387 f: ClosureExpr,
2388 input: Box<HydroNode>,
2389 metadata: HydroIrMetadata,
2390 },
2391
2392 DeferTick {
2393 input: Box<HydroNode>,
2394 metadata: HydroIrMetadata,
2395 },
2396 Enumerate {
2397 input: Box<HydroNode>,
2398 metadata: HydroIrMetadata,
2399 },
2400 Inspect {
2401 f: ClosureExpr,
2402 input: Box<HydroNode>,
2403 metadata: HydroIrMetadata,
2404 },
2405
2406 Unique {
2407 input: Box<HydroNode>,
2408 metadata: HydroIrMetadata,
2409 },
2410
2411 Sort {
2412 input: Box<HydroNode>,
2413 metadata: HydroIrMetadata,
2414 },
2415 Fold {
2416 init: ClosureExpr,
2417 acc: ClosureExpr,
2418 input: Box<HydroNode>,
2419 metadata: HydroIrMetadata,
2420 },
2421
2422 Scan {
2423 init: ClosureExpr,
2424 acc: ClosureExpr,
2425 input: Box<HydroNode>,
2426 metadata: HydroIrMetadata,
2427 },
2428 ScanAsyncBlocking {
2429 init: ClosureExpr,
2430 acc: ClosureExpr,
2431 input: Box<HydroNode>,
2432 metadata: HydroIrMetadata,
2433 },
2434 FoldKeyed {
2435 init: ClosureExpr,
2436 acc: ClosureExpr,
2437 input: Box<HydroNode>,
2438 metadata: HydroIrMetadata,
2439 },
2440
2441 Reduce {
2442 f: ClosureExpr,
2443 input: Box<HydroNode>,
2444 metadata: HydroIrMetadata,
2445 },
2446 ReduceKeyed {
2447 f: ClosureExpr,
2448 input: Box<HydroNode>,
2449 metadata: HydroIrMetadata,
2450 },
2451 ReduceKeyedWatermark {
2452 f: ClosureExpr,
2453 input: Box<HydroNode>,
2454 watermark: Box<HydroNode>,
2455 metadata: HydroIrMetadata,
2456 },
2457
2458 Network {
2459 name: Option<String>,
2460 networking_info: crate::networking::NetworkingInfo,
2461 serialize_fn: Option<DebugExpr>,
2462 instantiate_fn: DebugInstantiate,
2463 deserialize_fn: Option<DebugExpr>,
2464 input: Box<HydroNode>,
2465 metadata: HydroIrMetadata,
2466 },
2467
2468 ExternalInput {
2469 from_external_key: LocationKey,
2470 from_port_id: ExternalPortId,
2471 from_many: bool,
2472 codec_type: DebugType,
2473 #[serde(skip)]
2474 port_hint: NetworkHint,
2475 instantiate_fn: DebugInstantiate,
2476 deserialize_fn: Option<DebugExpr>,
2477 metadata: HydroIrMetadata,
2478 },
2479
2480 Counter {
2481 tag: String,
2482 duration: DebugExpr,
2483 prefix: String,
2484 input: Box<HydroNode>,
2485 metadata: HydroIrMetadata,
2486 },
2487}
2488
2489pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
2490pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
2491
2492impl HydroNode {
2493 pub fn transform_bottom_up(
2494 &mut self,
2495 transform: &mut impl FnMut(&mut HydroNode),
2496 seen_tees: &mut SeenSharedNodes,
2497 check_well_formed: bool,
2498 ) {
2499 self.transform_children(
2500 |n, s| n.transform_bottom_up(transform, s, check_well_formed),
2501 seen_tees,
2502 );
2503
2504 transform(self);
2505
2506 let self_location = self.metadata().location_id.root();
2507
2508 if check_well_formed {
2509 match &*self {
2510 HydroNode::Network { .. } => {}
2511 _ => {
2512 self.input_metadata().iter().for_each(|i| {
2513 if i.location_id.root() != self_location {
2514 panic!(
2515 "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
2516 i,
2517 i.location_id.root(),
2518 self,
2519 self_location
2520 )
2521 }
2522 });
2523 }
2524 }
2525 }
2526 }
2527
2528 #[inline(always)]
2529 pub fn transform_children(
2530 &mut self,
2531 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
2532 seen_tees: &mut SeenSharedNodes,
2533 ) {
2534 match self {
2535 HydroNode::Placeholder => {
2536 panic!();
2537 }
2538
2539 HydroNode::Source { .. }
2540 | HydroNode::SingletonSource { .. }
2541 | HydroNode::CycleSource { .. }
2542 | HydroNode::ExternalInput { .. } => {}
2543
2544 HydroNode::Tee { inner, .. } | HydroNode::Singleton { inner, .. } => {
2545 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2546 *inner = SharedNode(transformed.clone());
2547 } else {
2548 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2549 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2550 let mut orig = inner.0.replace(HydroNode::Placeholder);
2551 transform(&mut orig, seen_tees);
2552 *transformed_cell.borrow_mut() = orig;
2553 *inner = SharedNode(transformed_cell);
2554 }
2555 }
2556
2557 HydroNode::Partition { inner, f, .. } => {
2558 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2559 *inner = SharedNode(transformed.clone());
2560 } else {
2561 f.transform_children(&mut transform, seen_tees);
2562 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2563 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2564 let mut orig = inner.0.replace(HydroNode::Placeholder);
2565 transform(&mut orig, seen_tees);
2566 *transformed_cell.borrow_mut() = orig;
2567 *inner = SharedNode(transformed_cell);
2568 }
2569 }
2570
2571 HydroNode::Cast { inner, .. }
2572 | HydroNode::ObserveNonDet { inner, .. }
2573 | HydroNode::BeginAtomic { inner, .. }
2574 | HydroNode::EndAtomic { inner, .. }
2575 | HydroNode::Batch { inner, .. }
2576 | HydroNode::YieldConcat { inner, .. } => {
2577 transform(inner.as_mut(), seen_tees);
2578 }
2579
2580 HydroNode::Chain { first, second, .. } => {
2581 transform(first.as_mut(), seen_tees);
2582 transform(second.as_mut(), seen_tees);
2583 }
2584
2585 HydroNode::MergeOrdered { first, second, .. } => {
2586 transform(first.as_mut(), seen_tees);
2587 transform(second.as_mut(), seen_tees);
2588 }
2589
2590 HydroNode::ChainFirst { first, second, .. } => {
2591 transform(first.as_mut(), seen_tees);
2592 transform(second.as_mut(), seen_tees);
2593 }
2594
2595 HydroNode::CrossSingleton { left, right, .. }
2596 | HydroNode::CrossProduct { left, right, .. }
2597 | HydroNode::Join { left, right, .. }
2598 | HydroNode::JoinHalf { left, right, .. } => {
2599 transform(left.as_mut(), seen_tees);
2600 transform(right.as_mut(), seen_tees);
2601 }
2602
2603 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2604 transform(pos.as_mut(), seen_tees);
2605 transform(neg.as_mut(), seen_tees);
2606 }
2607
2608 HydroNode::Map { f, input, .. } => {
2609 f.transform_children(&mut transform, seen_tees);
2610 transform(input.as_mut(), seen_tees);
2611 }
2612 HydroNode::FlatMap { f, input, .. }
2613 | HydroNode::FlatMapStreamBlocking { f, input, .. }
2614 | HydroNode::Filter { f, input, .. }
2615 | HydroNode::FilterMap { f, input, .. }
2616 | HydroNode::Inspect { f, input, .. }
2617 | HydroNode::Reduce { f, input, .. }
2618 | HydroNode::ReduceKeyed { f, input, .. } => {
2619 f.transform_children(&mut transform, seen_tees);
2620 transform(input.as_mut(), seen_tees);
2621 }
2622 HydroNode::ReduceKeyedWatermark {
2623 f,
2624 input,
2625 watermark,
2626 ..
2627 } => {
2628 f.transform_children(&mut transform, seen_tees);
2629 transform(input.as_mut(), seen_tees);
2630 transform(watermark.as_mut(), seen_tees);
2631 }
2632 HydroNode::Fold {
2633 init, acc, input, ..
2634 }
2635 | HydroNode::Scan {
2636 init, acc, input, ..
2637 }
2638 | HydroNode::ScanAsyncBlocking {
2639 init, acc, input, ..
2640 }
2641 | HydroNode::FoldKeyed {
2642 init, acc, input, ..
2643 } => {
2644 init.transform_children(&mut transform, seen_tees);
2645 acc.transform_children(&mut transform, seen_tees);
2646 transform(input.as_mut(), seen_tees);
2647 }
2648 HydroNode::ResolveFutures { input, .. }
2649 | HydroNode::ResolveFuturesBlocking { input, .. }
2650 | HydroNode::ResolveFuturesOrdered { input, .. }
2651 | HydroNode::Sort { input, .. }
2652 | HydroNode::DeferTick { input, .. }
2653 | HydroNode::Enumerate { input, .. }
2654 | HydroNode::Unique { input, .. }
2655 | HydroNode::Network { input, .. }
2656 | HydroNode::Counter { input, .. } => {
2657 transform(input.as_mut(), seen_tees);
2658 }
2659 }
2660 }
2661
2662 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2663 match self {
2664 HydroNode::Placeholder => HydroNode::Placeholder,
2665 HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2666 inner: Box::new(inner.deep_clone(seen_tees)),
2667 metadata: metadata.clone(),
2668 },
2669 HydroNode::ObserveNonDet {
2670 inner,
2671 trusted,
2672 metadata,
2673 } => HydroNode::ObserveNonDet {
2674 inner: Box::new(inner.deep_clone(seen_tees)),
2675 trusted: *trusted,
2676 metadata: metadata.clone(),
2677 },
2678 HydroNode::Source { source, metadata } => HydroNode::Source {
2679 source: source.clone(),
2680 metadata: metadata.clone(),
2681 },
2682 HydroNode::SingletonSource {
2683 value,
2684 first_tick_only,
2685 metadata,
2686 } => HydroNode::SingletonSource {
2687 value: value.clone(),
2688 first_tick_only: *first_tick_only,
2689 metadata: metadata.clone(),
2690 },
2691 HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2692 cycle_id: *cycle_id,
2693 metadata: metadata.clone(),
2694 },
2695 HydroNode::Tee { inner, metadata } | HydroNode::Singleton { inner, metadata } => {
2696 let cloned_inner = if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2697 SharedNode(transformed.clone())
2698 } else {
2699 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2700 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2701 let cloned = inner.0.borrow().deep_clone(seen_tees);
2702 *new_rc.borrow_mut() = cloned;
2703 SharedNode(new_rc)
2704 };
2705 if matches!(self, HydroNode::Singleton { .. }) {
2706 HydroNode::Singleton {
2707 inner: cloned_inner,
2708 metadata: metadata.clone(),
2709 }
2710 } else {
2711 HydroNode::Tee {
2712 inner: cloned_inner,
2713 metadata: metadata.clone(),
2714 }
2715 }
2716 }
2717 HydroNode::Partition {
2718 inner,
2719 f,
2720 is_true,
2721 metadata,
2722 } => {
2723 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2724 HydroNode::Partition {
2725 inner: SharedNode(transformed.clone()),
2726 f: f.deep_clone(seen_tees),
2727 is_true: *is_true,
2728 metadata: metadata.clone(),
2729 }
2730 } else {
2731 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2732 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2733 let cloned = inner.0.borrow().deep_clone(seen_tees);
2734 *new_rc.borrow_mut() = cloned;
2735 HydroNode::Partition {
2736 inner: SharedNode(new_rc),
2737 f: f.deep_clone(seen_tees),
2738 is_true: *is_true,
2739 metadata: metadata.clone(),
2740 }
2741 }
2742 }
2743 HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2744 inner: Box::new(inner.deep_clone(seen_tees)),
2745 metadata: metadata.clone(),
2746 },
2747 HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2748 inner: Box::new(inner.deep_clone(seen_tees)),
2749 metadata: metadata.clone(),
2750 },
2751 HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2752 inner: Box::new(inner.deep_clone(seen_tees)),
2753 metadata: metadata.clone(),
2754 },
2755 HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2756 inner: Box::new(inner.deep_clone(seen_tees)),
2757 metadata: metadata.clone(),
2758 },
2759 HydroNode::Chain {
2760 first,
2761 second,
2762 metadata,
2763 } => HydroNode::Chain {
2764 first: Box::new(first.deep_clone(seen_tees)),
2765 second: Box::new(second.deep_clone(seen_tees)),
2766 metadata: metadata.clone(),
2767 },
2768 HydroNode::MergeOrdered {
2769 first,
2770 second,
2771 metadata,
2772 } => HydroNode::MergeOrdered {
2773 first: Box::new(first.deep_clone(seen_tees)),
2774 second: Box::new(second.deep_clone(seen_tees)),
2775 metadata: metadata.clone(),
2776 },
2777 HydroNode::ChainFirst {
2778 first,
2779 second,
2780 metadata,
2781 } => HydroNode::ChainFirst {
2782 first: Box::new(first.deep_clone(seen_tees)),
2783 second: Box::new(second.deep_clone(seen_tees)),
2784 metadata: metadata.clone(),
2785 },
2786 HydroNode::CrossProduct {
2787 left,
2788 right,
2789 metadata,
2790 } => HydroNode::CrossProduct {
2791 left: Box::new(left.deep_clone(seen_tees)),
2792 right: Box::new(right.deep_clone(seen_tees)),
2793 metadata: metadata.clone(),
2794 },
2795 HydroNode::CrossSingleton {
2796 left,
2797 right,
2798 metadata,
2799 } => HydroNode::CrossSingleton {
2800 left: Box::new(left.deep_clone(seen_tees)),
2801 right: Box::new(right.deep_clone(seen_tees)),
2802 metadata: metadata.clone(),
2803 },
2804 HydroNode::Join {
2805 left,
2806 right,
2807 metadata,
2808 } => HydroNode::Join {
2809 left: Box::new(left.deep_clone(seen_tees)),
2810 right: Box::new(right.deep_clone(seen_tees)),
2811 metadata: metadata.clone(),
2812 },
2813 HydroNode::JoinHalf {
2814 left,
2815 right,
2816 metadata,
2817 } => HydroNode::JoinHalf {
2818 left: Box::new(left.deep_clone(seen_tees)),
2819 right: Box::new(right.deep_clone(seen_tees)),
2820 metadata: metadata.clone(),
2821 },
2822 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2823 pos: Box::new(pos.deep_clone(seen_tees)),
2824 neg: Box::new(neg.deep_clone(seen_tees)),
2825 metadata: metadata.clone(),
2826 },
2827 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2828 pos: Box::new(pos.deep_clone(seen_tees)),
2829 neg: Box::new(neg.deep_clone(seen_tees)),
2830 metadata: metadata.clone(),
2831 },
2832 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2833 input: Box::new(input.deep_clone(seen_tees)),
2834 metadata: metadata.clone(),
2835 },
2836 HydroNode::ResolveFuturesBlocking { input, metadata } => {
2837 HydroNode::ResolveFuturesBlocking {
2838 input: Box::new(input.deep_clone(seen_tees)),
2839 metadata: metadata.clone(),
2840 }
2841 }
2842 HydroNode::ResolveFuturesOrdered { input, metadata } => {
2843 HydroNode::ResolveFuturesOrdered {
2844 input: Box::new(input.deep_clone(seen_tees)),
2845 metadata: metadata.clone(),
2846 }
2847 }
2848 HydroNode::Map { f, input, metadata } => HydroNode::Map {
2849 f: f.deep_clone(seen_tees),
2850 input: Box::new(input.deep_clone(seen_tees)),
2851 metadata: metadata.clone(),
2852 },
2853 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2854 f: f.deep_clone(seen_tees),
2855 input: Box::new(input.deep_clone(seen_tees)),
2856 metadata: metadata.clone(),
2857 },
2858 HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
2859 HydroNode::FlatMapStreamBlocking {
2860 f: f.deep_clone(seen_tees),
2861 input: Box::new(input.deep_clone(seen_tees)),
2862 metadata: metadata.clone(),
2863 }
2864 }
2865 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2866 f: f.deep_clone(seen_tees),
2867 input: Box::new(input.deep_clone(seen_tees)),
2868 metadata: metadata.clone(),
2869 },
2870 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2871 f: f.deep_clone(seen_tees),
2872 input: Box::new(input.deep_clone(seen_tees)),
2873 metadata: metadata.clone(),
2874 },
2875 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2876 input: Box::new(input.deep_clone(seen_tees)),
2877 metadata: metadata.clone(),
2878 },
2879 HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2880 input: Box::new(input.deep_clone(seen_tees)),
2881 metadata: metadata.clone(),
2882 },
2883 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2884 f: f.deep_clone(seen_tees),
2885 input: Box::new(input.deep_clone(seen_tees)),
2886 metadata: metadata.clone(),
2887 },
2888 HydroNode::Unique { input, metadata } => HydroNode::Unique {
2889 input: Box::new(input.deep_clone(seen_tees)),
2890 metadata: metadata.clone(),
2891 },
2892 HydroNode::Sort { input, metadata } => HydroNode::Sort {
2893 input: Box::new(input.deep_clone(seen_tees)),
2894 metadata: metadata.clone(),
2895 },
2896 HydroNode::Fold {
2897 init,
2898 acc,
2899 input,
2900 metadata,
2901 } => HydroNode::Fold {
2902 init: init.deep_clone(seen_tees),
2903 acc: acc.deep_clone(seen_tees),
2904 input: Box::new(input.deep_clone(seen_tees)),
2905 metadata: metadata.clone(),
2906 },
2907 HydroNode::Scan {
2908 init,
2909 acc,
2910 input,
2911 metadata,
2912 } => HydroNode::Scan {
2913 init: init.deep_clone(seen_tees),
2914 acc: acc.deep_clone(seen_tees),
2915 input: Box::new(input.deep_clone(seen_tees)),
2916 metadata: metadata.clone(),
2917 },
2918 HydroNode::ScanAsyncBlocking {
2919 init,
2920 acc,
2921 input,
2922 metadata,
2923 } => HydroNode::ScanAsyncBlocking {
2924 init: init.deep_clone(seen_tees),
2925 acc: acc.deep_clone(seen_tees),
2926 input: Box::new(input.deep_clone(seen_tees)),
2927 metadata: metadata.clone(),
2928 },
2929 HydroNode::FoldKeyed {
2930 init,
2931 acc,
2932 input,
2933 metadata,
2934 } => HydroNode::FoldKeyed {
2935 init: init.deep_clone(seen_tees),
2936 acc: acc.deep_clone(seen_tees),
2937 input: Box::new(input.deep_clone(seen_tees)),
2938 metadata: metadata.clone(),
2939 },
2940 HydroNode::ReduceKeyedWatermark {
2941 f,
2942 input,
2943 watermark,
2944 metadata,
2945 } => HydroNode::ReduceKeyedWatermark {
2946 f: f.deep_clone(seen_tees),
2947 input: Box::new(input.deep_clone(seen_tees)),
2948 watermark: Box::new(watermark.deep_clone(seen_tees)),
2949 metadata: metadata.clone(),
2950 },
2951 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2952 f: f.deep_clone(seen_tees),
2953 input: Box::new(input.deep_clone(seen_tees)),
2954 metadata: metadata.clone(),
2955 },
2956 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
2957 f: f.deep_clone(seen_tees),
2958 input: Box::new(input.deep_clone(seen_tees)),
2959 metadata: metadata.clone(),
2960 },
2961 HydroNode::Network {
2962 name,
2963 networking_info,
2964 serialize_fn,
2965 instantiate_fn,
2966 deserialize_fn,
2967 input,
2968 metadata,
2969 } => HydroNode::Network {
2970 name: name.clone(),
2971 networking_info: networking_info.clone(),
2972 serialize_fn: serialize_fn.clone(),
2973 instantiate_fn: instantiate_fn.clone(),
2974 deserialize_fn: deserialize_fn.clone(),
2975 input: Box::new(input.deep_clone(seen_tees)),
2976 metadata: metadata.clone(),
2977 },
2978 HydroNode::ExternalInput {
2979 from_external_key,
2980 from_port_id,
2981 from_many,
2982 codec_type,
2983 port_hint,
2984 instantiate_fn,
2985 deserialize_fn,
2986 metadata,
2987 } => HydroNode::ExternalInput {
2988 from_external_key: *from_external_key,
2989 from_port_id: *from_port_id,
2990 from_many: *from_many,
2991 codec_type: codec_type.clone(),
2992 port_hint: *port_hint,
2993 instantiate_fn: instantiate_fn.clone(),
2994 deserialize_fn: deserialize_fn.clone(),
2995 metadata: metadata.clone(),
2996 },
2997 HydroNode::Counter {
2998 tag,
2999 duration,
3000 prefix,
3001 input,
3002 metadata,
3003 } => HydroNode::Counter {
3004 tag: tag.clone(),
3005 duration: duration.clone(),
3006 prefix: prefix.clone(),
3007 input: Box::new(input.deep_clone(seen_tees)),
3008 metadata: metadata.clone(),
3009 },
3010 }
3011 }
3012
3013 #[cfg(feature = "build")]
3014 pub fn emit_core(
3015 &mut self,
3016 builders_or_callback: &mut BuildersOrCallback<
3017 impl FnMut(&mut HydroRoot, &mut usize),
3018 impl FnMut(&mut HydroNode, &mut usize),
3019 >,
3020 seen_tees: &mut SeenSharedNodes,
3021 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
3022 next_stmt_id: &mut usize,
3023 fold_hooked_idents: &mut HashSet<String>,
3024 ) -> syn::Ident {
3025 let mut ident_stack: Vec<syn::Ident> = Vec::new();
3026
3027 self.transform_bottom_up(
3028 &mut |node: &mut HydroNode| {
3029 let out_location = node.metadata().location_id.clone();
3030 match node {
3031 HydroNode::Placeholder => {
3032 panic!()
3033 }
3034
3035 HydroNode::Cast { .. } => {
3036 match builders_or_callback {
3039 BuildersOrCallback::Builders(_) => {}
3040 BuildersOrCallback::Callback(_, node_callback) => {
3041 node_callback(node, next_stmt_id);
3042 }
3043 }
3044
3045 *next_stmt_id += 1;
3046 }
3048
3049 HydroNode::ObserveNonDet {
3050 inner,
3051 trusted,
3052 metadata,
3053 ..
3054 } => {
3055 let inner_ident = ident_stack.pop().unwrap();
3056
3057 let observe_ident =
3058 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3059
3060 match builders_or_callback {
3061 BuildersOrCallback::Builders(graph_builders) => {
3062 graph_builders.observe_nondet(
3063 *trusted,
3064 &inner.metadata().location_id,
3065 inner_ident,
3066 &inner.metadata().collection_kind,
3067 &observe_ident,
3068 &metadata.collection_kind,
3069 &metadata.op,
3070 );
3071 }
3072 BuildersOrCallback::Callback(_, node_callback) => {
3073 node_callback(node, next_stmt_id);
3074 }
3075 }
3076
3077 *next_stmt_id += 1;
3078
3079 ident_stack.push(observe_ident);
3080 }
3081
3082 HydroNode::Batch {
3083 inner, metadata, ..
3084 } => {
3085 let inner_ident = ident_stack.pop().unwrap();
3086
3087 let batch_ident =
3088 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3089
3090 match builders_or_callback {
3091 BuildersOrCallback::Builders(graph_builders) => {
3092 graph_builders.batch(
3093 inner_ident,
3094 &inner.metadata().location_id,
3095 &inner.metadata().collection_kind,
3096 &batch_ident,
3097 &out_location,
3098 &metadata.op,
3099 fold_hooked_idents,
3100 );
3101 }
3102 BuildersOrCallback::Callback(_, node_callback) => {
3103 node_callback(node, next_stmt_id);
3104 }
3105 }
3106
3107 *next_stmt_id += 1;
3108
3109 ident_stack.push(batch_ident);
3110 }
3111
3112 HydroNode::YieldConcat { inner, .. } => {
3113 let inner_ident = ident_stack.pop().unwrap();
3114
3115 let yield_ident =
3116 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3117
3118 match builders_or_callback {
3119 BuildersOrCallback::Builders(graph_builders) => {
3120 graph_builders.yield_from_tick(
3121 inner_ident,
3122 &inner.metadata().location_id,
3123 &inner.metadata().collection_kind,
3124 &yield_ident,
3125 &out_location,
3126 );
3127 }
3128 BuildersOrCallback::Callback(_, node_callback) => {
3129 node_callback(node, next_stmt_id);
3130 }
3131 }
3132
3133 *next_stmt_id += 1;
3134
3135 ident_stack.push(yield_ident);
3136 }
3137
3138 HydroNode::BeginAtomic { inner, metadata } => {
3139 let inner_ident = ident_stack.pop().unwrap();
3140
3141 let begin_ident =
3142 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3143
3144 match builders_or_callback {
3145 BuildersOrCallback::Builders(graph_builders) => {
3146 graph_builders.begin_atomic(
3147 inner_ident,
3148 &inner.metadata().location_id,
3149 &inner.metadata().collection_kind,
3150 &begin_ident,
3151 &out_location,
3152 &metadata.op,
3153 );
3154 }
3155 BuildersOrCallback::Callback(_, node_callback) => {
3156 node_callback(node, next_stmt_id);
3157 }
3158 }
3159
3160 *next_stmt_id += 1;
3161
3162 ident_stack.push(begin_ident);
3163 }
3164
3165 HydroNode::EndAtomic { inner, .. } => {
3166 let inner_ident = ident_stack.pop().unwrap();
3167
3168 let end_ident =
3169 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3170
3171 match builders_or_callback {
3172 BuildersOrCallback::Builders(graph_builders) => {
3173 graph_builders.end_atomic(
3174 inner_ident,
3175 &inner.metadata().location_id,
3176 &inner.metadata().collection_kind,
3177 &end_ident,
3178 );
3179 }
3180 BuildersOrCallback::Callback(_, node_callback) => {
3181 node_callback(node, next_stmt_id);
3182 }
3183 }
3184
3185 *next_stmt_id += 1;
3186
3187 ident_stack.push(end_ident);
3188 }
3189
3190 HydroNode::Source {
3191 source, metadata, ..
3192 } => {
3193 if let HydroSource::ExternalNetwork() = source {
3194 ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
3195 } else {
3196 let source_ident =
3197 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3198
3199 let source_stmt = match source {
3200 HydroSource::Stream(expr) => {
3201 debug_assert!(metadata.location_id.is_top_level());
3202 parse_quote! {
3203 #source_ident = source_stream(#expr);
3204 }
3205 }
3206
3207 HydroSource::ExternalNetwork() => {
3208 unreachable!()
3209 }
3210
3211 HydroSource::Iter(expr) => {
3212 if metadata.location_id.is_top_level() {
3213 parse_quote! {
3214 #source_ident = source_iter(#expr);
3215 }
3216 } else {
3217 parse_quote! {
3219 #source_ident = source_iter(#expr) -> persist::<'static>();
3220 }
3221 }
3222 }
3223
3224 HydroSource::Spin() => {
3225 debug_assert!(metadata.location_id.is_top_level());
3226 parse_quote! {
3227 #source_ident = spin();
3228 }
3229 }
3230
3231 HydroSource::ClusterMembers(target_loc, state) => {
3232 debug_assert!(metadata.location_id.is_top_level());
3233
3234 let members_tee_ident = syn::Ident::new(
3235 &format!(
3236 "__cluster_members_tee_{}_{}",
3237 metadata.location_id.root().key(),
3238 target_loc.key(),
3239 ),
3240 Span::call_site(),
3241 );
3242
3243 match state {
3244 ClusterMembersState::Stream(d) => {
3245 parse_quote! {
3246 #members_tee_ident = source_stream(#d) -> tee();
3247 #source_ident = #members_tee_ident;
3248 }
3249 },
3250 ClusterMembersState::Uninit => syn::parse_quote! {
3251 #source_ident = source_stream(DUMMY);
3252 },
3253 ClusterMembersState::Tee(..) => parse_quote! {
3254 #source_ident = #members_tee_ident;
3255 },
3256 }
3257 }
3258
3259 HydroSource::Embedded(ident) => {
3260 parse_quote! {
3261 #source_ident = source_stream(#ident);
3262 }
3263 }
3264
3265 HydroSource::EmbeddedSingleton(ident) => {
3266 parse_quote! {
3267 #source_ident = source_iter([#ident]);
3268 }
3269 }
3270 };
3271
3272 match builders_or_callback {
3273 BuildersOrCallback::Builders(graph_builders) => {
3274 let builder = graph_builders.get_dfir_mut(&out_location);
3275 builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
3276 }
3277 BuildersOrCallback::Callback(_, node_callback) => {
3278 node_callback(node, next_stmt_id);
3279 }
3280 }
3281
3282 *next_stmt_id += 1;
3283
3284 ident_stack.push(source_ident);
3285 }
3286 }
3287
3288 HydroNode::SingletonSource { value, first_tick_only, metadata } => {
3289 let source_ident =
3290 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3291
3292 match builders_or_callback {
3293 BuildersOrCallback::Builders(graph_builders) => {
3294 let builder = graph_builders.get_dfir_mut(&out_location);
3295
3296 if *first_tick_only {
3297 assert!(
3298 !metadata.location_id.is_top_level(),
3299 "first_tick_only SingletonSource must be inside a tick"
3300 );
3301 }
3302
3303 if *first_tick_only
3304 || (metadata.location_id.is_top_level()
3305 && metadata.collection_kind.is_bounded())
3306 {
3307 builder.add_dfir(
3308 parse_quote! {
3309 #source_ident = source_iter([#value]);
3310 },
3311 None,
3312 Some(&next_stmt_id.to_string()),
3313 );
3314 } else {
3315 builder.add_dfir(
3316 parse_quote! {
3317 #source_ident = source_iter([#value]) -> persist::<'static>();
3318 },
3319 None,
3320 Some(&next_stmt_id.to_string()),
3321 );
3322 }
3323 }
3324 BuildersOrCallback::Callback(_, node_callback) => {
3325 node_callback(node, next_stmt_id);
3326 }
3327 }
3328
3329 *next_stmt_id += 1;
3330
3331 ident_stack.push(source_ident);
3332 }
3333
3334 HydroNode::CycleSource { cycle_id, .. } => {
3335 let ident = cycle_id.as_ident();
3336
3337 match builders_or_callback {
3338 BuildersOrCallback::Builders(_) => {}
3339 BuildersOrCallback::Callback(_, node_callback) => {
3340 node_callback(node, next_stmt_id);
3341 }
3342 }
3343
3344 *next_stmt_id += 1;
3346
3347 ident_stack.push(ident);
3348 }
3349
3350 HydroNode::Tee { inner, .. } => {
3351 let ret_ident = if let Some(built_idents) =
3352 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3353 {
3354 match builders_or_callback {
3355 BuildersOrCallback::Builders(_) => {}
3356 BuildersOrCallback::Callback(_, node_callback) => {
3357 node_callback(node, next_stmt_id);
3358 }
3359 }
3360
3361 built_idents[0].clone()
3362 } else {
3363 let inner_ident = ident_stack.pop().unwrap();
3366
3367 let tee_ident =
3368 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3369
3370 built_tees.insert(
3371 inner.0.as_ref() as *const RefCell<HydroNode>,
3372 vec![tee_ident.clone()],
3373 );
3374
3375 match builders_or_callback {
3376 BuildersOrCallback::Builders(graph_builders) => {
3377 if fold_hooked_idents.contains(&inner_ident.to_string()) {
3389 fold_hooked_idents.insert(tee_ident.to_string());
3390 }
3391 let builder = graph_builders.get_dfir_mut(&out_location);
3392 builder.add_dfir(
3393 parse_quote! {
3394 #tee_ident = #inner_ident -> tee();
3395 },
3396 None,
3397 Some(&next_stmt_id.to_string()),
3398 );
3399 }
3400 BuildersOrCallback::Callback(_, node_callback) => {
3401 node_callback(node, next_stmt_id);
3402 }
3403 }
3404
3405 tee_ident
3406 };
3407
3408 *next_stmt_id += 1;
3412 ident_stack.push(ret_ident);
3413 }
3414
3415 HydroNode::Singleton { inner, .. } => {
3416 let ret_ident = if let Some(built_idents) =
3417 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3418 {
3419 built_idents[0].clone()
3420 } else {
3421 let inner_ident = ident_stack.pop().unwrap();
3422
3423 let singleton_ident =
3424 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3425
3426 built_tees.insert(
3427 inner.0.as_ref() as *const RefCell<HydroNode>,
3428 vec![singleton_ident.clone()],
3429 );
3430
3431 match builders_or_callback {
3432 BuildersOrCallback::Builders(graph_builders) => {
3433 let builder = graph_builders.get_dfir_mut(&out_location);
3434 builder.add_dfir(
3435 parse_quote! {
3436 #singleton_ident = #inner_ident -> singleton();
3437 },
3438 None,
3439 Some(&next_stmt_id.to_string()),
3440 );
3441 }
3442 BuildersOrCallback::Callback(_, node_callback) => {
3443 node_callback(node, next_stmt_id);
3444 }
3445 }
3446
3447 singleton_ident
3448 };
3449
3450 *next_stmt_id += 1;
3453 ident_stack.push(ret_ident);
3454 }
3455
3456 HydroNode::Partition {
3457 inner, f, is_true, ..
3458 } => {
3459 let is_true = *is_true; let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
3461 let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
3462 match builders_or_callback {
3463 BuildersOrCallback::Builders(_) => {}
3464 BuildersOrCallback::Callback(_, node_callback) => {
3465 node_callback(node, next_stmt_id);
3466 }
3467 }
3468
3469 let idx = if is_true { 0 } else { 1 };
3470 built_idents[idx].clone()
3471 } else {
3472 let inner_ident = ident_stack.pop().unwrap();
3475 let f_tokens = f.emit_tokens(&mut ident_stack);
3476
3477 let partition_ident = syn::Ident::new(
3478 &format!("stream_{}_partition", *next_stmt_id),
3479 Span::call_site(),
3480 );
3481 let true_ident = syn::Ident::new(
3482 &format!("stream_{}_true", *next_stmt_id),
3483 Span::call_site(),
3484 );
3485 let false_ident = syn::Ident::new(
3486 &format!("stream_{}_false", *next_stmt_id),
3487 Span::call_site(),
3488 );
3489
3490 built_tees.insert(
3491 ptr,
3492 vec![true_ident.clone(), false_ident.clone()],
3493 );
3494
3495 match builders_or_callback {
3496 BuildersOrCallback::Builders(graph_builders) => {
3497 let builder = graph_builders.get_dfir_mut(&out_location);
3498 builder.add_dfir(
3499 parse_quote! {
3500 #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f_tokens)(__item) { 0_usize } else { 1_usize });
3501 #true_ident = #partition_ident[0];
3502 #false_ident = #partition_ident[1];
3503 },
3504 None,
3505 Some(&next_stmt_id.to_string()),
3506 );
3507 }
3508 BuildersOrCallback::Callback(_, node_callback) => {
3509 node_callback(node, next_stmt_id);
3510 }
3511 }
3512
3513 if is_true { true_ident } else { false_ident }
3514 };
3515
3516 *next_stmt_id += 1;
3517 ident_stack.push(ret_ident);
3518 }
3519
3520 HydroNode::Chain { .. } => {
3521 let second_ident = ident_stack.pop().unwrap();
3523 let first_ident = ident_stack.pop().unwrap();
3524
3525 let chain_ident =
3526 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3527
3528 match builders_or_callback {
3529 BuildersOrCallback::Builders(graph_builders) => {
3530 let builder = graph_builders.get_dfir_mut(&out_location);
3531 builder.add_dfir(
3532 parse_quote! {
3533 #chain_ident = chain();
3534 #first_ident -> [0]#chain_ident;
3535 #second_ident -> [1]#chain_ident;
3536 },
3537 None,
3538 Some(&next_stmt_id.to_string()),
3539 );
3540 }
3541 BuildersOrCallback::Callback(_, node_callback) => {
3542 node_callback(node, next_stmt_id);
3543 }
3544 }
3545
3546 *next_stmt_id += 1;
3547
3548 ident_stack.push(chain_ident);
3549 }
3550
3551 HydroNode::MergeOrdered { first, metadata, .. } => {
3552 let second_ident = ident_stack.pop().unwrap();
3553 let first_ident = ident_stack.pop().unwrap();
3554
3555 let merge_ident =
3556 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3557
3558 match builders_or_callback {
3559 BuildersOrCallback::Builders(graph_builders) => {
3560 graph_builders.merge_ordered(
3561 &first.metadata().location_id,
3562 first_ident,
3563 second_ident,
3564 &merge_ident,
3565 &first.metadata().collection_kind,
3566 &metadata.op,
3567 Some(&next_stmt_id.to_string()),
3568 );
3569 }
3570 BuildersOrCallback::Callback(_, node_callback) => {
3571 node_callback(node, next_stmt_id);
3572 }
3573 }
3574
3575 *next_stmt_id += 1;
3576
3577 ident_stack.push(merge_ident);
3578 }
3579
3580 HydroNode::ChainFirst { .. } => {
3581 let second_ident = ident_stack.pop().unwrap();
3582 let first_ident = ident_stack.pop().unwrap();
3583
3584 let chain_ident =
3585 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3586
3587 match builders_or_callback {
3588 BuildersOrCallback::Builders(graph_builders) => {
3589 let builder = graph_builders.get_dfir_mut(&out_location);
3590 builder.add_dfir(
3591 parse_quote! {
3592 #chain_ident = chain_first_n(1);
3593 #first_ident -> [0]#chain_ident;
3594 #second_ident -> [1]#chain_ident;
3595 },
3596 None,
3597 Some(&next_stmt_id.to_string()),
3598 );
3599 }
3600 BuildersOrCallback::Callback(_, node_callback) => {
3601 node_callback(node, next_stmt_id);
3602 }
3603 }
3604
3605 *next_stmt_id += 1;
3606
3607 ident_stack.push(chain_ident);
3608 }
3609
3610 HydroNode::CrossSingleton { right, .. } => {
3611 let right_ident = ident_stack.pop().unwrap();
3612 let left_ident = ident_stack.pop().unwrap();
3613
3614 let cross_ident =
3615 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3616
3617 match builders_or_callback {
3618 BuildersOrCallback::Builders(graph_builders) => {
3619 let builder = graph_builders.get_dfir_mut(&out_location);
3620
3621 if right.metadata().location_id.is_top_level()
3622 && right.metadata().collection_kind.is_bounded()
3623 {
3624 builder.add_dfir(
3625 parse_quote! {
3626 #cross_ident = cross_singleton();
3627 #left_ident -> [input]#cross_ident;
3628 #right_ident -> persist::<'static>() -> [single]#cross_ident;
3629 },
3630 None,
3631 Some(&next_stmt_id.to_string()),
3632 );
3633 } else {
3634 builder.add_dfir(
3635 parse_quote! {
3636 #cross_ident = cross_singleton();
3637 #left_ident -> [input]#cross_ident;
3638 #right_ident -> [single]#cross_ident;
3639 },
3640 None,
3641 Some(&next_stmt_id.to_string()),
3642 );
3643 }
3644 }
3645 BuildersOrCallback::Callback(_, node_callback) => {
3646 node_callback(node, next_stmt_id);
3647 }
3648 }
3649
3650 *next_stmt_id += 1;
3651
3652 ident_stack.push(cross_ident);
3653 }
3654
3655 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
3656 let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
3657 parse_quote!(cross_join_multiset)
3658 } else {
3659 parse_quote!(join_multiset)
3660 };
3661
3662 let (HydroNode::CrossProduct { left, right, .. }
3663 | HydroNode::Join { left, right, .. }) = node
3664 else {
3665 unreachable!()
3666 };
3667
3668 let is_top_level = left.metadata().location_id.is_top_level()
3669 && right.metadata().location_id.is_top_level();
3670 let left_lifetime = if left.metadata().location_id.is_top_level() {
3671 quote!('static)
3672 } else {
3673 quote!('tick)
3674 };
3675
3676 let right_lifetime = if right.metadata().location_id.is_top_level() {
3677 quote!('static)
3678 } else {
3679 quote!('tick)
3680 };
3681
3682 let right_ident = ident_stack.pop().unwrap();
3683 let left_ident = ident_stack.pop().unwrap();
3684
3685 let stream_ident =
3686 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3687
3688 match builders_or_callback {
3689 BuildersOrCallback::Builders(graph_builders) => {
3690 let builder = graph_builders.get_dfir_mut(&out_location);
3691 builder.add_dfir(
3692 if is_top_level {
3693 parse_quote! {
3696 #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
3697 #left_ident -> [0]#stream_ident;
3698 #right_ident -> [1]#stream_ident;
3699 }
3700 } else {
3701 parse_quote! {
3702 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
3703 #left_ident -> [0]#stream_ident;
3704 #right_ident -> [1]#stream_ident;
3705 }
3706 }
3707 ,
3708 None,
3709 Some(&next_stmt_id.to_string()),
3710 );
3711 }
3712 BuildersOrCallback::Callback(_, node_callback) => {
3713 node_callback(node, next_stmt_id);
3714 }
3715 }
3716
3717 *next_stmt_id += 1;
3718
3719 ident_stack.push(stream_ident);
3720 }
3721
3722 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
3723 let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
3724 parse_quote!(difference)
3725 } else {
3726 parse_quote!(anti_join)
3727 };
3728
3729 let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
3730 node
3731 else {
3732 unreachable!()
3733 };
3734
3735 let neg_lifetime = if neg.metadata().location_id.is_top_level() {
3736 quote!('static)
3737 } else {
3738 quote!('tick)
3739 };
3740
3741 let neg_ident = ident_stack.pop().unwrap();
3742 let pos_ident = ident_stack.pop().unwrap();
3743
3744 let stream_ident =
3745 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3746
3747 match builders_or_callback {
3748 BuildersOrCallback::Builders(graph_builders) => {
3749 let builder = graph_builders.get_dfir_mut(&out_location);
3750 builder.add_dfir(
3751 parse_quote! {
3752 #stream_ident = #operator::<'tick, #neg_lifetime>();
3753 #pos_ident -> [pos]#stream_ident;
3754 #neg_ident -> [neg]#stream_ident;
3755 },
3756 None,
3757 Some(&next_stmt_id.to_string()),
3758 );
3759 }
3760 BuildersOrCallback::Callback(_, node_callback) => {
3761 node_callback(node, next_stmt_id);
3762 }
3763 }
3764
3765 *next_stmt_id += 1;
3766
3767 ident_stack.push(stream_ident);
3768 }
3769
3770 HydroNode::JoinHalf { .. } => {
3771 let HydroNode::JoinHalf { right, .. } = node else {
3772 unreachable!()
3773 };
3774
3775 assert!(
3776 right.metadata().collection_kind.is_bounded(),
3777 "JoinHalf requires the right (build) side to be Bounded, got {:?}",
3778 right.metadata().collection_kind
3779 );
3780
3781 let build_lifetime = if right.metadata().location_id.is_top_level() {
3782 quote!('static)
3783 } else {
3784 quote!('tick)
3785 };
3786
3787 let build_ident = ident_stack.pop().unwrap();
3788 let probe_ident = ident_stack.pop().unwrap();
3789
3790 let stream_ident =
3791 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3792
3793 match builders_or_callback {
3794 BuildersOrCallback::Builders(graph_builders) => {
3795 let builder = graph_builders.get_dfir_mut(&out_location);
3796 builder.add_dfir(
3797 parse_quote! {
3798 #stream_ident = join_multiset_half::<#build_lifetime, 'tick>();
3799 #probe_ident -> [probe]#stream_ident;
3800 #build_ident -> [build]#stream_ident;
3801 },
3802 None,
3803 Some(&next_stmt_id.to_string()),
3804 );
3805 }
3806 BuildersOrCallback::Callback(_, node_callback) => {
3807 node_callback(node, next_stmt_id);
3808 }
3809 }
3810
3811 *next_stmt_id += 1;
3812
3813 ident_stack.push(stream_ident);
3814 }
3815
3816 HydroNode::ResolveFutures { .. } => {
3817 let input_ident = ident_stack.pop().unwrap();
3818
3819 let futures_ident =
3820 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3821
3822 match builders_or_callback {
3823 BuildersOrCallback::Builders(graph_builders) => {
3824 let builder = graph_builders.get_dfir_mut(&out_location);
3825 builder.add_dfir(
3826 parse_quote! {
3827 #futures_ident = #input_ident -> resolve_futures();
3828 },
3829 None,
3830 Some(&next_stmt_id.to_string()),
3831 );
3832 }
3833 BuildersOrCallback::Callback(_, node_callback) => {
3834 node_callback(node, next_stmt_id);
3835 }
3836 }
3837
3838 *next_stmt_id += 1;
3839
3840 ident_stack.push(futures_ident);
3841 }
3842
3843 HydroNode::ResolveFuturesBlocking { .. } => {
3844 let input_ident = ident_stack.pop().unwrap();
3845
3846 let futures_ident =
3847 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3848
3849 match builders_or_callback {
3850 BuildersOrCallback::Builders(graph_builders) => {
3851 let builder = graph_builders.get_dfir_mut(&out_location);
3852 builder.add_dfir(
3853 parse_quote! {
3854 #futures_ident = #input_ident -> resolve_futures_blocking();
3855 },
3856 None,
3857 Some(&next_stmt_id.to_string()),
3858 );
3859 }
3860 BuildersOrCallback::Callback(_, node_callback) => {
3861 node_callback(node, next_stmt_id);
3862 }
3863 }
3864
3865 *next_stmt_id += 1;
3866
3867 ident_stack.push(futures_ident);
3868 }
3869
3870 HydroNode::ResolveFuturesOrdered { .. } => {
3871 let input_ident = ident_stack.pop().unwrap();
3872
3873 let futures_ident =
3874 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3875
3876 match builders_or_callback {
3877 BuildersOrCallback::Builders(graph_builders) => {
3878 let builder = graph_builders.get_dfir_mut(&out_location);
3879 builder.add_dfir(
3880 parse_quote! {
3881 #futures_ident = #input_ident -> resolve_futures_ordered();
3882 },
3883 None,
3884 Some(&next_stmt_id.to_string()),
3885 );
3886 }
3887 BuildersOrCallback::Callback(_, node_callback) => {
3888 node_callback(node, next_stmt_id);
3889 }
3890 }
3891
3892 *next_stmt_id += 1;
3893
3894 ident_stack.push(futures_ident);
3895 }
3896
3897 HydroNode::Map { f, .. } => {
3898 let input_ident = ident_stack.pop().unwrap();
3900 let f_tokens = f.emit_tokens(&mut ident_stack);
3901
3902 let map_ident =
3903 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3904
3905 match builders_or_callback {
3906 BuildersOrCallback::Builders(graph_builders) => {
3907 let builder = graph_builders.get_dfir_mut(&out_location);
3908 builder.add_dfir(
3909 parse_quote! {
3910 #map_ident = #input_ident -> map(#f_tokens);
3911 },
3912 None,
3913 Some(&next_stmt_id.to_string()),
3914 );
3915 }
3916 BuildersOrCallback::Callback(_, node_callback) => {
3917 node_callback(node, next_stmt_id);
3918 }
3919 }
3920
3921 *next_stmt_id += 1;
3922
3923 ident_stack.push(map_ident);
3924 }
3925
3926 HydroNode::FlatMap { f, .. } => {
3927 let input_ident = ident_stack.pop().unwrap();
3928 let f_tokens = f.emit_tokens(&mut ident_stack);
3929
3930 let flat_map_ident =
3931 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3932
3933 match builders_or_callback {
3934 BuildersOrCallback::Builders(graph_builders) => {
3935 let builder = graph_builders.get_dfir_mut(&out_location);
3936 builder.add_dfir(
3937 parse_quote! {
3938 #flat_map_ident = #input_ident -> flat_map(#f_tokens);
3939 },
3940 None,
3941 Some(&next_stmt_id.to_string()),
3942 );
3943 }
3944 BuildersOrCallback::Callback(_, node_callback) => {
3945 node_callback(node, next_stmt_id);
3946 }
3947 }
3948
3949 *next_stmt_id += 1;
3950
3951 ident_stack.push(flat_map_ident);
3952 }
3953
3954 HydroNode::FlatMapStreamBlocking { f, .. } => {
3955 let input_ident = ident_stack.pop().unwrap();
3956 let f_tokens = f.emit_tokens(&mut ident_stack);
3957
3958 let flat_map_stream_blocking_ident =
3959 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3960
3961 match builders_or_callback {
3962 BuildersOrCallback::Builders(graph_builders) => {
3963 let builder = graph_builders.get_dfir_mut(&out_location);
3964 builder.add_dfir(
3965 parse_quote! {
3966 #flat_map_stream_blocking_ident = #input_ident -> flat_map_stream_blocking(#f_tokens);
3967 },
3968 None,
3969 Some(&next_stmt_id.to_string()),
3970 );
3971 }
3972 BuildersOrCallback::Callback(_, node_callback) => {
3973 node_callback(node, next_stmt_id);
3974 }
3975 }
3976
3977 *next_stmt_id += 1;
3978
3979 ident_stack.push(flat_map_stream_blocking_ident);
3980 }
3981
3982 HydroNode::Filter { f, .. } => {
3983 let input_ident = ident_stack.pop().unwrap();
3984 let f_tokens = f.emit_tokens(&mut ident_stack);
3985
3986 let filter_ident =
3987 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3988
3989 match builders_or_callback {
3990 BuildersOrCallback::Builders(graph_builders) => {
3991 let builder = graph_builders.get_dfir_mut(&out_location);
3992 builder.add_dfir(
3993 parse_quote! {
3994 #filter_ident = #input_ident -> filter(#f_tokens);
3995 },
3996 None,
3997 Some(&next_stmt_id.to_string()),
3998 );
3999 }
4000 BuildersOrCallback::Callback(_, node_callback) => {
4001 node_callback(node, next_stmt_id);
4002 }
4003 }
4004
4005 *next_stmt_id += 1;
4006
4007 ident_stack.push(filter_ident);
4008 }
4009
4010 HydroNode::FilterMap { f, .. } => {
4011 let input_ident = ident_stack.pop().unwrap();
4012 let f_tokens = f.emit_tokens(&mut ident_stack);
4013
4014 let filter_map_ident =
4015 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4016
4017 match builders_or_callback {
4018 BuildersOrCallback::Builders(graph_builders) => {
4019 let builder = graph_builders.get_dfir_mut(&out_location);
4020 builder.add_dfir(
4021 parse_quote! {
4022 #filter_map_ident = #input_ident -> filter_map(#f_tokens);
4023 },
4024 None,
4025 Some(&next_stmt_id.to_string()),
4026 );
4027 }
4028 BuildersOrCallback::Callback(_, node_callback) => {
4029 node_callback(node, next_stmt_id);
4030 }
4031 }
4032
4033 *next_stmt_id += 1;
4034
4035 ident_stack.push(filter_map_ident);
4036 }
4037
4038 HydroNode::Sort { .. } => {
4039 let input_ident = ident_stack.pop().unwrap();
4040
4041 let sort_ident =
4042 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4043
4044 match builders_or_callback {
4045 BuildersOrCallback::Builders(graph_builders) => {
4046 let builder = graph_builders.get_dfir_mut(&out_location);
4047 builder.add_dfir(
4048 parse_quote! {
4049 #sort_ident = #input_ident -> sort();
4050 },
4051 None,
4052 Some(&next_stmt_id.to_string()),
4053 );
4054 }
4055 BuildersOrCallback::Callback(_, node_callback) => {
4056 node_callback(node, next_stmt_id);
4057 }
4058 }
4059
4060 *next_stmt_id += 1;
4061
4062 ident_stack.push(sort_ident);
4063 }
4064
4065 HydroNode::DeferTick { .. } => {
4066 let input_ident = ident_stack.pop().unwrap();
4067
4068 let defer_tick_ident =
4069 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4070
4071 match builders_or_callback {
4072 BuildersOrCallback::Builders(graph_builders) => {
4073 let builder = graph_builders.get_dfir_mut(&out_location);
4074 builder.add_dfir(
4075 parse_quote! {
4076 #defer_tick_ident = #input_ident -> defer_tick_lazy();
4077 },
4078 None,
4079 Some(&next_stmt_id.to_string()),
4080 );
4081 }
4082 BuildersOrCallback::Callback(_, node_callback) => {
4083 node_callback(node, next_stmt_id);
4084 }
4085 }
4086
4087 *next_stmt_id += 1;
4088
4089 ident_stack.push(defer_tick_ident);
4090 }
4091
4092 HydroNode::Enumerate { input, .. } => {
4093 let input_ident = ident_stack.pop().unwrap();
4094
4095 let enumerate_ident =
4096 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4097
4098 match builders_or_callback {
4099 BuildersOrCallback::Builders(graph_builders) => {
4100 let builder = graph_builders.get_dfir_mut(&out_location);
4101 let lifetime = if input.metadata().location_id.is_top_level() {
4102 quote!('static)
4103 } else {
4104 quote!('tick)
4105 };
4106 builder.add_dfir(
4107 parse_quote! {
4108 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
4109 },
4110 None,
4111 Some(&next_stmt_id.to_string()),
4112 );
4113 }
4114 BuildersOrCallback::Callback(_, node_callback) => {
4115 node_callback(node, next_stmt_id);
4116 }
4117 }
4118
4119 *next_stmt_id += 1;
4120
4121 ident_stack.push(enumerate_ident);
4122 }
4123
4124 HydroNode::Inspect { f, .. } => {
4125 let input_ident = ident_stack.pop().unwrap();
4126 let f_tokens = f.emit_tokens(&mut ident_stack);
4127
4128 let inspect_ident =
4129 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4130
4131 match builders_or_callback {
4132 BuildersOrCallback::Builders(graph_builders) => {
4133 let builder = graph_builders.get_dfir_mut(&out_location);
4134 builder.add_dfir(
4135 parse_quote! {
4136 #inspect_ident = #input_ident -> inspect(#f_tokens);
4137 },
4138 None,
4139 Some(&next_stmt_id.to_string()),
4140 );
4141 }
4142 BuildersOrCallback::Callback(_, node_callback) => {
4143 node_callback(node, next_stmt_id);
4144 }
4145 }
4146
4147 *next_stmt_id += 1;
4148
4149 ident_stack.push(inspect_ident);
4150 }
4151
4152 HydroNode::Unique { input, .. } => {
4153 let input_ident = ident_stack.pop().unwrap();
4154
4155 let unique_ident =
4156 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4157
4158 match builders_or_callback {
4159 BuildersOrCallback::Builders(graph_builders) => {
4160 let builder = graph_builders.get_dfir_mut(&out_location);
4161 let lifetime = if input.metadata().location_id.is_top_level() {
4162 quote!('static)
4163 } else {
4164 quote!('tick)
4165 };
4166
4167 builder.add_dfir(
4168 parse_quote! {
4169 #unique_ident = #input_ident -> unique::<#lifetime>();
4170 },
4171 None,
4172 Some(&next_stmt_id.to_string()),
4173 );
4174 }
4175 BuildersOrCallback::Callback(_, node_callback) => {
4176 node_callback(node, next_stmt_id);
4177 }
4178 }
4179
4180 *next_stmt_id += 1;
4181
4182 ident_stack.push(unique_ident);
4183 }
4184
4185 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } | HydroNode::ScanAsyncBlocking { .. } => {
4186 let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
4187 if input.metadata().location_id.is_top_level()
4188 && input.metadata().collection_kind.is_bounded()
4189 {
4190 parse_quote!(fold_no_replay)
4191 } else {
4192 parse_quote!(fold)
4193 }
4194 } else if matches!(node, HydroNode::Scan { .. }) {
4195 parse_quote!(scan)
4196 } else if matches!(node, HydroNode::ScanAsyncBlocking { .. }) {
4197 parse_quote!(scan_async_blocking)
4198 } else if let HydroNode::FoldKeyed { input, .. } = node {
4199 if input.metadata().location_id.is_top_level()
4200 && input.metadata().collection_kind.is_bounded()
4201 {
4202 todo!("Fold keyed on a top-level bounded collection is not yet supported")
4203 } else {
4204 parse_quote!(fold_keyed)
4205 }
4206 } else {
4207 unreachable!()
4208 };
4209
4210 let (HydroNode::Fold { input, .. }
4211 | HydroNode::FoldKeyed { input, .. }
4212 | HydroNode::Scan { input, .. }
4213 | HydroNode::ScanAsyncBlocking { input, .. }) = node
4214 else {
4215 unreachable!()
4216 };
4217
4218 let lifetime = if input.metadata().location_id.is_top_level() {
4219 quote!('static)
4220 } else {
4221 quote!('tick)
4222 };
4223
4224 let input_ident = ident_stack.pop().unwrap();
4225
4226 let (HydroNode::Fold { init, acc, .. }
4227 | HydroNode::FoldKeyed { init, acc, .. }
4228 | HydroNode::Scan { init, acc, .. }
4229 | HydroNode::ScanAsyncBlocking { init, acc, .. }) = &*node
4230 else {
4231 unreachable!()
4232 };
4233
4234 let acc_tokens = acc.emit_tokens(&mut ident_stack);
4235 let init_tokens = init.emit_tokens(&mut ident_stack);
4236
4237 let fold_ident =
4238 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4239
4240 match builders_or_callback {
4241 BuildersOrCallback::Builders(graph_builders) => {
4242 if matches!(node, HydroNode::Fold { .. })
4243 && node.metadata().location_id.is_top_level()
4244 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4245 && graph_builders.singleton_intermediates()
4246 && !node.metadata().collection_kind.is_bounded()
4247 {
4248 let HydroNode::Fold { input, .. } = &*node else { unreachable!() };
4249 let hooked_input_ident = graph_builders.emit_fold_hook(
4250 &input.metadata().location_id,
4251 &input_ident,
4252 &input.metadata().collection_kind,
4253 &node.metadata().op,
4254 );
4255
4256 let (effective_input, wrapped_acc) = if let Some(ref hooked) = hooked_input_ident {
4257 let acc: syn::Expr = parse_quote!({
4258 let mut __inner = #acc_tokens;
4259 move |__state, __batch: Vec<_>| {
4260 if __batch.is_empty() {
4261 return None;
4262 }
4263 for __value in __batch {
4264 __inner(__state, __value);
4265 }
4266 Some(__state.clone())
4267 }
4268 });
4269 (hooked, acc)
4270 } else {
4271 let acc: syn::Expr = parse_quote!({
4272 let mut __inner = #acc_tokens;
4273 move |__state, __value| {
4274 __inner(__state, __value);
4275 Some(__state.clone())
4276 }
4277 });
4278 (&input_ident, acc)
4279 };
4280
4281 let builder = graph_builders.get_dfir_mut(&out_location);
4282 builder.add_dfir(
4283 parse_quote! {
4284 source_iter([(#init_tokens)()]) -> [0]#fold_ident;
4285 #effective_input -> scan::<#lifetime>(#init_tokens, #wrapped_acc) -> [1]#fold_ident;
4286 #fold_ident = chain();
4287 },
4288 None,
4289 Some(&next_stmt_id.to_string()),
4290 );
4291
4292 if hooked_input_ident.is_some() {
4293 fold_hooked_idents.insert(fold_ident.to_string());
4294 }
4295 } else if matches!(node, HydroNode::FoldKeyed { .. })
4296 && node.metadata().location_id.is_top_level()
4297 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4298 && graph_builders.singleton_intermediates()
4299 && !node.metadata().collection_kind.is_bounded()
4300 {
4301 let HydroNode::FoldKeyed { input, .. } = &*node else { unreachable!() };
4302 let hooked_input_ident = graph_builders.emit_fold_hook(
4303 &input.metadata().location_id,
4304 &input_ident,
4305 &input.metadata().collection_kind,
4306 &node.metadata().op,
4307 );
4308 let builder = graph_builders.get_dfir_mut(&out_location);
4309
4310 let wrapped_acc: syn::Expr = parse_quote!({
4311 let mut __init = #init_tokens;
4312 let mut __inner = #acc_tokens;
4313 move |__state, __kv: (_, _)| {
4314 let __state = __state
4316 .entry(::std::clone::Clone::clone(&__kv.0))
4317 .or_insert_with(|| (__init)());
4318 __inner(__state, __kv.1);
4319 Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
4320 }
4321 });
4322
4323 if let Some(hooked_input_ident) = hooked_input_ident {
4324 builder.add_dfir(
4325 parse_quote! {
4326 #fold_ident = #hooked_input_ident -> flatten() -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4327 },
4328 None,
4329 Some(&next_stmt_id.to_string()),
4330 );
4331
4332 fold_hooked_idents.insert(fold_ident.to_string());
4333 } else {
4334 builder.add_dfir(
4335 parse_quote! {
4336 #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4337 },
4338 None,
4339 Some(&next_stmt_id.to_string()),
4340 );
4341 }
4342 } else if (matches!(node, HydroNode::Fold { .. })
4343 || matches!(node, HydroNode::FoldKeyed { .. }))
4344 && !node.metadata().location_id.is_top_level()
4345 && graph_builders.singleton_intermediates()
4346 {
4347 let input_ref = match &*node {
4348 HydroNode::Fold { input, .. } => input,
4349 HydroNode::FoldKeyed { input, .. } => input,
4350 _ => unreachable!(),
4351 };
4352 let hooked_input_ident = graph_builders.emit_fold_hook(
4353 &input_ref.metadata().location_id,
4354 &input_ident,
4355 &input_ref.metadata().collection_kind,
4356 &node.metadata().op,
4357 );
4358
4359 let actual_input = hooked_input_ident.as_ref().unwrap_or(&input_ident);
4360 let builder = graph_builders.get_dfir_mut(&out_location);
4361 builder.add_dfir(
4362 parse_quote! {
4363 #fold_ident = #actual_input -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4364 },
4365 None,
4366 Some(&next_stmt_id.to_string()),
4367 );
4368 } else {
4369 let builder = graph_builders.get_dfir_mut(&out_location);
4370 builder.add_dfir(
4371 parse_quote! {
4372 #fold_ident = #input_ident -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4373 },
4374 None,
4375 Some(&next_stmt_id.to_string()),
4376 );
4377 }
4378 }
4379 BuildersOrCallback::Callback(_, node_callback) => {
4380 node_callback(node, next_stmt_id);
4381 }
4382 }
4383
4384 *next_stmt_id += 1;
4385
4386 ident_stack.push(fold_ident);
4387 }
4388
4389 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
4390 let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
4391 if input.metadata().location_id.is_top_level()
4392 && input.metadata().collection_kind.is_bounded()
4393 {
4394 parse_quote!(reduce_no_replay)
4395 } else {
4396 parse_quote!(reduce)
4397 }
4398 } else if let HydroNode::ReduceKeyed { input, .. } = node {
4399 if input.metadata().location_id.is_top_level()
4400 && input.metadata().collection_kind.is_bounded()
4401 {
4402 todo!(
4403 "Calling keyed reduce on a top-level bounded collection is not supported"
4404 )
4405 } else {
4406 parse_quote!(reduce_keyed)
4407 }
4408 } else {
4409 unreachable!()
4410 };
4411
4412 let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
4413 else {
4414 unreachable!()
4415 };
4416
4417 let lifetime = if input.metadata().location_id.is_top_level() {
4418 quote!('static)
4419 } else {
4420 quote!('tick)
4421 };
4422
4423 let input_ident = ident_stack.pop().unwrap();
4424
4425 let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
4426 else {
4427 unreachable!()
4428 };
4429
4430 let f_tokens = f.emit_tokens(&mut ident_stack);
4431
4432 let reduce_ident =
4433 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4434
4435 match builders_or_callback {
4436 BuildersOrCallback::Builders(graph_builders) => {
4437 if matches!(node, HydroNode::Reduce { .. })
4438 && node.metadata().location_id.is_top_level()
4439 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4440 && graph_builders.singleton_intermediates()
4441 && !node.metadata().collection_kind.is_bounded()
4442 {
4443 todo!(
4444 "Reduce with optional intermediates is not yet supported in simulator"
4445 );
4446 } else if matches!(node, HydroNode::ReduceKeyed { .. })
4447 && node.metadata().location_id.is_top_level()
4448 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4449 && graph_builders.singleton_intermediates()
4450 && !node.metadata().collection_kind.is_bounded()
4451 {
4452 todo!(
4453 "Reduce keyed with optional intermediates is not yet supported in simulator"
4454 );
4455 } else {
4456 let builder = graph_builders.get_dfir_mut(&out_location);
4457 builder.add_dfir(
4458 parse_quote! {
4459 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f_tokens);
4460 },
4461 None,
4462 Some(&next_stmt_id.to_string()),
4463 );
4464 }
4465 }
4466 BuildersOrCallback::Callback(_, node_callback) => {
4467 node_callback(node, next_stmt_id);
4468 }
4469 }
4470
4471 *next_stmt_id += 1;
4472
4473 ident_stack.push(reduce_ident);
4474 }
4475
4476 HydroNode::ReduceKeyedWatermark {
4477 f,
4478 input,
4479 metadata,
4480 ..
4481 } => {
4482 let lifetime = if input.metadata().location_id.is_top_level() {
4483 quote!('static)
4484 } else {
4485 quote!('tick)
4486 };
4487
4488 let watermark_ident = ident_stack.pop().unwrap();
4490 let input_ident = ident_stack.pop().unwrap();
4491 let f_tokens = f.emit_tokens(&mut ident_stack);
4492
4493 let chain_ident = syn::Ident::new(
4494 &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
4495 Span::call_site(),
4496 );
4497
4498 let fold_ident =
4499 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4500
4501 let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
4502 && input.metadata().collection_kind.is_bounded()
4503 {
4504 parse_quote!(fold_no_replay)
4505 } else {
4506 parse_quote!(fold)
4507 };
4508
4509 match builders_or_callback {
4510 BuildersOrCallback::Builders(graph_builders) => {
4511 if metadata.location_id.is_top_level()
4512 && !(matches!(metadata.location_id, LocationId::Atomic(_)))
4513 && graph_builders.singleton_intermediates()
4514 && !metadata.collection_kind.is_bounded()
4515 {
4516 todo!(
4517 "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
4518 )
4519 } else {
4520 let builder = graph_builders.get_dfir_mut(&out_location);
4521 builder.add_dfir(
4522 parse_quote! {
4523 #chain_ident = chain();
4524 #input_ident
4525 -> map(|x| (Some(x), None))
4526 -> [0]#chain_ident;
4527 #watermark_ident
4528 -> map(|watermark| (None, Some(watermark)))
4529 -> [1]#chain_ident;
4530
4531 #fold_ident = #chain_ident
4532 -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
4533 let __reduce_keyed_fn = #f_tokens;
4534 move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
4535 if let Some((k, v)) = opt_payload {
4536 if let Some(curr_watermark) = *opt_curr_watermark {
4537 if k < curr_watermark {
4538 return;
4539 }
4540 }
4541 match map.entry(k) {
4542 ::std::collections::hash_map::Entry::Vacant(e) => {
4543 e.insert(v);
4544 }
4545 ::std::collections::hash_map::Entry::Occupied(mut e) => {
4546 __reduce_keyed_fn(e.get_mut(), v);
4547 }
4548 }
4549 } else {
4550 let watermark = opt_watermark.unwrap();
4551 if let Some(curr_watermark) = *opt_curr_watermark {
4552 if watermark <= curr_watermark {
4553 return;
4554 }
4555 }
4556 map.retain(|k, _| *k >= watermark);
4557 *opt_curr_watermark = Some(watermark);
4558 }
4559 }
4560 })
4561 -> flat_map(|(map, _curr_watermark)| map);
4562 },
4563 None,
4564 Some(&next_stmt_id.to_string()),
4565 );
4566 }
4567 }
4568 BuildersOrCallback::Callback(_, node_callback) => {
4569 node_callback(node, next_stmt_id);
4570 }
4571 }
4572
4573 *next_stmt_id += 1;
4574
4575 ident_stack.push(fold_ident);
4576 }
4577
4578 HydroNode::Network {
4579 networking_info,
4580 serialize_fn: serialize_pipeline,
4581 instantiate_fn,
4582 deserialize_fn: deserialize_pipeline,
4583 input,
4584 ..
4585 } => {
4586 let input_ident = ident_stack.pop().unwrap();
4587
4588 let receiver_stream_ident =
4589 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4590
4591 match builders_or_callback {
4592 BuildersOrCallback::Builders(graph_builders) => {
4593 let (sink_expr, source_expr) = match instantiate_fn {
4594 DebugInstantiate::Building => (
4595 syn::parse_quote!(DUMMY_SINK),
4596 syn::parse_quote!(DUMMY_SOURCE),
4597 ),
4598
4599 DebugInstantiate::Finalized(finalized) => {
4600 (finalized.sink.clone(), finalized.source.clone())
4601 }
4602 };
4603
4604 graph_builders.create_network(
4605 &input.metadata().location_id,
4606 &out_location,
4607 input_ident,
4608 &receiver_stream_ident,
4609 serialize_pipeline.as_ref(),
4610 sink_expr,
4611 source_expr,
4612 deserialize_pipeline.as_ref(),
4613 *next_stmt_id,
4614 networking_info,
4615 );
4616 }
4617 BuildersOrCallback::Callback(_, node_callback) => {
4618 node_callback(node, next_stmt_id);
4619 }
4620 }
4621
4622 *next_stmt_id += 1;
4623
4624 ident_stack.push(receiver_stream_ident);
4625 }
4626
4627 HydroNode::ExternalInput {
4628 instantiate_fn,
4629 deserialize_fn: deserialize_pipeline,
4630 ..
4631 } => {
4632 let receiver_stream_ident =
4633 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4634
4635 match builders_or_callback {
4636 BuildersOrCallback::Builders(graph_builders) => {
4637 let (_, source_expr) = match instantiate_fn {
4638 DebugInstantiate::Building => (
4639 syn::parse_quote!(DUMMY_SINK),
4640 syn::parse_quote!(DUMMY_SOURCE),
4641 ),
4642
4643 DebugInstantiate::Finalized(finalized) => {
4644 (finalized.sink.clone(), finalized.source.clone())
4645 }
4646 };
4647
4648 graph_builders.create_external_source(
4649 &out_location,
4650 source_expr,
4651 &receiver_stream_ident,
4652 deserialize_pipeline.as_ref(),
4653 *next_stmt_id,
4654 );
4655 }
4656 BuildersOrCallback::Callback(_, node_callback) => {
4657 node_callback(node, next_stmt_id);
4658 }
4659 }
4660
4661 *next_stmt_id += 1;
4662
4663 ident_stack.push(receiver_stream_ident);
4664 }
4665
4666 HydroNode::Counter {
4667 tag,
4668 duration,
4669 prefix,
4670 ..
4671 } => {
4672 let input_ident = ident_stack.pop().unwrap();
4673
4674 let counter_ident =
4675 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4676
4677 match builders_or_callback {
4678 BuildersOrCallback::Builders(graph_builders) => {
4679 let arg = format!("{}({})", prefix, tag);
4680 let builder = graph_builders.get_dfir_mut(&out_location);
4681 builder.add_dfir(
4682 parse_quote! {
4683 #counter_ident = #input_ident -> _counter(#arg, #duration);
4684 },
4685 None,
4686 Some(&next_stmt_id.to_string()),
4687 );
4688 }
4689 BuildersOrCallback::Callback(_, node_callback) => {
4690 node_callback(node, next_stmt_id);
4691 }
4692 }
4693
4694 *next_stmt_id += 1;
4695
4696 ident_stack.push(counter_ident);
4697 }
4698 }
4699 },
4700 seen_tees,
4701 false,
4702 );
4703
4704 let ret = ident_stack
4705 .pop()
4706 .expect("ident_stack should have exactly one element after traversal");
4707 assert!(
4708 ident_stack.is_empty(),
4709 "ident_stack should be empty after popping the final ident, but has {} remaining element(s). \
4710 This indicates a bug in the code gen: some node pushed idents that were never consumed.",
4711 ident_stack.len()
4712 );
4713 ret
4714 }
4715
4716 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
4717 match self {
4718 HydroNode::Placeholder => {
4719 panic!()
4720 }
4721 HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
4722 HydroNode::Source { source, .. } => match source {
4723 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
4724 HydroSource::ExternalNetwork()
4725 | HydroSource::Spin()
4726 | HydroSource::ClusterMembers(_, _)
4727 | HydroSource::Embedded(_)
4728 | HydroSource::EmbeddedSingleton(_) => {} },
4730 HydroNode::SingletonSource { value, .. } => {
4731 transform(value);
4732 }
4733 HydroNode::CycleSource { .. }
4734 | HydroNode::Tee { .. }
4735 | HydroNode::Singleton { .. }
4736 | HydroNode::YieldConcat { .. }
4737 | HydroNode::BeginAtomic { .. }
4738 | HydroNode::EndAtomic { .. }
4739 | HydroNode::Batch { .. }
4740 | HydroNode::Chain { .. }
4741 | HydroNode::MergeOrdered { .. }
4742 | HydroNode::ChainFirst { .. }
4743 | HydroNode::CrossProduct { .. }
4744 | HydroNode::CrossSingleton { .. }
4745 | HydroNode::ResolveFutures { .. }
4746 | HydroNode::ResolveFuturesBlocking { .. }
4747 | HydroNode::ResolveFuturesOrdered { .. }
4748 | HydroNode::Join { .. }
4749 | HydroNode::JoinHalf { .. }
4750 | HydroNode::Difference { .. }
4751 | HydroNode::AntiJoin { .. }
4752 | HydroNode::DeferTick { .. }
4753 | HydroNode::Enumerate { .. }
4754 | HydroNode::Unique { .. }
4755 | HydroNode::Sort { .. } => {}
4756 HydroNode::Map { f, .. }
4757 | HydroNode::FlatMap { f, .. }
4758 | HydroNode::FlatMapStreamBlocking { f, .. }
4759 | HydroNode::Filter { f, .. }
4760 | HydroNode::FilterMap { f, .. }
4761 | HydroNode::Inspect { f, .. }
4762 | HydroNode::Partition { f, .. }
4763 | HydroNode::Reduce { f, .. }
4764 | HydroNode::ReduceKeyed { f, .. }
4765 | HydroNode::ReduceKeyedWatermark { f, .. } => {
4766 transform(&mut f.expr);
4767 }
4768 HydroNode::Fold { init, acc, .. }
4769 | HydroNode::Scan { init, acc, .. }
4770 | HydroNode::ScanAsyncBlocking { init, acc, .. }
4771 | HydroNode::FoldKeyed { init, acc, .. } => {
4772 transform(&mut init.expr);
4773 transform(&mut acc.expr);
4774 }
4775 HydroNode::Network {
4776 serialize_fn,
4777 deserialize_fn,
4778 ..
4779 } => {
4780 if let Some(serialize_fn) = serialize_fn {
4781 transform(serialize_fn);
4782 }
4783 if let Some(deserialize_fn) = deserialize_fn {
4784 transform(deserialize_fn);
4785 }
4786 }
4787 HydroNode::ExternalInput { deserialize_fn, .. } => {
4788 if let Some(deserialize_fn) = deserialize_fn {
4789 transform(deserialize_fn);
4790 }
4791 }
4792 HydroNode::Counter { duration, .. } => {
4793 transform(duration);
4794 }
4795 }
4796 }
4797
4798 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
4799 &self.metadata().op
4800 }
4801
4802 pub fn metadata(&self) -> &HydroIrMetadata {
4803 match self {
4804 HydroNode::Placeholder => {
4805 panic!()
4806 }
4807 HydroNode::Cast { metadata, .. }
4808 | HydroNode::ObserveNonDet { metadata, .. }
4809 | HydroNode::Source { metadata, .. }
4810 | HydroNode::SingletonSource { metadata, .. }
4811 | HydroNode::CycleSource { metadata, .. }
4812 | HydroNode::Tee { metadata, .. }
4813 | HydroNode::Singleton { metadata, .. }
4814 | HydroNode::Partition { metadata, .. }
4815 | HydroNode::YieldConcat { metadata, .. }
4816 | HydroNode::BeginAtomic { metadata, .. }
4817 | HydroNode::EndAtomic { metadata, .. }
4818 | HydroNode::Batch { metadata, .. }
4819 | HydroNode::Chain { metadata, .. }
4820 | HydroNode::MergeOrdered { metadata, .. }
4821 | HydroNode::ChainFirst { metadata, .. }
4822 | HydroNode::CrossProduct { metadata, .. }
4823 | HydroNode::CrossSingleton { metadata, .. }
4824 | HydroNode::Join { metadata, .. }
4825 | HydroNode::JoinHalf { metadata, .. }
4826 | HydroNode::Difference { metadata, .. }
4827 | HydroNode::AntiJoin { metadata, .. }
4828 | HydroNode::ResolveFutures { metadata, .. }
4829 | HydroNode::ResolveFuturesBlocking { metadata, .. }
4830 | HydroNode::ResolveFuturesOrdered { metadata, .. }
4831 | HydroNode::Map { metadata, .. }
4832 | HydroNode::FlatMap { metadata, .. }
4833 | HydroNode::FlatMapStreamBlocking { metadata, .. }
4834 | HydroNode::Filter { metadata, .. }
4835 | HydroNode::FilterMap { metadata, .. }
4836 | HydroNode::DeferTick { metadata, .. }
4837 | HydroNode::Enumerate { metadata, .. }
4838 | HydroNode::Inspect { metadata, .. }
4839 | HydroNode::Unique { metadata, .. }
4840 | HydroNode::Sort { metadata, .. }
4841 | HydroNode::Scan { metadata, .. }
4842 | HydroNode::ScanAsyncBlocking { metadata, .. }
4843 | HydroNode::Fold { metadata, .. }
4844 | HydroNode::FoldKeyed { metadata, .. }
4845 | HydroNode::Reduce { metadata, .. }
4846 | HydroNode::ReduceKeyed { metadata, .. }
4847 | HydroNode::ReduceKeyedWatermark { metadata, .. }
4848 | HydroNode::ExternalInput { metadata, .. }
4849 | HydroNode::Network { metadata, .. }
4850 | HydroNode::Counter { metadata, .. } => metadata,
4851 }
4852 }
4853
4854 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
4855 &mut self.metadata_mut().op
4856 }
4857
4858 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
4859 match self {
4860 HydroNode::Placeholder => {
4861 panic!()
4862 }
4863 HydroNode::Cast { metadata, .. }
4864 | HydroNode::ObserveNonDet { metadata, .. }
4865 | HydroNode::Source { metadata, .. }
4866 | HydroNode::SingletonSource { metadata, .. }
4867 | HydroNode::CycleSource { metadata, .. }
4868 | HydroNode::Tee { metadata, .. }
4869 | HydroNode::Singleton { metadata, .. }
4870 | HydroNode::Partition { metadata, .. }
4871 | HydroNode::YieldConcat { metadata, .. }
4872 | HydroNode::BeginAtomic { metadata, .. }
4873 | HydroNode::EndAtomic { metadata, .. }
4874 | HydroNode::Batch { metadata, .. }
4875 | HydroNode::Chain { metadata, .. }
4876 | HydroNode::MergeOrdered { metadata, .. }
4877 | HydroNode::ChainFirst { metadata, .. }
4878 | HydroNode::CrossProduct { metadata, .. }
4879 | HydroNode::CrossSingleton { metadata, .. }
4880 | HydroNode::Join { metadata, .. }
4881 | HydroNode::JoinHalf { metadata, .. }
4882 | HydroNode::Difference { metadata, .. }
4883 | HydroNode::AntiJoin { metadata, .. }
4884 | HydroNode::ResolveFutures { metadata, .. }
4885 | HydroNode::ResolveFuturesBlocking { metadata, .. }
4886 | HydroNode::ResolveFuturesOrdered { metadata, .. }
4887 | HydroNode::Map { metadata, .. }
4888 | HydroNode::FlatMap { metadata, .. }
4889 | HydroNode::FlatMapStreamBlocking { metadata, .. }
4890 | HydroNode::Filter { metadata, .. }
4891 | HydroNode::FilterMap { metadata, .. }
4892 | HydroNode::DeferTick { metadata, .. }
4893 | HydroNode::Enumerate { metadata, .. }
4894 | HydroNode::Inspect { metadata, .. }
4895 | HydroNode::Unique { metadata, .. }
4896 | HydroNode::Sort { metadata, .. }
4897 | HydroNode::Scan { metadata, .. }
4898 | HydroNode::ScanAsyncBlocking { metadata, .. }
4899 | HydroNode::Fold { metadata, .. }
4900 | HydroNode::FoldKeyed { metadata, .. }
4901 | HydroNode::Reduce { metadata, .. }
4902 | HydroNode::ReduceKeyed { metadata, .. }
4903 | HydroNode::ReduceKeyedWatermark { metadata, .. }
4904 | HydroNode::ExternalInput { metadata, .. }
4905 | HydroNode::Network { metadata, .. }
4906 | HydroNode::Counter { metadata, .. } => metadata,
4907 }
4908 }
4909
4910 pub fn input(&self) -> Vec<&HydroNode> {
4911 match self {
4912 HydroNode::Placeholder => {
4913 panic!()
4914 }
4915 HydroNode::Source { .. }
4916 | HydroNode::SingletonSource { .. }
4917 | HydroNode::ExternalInput { .. }
4918 | HydroNode::CycleSource { .. }
4919 | HydroNode::Tee { .. }
4920 | HydroNode::Singleton { .. }
4921 | HydroNode::Partition { .. } => {
4922 vec![]
4924 }
4925 HydroNode::Cast { inner, .. }
4926 | HydroNode::ObserveNonDet { inner, .. }
4927 | HydroNode::YieldConcat { inner, .. }
4928 | HydroNode::BeginAtomic { inner, .. }
4929 | HydroNode::EndAtomic { inner, .. }
4930 | HydroNode::Batch { inner, .. } => {
4931 vec![inner]
4932 }
4933 HydroNode::Chain { first, second, .. } => {
4934 vec![first, second]
4935 }
4936 HydroNode::MergeOrdered { first, second, .. } => {
4937 vec![first, second]
4938 }
4939 HydroNode::ChainFirst { first, second, .. } => {
4940 vec![first, second]
4941 }
4942 HydroNode::CrossProduct { left, right, .. }
4943 | HydroNode::CrossSingleton { left, right, .. }
4944 | HydroNode::Join { left, right, .. }
4945 | HydroNode::JoinHalf { left, right, .. } => {
4946 vec![left, right]
4947 }
4948 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
4949 vec![pos, neg]
4950 }
4951 HydroNode::Map { input, .. }
4952 | HydroNode::FlatMap { input, .. }
4953 | HydroNode::FlatMapStreamBlocking { input, .. }
4954 | HydroNode::Filter { input, .. }
4955 | HydroNode::FilterMap { input, .. }
4956 | HydroNode::Sort { input, .. }
4957 | HydroNode::DeferTick { input, .. }
4958 | HydroNode::Enumerate { input, .. }
4959 | HydroNode::Inspect { input, .. }
4960 | HydroNode::Unique { input, .. }
4961 | HydroNode::Network { input, .. }
4962 | HydroNode::Counter { input, .. }
4963 | HydroNode::ResolveFutures { input, .. }
4964 | HydroNode::ResolveFuturesBlocking { input, .. }
4965 | HydroNode::ResolveFuturesOrdered { input, .. }
4966 | HydroNode::Fold { input, .. }
4967 | HydroNode::FoldKeyed { input, .. }
4968 | HydroNode::Reduce { input, .. }
4969 | HydroNode::ReduceKeyed { input, .. }
4970 | HydroNode::Scan { input, .. }
4971 | HydroNode::ScanAsyncBlocking { input, .. } => {
4972 vec![input]
4973 }
4974 HydroNode::ReduceKeyedWatermark {
4975 input, watermark, ..
4976 } => {
4977 vec![input, watermark]
4978 }
4979 }
4980 }
4981
4982 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
4983 self.input()
4984 .iter()
4985 .map(|input_node| input_node.metadata())
4986 .collect()
4987 }
4988
4989 pub fn is_shared_with_others(&self) -> bool {
4993 match self {
4994 HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
4995 Rc::strong_count(&inner.0) > 1
4996 }
4997 HydroNode::Singleton { .. } => false,
5000 _ => false,
5001 }
5002 }
5003
5004 pub fn print_root(&self) -> String {
5005 match self {
5006 HydroNode::Placeholder => {
5007 panic!()
5008 }
5009 HydroNode::Cast { .. } => "Cast()".to_owned(),
5010 HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
5011 HydroNode::Source { source, .. } => format!("Source({:?})", source),
5012 HydroNode::SingletonSource {
5013 value,
5014 first_tick_only,
5015 ..
5016 } => format!(
5017 "SingletonSource({:?}, first_tick_only={})",
5018 value, first_tick_only
5019 ),
5020 HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
5021 HydroNode::Tee { inner, .. } => {
5022 format!("Tee({})", inner.0.borrow().print_root())
5023 }
5024 HydroNode::Singleton { inner, .. } => {
5025 format!("Singleton({})", inner.0.borrow().print_root())
5026 }
5027 HydroNode::Partition { f, is_true, .. } => {
5028 format!("Partition({:?}, is_true={})", f, is_true)
5029 }
5030 HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
5031 HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
5032 HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
5033 HydroNode::Batch { .. } => "Batch()".to_owned(),
5034 HydroNode::Chain { first, second, .. } => {
5035 format!("Chain({}, {})", first.print_root(), second.print_root())
5036 }
5037 HydroNode::MergeOrdered { first, second, .. } => {
5038 format!(
5039 "MergeOrdered({}, {})",
5040 first.print_root(),
5041 second.print_root()
5042 )
5043 }
5044 HydroNode::ChainFirst { first, second, .. } => {
5045 format!(
5046 "ChainFirst({}, {})",
5047 first.print_root(),
5048 second.print_root()
5049 )
5050 }
5051 HydroNode::CrossProduct { left, right, .. } => {
5052 format!(
5053 "CrossProduct({}, {})",
5054 left.print_root(),
5055 right.print_root()
5056 )
5057 }
5058 HydroNode::CrossSingleton { left, right, .. } => {
5059 format!(
5060 "CrossSingleton({}, {})",
5061 left.print_root(),
5062 right.print_root()
5063 )
5064 }
5065 HydroNode::Join { left, right, .. } => {
5066 format!("Join({}, {})", left.print_root(), right.print_root())
5067 }
5068 HydroNode::JoinHalf { left, right, .. } => {
5069 format!("JoinHalf({}, {})", left.print_root(), right.print_root())
5070 }
5071 HydroNode::Difference { pos, neg, .. } => {
5072 format!("Difference({}, {})", pos.print_root(), neg.print_root())
5073 }
5074 HydroNode::AntiJoin { pos, neg, .. } => {
5075 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
5076 }
5077 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
5078 HydroNode::ResolveFuturesBlocking { .. } => "ResolveFuturesBlocking()".to_owned(),
5079 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
5080 HydroNode::Map { f, .. } => format!("Map({:?})", f),
5081 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
5082 HydroNode::FlatMapStreamBlocking { f, .. } => format!("FlatMapStreamBlocking({:?})", f),
5083 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
5084 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
5085 HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
5086 HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
5087 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
5088 HydroNode::Unique { .. } => "Unique()".to_owned(),
5089 HydroNode::Sort { .. } => "Sort()".to_owned(),
5090 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
5091 HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
5092 HydroNode::ScanAsyncBlocking { init, acc, .. } => {
5093 format!("ScanAsyncBlocking({:?}, {:?})", init, acc)
5094 }
5095 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
5096 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
5097 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
5098 HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
5099 HydroNode::Network { .. } => "Network()".to_owned(),
5100 HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
5101 HydroNode::Counter { tag, duration, .. } => {
5102 format!("Counter({:?}, {:?})", tag, duration)
5103 }
5104 }
5105 }
5106}
5107
5108#[cfg(feature = "build")]
5109fn instantiate_network<'a, D>(
5110 env: &mut D::InstantiateEnv,
5111 from_location: &LocationId,
5112 to_location: &LocationId,
5113 processes: &SparseSecondaryMap<LocationKey, D::Process>,
5114 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
5115 name: Option<&str>,
5116 networking_info: &crate::networking::NetworkingInfo,
5117) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
5118where
5119 D: Deploy<'a>,
5120{
5121 let ((sink, source), connect_fn) = match (from_location, to_location) {
5122 (&LocationId::Process(from), &LocationId::Process(to)) => {
5123 let from_node = processes
5124 .get(from)
5125 .unwrap_or_else(|| {
5126 panic!("A process used in the graph was not instantiated: {}", from)
5127 })
5128 .clone();
5129 let to_node = processes
5130 .get(to)
5131 .unwrap_or_else(|| {
5132 panic!("A process used in the graph was not instantiated: {}", to)
5133 })
5134 .clone();
5135
5136 let sink_port = from_node.next_port();
5137 let source_port = to_node.next_port();
5138
5139 (
5140 D::o2o_sink_source(
5141 env,
5142 &from_node,
5143 &sink_port,
5144 &to_node,
5145 &source_port,
5146 name,
5147 networking_info,
5148 ),
5149 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
5150 )
5151 }
5152 (&LocationId::Process(from), &LocationId::Cluster(to)) => {
5153 let from_node = processes
5154 .get(from)
5155 .unwrap_or_else(|| {
5156 panic!("A process used in the graph was not instantiated: {}", from)
5157 })
5158 .clone();
5159 let to_node = clusters
5160 .get(to)
5161 .unwrap_or_else(|| {
5162 panic!("A cluster used in the graph was not instantiated: {}", to)
5163 })
5164 .clone();
5165
5166 let sink_port = from_node.next_port();
5167 let source_port = to_node.next_port();
5168
5169 (
5170 D::o2m_sink_source(
5171 env,
5172 &from_node,
5173 &sink_port,
5174 &to_node,
5175 &source_port,
5176 name,
5177 networking_info,
5178 ),
5179 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
5180 )
5181 }
5182 (&LocationId::Cluster(from), &LocationId::Process(to)) => {
5183 let from_node = clusters
5184 .get(from)
5185 .unwrap_or_else(|| {
5186 panic!("A cluster used in the graph was not instantiated: {}", from)
5187 })
5188 .clone();
5189 let to_node = processes
5190 .get(to)
5191 .unwrap_or_else(|| {
5192 panic!("A process used in the graph was not instantiated: {}", to)
5193 })
5194 .clone();
5195
5196 let sink_port = from_node.next_port();
5197 let source_port = to_node.next_port();
5198
5199 (
5200 D::m2o_sink_source(
5201 env,
5202 &from_node,
5203 &sink_port,
5204 &to_node,
5205 &source_port,
5206 name,
5207 networking_info,
5208 ),
5209 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
5210 )
5211 }
5212 (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
5213 let from_node = clusters
5214 .get(from)
5215 .unwrap_or_else(|| {
5216 panic!("A cluster used in the graph was not instantiated: {}", from)
5217 })
5218 .clone();
5219 let to_node = clusters
5220 .get(to)
5221 .unwrap_or_else(|| {
5222 panic!("A cluster used in the graph was not instantiated: {}", to)
5223 })
5224 .clone();
5225
5226 let sink_port = from_node.next_port();
5227 let source_port = to_node.next_port();
5228
5229 (
5230 D::m2m_sink_source(
5231 env,
5232 &from_node,
5233 &sink_port,
5234 &to_node,
5235 &source_port,
5236 name,
5237 networking_info,
5238 ),
5239 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
5240 )
5241 }
5242 (LocationId::Tick(_, _), _) => panic!(),
5243 (_, LocationId::Tick(_, _)) => panic!(),
5244 (LocationId::Atomic(_), _) => panic!(),
5245 (_, LocationId::Atomic(_)) => panic!(),
5246 };
5247 (sink, source, connect_fn)
5248}
5249
5250#[cfg(test)]
5251mod serde_test;
5252
5253#[cfg(test)]
5254mod test {
5255 use std::mem::size_of;
5256
5257 use stageleft::{QuotedWithContext, q};
5258
5259 use super::*;
5260
5261 #[test]
5262 #[cfg_attr(
5263 not(feature = "build"),
5264 ignore = "expects inclusion of feature-gated fields"
5265 )]
5266 fn hydro_node_size() {
5267 assert_eq!(size_of::<HydroNode>(), 256);
5268 }
5269
5270 #[test]
5271 #[cfg_attr(
5272 not(feature = "build"),
5273 ignore = "expects inclusion of feature-gated fields"
5274 )]
5275 fn hydro_root_size() {
5276 assert_eq!(size_of::<HydroRoot>(), 136);
5277 }
5278
5279 #[test]
5280 fn test_simplify_q_macro_basic() {
5281 let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
5283 let result = simplify_q_macro(simple_expr.clone());
5284 assert_eq!(result, simple_expr);
5285 }
5286
5287 #[test]
5288 fn test_simplify_q_macro_actual_stageleft_call() {
5289 let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
5291 let result = simplify_q_macro(stageleft_call);
5292 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5295 }
5296
5297 #[test]
5298 fn test_closure_no_pipe_at_start() {
5299 let stageleft_call = q!({
5301 let foo = 123;
5302 move |b: usize| b + foo
5303 })
5304 .splice_fn1_ctx(&());
5305 let result = simplify_q_macro(stageleft_call);
5306 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5307 }
5308}